In [1]:
# Importing Spark Session 
from pyspark.sql import SparkSession

In [None]:
# Creating a Spark session Object
spark = SparkSession. \
    builder. \
    config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1'). \
    config('spark.ui.port', '0'). \
    config('spark.sql.warehouse.dir', f'/user/warehouse'). \
    enableHiveSupport(). \
    appName(f' Python - Kafka and Spark Integration for Machine Learning'). \
    master('yarn'). \
    getOrCreate()

In [3]:
# Creating a variable to store Bootstrap server details
kafka_bootstrap_servers = 'localhost:9092'

In [4]:
# Creating a Spark Readstream object
df_cta_readstream = spark. \
  readStream. \
  format('kafka'). \
  option('kafka.bootstrap.servers', kafka_bootstrap_servers). \
  option('subscribe', 'cta_topic_kc'). \
  load()

In [5]:
# Creating a query name object to read the data from the Readstream
df_cta_readstream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"). \
    writeStream. \
    format("memory"). \
    queryName("df_cta_sql_ml"). \
    start()

24/03/02 17:26:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0fffc591-5cea-477d-afb8-f27fc8d35f46. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/02 17:26:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f6284a7b3a0>

[Stage 0:>                                                          (0 + 1) / 1]

In [55]:
# Checking the count of the streaming data 
spark.sql('SELECT count(1) FROM df_cta_sql_ml').show()

+--------+
|count(1)|
+--------+
|    5530|
+--------+



In [56]:
# Checking the samples of the streaming data 
spark.sql('SELECT * FROM df_cta_sql_ml').show(truncate=False)

+----+-------------------------------------------------------------------------------------------------------------------+
|key |value                                                                                                              |
+----+-------------------------------------------------------------------------------------------------------------------+
|NULL|g,001,30057,Ashland/63rd,41160,Clinton,2024-03-02T11:26:17,2024-03-02T11:27:17,0,41.88566,-87.64782                |
|NULL|g,004,30004,Harlem/Lake,41350,Oak Park,2024-03-02T11:26:16,2024-03-02T11:27:16,0,41.88707,-87.78872                |
|NULL|g,005,30004,Harlem/Lake,41400,Roosevelt,2024-03-02T11:25:40,2024-03-02T11:29:40,0,41.86744,-87.62659               |
|NULL|g,006,30004,Harlem/Lake,40030,Pulaski,2024-03-02T11:25:06,2024-03-02T11:27:06,0,41.8849,-87.71652                  |
|NULL|g,007,30057,Ashland/63rd,41080,47th,2024-03-02T11:26:18,2024-03-02T11:27:18,0,41.81284,-87.61892                   |
|NULL|g,601,3000

In [11]:
# Importing required SQL functions
from pyspark.sql.functions import lit, date_format, to_date, split, substring,unix_timestamp, from_unixtime

                                                                                

### Writing ML streaming data to HDFS

In [57]:
#!hdfs dfs -rm -R -skipTrash /final_project_hdfs/ml

In [58]:
# Writing the data into HDFS in a new path /final_project_hdfs/ml/data
df_cta_readstream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"). \
    withColumn('route_color', split('value', ',')[0]). \
    withColumn('run_number', split('value', ',')[1]). \
    withColumn('dest_name', split('value', ',')[3]). \
    withColumn('next_station_id', split('value', ',')[4]). \
    withColumn('next_station_name', split('value', ',')[5]). \
    withColumn('lat', split('value', ',')[9]). \
    withColumn('lon', split('value', ',')[10]). \
    withColumn('transit_date', to_date(split('value', ',')[6], "yyyy-MM-dd'T'HH:mm:ss")). \
    withColumn('year', date_format('transit_date', 'yyyy')). \
    withColumn('month', date_format('transit_date', 'MM')). \
    withColumn('dayofmonth', date_format('transit_date', 'dd')). \
    withColumn('hour', date_format('transit_date', 'HH')). \
    withColumn('minute', date_format('transit_date', 'mm')). \
    withColumn('is_delayed', split('value', ',')[8]). \
    writeStream. \
    format('csv'). \
    option("checkpointLocation", '/final_project_hdfs/ml/checkpoint'). \
    option('path', '/final_project_hdfs/ml/data'). \
    option('header',True). \
    option('sep','|'). \
    trigger(processingTime='10 seconds'). \
    start()

24/03/02 19:20:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/02 19:20:53 WARN StreamingQueryManager: Stopping existing streaming query [id=2d9d4a30-cfe0-4117-920c-b929b0101318, runId=bc173ec8-b4f4-41dc-8ee3-809d3884c9c5], as a new run is being started.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f62845438b0>

In [54]:
!hdfs dfs -ls /final_project_hdfs/ml/data

Found 416 items
drwxr-xr-x   - guruprasadvk10 supergroup          0 2024-03-02 18:39 /final_project_hdfs/ml/data/_spark_metadata
-rw-r--r--   1 guruprasadvk10 supergroup       1987 2024-03-02 17:36 /final_project_hdfs/ml/data/part-00000-0037e94a-92e3-4818-aad9-d2ea1a2611c6-c000.csv
-rw-r--r--   1 guruprasadvk10 supergroup       1209 2024-03-02 18:36 /final_project_hdfs/ml/data/part-00000-00b52e82-8d89-42a4-bcac-6ac4d2c2effa-c000.csv
-rw-r--r--   1 guruprasadvk10 supergroup       2031 2024-03-02 17:50 /final_project_hdfs/ml/data/part-00000-00c8e9b3-e262-4d1f-9fec-343ad32e854a-c000.csv
-rw-r--r--   1 guruprasadvk10 supergroup       1225 2024-03-02 18:12 /final_project_hdfs/ml/data/part-00000-01ffbd6c-82d8-459e-b9ad-db994f84299f-c000.csv
-rw-r--r--   1 guruprasadvk10 supergroup       2383 2024-03-02 18:17 /final_project_hdfs/ml/data/part-00000-026e023a-4e8f-482d-998f-1630950d0a02-c000.csv
-rw-r--r--   1 guruprasadvk10 supergroup       2162 2024-03-02 18:16 /final_project_hdfs/ml/data/part

### Creating Hive tables on the ML streaming data stored in the HDFS

In [59]:
# Selecting the Hive DB
spark.sql("use cta_db")
# Drop table if it already exists
spark.sql("DROP TABLE IF EXISTS cta_ml_data;")

In [60]:
# Create HIve table cta_ml_data
spark.sql("CREATE TABLE cta_ml_data (   value STRING,    route_color  STRING,  run_number STRING,  dest_name STRING, next_station_id STRING, next_station_name STRING, \
          lat STRING, lon STRING , transit_date STRING,  year STRING, month STRING, dayofmonth STRING, hour STRING , minute STRING, is_delayed STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';")
# Load data into Hive from HDFS path
spark.sql("LOAD DATA INPATH '/final_project_hdfs/ml/data' INTO TABLE cta_ml_data;")

24/03/02 19:21:05 WARN HiveMetaStore: Location: hdfs://localhost:9000/user/hive/warehouse/cta_db.db/cta_ml_data specified for non-external table:cta_ml_data


In [61]:
# Running sample queries on hive table
spark.sql("SELECT * FROM cta_ml_data  order by run_number LIMIT 5;")

                                                                                

value,route_color,run_number,dest_name,next_station_id,next_station_name,lat,lon,transit_date,year,month,dayofmonth,hour,minute,is_delayed
,"blue,110,30077,Fo...",blue,110,Forest Park,40590,Damen,41.91427,-87.68446,2024-03-02,2024,3,2,0,0
,"blue,112,30171,O'...",blue,112,O'Hare,40010,Austin,41.87109,-87.77971,2024-03-02,2024,3,2,0,0
,"blue,107,30077,Fo...",blue,107,Forest Park,40550,Irving Park,41.96149,-87.74362,2024-03-02,2024,3,2,0,0
,"blue,203,30077,Fo...",blue,203,Forest Park,40370,Washington,41.88471,-87.62946,2024-03-02,2024,3,2,0,0
,"blue,113,30077,Fo...",blue,113,Forest Park,41340,LaSalle,41.87818,-87.6293,2024-03-02,2024,3,2,0,0


In [62]:
spark.sql("SELECT count(1) FROM cta_ml_data;")

                                                                                

count(1)
6089


In [63]:
!hdfs dfs -ls /final_project_hdfs/ml

Found 2 items
drwxr-xr-x   - guruprasadvk10 supergroup          0 2024-03-02 17:28 /final_project_hdfs/ml/checkpoint
drwxr-xr-x   - guruprasadvk10 supergroup          0 2024-03-02 19:21 /final_project_hdfs/ml/data


In [None]:
#!hdfs dfs -ls -R /final_project_hdfs/ml/data

### Building Machine Learning Model

In [64]:
# Exporting required libraries for building the model
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [65]:
# Creating list of numeric and Categorical columns
catCols=['route_color', 'run_number', 'dest_name', 'next_station_id', 'next_station_name']
numCols=['year','month','day','hour','minute']
print(catCols)
print(numCols)

['route_color', 'run_number', 'dest_name', 'next_station_id', 'next_station_name']
['year', 'month', 'day', 'hour', 'minute']


In [66]:
# Creating a String Indexer object on the categorical columns
string_indexer=[
    StringIndexer(inputCol=x, outputCol=x+"_StringIndexer",handleInvalid="skip")
    for x in catCols
]
string_indexer

[StringIndexer_c26df6e97feb,
 StringIndexer_9d39cf756252,
 StringIndexer_06f5bdd3275e,
 StringIndexer_42c199204d62,
 StringIndexer_33293fef0160]

In [67]:
# Creating a One Hot Encoding object on the categorical columns
one_hot_encoder=[
    OneHotEncoder(inputCols=[f"{x}_StringIndexer" for x in catCols],
                  outputCols=[f"{x}_OneHotEncoder" for x in catCols],
                 )
]
one_hot_encoder

[OneHotEncoder_38144cd3c1ed]

In [68]:
# Creating an assembler object
assemblerInput=[x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]
assemblerInput

['year',
 'month',
 'day',
 'hour',
 'minute',
 'route_color_OneHotEncoder',
 'run_number_OneHotEncoder',
 'dest_name_OneHotEncoder',
 'next_station_id_OneHotEncoder',
 'next_station_name_OneHotEncoder']

In [69]:
# Creating a Vector object on the assembler object as ML can accept the features as a Vector
vector_assembler=VectorAssembler(
    inputCols=assemblerInput,outputCol="vectorAssembler_features"
)

In [70]:
# Creating a list called stages that contains the stages in the pipeline
stages=[]
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]
stages

[StringIndexer_c26df6e97feb,
 StringIndexer_9d39cf756252,
 StringIndexer_06f5bdd3275e,
 StringIndexer_42c199204d62,
 StringIndexer_33293fef0160,
 OneHotEncoder_38144cd3c1ed,
 VectorAssembler_08e6c845c54f]

In [71]:
# Creating a pipeline from the stages list
from pyspark.ml import Pipeline
pipeline= Pipeline().setStages(stages)

In [72]:
# Creating a temporary Dataframe extracting the data from df_cta_sql_ml
temp_df=spark.sql("select CAST(value AS STRING), \
                  split(value, ',')[0] as route_color, \
                  split(value, ',')[1] as run_number, \
                  split(value, ',')[3] as dest_name, \
                  split(value, ',')[4] as next_station_id, \
                  split(value, ',')[5] as next_station_name, \
                  split(value, ',')[8] as is_delayed, \
                  to_date(split(split(value, ',')[6],'T')[0],'yyyy-MM-dd') as transit_date, \
                  year(transit_date) as year, \
                  month(transit_date) as month, \
                  day(transit_date) as day, \
                  to_date(split(split(value, ',')[6],'T')[1],'HH:mm:ss') as transit_time, \
                  hour(transit_time) as hour, \
                  minute(transit_time) as minute \
                  from df_cta_sql_ml"
         )
# Selecting the required columns
ml_df_orig=temp_df.select("route_color","run_number","dest_name","next_station_id","next_station_name","year","month","day","hour","minute","is_delayed")
# Renaming the column is_delayed as output
ml_df = ml_df_orig.withColumnRenamed("is_delayed","output")

In [73]:
ml_df.count()

5576

In [74]:
ml_df.printSchema()

root
 |-- route_color: string (nullable = true)
 |-- run_number: string (nullable = true)
 |-- dest_name: string (nullable = true)
 |-- next_station_id: string (nullable = true)
 |-- next_station_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- output: string (nullable = true)



In [75]:
# Showing sample rows from ml_df
ml_df.show(10)

+-----------+----------+-------------+---------------+--------------------+----+-----+---+----+------+------+
|route_color|run_number|    dest_name|next_station_id|   next_station_name|year|month|day|hour|minute|output|
+-----------+----------+-------------+---------------+--------------------+----+-----+---+----+------+------+
|          g|       001| Ashland/63rd|          41160|             Clinton|2024|    3|  2|   0|     0|     0|
|          g|       004|  Harlem/Lake|          41350|            Oak Park|2024|    3|  2|   0|     0|     0|
|          g|       005|  Harlem/Lake|          41400|           Roosevelt|2024|    3|  2|   0|     0|     0|
|          g|       006|  Harlem/Lake|          40030|             Pulaski|2024|    3|  2|   0|     0|     0|
|          g|       007| Ashland/63rd|          41080|                47th|2024|    3|  2|   0|     0|     0|
|          g|       601|  Harlem/Lake|          41080|                47th|2024|    3|  2|   0|     0|     0|
|         

In [76]:
# Fitting  and transforming the pipeline object on the ml_df
ml_df_fit=pipeline.fit(ml_df)
ml_df_trans=ml_df_fit.transform(ml_df)
# Selecting the required columns to display 
ml_df_trans.select('route_color', 'run_number', 'dest_name', 'next_station_id', 'next_station_name','year','month','day','hour','minute','vectorAssembler_features').show(5,truncate=False)

+-----------+----------+------------+---------------+-----------------+----+-----+---+----+------+-------------------------------------------------------------------+
|route_color|run_number|dest_name   |next_station_id|next_station_name|year|month|day|hour|minute|vectorAssembler_features                                           |
+-----------+----------+------------+---------------+-----------------+----+-----+---+----+------+-------------------------------------------------------------------+
|g          |001       |Ashland/63rd|41160          |Clinton          |2024|3    |2  |0   |0     |(333,[0,1,2,7,34,103,110,241],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0])|
|g          |004       |Harlem/Lake |41350          |Oak Park         |2024|3    |2  |0   |0     |(333,[0,1,2,7,66,99,226,266],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0]) |
|g          |005       |Harlem/Lake |41400          |Roosevelt        |2024|3    |2  |0   |0     |(333,[0,1,2,7,40,99,104,234],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0]) 

In [77]:
# creating an objet called Data that contains the features and output
from pyspark.sql.functions import col
data=ml_df_trans.select(
    col("vectorAssembler_features").alias("features"),
    col("output").alias("label")
)

In [78]:
# Casting the datatype of output column as Integer
data_df=data.selectExpr("features","cast(label as int) label")
data_df.show(5,truncate=False)

+-------------------------------------------------------------------+-----+
|features                                                           |label|
+-------------------------------------------------------------------+-----+
|(333,[0,1,2,7,34,103,110,241],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0])|0    |
|(333,[0,1,2,7,66,99,226,266],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0]) |0    |
|(333,[0,1,2,7,40,99,104,234],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0]) |0    |
|(333,[0,1,2,7,88,99,147,236],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0]) |0    |
|(333,[0,1,2,7,50,103,141,255],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0])|0    |
+-------------------------------------------------------------------+-----+
only showing top 5 rows



In [79]:
# Printing the schema of the data frame on which the ML model will be built
data_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)



### Building a Logistic Regression Model

In [80]:
# Split the dataset into Test and Train sets based on 30:70 ratio
test_df, train_df = data_df.randomSplit([0.3, 0.7])

In [81]:
# Creating a Logistic regression object
lr=LogisticRegression(maxIter=10, regParam= 0.01)

In [82]:
# Training the Logistic regression Model using the tran dataset
model=lr.fit(train_df)
# Creating an object for the Model summary
trainingSummary = model.summary
# Printing the Model accuracy
trainingSummary.accuracy

                                                                                

0.9918388166284111

In [83]:
# Creating an object to store the Model predictions
predictions = model.transform(test_df)
# Printing sample rows from the predictions
#predictions.select("prediction","label","features").show(10)
predictions.show(truncate=False)

+------------------------------------------------------------------+-----+----------------------------------------+------------------------------------------+----------+
|features                                                          |label|rawPrediction                           |probability                               |prediction|
+------------------------------------------------------------------+-----+----------------------------------------+------------------------------------------+----------+
|(333,[0,1,2,5,14,95,113,246],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0])|0    |[6.6549711900460915,-6.6549711900460915]|[0.998714050556637,0.0012859494433630214] |0.0       |
|(333,[0,1,2,5,14,95,119,262],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0])|0    |[6.647070912240843,-6.647070912240843]  |[0.9987038641820852,0.0012961358179147675]|0.0       |
|(333,[0,1,2,5,14,95,119,262],[2024.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0])|0    |[6.647070912240843,-6.647070912240843]  |[0.9987038641820852,0.0012961358179

In [84]:
# Printing the percent of area under the ROC curve
model.summary.areaUnderROC

0.9954643428114651

In [85]:
# Printing Sample Recall and Precision of the MOdel predictions
model.summary.pr.show()

+------------------+-------------------+
|            recall|          precision|
+------------------+-------------------+
|               0.0|                0.0|
|               0.0|                0.0|
|               0.0|                0.0|
|0.1111111111111111| 0.6666666666666666|
|0.1111111111111111| 0.5714285714285714|
|              0.25| 0.6428571428571429|
|0.3055555555555556| 0.6470588235294118|
|0.3888888888888889| 0.6086956521739131|
|0.4444444444444444|               0.64|
|0.4444444444444444| 0.6153846153846154|
|0.5833333333333334| 0.6176470588235294|
|0.7222222222222222| 0.5777777777777777|
|0.7222222222222222| 0.5416666666666666|
|0.7222222222222222|               0.52|
|0.7222222222222222| 0.5098039215686274|
|              0.75|                0.5|
|0.8611111111111112|                0.5|
|0.8888888888888888|0.47761194029850745|
|0.8888888888888888|  0.463768115942029|
|0.8888888888888888| 0.4507042253521127|
+------------------+-------------------+
only showing top

In [86]:
predictions.count()

1669

### Storing Model results in HDFS and querying using Hive

In [87]:
# Saving the dataframe data in HDFS in parquet format
!hdfs dfs -rm -R -skipTrash /model_results
predictions.write.format("parquet").save("/model_results/")

Deleted /model_results


                                                                                

In [88]:
# CHecking the contents of HDFS path
!hdfs dfs -ls /model_results

Found 3 items
-rw-r--r--   1 guruprasadvk10 supergroup          0 2024-03-02 19:22 /model_results/_SUCCESS
-rw-r--r--   1 guruprasadvk10 supergroup      50083 2024-03-02 19:22 /model_results/part-00000-9b06e66e-5e3c-4a3a-8e51-dd7b77835a7c-c000.snappy.parquet
-rw-r--r--   1 guruprasadvk10 supergroup      50265 2024-03-02 19:22 /model_results/part-00001-9b06e66e-5e3c-4a3a-8e51-dd7b77835a7c-c000.snappy.parquet


In [89]:
# Dropping the hive table if it already exists
spark.sql("use cta_db")
spark.sql("DROP TABLE IF EXISTS prediction_results;")

In [90]:
spark.sql("CREATE EXTERNAL TABLE prediction_results( features STRING) STORED AS PARQUET LOCATION '/model_results/';")
# Selecting count of rows 
spark.sql("SELECT count(1) FROM prediction_results;")

count(1)
1669
