In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import sqrt
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from math import pi
from time import sleep
from elasticsearch import Elasticsearch
import numpy as np
import seaborn as sns
import pandas as pd
import json
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pickle
from pyspark.ml.regression import LinearRegressionModel

spark.conf.set("spark.sql.session.timeZone", "Europe/Dublin")
ES_HOST = "10.0.0.6"
es = Elasticsearch([{'host': ES_HOST}], timeout=800000)



In [0]:
# saved data on DB
stops_index = "stops_index"
light_index = "lights_data"
travel_time_index = "time_travel_data_journey_clean"
lr_imputation_path_trained = "/FileStore/lr_imputation4"
lr_speed_path_trained = "/FileStore/lr_speed2"

# new static data
upload_stops_data  = False
upload_lights_data = False
create_new_static_data = False

stops_data_file_location = "/FileStore/tables/stops.csv"
stops_index_name = "stops_index"
lights_data_file_location = "dbfs:/FileStore/shared_uploads/ploznik@campus.technion.ac.il/traffic_lights_locations.csv"
lights_index_name = "lights_data"
travel_time_index_name = "time_travel_data_journey_clean"

bus_data_location = '/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile'

lr_imputation_path_new = "/FileStore/lr_imputation5"
lr_speed_path_new = "/FileStore/lr_speed3"


In [0]:
es_schema = {
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "date" : { "type": "date" },
            "loc" : { "type": "geo_point" },
        }
    }
}

DEFUALT_SCEHMA = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    },
    "mappings": {
        "properties": {
            "actualDelay" : { "type": "long" },
            "filteredActualDelay" : { "type": "long" },
            "delay" : { "type": "long" },
            "areaId" : { "type": "long" },
            "areaId1" : { "type": "long" },
            "areaId2" : { "type": "long" },
            "areaId3" : { "type": "long" },
            "atStop" : { "type": "boolean" },
            "busStop" : { "type": "long" },
            "congestion" : { "type": "boolean" },
            "gridID" : { "type": "keyword" },
            "journeyPatternId" : { "type": "keyword" },
            "lineId" : { "type": "keyword" },
            "loc" : { "type": "geo_point" },
            "timestamp" : { "type": "date", "format" : "epoch_millis" },
            "date" : { "type": "date" },
            "vehicleId" : { "type": "long" },
            "direction" : { "type": "long" },
        }
    }
}
  

def write_to_elastic(df, index: str, settings=es_schema):
  if es.indices.exists(index):
      es.indices.delete(index=index)

  es.indices.create(index=index, ignore=400, body=settings)

  df.write.format("org.elasticsearch.spark.sql")\
      .option("es.resource", index)\
      .option("es.nodes.wan.only","true")\
      .option("es.port","9200")\
      .option("es.nodes",ES_HOST)\
      .option("es.nodes.client.only", "false")\
      .save()
  
  
query = """{
    "query": {
        "match_all": {}
    }
}"""


def read_elastic(index, body="", scroll_size="10000", array_field=""):
    if not es.indices.exists(index):
        raise Exception("Index doesn't exist!")

    return spark.read\
                .format("org.elasticsearch.spark.sql")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.nodes",ES_HOST)\
                .option("es.nodes.client.only", "false")\
                .option("pushdown", "true")\
                .option("es.query", query)\
                .option("es.scroll.size", scroll_size)\
                .option("es.scroll.keepalive", "120m")\
                .option("es.read.field.as.array.include", array_field)\
                .load(index)

  
def write_stream(df, index: str, settings=DEFUALT_SCEHMA, mode=True, wait=True):
  
  es.indices.create(index=index, ignore=400, body=settings)
  if wait:
    df.writeStream \
        .outputMode("append") \
        .queryName(f"{index}_to_es") \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes.wan.only","true") \
        .option("checkpointLocation", checkpoint_dir) \
        .option("es.resource", index) \
        .option("es.nodes", ES_HOST) \
        .option("es.port","9200") \
        .start().awaitTermination()
  else:
    df.writeStream \
        .outputMode("append") \
        .queryName(f"{index}_to_es") \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes.wan.only","true") \
        .option("checkpointLocation", checkpoint_dir) \
        .option("es.resource", index) \
        .option("es.nodes", ES_HOST) \
        .option("es.port","9200") \
        .start()

In [0]:
if upload_stops_data:
    stops_data = spark.read.option("header",True).csv(stops_data_file_location)
    stops_data = stops_data.withColumn("stop_id", stops_data["stop_id"][9:11])
    stops_data = stops_data.withColumn("stop_id", stops_data["stop_id"].cast("int"))
    stops_data = stops_data.withColumn("stop_lat", stops_data["stop_lat"].cast("double"))
    stops_data = stops_data.withColumn("stop_lon", stops_data["stop_lon"].cast("double"))

    stops_data = stops_data.filter(stops_data.stop_lat.isNotNull())
    stops_data = stops_data.filter(stops_data.stop_lon.isNotNull())
    stops_data = stops_data.filter(stops_data.stop_lon != 0)
    stops_data = stops_data.filter(stops_data.stop_lat != 0)

    # Combines the longitude and latitude so we can upload it as a geo point to elastic
    stops_data = stops_data.withColumn("stops_loc", F.array(stops_data["stop_lon"].cast("double"), stops_data["stop_lat"].cast("double")))

    write_to_elastic(stops_data, stops_index_name)

if upload_lights_data:
    # Loads the data
    light_data = spark.read.option("header",True).csv(lights_data_file_location)
    # removes rows with null/distorted values
    light_data = light_data.withColumn("light_lat", light_data["Lat"].cast("double")).drop("Lat")
    light_data = light_data.withColumn("light_lon", light_data["Long"].cast("double")).drop("Long")
    light_data = light_data.filter(light_data.light_lat.isNotNull())
    light_data = light_data.filter(light_data.light_lon.isNotNull())
    light_data = light_data.filter(light_data.light_lon != 0)
    light_data = light_data.filter(light_data.light_lat != 0)

    # Combines the longitude and latitude so we can upload it as a geo point to elastic
    light_data = light_data.withColumn("lights_loc", F.array(light_data["light_lon"].cast("double"), light_data["light_lat"].cast("double")))

    write_to_elastic(light_data, lights_index_name)

In [0]:
stops_df = read_elastic(index=stops_index, body=query).drop("datetime", "loc")
lights_df = read_elastic(index=light_index, body=query).drop("datetime").drop("loc")
stops_df = stops_df.withColumn("busStop",stops_df["stop_id"])
stops_df.drop("stop_id")

# Creates a sparse feature vector for the kmeans clustering
features_col_L = ["light_lat","light_lon"]
assembler_L = VectorAssembler(inputCols=features_col_L, outputCol="features")
L_df = assembler_L.transform(lights_df)
k_L = lights_df.count()
kmeans_L = KMeans(k=k_L)
kmeans_model_L = kmeans_L.fit(L_df.select("features"))
predictions_k_L = kmeans_model_L.transform(L_df)
predictions_k_L = predictions_k_L.drop("features")

features_col_LS = ["latitude","longitude"]
assembler_LS = VectorAssembler(inputCols=features_col_LS, outputCol="features")



In [0]:
# Selects features
features_col = ["actualDelay","congestion", "atStop", "delay","distanceCovered","justLeftStop","justStopped","ellapsedTime","latitude","longitude","distance_from_stop","distance_from_junction"]

# Creates a features' vector assembler
assembler = VectorAssembler(inputCols=features_col, outputCol="features")

features_col_r = ["actualDelay","congestion", "atStop", "delay","distanceCovered","justLeftStop","justStopped","latitude","longitude","distance_from_stop","distance_from_junction"]

# Creates a features' vector assembler
assembler_r = VectorAssembler(inputCols=features_col_r, outputCol="features_r")

lr_r = LinearRegression(maxIter=10, regParam=0.9, elasticNetParam=0.8, labelCol="ellapsedTime",featuresCol='features_r')
lr_2 = LinearRegression(maxIter=10, regParam=0.9, elasticNetParam=0.8, labelCol="intervalSpeed-kmh",featuresCol='features')

In [0]:
if create_new_static_data:
    bus_path = bus_data_location

    # We predefine the dataframe's schema to speedup the reading process
    schema_stat = StructType([StructField('_id',StructType([StructField('$oid',StringType(),True)]),True),
                       StructField('actualDelay',LongType(),True),
                       StructField('angle',DoubleType(),True),
                       StructField('anomaly',BooleanType(),True),
                       StructField('areaId',LongType(),True),
                       StructField('areaId1',LongType(),True),
                       StructField('areaId2',LongType(),True),
                       StructField('areaId3',LongType(),True),
                       StructField('atStop',BooleanType(),True),
                       StructField('busStop',LongType(),True),
                       StructField('calendar',StructType([StructField('$numberLong',StringType(),True)]),True),
                       StructField('congestion',BooleanType(),True),
                       StructField('currentHour',LongType(),True),
                       StructField('dateType',LongType(),True),
                       StructField('dateTypeEnum',StringType(),True),
                       StructField('delay',LongType(),True),
                       StructField('direction',LongType(),True),
                       StructField('distanceCovered',DoubleType(),True),
                       StructField('ellapsedTime',LongType(),True),
                       StructField('filteredActualDelay',LongType(),True),
                       StructField('gridID',StringType(),True),
                       StructField('journeyPatternId',StringType(),True),
                       StructField('justLeftStop',BooleanType(),True),
                       StructField('justStopped',BooleanType(),True),
                       StructField('latitude',DoubleType(),True),
                       StructField('lineId',StringType(),True),
                       StructField('loc',StructType([StructField('coordinates',ArrayType(DoubleType(),True),True),StructField('type',StringType(),True)]),True),
                       StructField('longitude',DoubleType(),True),
                       StructField('poiId',LongType(),True),
                       StructField('poiId2',LongType(),True),
                       StructField('probability',DoubleType(),True),
                       StructField('systemTimestamp',DoubleType(),True),
                       StructField('timestamp',StructType([StructField('$numberLong',StringType(),True)]),True),
                       StructField('vehicleId',LongType(),True),
                       StructField('vehicleSpeed',LongType(),True)])

    # Reading the json file into a pyspark's dataframe
    data = spark.read.json(bus_path, schema=schema_stat)
    # Convert list to RDD




    droped_columns = ["anomaly", "systemTimestamp", "calendar", "dateTypeEnum", "probability", "poiId2", "poiId", "direction", "_id","vehicleSpeed","currentHour","angle"]
    data = data.drop(*droped_columns).withColumnRenamed("dateType","isWeekend")
    data = data.withColumn('timestamp', data['timestamp']['$numberLong'].cast('bigint'))
    data = data.withColumn("datetime", F.to_date(F.from_unixtime(data["timestamp"]/1000)))
    data = data.withColumn("timestamp", data["timestamp"]/1000)
    data = data.withColumn("ellapsedTime", data["ellapsedTime"]/1000)
    data = data.withColumn("filteredActualDelay", data["filteredActualDelay"]/1000)
    data = data.withColumn("loc", data["loc"]["coordinates"])
    data = data.withColumn('timestamp', data['timestamp'].cast('int'))
    data = data.withColumn('actualDelay', data['actualDelay'].cast('int'))
    data = data.withColumn('areaId', data['areaId'].cast('int'))
    data = data.withColumn('areaId1', data['areaId1'].cast('int'))
    data = data.withColumn('areaId2', data['areaId2'].cast('int'))
    data = data.withColumn('areaId3', data['areaId3'].cast('int'))
    data = data.withColumn('busStop', data['busStop'].cast('int'))
    data = data.withColumn('isWeekend', data['isWeekend'].cast('byte'))
    data = data.withColumn('delay', data['delay'].cast('int'))
    data = data.withColumn('ellapsedTime', data['ellapsedTime'].cast('int'))
    data = data.withColumn('filteredActualDelay', data['filteredActualDelay'].cast('int'))
    data = data.withColumn('vehicleId', data['vehicleId'].cast('int'))

    data = data.withColumn("intervalSpeed-kmh",  F.when(data['ellapsedTime'] != 0,(3.6*1000*data['distanceCovered'])/data['ellapsedTime']).otherwise(0))

    data = data.join(stops_df, data.busStop == stops_df.stop_id)
    data = data.drop("features","prediction")
    data = assembler_LS.transform(data)
    data = kmeans_model_L.transform(data)
    # Removes the columns used only for the kmeans
    data = data.drop("features")

    # joines the traffic lights data with the joined data
    data = data.join(predictions_k_L, data.prediction == predictions_k_L.prediction)
    data = data.drop("prediction")

    data = data.withColumn("distance_from_stop", sqrt(((data["stop_lon"] - data["longitude"])**2) + ((data["stop_lat"] - data["latitude"])**2)))
    data = data.withColumn("distance_from_junction", sqrt(((data["light_lon"] - data["longitude"])**2) + ((data["light_lat"] - data["latitude"])**2)))

    ##################
    # regression imputation training #
    ##################
    # Casts bool to int
    data = data.withColumn( "justLeftStop" , F.when( F.col("justLeftStop") , F.lit(1) ).otherwise(0) ).withColumn( "justStopped" , F.when( F.col("justStopped") , F.lit(1) ).otherwise(0) )
    data =data.withColumn( "atStop" , F.when( F.col("atStop") , F.lit(1) ).otherwise(0) ).withColumn( "congestion" , F.when( F.col("congestion") , F.lit(1) ).otherwise(0) )


    data = assembler_r.transform(data)
    lr_model_r = lr_r.fit(data)
    #print("Coefficients: " + str(lr_model_r.coefficients))
    #print("Intercept: " + str(lr_model_r.intercept))

    trainingSummary = lr_model_r.summary
    #print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    #print("r2: %f" % trainingSummary.r2)

    data = lr_model_r.transform(data)
    data = data.withColumn("ellapsedTime", \
                  F.when((data["ellapsedTime"] == 0) & (data["distanceCovered"] != 0), data["prediction"]).otherwise(data["ellapsedTime"]))
    data = data.drop("prediction").drop('features_r')
    
    ##################
    # speed estimate training #
    ##################
    train_2, test_2 = data.randomSplit(weights=[0.7, 0.3], seed=464)
    train_2 = assembler.transform(train_2)
    test_2 = assembler.transform(test_2)

    lr_model_2 = lr_2.fit(train_2)
    #print("Coefficients: " + str(lr_model_2.coefficients))
    #print("Intercept: " + str(lr_model_2.intercept))

    trainingSummary = lr_model_2.summary
    #print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    #print("r2: %f" % trainingSummary.r2)
    lr_predictions_2 = lr_model_2.transform(test_2)
    lr_evaluator_2 = RegressionEvaluator(predictionCol="prediction", \
                     labelCol="intervalSpeed-kmh",metricName="r2")
    #print("R Squared (R2) on test data = %g" % lr_evaluator_2.evaluate(lr_predictions_2))

    test_result_2 = lr_model_2.evaluate(test_2)
    #print("Root Mean Squared Error (RMSE) on test data = %g" % test_result_2.rootMeanSquaredError)
    
    
    ##################
    # save the models #
    ##################
    lr_model_r.write().save(lr_imputation_path_new)
    
    lr_model_2.save(lr_speed_path_new)
    
    ##################
    # travel time estimate #
    ##################
    
    data = data.withColumn("b", (
        F.pow(F.sin(F.radians(F.col("latitude") - F.col("stop_lat")) / 2), 2) +
        F.cos(F.radians(F.col("stop_lat"))) * F.cos(F.radians(F.col("latitude"))) *
        F.pow(F.sin(F.radians(F.col("longitude") - F.col("stop_lon")) / 2), 2)
    )).withColumn("destination_distance_meters", F.atan2(F.sqrt(F.col("b")), F.sqrt(-F.col("b") + 1)) * 12742000)
    data.drop("b")
    
    data = data.select("busStop","vehicleId","timestamp","destination_distance_meters","journeyPatternId")
    data = data.withColumn("hour_timestamp", (data["timestamp"]/3600).cast('int'))


    my_window6 = Window.partitionBy("vehicleId","journeyPatternId").orderBy(F.col("timestamp"))
    data = data.withColumn("prev_stop", F.lag(data.busStop).over(my_window6))
    my_window = Window.partitionBy("journeyPatternId","vehicleId","busStop","hour_timestamp").orderBy(F.col("timestamp").desc())
    data = data.withColumn("prev_timestamp", F.lag(data.timestamp).over(my_window))
    data = data.withColumn("timestamp_diff_sec", F.when(F.isnull(data.prev_timestamp - data.timestamp), 0).otherwise(data.prev_timestamp- data.timestamp))
    data = data.withColumn("total_time", F.sum(data.timestamp_diff_sec).over(my_window))

    data = data.groupBy("journeyPatternId","vehicleId","busStop","hour_timestamp","prev_stop").agg(F.max("total_time"),F.max("destination_distance_meters"))
    data = data.dropna()
    data = data.where((data["max(total_time)"] > 0) &(data["max(total_time)"] < 1200) & (data["busStop"] != data["prev_stop"])).groupBy("busStop","prev_stop","journeyPatternId").agg(F.mean("max(total_time)"), F.mean("max(destination_distance_meters)"))
    data = data.withColumn("avg_travel_time", F.when((data.busStop == data.prev_stop), 0).otherwise(data["avg(max(total_time))"]))
    data = data.withColumn("avg_travel_dist", F.when((data.busStop == data.prev_stop), 0).otherwise(data["avg(max(destination_distance_meters))"]))
    data = data.drop("avg(max(total_time))").drop("avg(max(destination_distance_meters))")
    write_to_elastic(data, travel_time_index_name)

In [0]:
time_travel_data_journey = read_elastic(index=travel_time_index, body=query).drop("datetime").drop("loc").drop("line_loc")

lr_model_r = LinearRegressionModel.load(lr_imputation_path_trained)

lr_model_2 = LinearRegressionModel.load(lr_speed_path_trained)

In [0]:
kafka_server = '10.0.0.30:9091'
 
#Creates data stream thorugh multiple/single topics

# Subscribe to multiple topics
kafka_raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_server) \
  .option("subscribePattern", "vehicleId_.*") \
  .option("startingOffsets", "earliest") \
  .option("batchDuration", 1) \
  .load()
 
# Subscribe to a pattern
# kafka_raw_df = spark \
#   .readStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", kafka_server) \
#   .option("subscribePattern", "vehicleId_33569") \
#   .option("startingOffsets", "earliest") \
#   .load()
 
kafka_value_df = kafka_raw_df.selectExpr("CAST(value AS STRING)")

# Use preload reading schema
schema = pickle.load(open("/dbfs/mnt/schema.pkl", "rb"))
 
kafka_df = kafka_value_df \
           .select(F.from_json(F.col("value"), schema=schema).alias('json')) \
           .select("json.*")
 
display(kafka_df)

In [0]:
data = kafka_df
droped_columns = ["anomaly", "systemTimestamp", "calendar", "dateTypeEnum", "probability", "poiId2", "poiId", "direction", "_id","vehicleSpeed","currentHour","angle"]
data = data.drop(*droped_columns).withColumnRenamed("dateType","isWeekend")
data = data.withColumn('timestamp', data['timestamp']['$numberLong'].cast('bigint'))
data = data.withColumn("datetime", F.to_date(F.from_unixtime(data["timestamp"]/1000)))
data = data.withColumn("timestamp", data["timestamp"]/1000)
data = data.withColumn("ellapsedTime", data["ellapsedTime"]/1000)
data = data.withColumn("filteredActualDelay", data["filteredActualDelay"]/1000)
data = data.withColumn("loc", data["loc"]["coordinates"])
data = data.withColumn('timestamp', data['timestamp'].cast('int'))
data = data.withColumn('actualDelay', data['actualDelay'].cast('int'))
data = data.withColumn('areaId', data['areaId'].cast('int'))
data = data.withColumn('areaId1', data['areaId1'].cast('int'))
data = data.withColumn('areaId2', data['areaId2'].cast('int'))
data = data.withColumn('areaId3', data['areaId3'].cast('int'))
data = data.withColumn('busStop', data['busStop'].cast('int'))
data = data.withColumn('isWeekend', data['isWeekend'].cast('byte'))
data = data.withColumn('delay', data['delay'].cast('int'))
data = data.withColumn('ellapsedTime', data['ellapsedTime'].cast('int'))
data = data.withColumn('filteredActualDelay', data['filteredActualDelay'].cast('int'))
data = data.withColumn('vehicleId', data['vehicleId'].cast('int'))

data = data.withColumn("intervalSpeed-kmh",  F.when(data['ellapsedTime'] != 0,(3.6*1000*data['distanceCovered'])/data['ellapsedTime']).otherwise(0))




display(data, processingTime = "20 seconds")

In [0]:
joined_data_stops = data.join(stops_df, "busStop")
joined_data_stops = joined_data_stops.drop("features","prediction")
joined_data_stops = assembler_LS.transform(joined_data_stops)
joined_data_stops = kmeans_model_L.transform(joined_data_stops)
# Removes the columns used only for the kmeans
joined_data_stops = joined_data_stops.drop("features")
display(joined_data_stops, processingTime = "20 seconds")

In [0]:
# joines the traffic lights data with the joined data
joined_data_lights = joined_data_stops.join(predictions_k_L, "prediction")
joined_data_lights = joined_data_lights.drop("prediction")

joined_data_lights = joined_data_lights.withColumn("distance_from_stop", sqrt(((joined_data_lights["stop_lon"] - joined_data_lights["longitude"])**2) + ((joined_data_lights["stop_lat"] - joined_data_lights["latitude"])**2)))
joined_data_lights = joined_data_lights.withColumn("distance_from_junction", sqrt(((joined_data_lights["light_lon"] - joined_data_lights["longitude"])**2) + ((joined_data_lights["light_lat"] - joined_data_lights["latitude"])**2)))
display(joined_data_lights, processingTime = "20 seconds")

In [0]:
# Casts bool to int
data = data.withColumn( "justLeftStop" , F.when( F.col("justLeftStop") , F.lit(1) ).otherwise(0) ).withColumn( "justStopped" , F.when( F.col("justStopped") , F.lit(1) ).otherwise(0) )
data =data.withColumn( "atStop" , F.when( F.col("atStop") , F.lit(1) ).otherwise(0) ).withColumn( "congestion" , F.when( F.col("congestion") , F.lit(1) ).otherwise(0) )


data = assembler_r.transform(data)
data = lr_model_r.transform(data)
data = data.withColumn("ellapsedTime", \
              F.when((data["ellapsedTime"] == 0) & (data["distanceCovered"] != 0), data["prediction"]).otherwise(data["ellapsedTime"]))
data = data.drop("prediction").drop('features_r')

In [0]:
data = assembler.transform(data)
data = lr_model_2.transform(data)
data = data.drop("features")


In [0]:
display(data)

In [0]:
write_stream(data, 'mission_1_data', wait=False)


In [0]:
data = data.withColumn('date', data['datetime'])

data = data.withColumn("datetime", F.from_unixtime(data["timestamp"]))

data = data.withColumn('time', F.concat(F.hour(data['datetime']),F.lit(":"),F.minute(data['datetime']),F.lit(":"),F.second(data['datetime'])))

data = data.withColumn("b", (
        F.pow(F.sin(F.radians(F.col("latitude") - F.col("stop_lat")) / 2), 2) +
        F.cos(F.radians(F.col("stop_lat"))) * F.cos(F.radians(F.col("latitude"))) *
        F.pow(F.sin(F.radians(F.col("longitude") - F.col("stop_lon")) / 2), 2)
    )).withColumn("destination_distance_meters", F.atan2(F.sqrt(F.col("b")), F.sqrt(-F.col("b") + 1)) * 12742000)
data.drop("b")


est_times = data
est_times = est_times.drop("datetime")

temp_table_name = "stream"
query = est_times.writeStream.format("memory").queryName(temp_table_name).outputMode("append").start()
est_times.createOrReplaceTempView(temp_table_name)


In [0]:
max_journey = spark.sql('''
SELECT vehicleId, MAX(timestamp) AS timestamp
FROM stream
GROUP BY vehicleId''')

query_1 = (
  max_journey.writeStream
    .format("memory")
    .queryName("max_journey")
    .outputMode("complete")
    .start())

In [0]:
display(max_journey)

In [0]:
arriving_times = spark.sql('''
SELECT t1.journeyPatternId, t1.loc, t1.date, t1.timestamp, t1.busStop, t1.destination_distance_meters
FROM
(SELECT *
FROM stream) t1
INNER JOIN
(SELECT *
FROM max_journey) t2
ON t1.vehicleId=t2.vehicleId AND t1.timestamp=t2.timestamp''')

In [0]:
display(arriving_times)

In [0]:
arriving_times = arriving_times.join(time_travel_data_journey, ["busStop","journeyPatternId"])
arriving_times = arriving_times.withColumn("est_time_to_arrival", (arriving_times["destination_distance_meters"] / (arriving_times["avg_travel_dist"])) * arriving_times["avg_travel_time"])
display(arriving_times)


In [0]:
arrive_next = arriving_times.select(["journeyPatternId", "busStop", "est_time_to_arrival"])

In [0]:
sleep(10)

display(arrive_next)

In [0]:
sleep(5)
write_stream(arrive_next, 'arrive_next_3', wait=False)

In [0]:
live_locations = joined_df
live_locations = live_locations.drop("datetime")
temp_table_name = "test"
query_2 = live_locations.writeStream.format("memory").queryName(temp_table_name).outputMode("append").start()
live_locations.createOrReplaceTempView(temp_table_name)

In [0]:
max_journey = spark.sql('''
SELECT vehicleId, MAX(timestamp) AS timestamp
FROM test
GROUP BY vehicleId''')

In [0]:
query_3 = (
  max_journey.writeStream
    .format("memory")
    .queryName("max_journey")
    .outputMode("complete")
    .start())

In [0]:
live_locations = spark.sql('''
SELECT t1.journeyPatternId, t1.loc, t1.date, t1.timestamp
FROM
(SELECT *
FROM test) t1
INNER JOIN
(SELECT *
FROM max_journey) t2
ON t1.vehicleId=t2.vehicleId AND t1.timestamp=t2.timestamp''')

In [0]:
# sleep(10)
display(live_locations)

In [0]:
# sleep(10)

write_stream(live_locations, 'live_location_2', wait=False)

In [0]:
sleep(10)
display(live_locations)