# Pipeline
## Platform: Spark, colab.research.google.com

In [0]:
# Colab preinstalled packages
import pandas as pd
import time
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, recall_score, precision_score

In [0]:
# install Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# init Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler

In [6]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
flights_dfs = spark.read.csv("/content/gdrive/My Drive/Colab Notebooks/SparkAzureTutorial/data/flights.csv", header=True, inferSchema=True)

In [8]:
flights_dfs.describe().show(5, False)

+-------+------------------+-----------------+-------+------------------+------------------+------------------+------------------+
|summary|DayofMonth        |DayOfWeek        |Carrier|OriginAirportID   |DestAirportID     |DepDelay          |ArrDelay          |
+-------+------------------+-----------------+-------+------------------+------------------+------------------+------------------+
|count  |2702218           |2702218          |2702218|2702218           |2702218           |2702218           |2702218           |
|mean   |15.797897875004903|3.899480352806472|null   |12742.597593162358|12743.000197985506|10.510732294729737|6.6550108096386005|
|stddev |8.7988350691642   |1.985924603367557|null   |1501.8408475102513|1501.8014309297723|36.02975608466093 |38.547584236791245|
|min    |1                 |1                |9E     |10140             |10140             |-63               |-94               |
|max    |31                |7                |YV     |15376             |15376     

In [9]:
data = flights_dfs.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Double").alias("label")))
data.show(5)

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|        19|        5|     DL|          11433|        13303|      -3|  0.0|
|        19|        5|     DL|          14869|        12478|       0|  0.0|
|        19|        5|     DL|          14057|        14869|      -4|  0.0|
|        19|        5|     DL|          15016|        11433|      28|  1.0|
|        19|        5|     DL|          11193|        12892|      -6|  0.0|
+----------+---------+-------+---------------+-------------+--------+-----+
only showing top 5 rows



In [10]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
test.show(5, truncate=False)
print("Train len: {}, test len: {}".format(train.count(), test.count()))

+----------+---------+-------+---------------+-------------+--------+---------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|trueLabel|
+----------+---------+-------+---------------+-------------+--------+---------+
|1         |1        |9E     |10397          |12191        |-3      |0.0      |
|1         |1        |9E     |10397          |12264        |-5      |0.0      |
|1         |1        |9E     |10423          |11433        |-5      |0.0      |
|1         |1        |9E     |10423          |11433        |-3      |0.0      |
|1         |1        |9E     |10423          |11433        |14      |1.0      |
+----------+---------+-------+---------------+-------------+--------+---------+
only showing top 5 rows

Train len: 1891656, test len: 810562


In [11]:
# categorical features
strIdx = StringIndexer(inputCol = "Carrier", outputCol = "CarrierIdx")
catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
# numerical features
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
# combine with classifier
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, dt])

print("Training model...")
start_time = time.time()
pipelineModel = pipeline.fit(train)
print("Evaluating model...")
prediction = pipelineModel.transform(test)
print("--- {} seconds ---".format(time.time() - start_time))

predicted = prediction.select("features", "trueLabel", "prediction")
predicted.show(5, truncate=False)

Training model...
Evaluating model...
--- 119.75610280036926 seconds ---
+---------------------------------------------------+---------+----------+
|features                                           |trueLabel|prediction|
+---------------------------------------------------+---------+----------+
|[10.0,1.0,0.0,10397.0,12191.0,0.03115264797507788] |0.0      |0.0       |
|[10.0,1.0,0.0,10397.0,12264.0,0.030114226375908618]|0.0      |0.0       |
|[10.0,1.0,0.0,10423.0,11433.0,0.030114226375908618]|0.0      |0.0       |
|[10.0,1.0,0.0,10423.0,11433.0,0.03115264797507788] |0.0      |0.0       |
|[10.0,1.0,0.0,10423.0,11433.0,0.03997923156801662] |1.0      |0.0       |
+---------------------------------------------------+---------+----------+
only showing top 5 rows



In [12]:
predicted_spark = predicted.toPandas()
y_test = predicted_spark["trueLabel"]
y_pred = predicted_spark["prediction"]
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("Accuracy: {}".format(accuracy))
print("Precision: {}".format(precision))
print("Recall: {}".format(recall))
print("F1: {}".format(f1))
# check
cm = confusion_matrix(y_test, y_pred)
tn, fn, fp, tp = cm[0][0], cm[1][0], cm[0][1], cm[1][1]
assert precision == tp/(tp + fp)
assert recall == tp/(tp + fn)

Accuracy: 0.9264177200510263
Precision: 0.8844429398931769
Recall: 0.7264026790072351
F1: 0.7976701347780216


## Platform: Pandas, scikit-learn, colab.research.google.com

In [0]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier

In [14]:
flights_df = pd.read_csv("/content/gdrive/My Drive/Colab Notebooks/SparkAzureTutorial/data/flights.csv", header=0)
flights_df.describe()

Unnamed: 0,DayofMonth,DayOfWeek,OriginAirportID,DestAirportID,DepDelay,ArrDelay
count,2702218.0,2702218.0,2702218.0,2702218.0,2702218.0,2702218.0
mean,15.7979,3.89948,12742.6,12743.0,10.51073,6.655011
std,8.798835,1.985925,1501.841,1501.801,36.02976,38.54758
min,1.0,1.0,10140.0,10140.0,-63.0,-94.0
25%,8.0,2.0,11292.0,11292.0,-4.0,-11.0
50%,16.0,4.0,12892.0,12892.0,-1.0,-3.0
75%,23.0,6.0,14057.0,14057.0,9.0,10.0
max,31.0,7.0,15376.0,15376.0,1863.0,1845.0


In [15]:
# create labels
flights_df["late"] = 0
flights_df.loc[flights_df["ArrDelay"] > 15,"late"] = 1
flights_df.head(5)

Unnamed: 0,DayofMonth,DayOfWeek,Carrier,OriginAirportID,DestAirportID,DepDelay,ArrDelay,late
0,19,5,DL,11433,13303,-3,1,0
1,19,5,DL,14869,12478,0,-8,0
2,19,5,DL,14057,14869,-4,-15,0
3,19,5,DL,15016,11433,28,24,1
4,19,5,DL,11193,12892,-6,-11,0


In [0]:
X = flights_df.loc[:, ["DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay"]]
y = flights_df.loc[:, "late"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42)

In [0]:
class EncodeCategoricalCol():
    def __init__(self, col_name, classes=None):
        self.col_name = col_name
        self.classes = classes
    
    def fit(self, X, y=None):
        self.lb = LabelBinarizer()
        if self.classes:
            self.lb.classes_ = self.classes
        else:
            self.lb.fit(X.loc[:,self.col_name])
        return self

    def transform(self, X):
        vals = self.lb.transform(X[self.col_name])
        cols = ["{}_{}".format(self.col_name, x) for x in self.lb.classes_]
        new_categories = pd.DataFrame(vals, columns=cols, index=X.index)
        X_enc = pd.concat([X, new_categories], axis=1)
        X_enc = X_enc.drop(columns=self.col_name, axis=1)
        return X_enc

In [0]:
class EncodeNumericalCol():
    def __init__(self, col_name):
        self.col_name = col_name
    
    def fit(self, X, y=None):
        self.ss = StandardScaler()
        self.ss.fit(X.loc[:, self.col_name].values.reshape(-1, 1))
        return self

    def transform(self, X):
        X[self.col_name] = self.ss.transform(X[self.col_name].values.reshape(-1, 1))
        return X

In [0]:
enc_day_of_week = EncodeCategoricalCol(col_name="DayOfWeek", classes=list(range(1,8)))
enc_day_of_month = EncodeCategoricalCol(col_name="DayofMonth", classes=list(range(1,32)))
enc_carrier = EncodeCategoricalCol(col_name="Carrier")
enc_dep_delay = EncodeNumericalCol(col_name="DepDelay")
model = DecisionTreeClassifier(max_depth=5, random_state=42)
pipeline = Pipeline([("enc_day_of_week", enc_day_of_week),
                     ("enc_day_of_month", enc_day_of_month),
                     ("enc_carrier", enc_carrier),
                     ("enc_dep_delay", enc_dep_delay),
                     ("model", model)])

In [20]:
start_time = time.time()
print("Training model...")
pipeline.fit(X_train, y_train)
print("Evaluating model...")
y_pred = pipeline.predict(X_test)
print("--- {} seconds ---".format(time.time() - start_time))

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("Accuracy: {}".format(accuracy))
print("Precision: {}".format(precision))
print("Recall: {}".format(recall))
print("F1: {}".format(f1))
# check
cm = confusion_matrix(y_test, y_pred)
tn, fn, fp, tp = cm[0][0], cm[1][0], cm[0][1], cm[1][1]
assert precision == tp/(tp + fp)
assert recall == tp/(tp + fn)

Training model...




Evaluating model...




--- 39.889575242996216 seconds ---
Accuracy: 0.926816962842897
Precision: 0.893370849573714
Recall: 0.7188180423594616
F1: 0.7966449693392428


### Reusing Spark split data to compare metrics

In [0]:
train = train.toPandas()
test = test.toPandas()

In [0]:
y_train = train["label"]
X_train = train.drop("label", axis=1)
y_test = test["trueLabel"]
X_test = test.drop("trueLabel", axis=1)

In [23]:
start_time = time.time()
print("Training model...")
pipeline.fit(X_train, y_train)
print("Evaluating model...")
print("--- {} seconds ---".format(time.time() - start_time))

y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print("Accuracy: {}".format(accuracy))
print("Precision: {}".format(precision))
print("Recall: {}".format(recall))
print("F1: {}".format(f1))
# check
cm = confusion_matrix(y_test, y_pred)
tn, fn, fp, tp = cm[0][0], cm[1][0], cm[0][1], cm[1][1]
assert precision == tp/(tp + fp)
assert recall == tp/(tp + fn)

Training model...




Evaluating model...
--- 26.6262948513031 seconds ---




Accuracy: 0.926578102600418
Precision: 0.8940077617272923
Recall: 0.7173449654311682
F1: 0.7959920334294304
