In [1]:
import findspark
findspark.init()

In [153]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, Tokenizer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF, StopWordsRemover
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import Bucketizer
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, round
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [3]:
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('ML_apache_spark') \
                    .getOrCreate()

## Flight data

In [157]:
path = r"C:\Users\se.vi.dmitriev\Downloads\DS Materials\scripts\flights-larger.csv"
df = spark.read.csv(path, header=True, inferSchema=True, nullValue='NA')
print(df.count())
df.printSchema()
df.show(5)

275000
root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



In [158]:
df = df.drop('flight')
df.show(5)

+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+---+----+------+--------+-----+
only showing top 5 rows



In [159]:
df = df.filter(df['delay'].isNotNull())
df.show(5)

+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|
|  3| 28|  1|     B6|LGA|1076| 13.33|     182|   70|
+---+---+---+-------+---+----+------+--------+-----+
only showing top 5 rows



In [160]:
df_km = df.withColumn('km', round(df['mile'] * 1.60934, 0)).drop('mile')
df_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|
+---+---+---+-------+---+------+--------+-----+------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|
+---+---+---+-------+---+------+--------+-----+------+
only showing top 5 rows



In [161]:
df_km = df_km.withColumn('label', F.when(df_km['delay'] >= 15, 1).otherwise(0))
df_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [164]:
df_km_train, df_km_test = df_km.randomSplit([0.8, 0.2], seed=123)

org_indexer = StringIndexer(inputCol='org', outputCol='org_idx')
one_hot = OneHotEncoder(inputCols=['org_idx', 'dow'], outputCols=['org_dummy', 'dow_dummy'])
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'], outputCol='features')
grad_boost = GBTClassifier(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages=[org_indexer, one_hot, assembler, grad_boost])

grid = ParamGridBuilder().build()

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    numFolds=3,
                    seed=123)

cv_model = cv.fit(df_km_train)
predictions = cv_model.transform(df_km_test)
predictions.show(5)


+---+---+---+-------+---+------+--------+-----+------+-----+-------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|org_idx|    org_dummy|    dow_dummy|            features|       rawPrediction|         probability|prediction|
+---+---+---+-------+---+------+--------+-----+------+-----+-------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|  0|  1|  2|     AA|JFK| 14.92|     245|    6|2239.0|    0|    2.0|(7,[2],[1.0])|(6,[2],[1.0])|(14,[0,3,10],[223...|[-0.0917681676909...|[0.45424428643828...|       1.0|
|  0|  1|  2|     AA|LGA|   6.0|     240|   45|2235.0|    1|    3.0|(7,[3],[1.0])|(6,[2],[1.0])|(14,[0,4,10],[223...|[-0.1114909574302...|[0.44448435503825...|       1.0|
|  0|  1|  2|     AA|LGA| 15.33|     190|    4|1765.0|    0|    3.0|(7,[3],[1.0])|(6,[2],[1.0])|(14,[0,4,10],[176...|[-0.1527358629289...|[0.4242

In [165]:
evaluator.evaluate(predictions)

0.5837219243024792

In [166]:
cv_model.bestModel.stages[3].featureImportances

SparseVector(14, {0: 0.3028, 1: 0.0739, 2: 0.0749, 3: 0.1016, 4: 0.1177, 5: 0.0859, 6: 0.026, 7: 0.0127, 8: 0.0429, 9: 0.038, 10: 0.0131, 11: 0.0107, 12: 0.0273, 13: 0.0725})

In [115]:
df_km_train, df_km_test = df_km.randomSplit([0.8, 0.2], seed=123)

# Create Pipeline
org_indexer = StringIndexer(inputCol='org', outputCol='org_idx')
one_hot = OneHotEncoder(inputCols=['org_idx', 'dow'], outputCols=['org_dummy', 'dow_dummy'])
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'], outputCol='features')
regression = LinearRegression(featuresCol='features', labelCol='duration')

pipeline = Pipeline(stages=[org_indexer, one_hot, assembler, regression])

grid = ParamGridBuilder() \
            .addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
            .addGrid(regression.elasticNetParam, [0.0, 0.5, 1.0]) \
            .build()

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='duration', metricName='rmse')

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    numFolds=3,
                    seed=123)

cv_model = cv.fit(df_km_train)
predictions = cv_model.transform(df_km_test)


In [147]:
cv_model.bestModel.stages[3].explainParam('regParam')

'regParam: regularization parameter (>= 0). (default: 0.0, current: 0.01)'

In [152]:
cv_model.bestModel.stages[3].coefficients

DenseVector([0.0742, 28.2962, 20.24, 52.4901, 46.7533, 15.6219, 17.9826, 17.965, 0.0813, -0.1033, -0.0583, -0.0383, -0.0641, -0.1285])

In [126]:
cv_model.avgMetrics

[11.052413498108365,
 11.052659240793442,
 11.053492405495746,
 11.054343422161995,
 11.085017269097982,
 11.159554307857988,
 11.175207198941633,
 11.508601873691342,
 11.68709254875606,
 14.533931464708136,
 17.010321330716437,
 19.196764340956136]

In [127]:
predictions.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-------+-------------+-------------+--------------------+------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|org_idx|    org_dummy|    dow_dummy|            features|        prediction|
+---+---+---+-------+---+------+--------+-----+------+-----+-------+-------------+-------------+--------------------+------------------+
|  0|  1|  2|     AA|JFK| 14.92|     245|    6|2239.0|    0|    2.0|(7,[2],[1.0])|(6,[2],[1.0])|(14,[0,3,10],[223...|234.73261405569363|
|  0|  1|  2|     AA|LGA|   6.0|     240|   45|2235.0|    1|    3.0|(7,[3],[1.0])|(6,[2],[1.0])|(14,[0,4,10],[223...|228.69894491497925|
|  0|  1|  2|     AA|LGA| 15.33|     190|    4|1765.0|    0|    3.0|(7,[3],[1.0])|(6,[2],[1.0])|(14,[0,4,10],[176...|193.81736450835942|
|  0|  1|  2|     AA|LGA| 15.58|     160|   20|1180.0|    1|    3.0|(7,[3],[1.0])|(6,[2],[1.0])|(14,[0,4,10],[118...| 150.4009293213965|
|  0|  1|  2|     AA|LGA| 19.75|     170|

In [128]:
rmse = evaluator.evaluate(predictions)
print(rmse)

10.988867765829333


In [106]:
print(model.stages[3].intercept)

15.986938664677428


In [107]:
df_km.show()

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
|  5| 28|  6|     B6|ORD|  9.58|     130|   47|1191.0|    1|
|  1| 19|  2|     UA|SFO| 12.75|     123|  135|1093.0|    1|
|  8|  5|  5|     US|LGA|  13.0|      71|  -10| 344.0|    0|
|  5| 27|  5|     AA|ORD| 14.42|     195|  -11|1926.0|    0|
|  8| 20|  6|     B6|JFK| 14.67|     198|   20|1902.0|    1|
|  2|  3|  1|     AA|JFK| 15.92|     200|   -9|1754.0|    0|
|  8| 26|  5|     B6|JFK| 20.58|     193|  102|1654.0|    1|
|  4|  9|  5|     AA|ORD|  20.5|     125|   32|1180.0|    1|
|  3|  8|  2|     UA|ORD

In [9]:
df_km = StringIndexer(inputCol='carrier', outputCol='carrier_idx').fit(df_km).transform(df_km)
df_km = StringIndexer(inputCol='org', outputCol='org_idx').fit(df_km).transform(df_km)
df_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



In [53]:
# Create One Hot Encoding for 'org'
df_km = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'], dropLast=True) \
                    .fit(df_km) \
                    .transform(df_km)
df_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|    org_dummy|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|[10.0,10.0,1.0,2....|(7,[0],[1.0])|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|[11.0,22.0,1.0,2....|(7,[0],[1.0])|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|[2.0,14.0,5.0,4.0...|(7,[2],[1.0])|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|[5.0,25.0,3.0,3.0...|(7,[5],[1.0])|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|[3.0,28.0,1.0,4.0...|(7,[3],[1.0])|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+

In [10]:
df_km = VectorAssembler(
    inputCols=['mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'],
    outputCol='features'
).transform(df_km)

df_km.select('features', 'delay').show(10)

+--------------------+-----+
|            features|delay|
+--------------------+-----+
|[10.0,10.0,1.0,2....|   27|
|[11.0,22.0,1.0,2....|  -19|
|[2.0,14.0,5.0,4.0...|   60|
|[5.0,25.0,3.0,3.0...|   22|
|[3.0,28.0,1.0,4.0...|   70|
|[5.0,28.0,6.0,4.0...|   47|
|[1.0,19.0,2.0,0.0...|  135|
|[8.0,5.0,5.0,6.0,...|  -10|
|[5.0,27.0,5.0,1.0...|  -11|
|[8.0,20.0,6.0,4.0...|   20|
+--------------------+-----+
only showing top 10 rows



In [13]:
# Divide into train and test sets
df_km_train, df_km_test = df_km.randomSplit([0.8, 0.2], seed=17)
training_ratio = df_km_train.count() / df_km_test.count()
print(training_ratio)

3.9921529213939193


## Decision Tree

In [16]:
# Train the model
decision_tree = DecisionTreeClassifier(featuresCol='features', labelCol='label')
model_tree = decision_tree.fit(df_km_train)
prediction = model_tree.transform(df_km_test)
prediction.select('label', 'probability', 'prediction').show(5, truncate=False)

+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|1    |[0.5734357848518112,0.4265642151481888] |0.0       |
|1    |[0.5734357848518112,0.4265642151481888] |0.0       |
|0    |[0.37154355176634096,0.628456448233659] |1.0       |
|1    |[0.37154355176634096,0.628456448233659] |1.0       |
|0    |[0.47085201793721976,0.5291479820627802]|1.0       |
+-----+----------------------------------------+----------+
only showing top 5 rows



In [18]:
prediction.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-----------------+--------------------+----------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|    rawPrediction|         probability|prediction|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-----------------+--------------------+----------+
|  0|  1|  2|     AA|JFK|  6.58|     230|   50|2570.0|    1|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|  [2612.0,1943.0]|[0.57343578485181...|       0.0|
|  0|  1|  2|     AA|LGA|   6.0|     240|   45|2235.0|    1|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|  [2612.0,1943.0]|[0.57343578485181...|       0.0|
|  0|  1|  2|     AA|LGA|  11.5|     195|  -11|1765.0|    0|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|[35822.0,60592.0]|[0.37154355176634...|       1.0|
|  0|  1|  2|     AA|LGA| 20.42|     185|   31|1765.0|    1|        1.0|    3.0|[0.0,1.0

In [17]:
# Calculate the confusion matrix
prediction.groupby('label', 'prediction').count().show()
TP = prediction.filter('label=1 AND label=prediction').count()
TN = prediction.filter('label=0 AND label=prediction').count()
FP = prediction.filter('label=0 AND label<>prediction').count()
FN = prediction.filter('label=1 AND label<>prediction').count()

accuracy = (TP + TN) / (TP + TN + FP + FN)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 6856|
|    0|       0.0|13598|
|    1|       1.0|19259|
|    0|       1.0|12026|
+-----+----------+-----+

0.635052861477802


## Logistic Regression

In [20]:
model_logreg = LogisticRegression(featuresCol='features', labelCol='label').fit(df_km_train)
prediction_logreg = model_logreg.transform(df_km_test)
prediction_logreg.select('label', 'probability', 'prediction').show(5, truncate=False)

+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|1    |[0.5567959243275945,0.44320407567240544]|0.0       |
|1    |[0.5243884719960039,0.4756115280039961] |0.0       |
|0    |[0.4542830437458366,0.5457169562541633] |1.0       |
|1    |[0.3165134296412002,0.6834865703587998] |1.0       |
|0    |[0.5764246565510142,0.42357534344898573]|0.0       |
+-----+----------------------------------------+----------+
only showing top 5 rows



In [21]:
# Calculate the confusion matrix
prediction_logreg.groupby('label', 'prediction').count().show()
TP = prediction_logreg.filter('label=1 AND label=prediction').count()
TN = prediction_logreg.filter('label=0 AND label=prediction').count()
FP = prediction_logreg.filter('label=0 AND label<>prediction').count()
FN = prediction_logreg.filter('label=1 AND label<>prediction').count()

accuracy = (TP + TN) / (TP + TN + FP + FN)
print(f"Accuracy: {accuracy}")

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9455|
|    0|       0.0|14931|
|    1|       1.0|16660|
|    0|       1.0|10693|
+-----+----------+-----+

Accuracy: 0.6105838922282998


## Evaluate Model

In [25]:
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print(f"Precision: {precision:.2f} | Recall: {recall:.2f}")

# Weighted precision
multi_evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', 
                                                    labelCol='label', 
                                                    metricName='weightedPrecision')
weighted_precision = multi_evaluator.evaluate(prediction_logreg)
print(f"Weighted precision: {weighted_precision:.2f}")

# AUC
binary_evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',
                                                 labelCol='label', 
                                                 metricName='areaUnderROC')
AUC = binary_evaluator.evaluate(prediction_logreg)
print(f"AUC: {AUC:.2f}")

Precision: 0.61 | Recall: 0.64
Weighted precision: 0.61
AUC: 0.65


## Regression Model

In [83]:
df_km.show(5)
df_regr = df_km

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|    org_dummy|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|[10.0,10.0,1.0,2....|(7,[0],[1.0])|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|[11.0,22.0,1.0,2....|(7,[0],[1.0])|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|[2.0,14.0,5.0,4.0...|(7,[2],[1.0])|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|[5.0,25.0,3.0,3.0...|(7,[5],[1.0])|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|[3.0,28.0,1.0,4.0...|(7,[3],[1.0])|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+

In [84]:
buckets = Bucketizer(splits=[0.0, 3.0, 6.0, 9.0, 12.0, 15.0, 18.0, 21.0, 24.0],
                     inputCol='depart',
                     outputCol='depart_bucket')
df_regr = buckets.transform(df_regr)
df_regr.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|    org_dummy|depart_bucket|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+-------------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|[10.0,10.0,1.0,2....|(7,[0],[1.0])|          2.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|[11.0,22.0,1.0,2....|(7,[0],[1.0])|          2.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|[2.0,14.0,5.0,4.0...|(7,[2],[1.0])|          7.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|[5.0,25.0,3.0,3.0...|(7,[5],[1.0])|          4.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|[3.0,28.0,

In [85]:
df_regr = OneHotEncoder(inputCol='depart_bucket', outputCol='depart_dummy').fit(df_regr).transform(df_regr)
df_regr.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+-------------+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|    org_dummy|depart_bucket| depart_dummy|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+-------------+-------------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|[10.0,10.0,1.0,2....|(7,[0],[1.0])|          2.0|(7,[2],[1.0])|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|[11.0,22.0,1.0,2....|(7,[0],[1.0])|          2.0|(7,[2],[1.0])|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|[2.0,14.0,5.0,4.0...|(7,[2],[1.0])|          7.0|    (7,[],[])|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|[5.0,25.0,3.0,3.0...|(7,[5],[1.0])|          4.0|(7,[4],

In [86]:
df_regr.printSchema()

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- org: string (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- km: double (nullable = true)
 |-- label: integer (nullable = false)
 |-- carrier_idx: double (nullable = false)
 |-- org_idx: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- org_dummy: vector (nullable = true)
 |-- depart_bucket: double (nullable = true)
 |-- depart_dummy: vector (nullable = true)



In [87]:
df_regr = VectorAssembler(inputCols=['km', 'org_dummy', 'depart_dummy'], outputCol='features_regr').transform(df_regr)
df_regr_train, df_regr_test = df_regr.randomSplit([0.8, 0.2], seed=123)
prediction_model = LinearRegression(featuresCol='features_regr', labelCol='duration').fit(df_regr_train)
prediction = prediction_model.transform(df_regr_test)
prediction.show(5)


+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+-------------+-------------+--------------------+------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|    org_dummy|depart_bucket| depart_dummy|       features_regr|        prediction|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+-------------+-------------+-------------+--------------------+------------------+
|  0|  1|  2|     AA|JFK| 14.92|     245|    6|2239.0|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|(7,[2],[1.0])|          4.0|(7,[4],[1.0])|(15,[0,3,12],[223...|233.31585741170153|
|  0|  1|  2|     AA|LGA|   6.0|     240|   45|2235.0|    1|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|(7,[3],[1.0])|          2.0|(7,[2],[1.0])|(15,[0,4,10],[223...|226.48855602014007|
|  0|  1|  2|     AA|LGA| 15.33|     190|    4|1765.0|    0|        1.0|    3.0|

In [88]:
rmse = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='rmse').evaluate(prediction)
print(rmse)

174.5781016318243


In [89]:
print(f"Intercept: {prediction_model.intercept}")
print(f"Coefficients: {prediction_model.coefficients}")

Intercept: 10.132522248072398
Coefficients: [0.07429245674971649,27.30668249486161,20.1718880481367,51.905644634781815,46.04651215996389,15.243799792732002,17.44111102148055,17.66400952456524,-14.353146739270645,0.7997661451820286,4.265880776487429,7.2705918137934145,4.9368798662321005,9.093981173471848,9.115793883754197]


## SMS data

In [26]:
path = r"C:\Users\se.vi.dmitriev\Downloads\DS Materials\scripts\SMSSpamCollection"
schema = StructType([
    StructField('label', StringType()),
    StructField('text', StringType())
])

df = spark.read.csv(path, schema=schema, sep='\t', header=False, inferSchema=True, nullValue='NA')
df.printSchema()

root
 |-- label: string (nullable = true)
 |-- text: string (nullable = true)



In [29]:
df = df.withColumn('label', F.when(df['label']=='ham', 0).otherwise(1))
df.show(5)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|Go until jurong p...|
|    0|Ok lar... Joking ...|
|    1|Free entry in 2 a...|
|    0|U dun say so earl...|
|    0|Nah I don't think...|
+-----+--------------------+
only showing top 5 rows



In [39]:
df_prep = df.withColumn('tokens', regexp_replace(df['text'], '[_():;,.!?\\-]', ' '))
df_prep = df_prep.withColumn('tokens', regexp_replace(df_prep['tokens'], '\d+', ' '))
# Merge multiple spaces
df_prep = df_prep.withColumn('tokens', regexp_replace(df_prep['tokens'], '\s+', ' '))
df_prep = Tokenizer(inputCol='tokens', outputCol='words').transform(df_prep)
df_prep.show()


+-----+--------------------+--------------------+--------------------+
|label|                text|              tokens|               words|
+-----+--------------------+--------------------+--------------------+
|    0|Go until jurong p...|Go until jurong p...|[go, until, juron...|
|    0|Ok lar... Joking ...|Ok lar Joking wif...|[ok, lar, joking,...|
|    1|Free entry in 2 a...|Free entry in a w...|[free, entry, in,...|
|    0|U dun say so earl...|U dun say so earl...|[u, dun, say, so,...|
|    0|Nah I don't think...|Nah I don't think...|[nah, i, don't, t...|
|    1|FreeMsg Hey there...|FreeMsg Hey there...|[freemsg, hey, th...|
|    0|Even my brother i...|Even my brother i...|[even, my, brothe...|
|    0|As per your reque...|As per your reque...|[as, per, your, r...|
|    1|WINNER!! As a val...|WINNER As a value...|[winner, as, a, v...|
|    1|Had your mobile 1...|Had your mobile m...|[had, your, mobil...|
|    0|I'm gonna be home...|I'm gonna be home...|[i'm, gonna, be, ...|
|    1

In [45]:
# Remove stop words
df_prep = StopWordsRemover(inputCol='words', outputCol='words_cleaned').transform(df_prep)
df_prep = HashingTF(numFeatures=1024, inputCol='words_cleaned', outputCol='words_hash').transform(df_prep)
tf_idf = IDF(inputCol='words_hash', outputCol='features').fit(df_prep).transform(df_prep)
tf_idf.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|                text|              tokens|               words|       words_cleaned|          words_hash|            features|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    0|Go until jurong p...|Go until jurong p...|[go, until, juron...|[go, jurong, poin...|(1024,[12,171,191...|(1024,[12,171,191...|
|    0|Ok lar... Joking ...|Ok lar Joking wif...|[ok, lar, joking,...|[ok, lar, joking,...|(1024,[3,493,565,...|(1024,[3,493,565,...|
|    1|Free entry in 2 a...|Free entry in a w...|[free, entry, in,...|[free, entry, wkl...|(1024,[16,24,35,5...|(1024,[16,24,35,5...|
|    0|U dun say so earl...|U dun say so earl...|[u, dun, say, so,...|[u, dun, say, ear...|(1024,[44,168,210...|(1024,[44,168,210...|
|    0|Nah I don't think...|Nah I don't think...|[nah, i, don'

In [46]:
# Split data
tf_idf_train, tf_idf_test = tf_idf.randomSplit([0.8, 0.2], seed=123)

# Train and evaluate model
predict_spam = LogisticRegression(featuresCol='features', labelCol='label').fit(tf_idf_train).transform(tf_idf_test)
weighted_precision = MulticlassClassificationEvaluator(predictionCol='prediction', 
                                                       labelCol='label', 
                                                       metricName='weightedPrecision') \
                                                    .evaluate(predict_spam)
print(f"Weighted precision: {weighted_precision:.2f}")

Weighted precision: 0.96


In [48]:
# Calculate confusion matrix
predict_spam.groupby('label', 'prediction').count().show()
TP = predict_spam.filter('label=1 AND label=prediction').count()
FP = predict_spam.filter('label=0 AND label<>prediction').count()
precision = TP / (TP + FP)
print(f"Precision: {precision:.2f}")

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|  942|
|    0|       1.0|   25|
|    1|       1.0|  129|
|    1|       0.0|   17|
+-----+----------+-----+

Precision: 0.84


In [43]:
# Terminate the session
spark.stop()