In [1]:
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz

!pip install -q findspark
!pip install pyspark
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=df6e7868de64cbadeaf4a0c5971745ff68891c620862908872c268f0ba19c699
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
# Point Colaboratory to your Google Drive

from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# Download datasets directly to your Google Drive "Colab Datasets" folder запускается один раз данный код
import requests

# 2007 data

file_url = "https://dataverse.harvard.edu/api/access/datafile/:persistentId?persistentId=doi:10.7910/DVN/HG7NV7/2BHLWK"

r = requests.get(file_url, stream = True) 

with open("/content/gdrive/My Drive/Colab Datasets/2007.csv.bz2", "wb") as file: 
	for block in r.iter_content(chunk_size = 1024): 
		if block: 
			file.write(block)

# 2008 data

file_url = "https://dataverse.harvard.edu/api/access/datafile/:persistentId?persistentId=doi:10.7910/DVN/HG7NV7/EIR0RA"

r = requests.get(file_url, stream = True) 

with open("/content/gdrive/My Drive/Colab Datasets/2008.csv.bz2", "wb") as file: 
	for block in r.iter_content(chunk_size = 1024): 
		if block: 
			file.write(block)

In [3]:
# Инструменты нужные для подключения к Spark, загрузка data,
# Очитска и подготовка
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnull, isnan, when, count, col
# Настройка констант
CSV_2007= "/content/gdrive/My Drive/Colab Datasets/2007.csv.bz2" 
CSV_2008= "/content/gdrive/My Drive/Colab Datasets/2008.csv.bz2"
APP_NAME = "Gradient-boosted-tree"
SPARK_URL = "local[*]"
RANDOM_SEED = 141389
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 8
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32



In [4]:
# Connect to the Spark server
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# Load datasets
df_2007 = spark.read.options(header="true",inferschema = "true").csv(CSV_2007)
df_2008 = spark.read.options(header="true",inferschema = "true").csv(CSV_2008)

# We concatenate both datasets обьеденение с помошью метода PySpark’s .unionAll
df = df_2007.unionAll(df_2008)
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [8]:
# просмотр размера кдра
print(f"Размер кадра {df.count():d} строк на {len(df.columns):d} столбцов.")

Размер кадра 9799958 строк на 28 столбцов.


In [6]:
# Проверка на нулевые значения
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)for c in df.columns]).toPandas().to_dict(orient='records')
print(f"Количество пустых значение в наборе данных {sum(null_counts[0].values()):d} .")

Количество пустых значение в наборе данных 9659715 .


In [7]:
# удаление столба и доп ячейку содержащее нулевое значение
df = df.drop(df.CancellationCode)
df = df.na.drop()

In [9]:
# типы столбоцов наших наборов данных
df.dtypes

[('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('DepTime', 'string'),
 ('CRSDepTime', 'int'),
 ('ArrTime', 'string'),
 ('CRSArrTime', 'int'),
 ('UniqueCarrier', 'string'),
 ('FlightNum', 'int'),
 ('TailNum', 'string'),
 ('ActualElapsedTime', 'string'),
 ('CRSElapsedTime', 'string'),
 ('AirTime', 'string'),
 ('ArrDelay', 'string'),
 ('DepDelay', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'int'),
 ('TaxiIn', 'string'),
 ('TaxiOut', 'string'),
 ('Cancelled', 'int'),
 ('Diverted', 'int'),
 ('CarrierDelay', 'string'),
 ('WeatherDelay', 'string'),
 ('NASDelay', 'string'),
 ('SecurityDelay', 'string'),
 ('LateAircraftDelay', 'string')]

In [10]:
# cancelled имеет две уникальные переменные 1 - да рейс отменен, 0 - нет рейс не отменен
df.select('Cancelled').distinct().rdd.map(lambda r: r[0]).collect()

[1, 0]

In [11]:
# для запуска модели оставим только столбцы с типом int
feature_cols = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSDepTime', 'CRSArrTime', 'FlightNum', 'Distance', 'Diverted']
# вектор и добваленный столбец который будет подтвергатся меотду transform
df = VectorAssembler(inputCols=feature_cols, outputCol="features").transform(df)

In [12]:
# столбцы которые будут использованы для обучения и тестирования модели
df.select("Cancelled", "features").show(5)

+---------+--------------------+
|Cancelled|            features|
+---------+--------------------+
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
+---------+--------------------+
only showing top 5 rows



In [15]:

df.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+--------------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|            features|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+--------------------+
|2007|    1|         1|        1|   1232|      1225|   1341|      1340|           

In [20]:

# Split the data into training and tests sets
(trainingData, testData) = df.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train GBTClassifier model
#rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

rf = GBTClassifier(labelCol = 'Cancelled', featuresCol = 'features')


In [21]:
# Train model
model = rf.fit(trainingData)
# Make predictions
predictions = model.transform(testData)


In [23]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="Cancelled", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Ошибка тестирования = {(1.0 - accuracy):g}")
print(f"Точность = {accuracy:g}")


Ошибка тестирования = 0.0185963
Точность = 0.981404
