In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from elasticsearch import Elasticsearch, helpers
import requests
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, lag, date_format, countDistinct, from_utc_timestamp, row_number, lit, month, hour
from pyspark.sql.functions import randn, rand
import time
import datetime
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import types
import numpy as np
import pyspark.sql.functions
from sklearn.metrics import confusion_matrix
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.streaming import *
from pyspark.sql.functions import *
import pickle
import pyspark.sql.functions as F

In [0]:
schema = types.StructType([
    types.StructField('_id',                 types.StructType([
                                                types.StructField('$oid', types.StringType(), True),
                                                ]), True),
    types.StructField('actualDelay',         types.LongType(),    True),
    types.StructField('angle',               types.DoubleType(),  True),
    types.StructField('anomaly',             types.BooleanType(), True),
    types.StructField('areaId',              types.LongType(),    True),
    types.StructField('areaId1',             types.LongType(),    True),
    types.StructField('areaId2',             types.LongType(),    True),
    types.StructField('areaId3',             types.LongType(),    True),
    types.StructField('atStop',              types.BooleanType(), True),
    types.StructField('busStop',             types.LongType(),    True),
    types.StructField('calendar',            types.StructType([
                                                types.StructField('$numberLong', types.StringType(), True),
                                                ]), True),
    types.StructField('congestion',          types.BooleanType(), True),
    types.StructField('currentHour',         types.LongType(),    True),
    types.StructField('dateType',            types.LongType(),    True),
    types.StructField('dateTypeEnum',        types.StringType(),  True),
    types.StructField('delay',               types.LongType(),    True),
    types.StructField('direction',           types.LongType(),    True),
    types.StructField('distanceCovered',     types.DoubleType(),  True),
    types.StructField('ellapsedTime',        types.LongType(),    True),
    types.StructField('filteredActualDelay', types.LongType(),    True),
    types.StructField('gridID',              types.StringType(),  True),
    types.StructField('journeyPatternId',    types.StringType(),  True),
    types.StructField('justLeftStop',        types.BooleanType(), True),
    types.StructField('justStopped',         types.BooleanType(), True),
    types.StructField('latitude',            types.DoubleType(),  True),
    types.StructField('lineId',              types.StringType(),  True),
    types.StructField('loc',                 types.StructType([
                                                types.StructField('coordinates', types.ArrayType(types.DoubleType(), True), True),
                                                types.StructField('type', types.StringType(), True),
                                                ]), True),
    types.StructField('longitude',           types.DoubleType(), True),
    types.StructField('poiId',               types.LongType(),   True),
    types.StructField('poiId2',              types.LongType(),   True),
    types.StructField('probability',         types.DoubleType(), True),
    types.StructField('systemTimestamp',     types.DoubleType(), True),
    types.StructField('timestamp',           types.StructType([
                                                types.StructField('$numberLong', types.StringType(), True),
                                                ]), True),
    types.StructField('vehicleId',           types.LongType(), True),
    types.StructField('vehicleSpeed',        types.LongType(), True),
])

### Read static buses data

In [0]:
# Extraction of the dublin bus data from Spark's File storage with the known schema
df = spark.read.json('/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile',schema)

### Read external data that uploaded from app - batch

In [None]:
df_external_data = spark.read.json('/FileStore/Ron_Eden/FromWeb/Stream',schema)

### Read external data that uploaded from app - stream

In [None]:
df_external_data = spark.readStream.json('/FileStore/Ron_Eden/FromWeb/Stream',schema)

### Data preprocess - Task 1

In [0]:
# flatten_df = flats all the structs columns so it will be easier to read/save
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

def preprocess_data(df):
  ################################# Prepare the data for streanimg (no trip_id) #################################
  flat_df = flatten_df(df)
  # Rename columns
  flat_df = flat_df.withColumnRenamed('timestamp_$numberLong', 'timestamp_numberLong')
  flat_df = flat_df.withColumnRenamed('calendar_$numberLong', 'calendar_numberLong')
  flat_df = flat_df.withColumnRenamed('_id_$oid', '_idRecord')
  flat_df = flat_df.withColumnRenamed('dateTypeEnum', 'isWeekend')
  # Convert timestamp columns to timestamp time from Long int
  flat_df = flat_df.withColumn('timestamp', F.to_timestamp(F.from_unixtime(flat_df.timestamp_numberLong / (1000))))
  flat_df = flat_df.withColumn('calendar', F.to_date(F.from_unixtime(flat_df.calendar_numberLong / (1000000))))
  flat_df = flat_df.drop("timestamp_numberLong", "calendar_numberLong", "loc_type", "calendar")
  flat_df = flat_df.withColumn("date", F.to_date(F.col("timestamp"))).withColumn("month", month("timestamp")).withColumn("hour", hour("timestamp").cast("bigint"))
  flat_df = flat_df.withColumn('DayInWeek', date_format('date', 'E'))
  flat_df = flat_df.withColumn(
      'season',
      F.when((F.col("month")).between(6, 8),1 )\
      .when((F.col("month")).between(3,5), 2)\
      .when((F.col("month")).between(9,11), 3)\
      .otherwise(0)
  )
  flat_df = flat_df.withColumn(
      'rushHours',
      F.when((F.col("hour")).between(7, 9) | (F.col("hour")).between(16, 18), 1)\
      .otherwise(0)
  )
  udf_loc = F.udf(lambda x,y: [x, y], "array<float>")
  flat_df = flat_df.withColumn('geoCoordinate', udf_loc(flat_df.latitude,flat_df.longitude))
  ################################# Take relevant columns #################################
  flat_df = flat_df[['_idRecord', 'rushHours', 'hour', 'date', 'season', 'longitude','latitude', 'areaId1', 'vehicleSpeed', 'timestamp', 'congestion', 'delay', 'lineId', 'justStopped',  'justLeftStop','atStop', 'ellapsedTime', 'journeyPatternId', 'DayInWeek' ]]
  ################################# Convert congestion type to numeric #################################
  union_df = flat_df.withColumn(
      'congestion',
      F.when((F.col("congestion")) == 'true', 1 )\
      .otherwise(0)
  )
  geo_udf = udf(lambda x,y: [y, x], "array<float>")
  union_df = union_df.withColumn('geo', geo_udf(union_df.latitude, union_df.longitude))
  return union_df


### Connection to Elastic

In [0]:
ES_HOST = '10.0.0.19'
es = Elasticsearch([{'host' :ES_HOST }], timeout=60000)

### Write steam data to Elastic Search

In [0]:
def write_stream(df, index: str, settings_for_elastic, checkpoints_location):
  es.indices.create(index=index, ignore=400, body=settings_for_elastic)
  df.writeStream \
      .outputMode("append") \
      .queryName(f"{index}_to_es") \
      .format("org.elasticsearch.spark.sql") \
      .option("es.nodes.wan.only","true") \
      .option("checkpointLocation", checkpoints_location) \
      .option("es.resource", index) \
      .option("es.nodes", ES_HOST) \
      .option("es.port","9200") \
      .start()

### Settings for elastic

In [0]:
SETTINGS = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    },
     "mappings": {
      "properties": {
          "actualDelay" : { "type": "long" },
          "areaId1" : { "type": "long" },
          "busStop" : { "type": "long" },
          "congestion" : { "type": "boolean" },
          "journeyPatternId" : { "type": "keyword" },
          "lineId" : { "type": "keyword" },
          "timestamp" : { "type": "date"},
          "vehicleId" : { "type": "long" },
          "vehicleSpeed" : { "type": "long" },
          "_idRecord" : { "type": "keyword" },
          "date" : {"type":"date"},
          "month" : {"type": "long"}, 
          "hour" : {"type": "long"},
          "season" : {"type": "long"},
          "rain" : {"type": "long"},
          "temp" : {"type": "long"},
          "sun" : {"type": "long"},
          "wdsp" : {"type": "long"},
          "event" : {"type": "boolean"},
          "rushHours" : {"type": "long"},
          "geo": {"type": "geo_point"},
    }
     }
}

SETTINGS2 = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    },
     "mappings": {
      "properties": {
          "date" : {"type":"date"},
          "hour" : {"type": "long"},
          "lineId_index" : {"type": "long"},
          "prediction" : {"type": "long"},
          "areaId1" : { "type": "long" },
          "journeyPatternId" : { "type": "keyword" },
          
    }
     }
}

### Uncertain data - Task 2

In [0]:
def filter_uncertain_data(union_df):
  ## Filter uncertain data
  union_df_filter_uncertain = union_df.withColumn(
      'vehicleSpeed',
      F.when((F.col("congestion") == 0) & (F.col("atStop") == False) & (F.col("justLeftStop") == False) &(F.col("justStopped") == False) &(F.col("vehicleSpeed") == 0), np.nan)
      .otherwise(F.col("vehicleSpeed"))
  )

  union_df_filter_uncertain = union_df_filter_uncertain.withColumn('vehicleSpeed',
      F.when((F.col("vehicleSpeed")>= 0) & (F.col("vehicleSpeed")<= 180)  , F.col("vehicleSpeed"))
      .otherwise(np.nan)
  )
  return union_df_filter_uncertain

### Preprocess for prediction - Task 2

In [0]:
def prepare_columns_for_regression(df,categoricalCols, continuousCols,labelCol):
    for c, c_file in categoricalCols.items():
      df_hash = spark.read.format("csv").option("header", "true").option('sep','|').load(c_file, inferSchema="true")
      df_hash = df_hash.withColumnRenamed('index', f'{c}_index')
      df_hash = df_hash.withColumnRenamed(c, f'{c}_1')
      df = df.join(df_hash.hint("broadcast"), df_hash[f'{c}_1'] == df[c], how='inner')
      df = df.drop(f'{c}_1')
    index_cols = ['areaId1_index', 'lineId_index', 'journeyPatternId_index']
    assembler = VectorAssembler(inputCols= index_cols + continuousCols, outputCol="features")
    pipeline = Pipeline(stages=[assembler])
    model=pipeline.fit(df)
    data = model.transform(df)
    data = data.withColumn('label',col(labelCol))
    return data

### Train the model (Logistic Regression) - Task 2

In [0]:
def train(data, model_name):
  logr = LogisticRegression(featuresCol='features', labelCol='label')
  model = logr.fit(data)
  model.save(model_name)
  return model

### Evaluation - Task 2

In [0]:
def evaluation(predictions, classes, type):
  y_true = predictions.select("label")
  y_true = y_true.toPandas()
  y_pred = predictions.select("prediction")
  y_pred = y_pred.toPandas()
  y_pred['prediction'] = y_pred['prediction'].astype('int32')
  cm2 = confusion_matrix(y_true, y_pred,labels=classes)
  accuracy=(cm2[0][0]+cm2[1][1])/cm2.sum()
  precision=(cm2[0][0])/(cm2[0][0]+cm2[1][0])
  recall=(cm2[0][0])/(cm2[0][0]+cm2[0][1])
  print(f"LogisticRegression {type}: accuracy,precision,recall",accuracy,precision,recall)
  evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="label")
  # Get the ROC
  predictions.select("label","rawPrediction","prediction","probability")
  print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

### Integration of weather data - Task 3

In [0]:
def integrate_weather_data(df):
  # df is stream
  df_weather = spark.read.csv("/FileStore/tables/weather_ron_eden_updated.csv", header = True)
  df_weather = df_weather.withColumnRenamed('date', 'timestamp')
  df_weather = df_weather.withColumn("date", F.split(col("timestamp"), " ").getItem(0)).withColumn("hour", F.split(col("timestamp"), " ").getItem(1))
  df_weather = df_weather.withColumn('date', F.to_date(df_weather.date, 'dd/MM/yyyy'))
  df_weather = df_weather.withColumn('hour', df_weather.hour[0:2].cast(IntegerType()))
  df_weather = df_weather.withColumn("temp", df_weather["temp"].cast(DoubleType()))
  df_weather = df_weather.withColumn("rain", df_weather["rain"].cast(DoubleType()))
  df_weather = df_weather.withColumn("sun", df_weather["sun"].cast(DoubleType()))
  df_weather = df_weather.withColumn("wdsp", df_weather["wdsp"].cast(DoubleType()))
  df_weather = df_weather[['rain', 'temp', 'sun', 'wdsp', 'date', 'hour']]
  df = df.join(df_weather, on=['hour', 'date'], how='inner')
  return df

### Integration of twitter data - Task 3

In [0]:
def integrate_events_data(df):
  # df is stream
  df_events = spark.read.csv("/FileStore/tables/event_tweet_update.csv", header = True)
  df_events = df_events.withColumnRenamed('date', 'DateTime')
  df_events = df_events.withColumn("date", F.split(col("DateTime"), " ").getItem(0)).withColumn("hour", F.split(col("DateTime"), " ").getItem(1))
  df_events = df_events.withColumn('date', F.to_date(df_events.date, 'yyyy-MM-dd'))
  df_events = df_events.withColumn('hour', df_events.hour[0:2].cast(IntegerType()))
  df_events = df_events.withColumn("event", df_events["event"].cast(DoubleType()))
  df_events = df_events.drop('DateTime')
  df = df.join(df_events, on=['hour', 'date'], how='inner')
  return df

### Hash tables for stream preprocess for prediction 

In [0]:
def create_hash_tables_for_stream_prediction():
  from pyspark.sql.functions import monotonically_increasing_id 
  areaId_count = trainingData.groupBy('areaId1').count()
  areaId_count = areaId_count.sort(desc("count"))
  areaId_count = areaId_count.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
  areaId_count.write.format('csv').option('header',True).mode('overwrite').option('sep','|').save('/areaId1_hash_new1.csv')
  
  lineId_count = trainingData.groupBy('lineId').count()
  lineId_count = lineId_count.sort(desc("count"))
  lineId_count = lineId_count.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
  lineId_count.write.format('csv').option('header',True).mode('overwrite').option('sep','|').save('/lineId_hash.csv')
  
  journeyPatternId_count = trainingData.groupBy('journeyPatternId').count()
  journeyPatternId_count = journeyPatternId_count.sort(desc("count"))
  journeyPatternId_count = journeyPatternId_count.withColumn('index', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
  journeyPatternId_count.write.format('csv').option('header',True).mode('overwrite').option('sep','|').save('/journeyPatternId_hash.csv')

### Read stream data from Kafka - Final Task

In [0]:
kafka_server = '10.0.0.30:9091'
 
# Subscribe to a pattern
kafka_raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_server) \
  .option("subscribePattern", "vehicleId_.*") \
  .option("startingOffsets", "earliest") \
  .load()
 
kafka_value_df = kafka_raw_df.selectExpr("CAST(value AS STRING)")
 
schema = pickle.load(open("/dbfs/mnt/schema.pkl", "rb"))
 
kafka_df = kafka_value_df \
           .select(F.from_json(F.col("value"), schema=schema).alias('json')) \
           .select("json.*")

### Definitions for regression 

In [0]:
################################# Defenition of the columns for the regression #################################
catcols = {'areaId1' : 'areaId1_hash_new1.csv' ,'lineId' : 'lineId_hash.csv' , 'journeyPatternId' : 'journeyPatternId_hash.csv'}
num_cols = ['vehicleSpeed', 'delay', 'season','rushHours','hour' ,'congestion', 'rain', 'temp', 'sun', 'wdsp']
labelCol = 'event'
classes = [0, 1]

### Train the model on static historical data

In [0]:
# Task 1
union_df = preprocess_data(df)
# Task 2
union_df_filter_uncertain = filter_uncertain_data(union_df)
# droping uncertain data
df_no_null = union_df_filter_uncertain.na.drop()
# Task 3
df_joined_weather = integrate_weather_data(df_no_null)
df_joined = integrate_events_data(df_joined_weather)
# split the data for predition - the split is not random due to the fact that the test data is a real-time data (time biased)
trainingData_for_validation = df_joined.where(df_joined.date < lit('2018-05-01'))
data = prepare_columns_for_regression(trainingData_for_validation,catcols, num_cols,labelCol)
model = train(data, "/lr_model_no_uncertain_for_validation.model")

### Evaluate the model on static historical data which simulates the test data - 2 new months  

In [0]:
validationData = df_joined.where(df_joined.date >= lit('2018-05-01'))
validationData = validationData.where(validationData.date <= lit('2018-07-01'))
validationData_for_regression =  prepare_columns_for_regression(validationData,catcols, num_cols,labelCol)
validation_model = LogisticRegressionModel.load("/lr_model_no_uncertain_for_validation.model")
validation_predictions = validation_model.transform(validationData_for_regression)
evaluation(validation_predictions, classes, "no uncertain")

### Train the model on all the trainData (without splitting for validation)

In [0]:
trainingData = df_joined.where(df_joined.date < lit('2018-07-01'))
data = prepare_columns_for_regression(trainingData,catcols, num_cols,labelCol)
model = train(data, "/lr_model_no_uncertain_V3.model")

### Executing all tasks on the stream data

In [0]:
# Task 1
streaming_union_df = preprocess_data(kafka_df)
# Task 2
streaming_union_df_filter_uncertain = filter_uncertain_data(streaming_union_df)
streaming_df_no_null = streaming_union_df_filter_uncertain.na.drop()
# Task 3
streaming_df_joined_weather = integrate_weather_data(streaming_df_no_null)
streaming_df_joined = integrate_events_data(streaming_df_joined_weather)

# Write to Elastic - drop irrelevant columns
streaming_df_to_elastic = streaming_df_joined.drop('areaId', 'areaId2', 'areaId3', 'atStop', 'gridId', 'ellapsedTime', 'justLeftStop', 'justStopped', '_c0')
streaming_df_to_elastic = streaming_df_to_elastic.withColumn('congestion', streaming_df_to_elastic.congestion.cast(BooleanType()))
streaming_df_to_elastic = streaming_df_to_elastic.withColumn('event', streaming_df_to_elastic.event.cast(BooleanType()))
# Write raw data:
write_stream(streaming_df_to_elastic, 'dublin_bus_with_weather_and_tweets_stream',SETTINGS, "/tmp/Ron_Eden/Stream/")

### Predict on streaming data - last 3 month

In [0]:
streaming_test_data = streaming_df_joined.filter(streaming_df_joined.date > lit('2018-07-01')) #save last three months of the data for testing
test_data = prepare_columns_for_regression(streaming_test_data, catcols, num_cols, labelCol)
test_model = LogisticRegressionModel.load("/lr_model_no_uncertain_V3.model")
no_null_predictions = test_model.transform(test_data)
# write predictions to elastic
write_stream(no_null_predictions[['date','hour','lineId','prediction','journeyPatternId','areaId1']], 'predictions',SETTINGS2, "/tmp/eden/Stream4/")

### Transform external data and upload to Elastic Search

In [None]:
# Task 1
streaming_df_external = preprocess_data(df_external_data)
# Task 2
streaming_external_df_filter_uncertain = filter_uncertain_data(streaming_df_external)
streaming_external_df_no_null = streaming_external_df_filter_uncertain.na.drop()
# Task 3
streaming_external_df_joined_weather = integrate_weather_data(streaming_external_df_no_null)
streaming_external_df_joined = integrate_events_data(streaming_external_df_joined_weather, start_date=, end_date=)

# Write to Elastic - drop irrelevant columns
streaming_df_to_elastic = streaming_external_df_joined.drop('areaId', 'areaId2', 'areaId3', 'atStop', 'gridId', 'ellapsedTime', 'justLeftStop', 'justStopped', '_c0')
streaming_df_to_elastic = streaming_external_df_joined.withColumn('congestion', streaming_df_to_elastic.congestion.cast(BooleanType()))
streaming_df_to_elastic = streaming_df_to_elastic.withColumn('event', streaming_df_to_elastic.event.cast(BooleanType()))
# Write raw data:
write_stream(streaming_df_to_elastic, 'dublin_bus_with_weather_and_tweets_stream',SETTINGS, "/tmp/Ron_Eden/Stream/")

### Predict on the external new data

In [None]:
external_test_data = prepare_columns_for_regression(streaming_external_df_joined, catcols, num_cols, labelCol)
test_model = LogisticRegressionModel.load("/lr_model_no_uncertain_V3.model")
no_null_predictions = test_model.transform(external_test_data)
# write predictions to elastic
write_stream(no_null_predictions[['date','hour','lineId','prediction','journeyPatternId','areaId1']], 'predictions',SETTINGS2, "/tmp/eden/Stream4/")