<a href="https://colab.research.google.com/github/erencsknn/classification_project_with_spark/blob/main/spark_exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark import SparkContext

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

In [None]:
csv = spark.read.csv("flights.csv",inferSchema=True, header=True)
csv.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [None]:
data = csv.select("DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID",
"DepDelay", ((col("ArrDelay") > 15).cast("int").alias("Late")))
data.show()

+----------+---------+---------------+-------------+--------+----+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|Late|
+----------+---------+---------------+-------------+--------+----+
|        19|        5|          11433|        13303|      -3|   0|
|        19|        5|          14869|        12478|       0|   0|
|        19|        5|          14057|        14869|      -4|   0|
|        19|        5|          15016|        11433|      28|   1|
|        19|        5|          11193|        12892|      -6|   0|
|        19|        5|          10397|        15016|      -1|   0|
|        19|        5|          15016|        10397|       0|   0|
|        19|        5|          10397|        14869|      15|   1|
|        19|        5|          10397|        10423|      33|   1|
|        19|        5|          11278|        10397|     323|   1|
|        19|        5|          14107|        13487|      -7|   0|
|        19|        5|          11433|        11298|      22| 

In [None]:
type(data.columns[:-1])

list

In [None]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 604462  Testing Rows: 260064


In [None]:
assembler = VectorAssembler(inputCols=data.columns[:-1],outputCol="features")
training = assembler.transform(train).select(col("features"),col("Late").alias("label"))
training.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    1|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    1|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    1|
|[1.0,1.0,10140.0,...|    0|
|[1.0,1.0,10140.0,...|    0|
+--------------------+-----+
only showing top 20 rows



In [None]:
lr = LogisticRegression(labelCol="label",featuresCol="features")
model = lr.fit(training)

In [None]:
testing = assembler.transform(test).select(col("features"),col("Late").alias("TrueLabel"))
testing.show()

+--------------------+---------+
|            features|TrueLabel|
+--------------------+---------+
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        1|
|[1.0,1.0,10140.0,...|        1|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        1|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
+--------------------+---------+
only showing top 20 rows



In [None]:
prediction = model.transform(testing)
predicted = prediction.select("features","prediction","probability","TrueLabel")
predicted.show(100,truncate=False)

+-------------------------------+----------+------------------------------------------+---------+
|features                       |prediction|probability                               |TrueLabel|
+-------------------------------+----------+------------------------------------------+---------+
|[1.0,1.0,10140.0,11259.0,-3.0] |0.0       |[0.959851430528415,0.04014856947158496]   |0        |
|[1.0,1.0,10140.0,11259.0,-1.0] |0.0       |[0.9479904547236673,0.052009545276332725] |0        |
|[1.0,1.0,10140.0,11259.0,21.0] |1.0       |[0.47973543059992424,0.5202645694000758]  |1        |
|[1.0,1.0,10140.0,11259.0,35.0] |1.0       |[0.12131623251366645,0.8786837674863336]  |1        |
|[1.0,1.0,10140.0,11292.0,-4.0] |0.0       |[0.9648198508577102,0.03518014914228984]  |0        |
|[1.0,1.0,10140.0,11292.0,0.0]  |0.0       |[0.9409724931418645,0.059027506858135514] |0        |
|[1.0,1.0,10140.0,11292.0,2.0]  |0.0       |[0.9239759159330253,0.0760240840669747]   |0        |
|[1.0,1.0,10140.0,11

In [None]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
("TP", tp),
("FP", fp),
("TN", tn),
("FN", fn),
("Precision", tp / (tp + fp)),
("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           37279.0|
|       FP|            4225.0|
|       TN|          203226.0|
|       FN|           15334.0|
|Precision|0.8982025828835775|
|   Recall|0.7085511185448463|
+---------+------------------+

