In [None]:
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz
!tar xf spark-3.0.3-bin-hadoop3.2.tgz

!pip install -q findspark
!pip install pyspark==3.0.3
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.0.3
  Downloading pyspark-3.0.3.tar.gz (209.1 MB)
[K     |████████████████████████████████| 209.1 MB 65 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 45.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.3-py2.py3-none-any.whl size=209435971 sha256=cbe16f7412b6691bbb7b5b1aa7ced49fc4b0a1d5942cb05d016d98e81b02ec7a
  Stored in directory: /root/.cache/pip/wheels/7e/6d/0a/6b0bf301bc056d9af03194b732b9f49ad2fceb205aab2984fd
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.3


In [None]:
# Point Colaboratory to your Google Drive

from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# Tools we need to connect to the Spark server, load our data,
# clean it and prepare it
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col
# Set up constants
CSV_combined= "/content/gdrive/My Drive/Colab Datasets/combined.csv" 
APP_NAME = "Flight Delays"
SPARK_URL = "local[*]"
RANDOM_SEED = 141109
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 8
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

In [None]:

# Connect to the Spark server

spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()


# Load datasets

df_combined = spark.read.options(header="true",inferschema = "true").csv(CSV_combined)


# We concatenate both datasets

df = df_combined

In [None]:
print(f"The shape is {df.count():d} rows by {len(df.columns):d} columns.")

The shape is 22000 rows by 30 columns.


In [None]:
null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c)for c in df.columns]).toPandas().to_dict(orient='records')
print(f"We have {sum(null_counts[0].values()):d} null values in this dataset.")

We have 138804 null values in this dataset.


In [None]:
df = df.drop(df.CancellationCode)
df = df.na.drop()

In [None]:
df.dtypes


[('_c0', 'int'),
 ('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('DepTime', 'double'),
 ('CRSDepTime', 'int'),
 ('ArrTime', 'double'),
 ('CRSArrTime', 'int'),
 ('UniqueCarrier', 'string'),
 ('FlightNum', 'int'),
 ('TailNum', 'string'),
 ('ActualElapsedTime', 'double'),
 ('CRSElapsedTime', 'double'),
 ('AirTime', 'double'),
 ('ArrDelay', 'double'),
 ('DepDelay', 'double'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'double'),
 ('TaxiIn', 'double'),
 ('TaxiOut', 'double'),
 ('Cancelled', 'int'),
 ('Diverted', 'int'),
 ('CarrierDelay', 'double'),
 ('WeatherDelay', 'double'),
 ('NASDelay', 'double'),
 ('SecurityDelay', 'double'),
 ('LateAircraftDelay', 'double')]

In [None]:
df.select('Cancelled').distinct().rdd.map(lambda r: r[0]).collect()


[1, 0]

In [None]:
feature_cols = ['_c0', 'Year', 'Month', 'DayofMonth',
                'DayOfWeek', 'CRSDepTime', 'CRSArrTime',
                'FlightNum', 'Diverted']

In [None]:
df = VectorAssembler(inputCols=feature_cols, outputCol="features").transform(df)

In [None]:
df.select("Cancelled", "features").show(5)


+---------+--------------------+
|Cancelled|            features|
+---------+--------------------+
|        0|[16003.0,2003.0,1...|
|        0|[16004.0,2003.0,1...|
|        0|[16005.0,2003.0,6...|
|        0|[16008.0,2003.0,7...|
|        0|[16009.0,2003.0,1...|
+---------+--------------------+
only showing top 5 rows



In [None]:
# Generate a labelIndexer
labelIndexer = StringIndexer(inputCol="Cancelled", outputCol="indexedLabel").fit(df)

# Generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df)
    
# Split the data into training and tests sets
(trainingData, testData) = df.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [None]:
# Train model
model = pipeline.fit(trainingData)
# Make predictions
predictions = model.transform(testData)

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.00257566
Accuracy = 0.997424
