In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [5]:
import sklearn
from sklearn.metrics import classification_report, confusion_matrix

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## WARNING, FILE "delay_clean.csv" is > 4 GB  --  added to gitignore
## Using reduced dataset.....

In [7]:
# Load and parse the data file, converting it to a DataFrame
clean = spark.read.format("libsvm").load('/content/drive/MyDrive/Colab_Notebooks/delay_clean_SVM.txt')
clean.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
+-----+--------------------+
only showing top 5 rows



In [8]:
clean.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [9]:
# Number of rows in dataset
number_rows = clean.count()
number_rows

6489057

In [10]:
clean.groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|  0.0|5261690|
|  1.0|1227367|
+-----+-------+



In [11]:
# Index labels, adding metadata to the label column
# Fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol = "label", outputCol = "indexedLabel").fit(clean)

In [12]:
# Automatically identify categorical features, and index them
# Set maxCategories so features with > 4 distinct values are treated as continuous
featureIndexer = VectorIndexer(inputCol = "features", outputCol = "indexedFeatures", maxCategories = 4).fit(clean)

In [13]:
from pyspark.ml.feature import Normalizer

In [14]:
normalizer = Normalizer(inputCol = "features", outputCol = "normFeatures", p = 1.0)
NormOutput = normalizer.transform(clean)

In [15]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = clean.randomSplit([0.7, 0.3])

In [16]:
trainingData.printSchema

<bound method DataFrame.printSchema of +-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
+-----+--------------------+
only showing top 20 rows
>

In [17]:
testData.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
|  0.0|(153,[0,1,2,3,4,5...|
+-----+--------------------+
only showing top 10 rows



# Gradient-boosted tree classifier (GBT)

In [18]:
# Train a GBT model
gbt = GBTClassifier(labelCol = "indexedLabel", featuresCol = "indexedFeatures", maxIter = 10)

In [19]:
# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages = [labelIndexer, featureIndexer, gbt])

In [20]:
# Train model.  This also runs the indexers
model = pipeline.fit(trainingData)

In [21]:
# Make predictions
predictions = model.transform(testData)

In [22]:
# Select example rows to display
predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
+----------+------------+--------------------+
only showing top 5 rows



In [23]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol = "indexedLabel", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

Accuracy = 0.813148
Test Error = 0.186852


In [24]:
gbtModel = model.stages[2]
print(gbtModel)  # summary only

GBTClassificationModel: uid = GBTClassifier_a65f4f7ffe9d, numTrees=10, numClasses=2, numFeatures=153


In [25]:
y_true = predictions.select(['indexedLabel']).collect()
y_pred = predictions.select(['prediction']).collect()

In [26]:
print(confusion_matrix(y_true, y_pred))

[[1573624    4255]
 [ 359408    8971]]


In [27]:
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

         0.0       0.81      1.00      0.90   1577879
         1.0       0.68      0.02      0.05    368379

    accuracy                           0.81   1946258
   macro avg       0.75      0.51      0.47   1946258
weighted avg       0.79      0.81      0.74   1946258



In [28]:
gbt.save("gbt_model.model")

# Random forest classifier (RFC)

In [29]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString

In [30]:
# Train a RandomForest model
rf = RandomForestClassifier(labelCol = "indexedLabel", featuresCol = "indexedFeatures", numTrees = 10)

In [31]:
# Convert indexed labels back to original labels
labelConverter = IndexToString(inputCol = "prediction", outputCol = "predictedLabel",
                               labels = labelIndexer.labels)

In [32]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages = [labelIndexer, featureIndexer, rf, labelConverter])

In [33]:
# Train model.  This also runs the indexers
model = pipeline.fit(trainingData)

In [34]:
# Make predictions
predictions1 = model.transform(testData)

In [35]:
# Select example rows to display
predictions1.select("predictedLabel", "label", "features").show(5)

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           0.0|  0.0|(153,[0,1,2,3,4,5...|
|           0.0|  0.0|(153,[0,1,2,3,4,5...|
|           0.0|  0.0|(153,[0,1,2,3,4,5...|
|           0.0|  0.0|(153,[0,1,2,3,4,5...|
|           0.0|  0.0|(153,[0,1,2,3,4,5...|
+--------------+-----+--------------------+
only showing top 5 rows



In [36]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol = "indexedLabel", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions1)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

Accuracy = 0.810724
Test Error = 0.189276


In [37]:
rfModel = model.stages[2]
print(rfModel)  # summary only

RandomForestClassificationModel: uid=RandomForestClassifier_3153acdd891a, numTrees=10, numClasses=2, numFeatures=153


In [38]:
print(rfModel.featureImportances)

(153,[0,2,3,4,5,6,7,8,10,12,14,15,16,17,18,19,20,21,22,24,26,30,32,33,34,35,36,37,38,41,43,45,47,48,66,67,77,82,84,93,95,108,116,122,142,151,152],[0.00043194861127163236,0.024833355027955414,0.15887225682062045,0.01612470647769076,0.0019950141276241697,0.09328693229628032,0.015085566452410826,0.002170013074534715,0.0016202464872729914,0.002217275425233632,0.1301040455744316,0.04752553344321054,0.0002889401420130394,0.0005780426023091405,0.0023235159028454433,0.0007273110665121879,0.060784159910128234,0.09459289655552312,0.05812925429478211,0.022651589751698505,0.0021512710505628124,0.0033208124496303627,0.020187928954084746,0.0022047002245754155,0.0010847080297592346,0.08147137636985742,0.00043377565868896195,0.04306251239324124,0.033490376065235986,0.0003119599222720562,0.005109480344328582,0.05336123739621627,0.00029396639885988793,4.528991814906351e-05,0.010366168063646953,0.00012426726982568093,4.9058635122806144e-05,2.524156883532959e-05,5.746656447374331e-05,0.0004047263726644071

In [39]:
y_true = predictions1.select(['indexedLabel']).collect()
y_pred = predictions1.select(['prediction']).collect()

In [40]:
print(confusion_matrix(y_true, y_pred))

[[1577879       0]
 [ 368379       0]]


In [41]:
print(classification_report(y_true, y_pred))

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

         0.0       0.81      1.00      0.90   1577879
         1.0       0.00      0.00      0.00    368379

    accuracy                           0.81   1946258
   macro avg       0.41      0.50      0.45   1946258
weighted avg       0.66      0.81      0.73   1946258



  _warn_prf(average, modifier, msg_start, len(result))


# Factorization machines classifier

In [42]:
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler

In [43]:
# Index labels, adding metadata to the label column
# Fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol = "label", outputCol = "indexedLabel").fit(clean)

# Scale features
featureScaler = MinMaxScaler(inputCol = "features", outputCol = "scaledFeatures").fit(clean) 

In [44]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = clean.randomSplit([0.7, 0.3])

In [45]:
# Train a FM model
fm = FMClassifier(labelCol = "indexedLabel", featuresCol = "scaledFeatures", stepSize = 0.001)

In [46]:
# Create a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])

In [48]:
# Train model
model = pipeline.fit(trainingData)

In [49]:
# Make predictions
predictions2 = model.transform(testData)

In [50]:
# Select example rows to display
predictions2.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
|       0.0|         0.0|(153,[0,1,2,3,4,5...|
+----------+------------+--------------------+
only showing top 5 rows



In [51]:
# Select (prediction, true label) and compute test accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol = "indexedLabel", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions2)
print("Test set accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

Test set accuracy = 0.810811
Test Error = 0.189189


In [52]:
y_true = predictions2.select(['indexedLabel']).collect()
y_pred = predictions2.select(['prediction']).collect()

In [53]:
print(confusion_matrix(y_true, y_pred))

[[1579134       0]
 [ 368464       0]]


In [54]:
print(classification_report(y_true, y_pred))

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

         0.0       0.81      1.00      0.90   1579134
         1.0       0.00      0.00      0.00    368464

    accuracy                           0.81   1947598
   macro avg       0.41      0.50      0.45   1947598
weighted avg       0.66      0.81      0.73   1947598



  _warn_prf(average, modifier, msg_start, len(result))
