In [0]:
dbutils.widgets.text("API", "Please enter your API here")

In [0]:
API=dbutils.widgets.get("API")

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from elasticsearch import Elasticsearch, helpers
from pyspark.sql.functions import col, split, randn
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
import matplotlib.pyplot as plt
import pandas as pd 
import numpy as np 
from pyspark.sql.window import Window
import pickle
from collections import defaultdict


ES_HOST= '10.0.0.20' #'da2020w-0016.eastus.cloudapp.azure.com' # VM Server 
es = Elasticsearch([{'host': ES_HOST}], timeout=60000)

In [0]:
schema_structfields = [
                      StructField("_id", MapType(StringType(), StringType(), True),True), 
                       StructField("actualDelay",LongType(),True),
                       StructField("angle",DoubleType(),True),
                       StructField("anomaly",BooleanType(),True),
                       StructField("areaId",LongType(),True),
                       StructField("areaId1",LongType(),True),
                       StructField("areaId2",LongType(),True),
                       StructField("areaId3",LongType(),True),
                       StructField("atStop",BooleanType(),True),
                       StructField("busStop",LongType(),True),
                       StructField("calendar",MapType(StringType(), StringType(),True),True),
                       StructField("congestion",BooleanType(),True),
                       StructField("currentHour",LongType(),True),
                       StructField("dateType",LongType(),True),
                       StructField("dateTypeEnum",StringType(),True),
                       StructField("delay",LongType(),True),
                       StructField("direction",LongType(),True),
                       StructField("distanceCovered",DoubleType(),True),
                       StructField("ellapsedTime",LongType(),True),
                       StructField("filteredActualDelay",LongType(),True),
                       StructField("gridID",StringType(),True),
                       StructField("journeyPatternId",StringType(),True), 
                       StructField("justLeftStop",BooleanType(),True),
                       StructField("justStopped",BooleanType(),True),
                       StructField("latitude",DoubleType(),True), 
                       StructField("lineId",StringType(),True), 
                       StructField('loc',StructType([StructField('coordinates',ArrayType(DoubleType(),True),True),StructField('type',StringType(),True)]),True),
                       StructField("longitude",DoubleType(),True),  
                       StructField("poiId",LongType(),True),
                       StructField("poiId2",LongType(),True),
                       StructField("probability",DoubleType(),True),
                       StructField("systemTimestamp",DoubleType(),True),
                       StructField("timestamp",MapType(StringType(),StringType(),True)), 
                       StructField("vehicleId",LongType(),True),
                       StructField("vehicleSpeed",LongType(),True)]

schema = StructType(schema_structfields)

In [0]:
kafka_server = API if API!="Please enter your API here" else'10.0.0.30:9091'
 
# Subscribe to multiple topics
# kafka_raw_df = spark \
#   .readStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", kafka_server) \
#   .option("subscribe", "vehicleId_28051,vehicleId_28052") \
#   .option("startingOffsets", "earliest") \
#   .load()
 
# Subscribe to a pattern
kafka_raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_server) \
  .option("subscribePattern", "vehicleId_.*") \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 5000000).load()
 
kafka_value_df = kafka_raw_df.selectExpr("CAST(value AS STRING)")
 
# schema = pickle.load(open("/dbfs/mnt/schema.pkl", "rb"))
 
stream_raw_df = kafka_value_df \
           .select(F.from_json(F.col("value"), schema=schema).alias('json')) \
           .select("json.*")
 
display(stream_raw_df)

In [0]:
stream_raw_df=stream_raw_df[["vehicleId","timestamp","longitude",'loc',"lineId","latitude","journeyPatternId","delay","congestion","busStop","atStop","areaId3","areaId2","areaId1","areaId","actualDelay"]]

fixed_df = stream_raw_df.withColumn('timestamp', F.map_values(stream_raw_df.timestamp)[0])\
                  .withColumn('loc', stream_raw_df['loc']['coordinates'])
fixed_df = fixed_df.dropDuplicates()
fixed_df = fixed_df.na.drop(how="all")

In [0]:
fixed_df = fixed_df.withColumn('timestamp', F.to_timestamp(F.from_unixtime(fixed_df.timestamp / (1000)))).withColumn('mins_delay', fixed_df.delay / 60)\
                  .withColumn('mins_actualDelay', fixed_df.actualDelay / 60) 

In [0]:
fixed_df=fixed_df.withColumn('realHour', F.hour(fixed_df.timestamp))
fixed_df=fixed_df.withColumn("isWeekend", F.dayofweek(fixed_df.timestamp).isin([1,7]).cast("int"))
fixed_df=fixed_df.withColumn('month', F.month(fixed_df.timestamp))
fixed_df=fixed_df.withColumn('year', F.year(fixed_df.timestamp))
fixed_df=fixed_df.withColumn('date', F.to_date(fixed_df.timestamp))
fixed_df=fixed_df.withColumn("season" ,F.expr(
    """IF (month IN (3,4,5) ,"Spring", IF (month IN (6,7,8), "Summer" ,IF (month IN (9,10,11) ,"Autumn","Winter")))"""
))
fixed_df=fixed_df.withColumn("timeInDay" ,F.expr(
    """IF (realHour IN (5,6,7,8,9,10,11) ,"Morning", IF (realHour IN (12, 13, 14, 15, 16, 17), "Afternoon" ,IF (realHour IN (18, 19, 20) ,"Evening","Night")))"""
))
fixed_df=fixed_df.withColumn("isInCenter" ,F.expr(
    """IF (busStop IN (278, 281, 4724, 274, 279, 4725, 4508, 272, 277, 270, 6059, 271, 7402, 1184, 4717, 288, 289, 7591, 299, 298, 297, 302, 301, 273, 315, 7622, 334, 335, 336, 340, 317, 319, 325, 7392, 328, 345, 346, 7588, 404, 405, 406, 320, 1359, 7582, 1358, 7581, 1279, 1278, 4522, 4521, 494, 495, 792, 793) ,1, 0)"""
))

fixed_df = fixed_df.withColumn('line_num', fixed_df['journeyPatternId'][0:4])
fixed_df = fixed_df.withColumn('line_num', F.regexp_replace('line_num', '^0+', '')) 
fixed_df = fixed_df.withColumn('direction', fixed_df['journeyPatternId'][5:5][0:1])
# display(fixed_df)

In [0]:
task2_df = fixed_df[fixed_df.atStop==True]

In [0]:
#the values were calculated on the static train data
mean_delay = 3.123
std_delay = 7.0039

mean_plus_std = mean_delay+2*std_delay
mean_minus_std = mean_delay-2*std_delay

task2_df=task2_df.withColumn("null_mins_delay" ,F.expr(
    f"""IF ((mins_delay > {mean_plus_std}) or (mins_delay < {mean_minus_std}) ,null, mins_delay)"""
))
task2_df = task2_df.withColumn("null_congestion", F.expr(f"""IF ((congestion=True and mins_delay < 0) or (congestion=False and mins_delay > 10) ,null, congestion)""")) #24 mins
display(task2_df)

In [0]:
task2_df = task2_df.withColumn('binary_congestion', F.expr("""IF (congestion=True, 1, 0)"""))
task2_df = task2_df.withColumn('null_binary_congestion', F.expr("""IF (null_congestion=True, 1, IF(null_congestion=False, 0, null))""")) 

In [0]:
#dealing with uncertain data
Complete_case_df = task2_df.dropna()

#split to test data
test_df=Complete_case_df[Complete_case_df["year"]== 2018]

df_hash = spark.read.csv('lineId_index.csv',header=True)

test_df = test_df.join(df_hash.hint("broadcast"), df_hash["lineId"] == test_df["lineId"], how='inner').drop(test_df.lineId)

In [0]:
#Preparing Data for Machine Learning
test_df=test_df.withColumn('label', F.col("isInCenter"))
test_df =test_df.withColumn("lineId_index",F.col("lineId_index").cast(IntegerType()))

stages = []

assemblerInputs = ["lineId_index",'null_mins_delay', 'longitude', 'latitude','null_binary_congestion','areaId1']
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

#Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(test_df)
test_df = pipelineModel.transform(test_df)
selectedCols = ['features'] + test_df.columns
test_df = test_df.select(selectedCols)

In [0]:
savedModel = LogisticRegressionModel.load("lrm_model_task2.model")
predictions = savedModel.transform(test_df)

In [0]:
aggDF_task2 = predictions.groupBy('label', 'prediction').agg(F.count('label').alias("count"))
display(aggDF_task2)

label,prediction,count
1,0.0,123695
0,0.0,950497


In [0]:
def foreach_batch_function(predictions, epoch_id):
  #predictions = savedModel.transform(test_df)
#   preds_df=predictions.groupBy('label', 'prediction').count()
  predictionAndTarget = predictions.select("label", "prediction")
  batch_size=predictionAndTarget.count()
  #for name in ["accuracy", "f1"]: #, "weightedPrecision", "weightedRecall"]: 
  evaluator_acc = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName="accuracy")
  evaluator_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName="f1")
  measure_acc = evaluator_acc.evaluate(predictions)
  measures["acc"].append((measure_acc,batch_size))
  acc_list.append((measure_acc,batch_size))
  measure_f1 = evaluator_f1.evaluate(predictions)
  measures["f1"].append((measure_f1,batch_size))
  f1_list.append((measure_f1,batch_size))
    #print("-"*6)
    #print(f'{name}: {measure}')
  

In [0]:
f1_list=[]
acc_list=[]
measures= defaultdict(list)

predictions.writeStream\
.format("memory") \
.foreachBatch(foreach_batch_function).start()

In [0]:
def calc_f1_acc(measures):
  test_data_size=sum([x[1] for x in measures["f1"]])
  output_f1=measures["f1"]
  output_acc=measures["acc"]
  avg_acc,avg_f1=0,0
  num_batches=len(output_f1)
  for (f1_m,batch_size),(acc_m,batch_size) in zip(output_f1,output_acc):
    avg_acc+=acc_m*(batch_size/test_data_size)
    avg_f1+=f1_m*(batch_size/test_data_size)
  print(f"avg acc is {avg_acc:.4f} and avg f1 is {avg_f1:.4f}")

In [0]:
calc_f1_acc(measures)

In [0]:
source_dest_df = spark.read.csv('source_dest_df_1.csv',header=True)
atrractions_df = spark.read.csv('atrractions_with_busStops.csv',header=True).drop("Longitude","Latitude")


In [0]:
first_joined_df=fixed_df.join(F.broadcast(source_dest_df), on=["line_num", "direction"],how="left")
display(first_joined_df) 

In [0]:
joined_df=first_joined_df.join(F.broadcast(atrractions_df),on=["busStop"],how="left")
display(joined_df)

In [0]:
joined_df = joined_df.withColumn("attraction_catagory",F.when(F.col('atStop')==False,2).otherwise(F.when(F.col('Name').isNull(),0).otherwise(1)))

In [0]:
# df_to_elastic=joined_df["busStop","vehicleId","line_num","direction","timestamp","loc","congestion","atStop","areaId1","mins_delay","realHour","isWeekend","month",
# "year",
# "date",
# "season",
# "timeInDay",
# "isInCenter",
# "sourceAndDest",
# "source",
# "dest",
# "Name","AddressLocality",
# "distance",
# "rank",
# "attraction_catagory"]

# index="stream_joined_data"
# # es.indices.delete(index=index)

In [0]:
# def write_to_elastic(df_to_elastic, index, settings_with_mapping, ES_HOST):
#   es.indices.create(index=index, ignore=400, body=settings_with_mapping)
#   df_to_elastic.writeStream \
#   .outputMode("append") \
#   .queryName(f"elastic_{index}") \
#   .format("org.elasticsearch.spark.sql") \
#   .option("es.resource", index) \
#   .option("es.nodes.wan.only","true") \
#   .option("es.port","9200") \
#   .option("es.nodes", ES_HOST) \
#   .option('es.nodes.client.only', 'false')\
#   .option("checkpointLocation", "/tmp/") \
#   .start()

In [0]:
# settings_with_mapping = {
#     "settings": {
#         "number_of_shards": 1,
#         "number_of_replicas": 0,
#         "refresh_interval" : -1
#     },
#      "mappings": {
#       "properties": {
#           "mins_delay" : { "type": "long" },#*
#           "areaId1" : { "type": "long" },#*
#           "atStop" : { "type": "boolean" },#*
#           "busStop" : { "type": "long" }, #* 

#         "congestion" : { "type": "boolean" },#*
#           "loc" : { "type": "geo_point" },#*
#           "timestamp" : { "type": "date"}, #*
#           "vehicleId" : { "type": "long" },
#           "realHour" : { "type": "long" },#*
#           "isWeekend" : { "type": "long" },#*
#           "date" : {"type":"date"},#*
#           "month" : {"type": "long"}, #*
#           "year" : {"type": "long"}, #*
#           "season" : {"type": "keyword"},#*
#           "timeInDay" : {"type": "keyword"}, #*
#           "isInCenter" : {"type" : "boolean"}, #*
#           "line_num" : { 'type' : 'keyword'},#*
#           "sourceAndDest" : { 'type' : 'keyword'},#*
#           "source" : { 'type' : 'keyword'},#*
#           "dest" : { 'type' : 'keyword'},#*
#           "direction" : { 'type' : 'long'},#*
#           "Name" : { 'type' : 'keyword'},#*
#           "AddressLocality" : { 'type' : 'keyword'},#*
#           "distance": { "type": "long" },#*
#           "rank": { "type": "long" },#*
#           "attraction_catagory": { "type": "long" },#*

#     }
#      }
# }
# # write_to_elastic(df_to_elastic, index, settings_with_mapping, ES_HOST)
# # if not es.indices.exists(index):
# es.indices.create(index=index, ignore=400, body=settings_with_mapping)

# df_to_elastic.writeStream \
#     .outputMode("append") \
#     .queryName(f"elastic_{index}") \
#     .format("org.elasticsearch.spark.sql") \
#     .option("es.resource", index) \
#     .option("es.nodes.wan.only","true") \
#     .option("es.port","9200") \
#     .option("es.nodes", ES_HOST) \
#     .option("checkpointLocation", "/tmp/") \
#     .start()

In [0]:
task_3_df = joined_df[joined_df.atStop==True]

#split to test data
task3_test_df=task_3_df[task_3_df["year"]== 2018]
task3_test_df = task3_test_df.withColumn('binary_congestion', F.expr("""IF (congestion=True, 1, 0)"""))

task3_test_df=task3_test_df["mins_delay","areaId1","binary_congestion","loc","isInCenter","line_num","source","dest","attraction_catagory",'longitude', 'latitude'] 
task3_test_df = task3_test_df.withColumn("source", F.when(F.col('source').isNull() ,"empty").otherwise(F.col('source'))).withColumn("dest", F.when(F.col('dest').isNull() ,"empty").otherwise(F.col('dest')))


In [0]:
categoricalCols={'line_num':'line_num_index.csv','source':'source_index.csv','dest':'dest_index.csv'}
for c, c_file in categoricalCols.items():
  df_hash = spark.read.format("csv").option("header", "true").option('sep','|').load(c_file, inferSchema="true")
  df_hash=df_hash.withColumn(c+"_index",(split(F.col(c+"_index,"+c),",")[0]).cast("int")).withColumn(c,split(F.col(c+"_index,"+c),",")[1]).drop(c+"_index,"+c)
  task3_test_df = task3_test_df.join(df_hash.hint("broadcast"), df_hash[c] == task3_test_df[c], how='inner').drop(df_hash[c])

In [0]:
#Preparing Data for Machine Learning
task3_test_df=task3_test_df.withColumn('label', F.col("isInCenter"))
stages = []
assemblerInputs = ['source_index','dest_index',"line_num_index",'mins_delay', 'longitude', 'latitude','binary_congestion','areaId1']
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

#Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(task3_test_df)
task3_test_df = pipelineModel.transform(task3_test_df)
selectedCols = ['features'] + task3_test_df.columns
task3_test_df = task3_test_df.select(selectedCols)



In [0]:
savedModel_task3 = LogisticRegressionModel.load("lrm_model_task3.model")
task3_predictions = savedModel_task3.transform(task3_test_df)
display(task3_predictions)

In [0]:
aggDf=task3_predictions.groupBy('label', 'prediction').agg(F.count('label').alias("count"))
display(aggDf)

In [0]:
def task3_foreach_batch_function(predictions, epoch_id):
  predictionAndTarget = predictions.select("label", "prediction")
  
  batch_size=predictionAndTarget.count()
  
  evaluator_acc = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName="accuracy")
  evaluator_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName="f1")
  
  measure_acc = evaluator_acc.evaluate(predictions)
  task3_measures["acc"].append((measure_acc,batch_size))
  task3_acc_list.append((measure_acc,batch_size))
  measure_f1 = evaluator_f1.evaluate(predictions)
  task3_measures["f1"].append((measure_f1,batch_size))
  task3_f1_list.append((measure_f1,batch_size))


In [0]:
task3_f1_list=[]
task3_acc_list=[]
task3_measures= defaultdict(list)

task3_predictions.writeStream\
.format("memory") \
.foreachBatch(task3_foreach_batch_function).start()

In [0]:
calc_f1_acc(task3_measures) 