In [1]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
import findspark
import pyspark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read lines from a file stream").getOrCreate()

24/09/09 20:03:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
from pyspark.sql.functions import *
import pyspark.sql.functions as f

#### Define schemas one for each folder of the provided data VertFinalExam and EdgesFinalExam

In [5]:
from pyspark.sql.types import (StructType, StructField, StringType, IntegerType)

In [6]:
vertSchema = StructType([StructField('id', StringType(), True),
                           StructField('City', StringType(), True),
                           StructField('State', StringType(), True),
                           StructField('Country', StringType(), True)])

In [7]:
edgSchema = StructType([StructField('tripid', IntegerType(), True),
                        StructField('delay', IntegerType(), True),
                        StructField('distance', IntegerType(), True),
                        StructField('src', StringType(), True),
                        StructField('dst', StringType(), True)])

#### Create a streaming reader to read streaming data from the reading sources:

In [8]:
vert_df = spark.readStream.format("parquet")\
.schema(vertSchema)\
.load("/home/wick/Big_Data/final_project/VertFinalExam/")

In [9]:
vert_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)



In [10]:
edg_df = spark.readStream.format("parquet")\
.schema(edgSchema)\
.load("/home/wick/Big_Data/final_project/EdgesFinalExam/")

In [11]:
edg_df.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)



#### For the streaming Edges dataframe create a new column to indicate delay categories as follow:

    Early: for early delays (-ve delay values).
    Late: for delayed flights (+ve delay values).
    OnTime: for on time flights (0 delay values).


In [12]:
edg_df1 = edg_df.withColumn(
                              "delay_category",
                              when(col("delay") < 0, "Early")
                             .when(col("delay") > 0, "Late")
                             .otherwise("OnTime"))


In [13]:
edg_df1.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- delay_category: string (nullable = false)



#### For the streaming Vertices dataframe remove all rows that contain state as an emplty string state=""

In [14]:
vert_df1 = vert_df.filter(col("State") != "")
#vert_df1 = vert_df.where(col("State") != "")

#### Create a writer for the final streaming Edges dataframe to write the streaming data in writing sink in a parquet fromat.

In [15]:
edg_writer = edg_df1.writeStream.outputMode("append")\
                .format("parquet")\
                .option("path", "/home/wick/Big_Data/final_project/first_folder/")\
                .option("checkpointLocation", "/home/wick/Big_Data/final_project/edg_chkpnt/")

#### Create a writer for the final streaming Vertices dataframe to write the streaming data in writing sink in a parquet fromat.

In [16]:
vert_writer = vert_df1.writeStream.outputMode("append")\
                .format("parquet")\
                .option("path", "/home/wick/Big_Data/final_project/second_folder/")\
                .option("checkpointLocation", "/home/wick/Big_Data/final_project/vert_chkpnt/")

#### Start a query for the Edges writer. Copy and paste your EdgesFinalExam data to the edges streaming reading source. Wait to make sure the writing sink folder contains all data. Then stop the query.

In [17]:
edg_query = edg_writer.start()

24/09/09 20:04:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [18]:
edg_query.stop()

#### Start a query for the Vertices writer. Copy and paste your VertFinalExam data to the vertices streaming reading source. Wait to make sure the writing sink folder contains all data. Then stop the query.

In [19]:
vert_query = vert_writer.start()

24/09/09 20:04:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [20]:
vert_query.stop()

#### Using spark.read():

    Read the vertices data from the writing sink directory into static Vertices dataframe.
    Read the edges data from the writing sink directory into static Edges dataframe.


In [21]:
df_verts = spark.read.format('parquet')\
.load('/home/wick/Big_Data/final_project/second_folder/')

                                                                                

In [22]:
df_edgs = spark.read.format('parquet')\
.load('/home/wick/Big_Data/final_project/first_folder/')

In [23]:
df_verts.show(5)

                                                                                

+---+-----------+-----+-------+
| id|       City|State|Country|
+---+-----------+-----+-------+
|ABE|  Allentown|   PA|    USA|
|ABI|    Abilene|   TX|    USA|
|ABQ|Albuquerque|   NM|    USA|
|ABR|   Aberdeen|   SD|    USA|
|ABY|     Albany|   GA|    USA|
+---+-----------+-----+-------+
only showing top 5 rows



In [24]:
df_edgs.show(5)

+-------+-----+--------+---+---+--------------+
| tripid|delay|distance|src|dst|delay_category|
+-------+-----+--------+---+---+--------------+
|1010630|  -10|     928|RSW|EWR|         Early|
|1021029|   87|     974|RSW|ORD|          Late|
|1021346|    0|     928|RSW|EWR|        OnTime|
|1021044|   18|     928|RSW|EWR|          Late|
|1021730|   29|     748|RSW|IAH|          Late|
+-------+-----+--------+---+---+--------------+
only showing top 5 rows



In [40]:
df_edgs.printSchema()

root
 |-- tripid: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- delay_category: string (nullable = false)



#### Create a GraphFrame from these data.

In [25]:
from graphframes import GraphFrame


In [26]:
GF = GraphFrame(df_verts, df_edgs)

#### Apply PageRank algorithm to find the most 10 important Vertices. Order the results based on the rank in descending order. 
#### Use maxIter=5

In [27]:
pagerank = GF.pageRank(resetProbability=0.15, maxIter=5)

vertices = pagerank.vertices

top_10_vertices = vertices.orderBy(vertices.pagerank.desc()).limit(10)

top_10_vertices.show()

                                                                                

+---+--------------+-----+-------+------------------+
| id|          City|State|Country|          pagerank|
+---+--------------+-----+-------+------------------+
|ATL|       Atlanta|   GA|    USA|31.402169285067313|
|DFW|        Dallas|   TX|    USA| 22.76415219751248|
|ORD|       Chicago|   IL|    USA| 21.83241348762772|
|DEN|        Denver|   CO|    USA|16.026921025779515|
|LAX|   Los Angeles|   CA|    USA|14.358865452635795|
|IAH|       Houston|   TX|    USA|13.229634347806075|
|SFO| San Francisco|   CA|    USA|11.322517232690489|
|PHX|       Phoenix|   AZ|    USA|10.852423159730376|
|SLC|Salt Lake City|   UT|    USA| 9.622759351860472|
|LAS|     Las Vegas|   NV|    USA| 8.778471071473987|
+---+--------------+-----+-------+------------------+



# Machine Learning:

### Convert the three dealy categories of the Edges dataframe into integers (0,1,2).

In [34]:
df_edgs_2 = df_edgs.withColumn(
    "delay_category_int",
    when(col("delay_category") == "Early", 0)
    .when(col("delay_category") == "Late", 1)
    .otherwise(2)
)

### Split the data 80% train and 20% test.

In [35]:
# Import the necessary function
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Randomly split the data
train_data, test_data = df_edgs_2.randomSplit([0.8, 0.2], seed=1234)



### Your task is to predict the delay category using any Classifier of your choice.
### Prepare your data as needed.
### Perform the required features engineering as needed.

In [42]:
from pyspark.ml.feature import VectorAssembler

# Prepare feature columns
feature_cols = ['delay', 'distance']

# VectorAssembler to combine feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

assembled_train_data = assembler.transform(train_data)

In [43]:
from pyspark.ml.classification import LogisticRegression

# Define the classifier
lr = LogisticRegression(featuresCol='features', labelCol='delay_category_int')


### All your steps should be in a pipeline.

In [44]:
# Define the pipeline
pipeline = Pipeline(stages=[assembler, lr])


In [45]:
# Train the model
model = pipeline.fit(train_data)

                                                                                

### Train your model on the trainig data and test on the test data.
### You should obtain at least 0.5 f1-score and 0.6 accuracy.

In [46]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='delay_category_int', predictionCol='prediction', metricName='f1')
f1_score = evaluator.evaluate(predictions)

accuracy_evaluator = MulticlassClassificationEvaluator(labelCol='delay_category_int', predictionCol='prediction', metricName='accuracy')
accuracy = accuracy_evaluator.evaluate(predictions)

print(f"F1 Score: {f1_score}")
print(f"Accuracy: {accuracy}")




F1 Score: 1.0
Accuracy: 1.0


                                                                                