In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Streaming & GraphFrames:
- Define schemas one for each folder of the provided data <b>VertFinalExam</b> and <b>EdgesFinalExam</b>
- The data you have contains two dataframes one for graphframe vertices and the other for graphframe edges.                                
- Create two emply folders that you will use as a streaming reading sources.
- Create a streaming reader to read streaming data from the reading sources:
    - You have two folders. So, you will need to create two streaming dataframes each dataframe should read from each folder.
    - For example The first streaming dataframe e.g. <b>df1</b> should read from the first streaming data source folder e.g. <b>MyFirstFolder</b> and the second streaming dataframe e.g. <b>df2</b> should read from the second streaming data source folder e.g. <b>MySecondFolder</b>
- Now you should have two streaming dataframes one for Edges and one for Vertices.
- For the streaming Edges dataframe create a new column to indicate delay categories as follow:
    - Early: for early delays (-ve delay values).
    - Late: for delayed flights (+ve delay values).
    - OnTime: for on time flights (0 delay values).
- For the streaming Vertices dataframe remove all rows that contain state as an emplty string <b>state=""</b>.
- Create a writer for the final streaming Edges dataframe to write the streaming data in writing sink in a parquet fromat.
- Create a writer for the final streaming Vertices dataframe to write the streaming data in writing sink in a parquet fromat.
- Start a query for the Edges writer. Copy and paste your <b>EdgesFinalExam</b> data to the edges streaming reading source. Wait to make sure the writing sink folder contains all data. Then stop the query.
- Start a query for the Vertices writer. Copy and paste your <b>VertFinalExam</b> data to the vertices streaming reading source. Wait to make sure the writing sink folder contains all data. Then stop the query.
- Using <b>spark.read()</b>:
    - Read the vertices data from the writing sink directory into static Vertices dataframe.
    - Read the edges data from the writing sink directory into static Edges dataframe.
- Create a <b>GraphFrame</b> from these data.
- Apply <b>PageRank</b> algorithm to find the most <b>10</b> important Vertices. Order the results based on the rank in descending order. <b>Use maxIter=5</b>

## Machine Learning:
- In this part you will use the Edges dataframe you used for GraphFrame creation in the previous part.
- Convert the three dealy categories of the Edges dataframe into integers (0,1,2).
- Split the data 80% train and 20% test.
- Your task is to predict the delay category using any <b>Classifier</b> of your choice.
- Prepare your data as needed.
- Perform the required features engineering as needed.
- All your steps should be in a pipeline.
- Train your model on the trainig data and test on the test data.
- You should obtain at least <b>0.5 f1-score</b> and <b>0.6 accuracy</b>.
#### Note: Model training will take sometime. Do not worry.

# Good Luck

In [3]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://bitbucket.org/habedi/datasets/raw/b6769c4664e7ff68b001e2f43bc517888cbe3642/spark/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!rm -rf spark-3.0.2-bin-hadoop2.7.tgz*
!pip -q install findspark pyspark graphframes
!wget https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar -P /content/spark-3.0.2-bin-hadoop2.7/jars/
!cp /content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar /content/spark-3.0.2-bin-hadoop2.7/graphframes-0.8.2-spark3.0-s_2.12.zip
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"]

os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"
import findspark
findspark.init()

!export PYSPARK_SUBMIT_ARGS="--master local[*] pyspark-shell"
!export PYSPARK_DRIVER_PYTHON=jupyter
!export PYSPARK_DRIVER_PYTHON_OPTS=notebook
from pyspark.sql import SparkSession
from graphframes import *

spark = SparkSession.builder.master("local[*]").appName("GraphFrames").getOrCreate()
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"


--2024-06-26 10:46:24--  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 52.84.125.113, 52.84.125.99, 52.84.125.73, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|52.84.125.113|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 247882 (242K) [binary/octet-stream]
Saving to: ‘/content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar.2’


2024-06-26 10:46:24 (5.32 MB/s) - ‘/content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar.2’ saved [247882/247882]



In [63]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import *

In [5]:
edgeSchema = StructType([StructField('tripid', IntegerType(), True),
                           StructField('delay', IntegerType(), True),
                           StructField('distance', IntegerType(), True),
                           StructField('src', StringType(), True),
                           StructField('dst', StringType(), True)])

In [6]:
vertSchema = StructType([StructField('id', StringType(), True),
                           StructField('City', StringType(), True),
                           StructField('State', StringType(), True),
                           StructField('Country', StringType(), True)
                           ])

In [7]:
!mkdir MyFirstFolder
!mkdir MySecondFolder


In [8]:
!mkdir MyFirstFolder_OUT_EDGE
!mkdir MySecondFolder_OUT_VERT

In [9]:
df_edge = spark.readStream.format("parquet") \
    .schema(edgeSchema) \
    .load("/content/MyFirstFolder")

In [10]:
df_vert = spark.readStream.format("parquet") \
    .schema(vertSchema) \
    .load("/content/MySecondFolder")

In [11]:
df_edge_cat = df_edge.withColumn("delay_category", when(col("delay") < 0, "Early"). \
                        when(col("delay") > 0, "Late"). \
                        when(col("delay") == 0, "OnTime") )

In [12]:
df_vert_rm = df_vert.filter(col("State") != "")

In [13]:
writer_edge = df_edge.writeStream.outputMode("append") \
    .format("parquet")  \
    .option("path", "/content/MyFirstFolder_OUT_EDGE") \
    .option("checkpointLocation", "chkpnt1") \
    .trigger(processingTime='4 seconds')

In [14]:
writer_vert = df_vert.writeStream.outputMode("append") \
    .format("parquet")  \
    .option("path", "/content/MySecondFolder_OUT_VERT") \
    .option("checkpointLocation", "chkpnt2") \
    .trigger(processingTime='4 seconds')

In [15]:
query_edge = writer_edge.start()

In [16]:
query_edge.stop()

In [17]:
query_vert = writer_vert.start()


In [18]:
query_vert.stop()

In [19]:
df_edge_static = spark.read.format("parquet") \
    .schema(edgeSchema) \
    .load("/content/MyFirstFolder_OUT_EDGE")

In [20]:
df_edge_static.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)



In [23]:
df_edge_static.show()

+-------+-----+--------+---+---+--------------+
| tripid|delay|distance|src|dst|delay_category|
+-------+-----+--------+---+---+--------------+
|1010630|  -10|     928|RSW|EWR|         Early|
|1021029|   87|     974|RSW|ORD|          Late|
|1021346|    0|     928|RSW|EWR|        OnTime|
|1021044|   18|     928|RSW|EWR|          Late|
|1021730|   29|     748|RSW|IAH|          Late|
|1020535|  605|     974|RSW|ORD|          Late|
|1021820|   71|     974|RSW|ORD|          Late|
|1021743|    0|     928|RSW|EWR|        OnTime|
|1022017|    0|     928|RSW|EWR|        OnTime|
|1020600|   -2|     748|RSW|IAH|         Early|
|1021214|   29|     891|RSW|CLE|          Late|
|1020630|   -5|     928|RSW|EWR|         Early|
|1031029|   13|     974|RSW|ORD|          Late|
|1031346|  279|     928|RSW|EWR|          Late|
|1031740|   29|     748|RSW|IAH|          Late|
|1030535|    0|     974|RSW|ORD|        OnTime|
|1031808|   -3|     974|RSW|ORD|         Early|
|1031516|   -2|    1396|RSW|DEN|        

In [24]:
df_vert_static = spark.read.format("parquet") \
    .schema(vertSchema) \
    .load("/content/MySecondFolder_OUT_VERT")

In [29]:
df_vert_static.show()

+---+-------------+-----+-------+
| id|         City|State|Country|
+---+-------------+-----+-------+
|ABE|    Allentown|   PA|    USA|
|ABI|      Abilene|   TX|    USA|
|ABQ|  Albuquerque|   NM|    USA|
|ABR|     Aberdeen|   SD|    USA|
|ABY|       Albany|   GA|    USA|
|ACK|    Nantucket|   MA|    USA|
|ACT|         Waco|   TX|    USA|
|ACV|       Eureka|   CA|    USA|
|ACY|Atlantic City|   NJ|    USA|
|ADQ|       Kodiak|   AK|    USA|
|AEX|   Alexandria|   LA|    USA|
|AGS|      Augusta|   GA|    USA|
|AHN|       Athens|   GA|    USA|
|AIA|     Alliance|   NE|    USA|
|AKN|  King Salmon|   AK|    USA|
|ALB|       Albany|   NY|    USA|
|ALO|     Waterloo|   IA|    USA|
|ALS|      Alamosa|   CO|    USA|
|ALW|  Walla Walla|   WA|    USA|
|AMA|     Amarillo|   TX|    USA|
+---+-------------+-----+-------+
only showing top 20 rows



In [30]:
graph = GraphFrame(df_vert_static, df_edge_static)

In [32]:
ranks = graph.pageRank(maxIter=5)
results = ranks.vertices
results.show()

+---+-------------+-----+-------+-------------------+
| id|         City|State|Country|           pagerank|
+---+-------------+-----+-------+-------------------+
|YUY|Rouyn-Noranda|   PQ| Canada| 0.2814969239277225|
|MLS|   Miles City|   MT|    USA| 0.2814969239277225|
|BTM|        Butte|   MT|    USA|0.33736459210901926|
|SCE|State College|   PA|    USA|0.32687029830292075|
|OGS|   Ogdensburg|   NY|    USA| 0.2814969239277225|
|ATW|     Appleton|   WI|    USA| 0.4358828773204816|
|ALO|     Waterloo|   IA|    USA| 0.3266002186935446|
|HTS|   Huntington|   WV|    USA| 0.2814969239277225|
|PNS|    Pensacola|   FL|    USA| 0.7794786797678428|
|YVR|    Vancouver|   BC| Canada| 0.2814969239277225|
|YAK|      Yakutat|   AK|    USA| 0.7775114233416804|
|DLG|   Dillingham|   AK|    USA| 0.2814969239277225|
|SNA|Orange County|   CA|    USA|  2.631315509431739|
|BHB|   Bar Harbor|   ME|    USA| 0.2814969239277225|
|IMT|Iron Mountain|   MI|    USA|0.30393461597531574|
|MFR|      Medford|   OR|   

In [33]:
results_ranked = results.orderBy("pagerank", ascending=False).limit(10)
results_ranked.show()

+---+--------------+-----+-------+------------------+
| id|          City|State|Country|          pagerank|
+---+--------------+-----+-------+------------------+
|ATL|       Atlanta|   GA|    USA|31.402169285067313|
|DFW|        Dallas|   TX|    USA| 22.76415219751248|
|ORD|       Chicago|   IL|    USA| 21.83241348762772|
|DEN|        Denver|   CO|    USA|16.026921025779515|
|LAX|   Los Angeles|   CA|    USA|14.358865452635795|
|IAH|       Houston|   TX|    USA|13.229634347806075|
|SFO| San Francisco|   CA|    USA|11.322517232690489|
|PHX|       Phoenix|   AZ|    USA|10.852423159730376|
|SLC|Salt Lake City|   UT|    USA| 9.622759351860472|
|LAS|     Las Vegas|   NV|    USA| 8.778471071473987|
+---+--------------+-----+-------+------------------+



In [91]:
df_edge_static.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- delay_category: string (nullable = true)



In [36]:
df_edge_staic_int = df_edge_static.withColumn("delay_category", when(col("delay_category") == "Early",0). \
                        when(col("delay_category") =="Late",1). \
                        when(col("delay_category") == "OnTime",2) )

In [38]:
df_edge_staic_int.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- delay_category: integer (nullable = true)



In [41]:

df_edge_staic_int.select([count(when(col(c).isNull(), c)).alias(c) for c in df_edge_staic_int.columns]
   ).show()

+------+-----+--------+---+---+--------------+
|tripid|delay|distance|src|dst|delay_category|
+------+-----+--------+---+---+--------------+
|     0|    0|       0|  0|  0|             0|
+------+-----+--------+---+---+--------------+



In [113]:
df_edge_staic_int = df_edge_staic_int.drop("delay")

In [93]:
train_data, test_data = df_edge_staic_int.randomSplit([0.80, 0.20], seed=1234)

In [94]:
stCols = [k for (k,v) in train_data.dtypes if v=='string']
stCols

['src', 'dst']

In [95]:
stColsInd = [k + '_ind' for (k,v) in train_data.dtypes if v=='string']
stColsInd

['src_ind', 'dst_ind']

In [96]:
OHEcols = [k + '_OHE' for (k,v) in train_data.dtypes if v=='string']
OHEcols

['src_OHE', 'dst_OHE']

In [97]:
numcols = [k  for (k,v) in train_data.dtypes if ((v=='int') & (k!='delay_category'))]
numcols

['tripid', 'distance']

In [98]:
stind = StringIndexer(inputCols=stCols,outputCols=stColsInd,handleInvalid='skip')

In [99]:
OHE = OneHotEncoder(inputCols=stColsInd,outputCols=OHEcols)

In [100]:
All_Col = numcols + OHEcols
All_Col

['tripid', 'distance', 'src_OHE', 'dst_OHE']

In [101]:
vecAssemb = VectorAssembler(inputCols=All_Col,outputCol='features')

In [102]:
model = RandomForestClassifier(labelCol="delay_category", featuresCol="features")


In [103]:
pipeline = Pipeline(stages=[stind, OHE, vecAssemb, model])

In [104]:
model_fitting = pipeline.fit(train_data)

In [105]:
predictions = model_fitting.transform(test_data)

In [106]:
predictions.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- delay_category: integer (nullable = true)
 |-- src_ind: double (nullable = false)
 |-- dst_ind: double (nullable = false)
 |-- src_OHE: vector (nullable = true)
 |-- dst_OHE: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [107]:
predictions.select("features", "delay_category", "prediction","probability").show(5)

+--------------------+--------------+----------+--------------------+
|            features|delay_category|prediction|         probability|
+--------------------+--------------+----------+--------------------+
|(558,[0,1,9,257],...|             2|       0.0|[0.47009469483003...|
|(558,[0,1,9,278],...|             0|       0.0|[0.46529704420883...|
|(558,[0,1,28,256]...|             0|       0.0|[0.47117251419272...|
|(558,[0,1,97,258]...|             2|       0.0|[0.46529704420883...|
|(558,[0,1,41,260]...|             1|       0.0|[0.46269852225867...|
+--------------------+--------------+----------+--------------------+
only showing top 5 rows



In [108]:
evaluator_f1 = MulticlassClassificationEvaluator(predictionCol='prediction',
                                              labelCol='delay_category', metricName='f1')

In [109]:
evaluator_accuracy = MulticlassClassificationEvaluator(predictionCol='prediction',
                                              labelCol='delay_category', metricName='accuracy')

In [110]:
f1 = evaluator_f1.evaluate(predictions)
print("f1= ",f1)

f1=  0.3885913156508042


In [111]:
accuracy = evaluator_accuracy.evaluate(predictions)
print("accuracy= ",accuracy)

accuracy=  0.5017819215954498
