In [1]:
from pyspark.sql.types import *
import json

schema_json = spark.read.json('/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile').schema.json()

new_schema = StructType.fromJson(json.loads(schema_json))

In [2]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf

streamed_df = spark.readStream.schema(new_schema).json('/mnt/dacoursedatabricksstg/dacoursedatabricksdata')

filtered_df = streamed_df.filter(streamed_df.atStop == True)

def get_timestamp(s):
    return int(s['$numberLong'][:-3])

get_timestamp_udf = udf(get_timestamp,LongType())
filtered_df = filtered_df.withColumn('date', get_timestamp_udf('timestamp').cast('timestamp'))
filtered_df = filtered_df.filter(filtered_df.date > lit('2018-08-01')) #save last two months of the data for testing 

In [3]:
query = (filtered_df
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("counts")     # counts = name of the in-memory table
    .start())

In [4]:
# query.stop()

In [5]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)
df = sqlContext.sql("select * from counts")

In [6]:
df.count()

In [7]:
df = df[['_id','atStop','busStop','congestion','delay','justLeftStop','justStopped','latitude' ,'longitude','vehicleId','vehicleSpeed','actualDelay','timestamp','journeyPatternId']]

In [8]:
#correct original data and add new fields
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType,TimestampType,StringType,DateType

def get_weekday(s):
  return s.weekday()

def get_hour(s):
  return s.hour

def get_is_weekend(s):
  return 1 if s in [6,7] else 0

def get_id(s):
  return s['$oid']

def get_month(s):
  return s.month

def get_only_date(s):
  return str(s.date())

def get_just_left(s):
  return 1 if s else 0

def get_just_stopped(s):
  return 1 if s else 0

def get_congestion(s):
  return 1 if s else 0

def get_timestamp(s):
    return int(s['$numberLong'][:-3])

get_timestamp_udf = udf(get_timestamp,LongType())
get_weekday_udf = udf(get_weekday, LongType())
get_hour_udf = udf(get_hour, LongType())
get_is_weekend_udf = udf(get_is_weekend, LongType())
get_id_udf = udf(get_id, StringType())
get_month_udf = udf(get_month, LongType())
get_only_date_udf = udf(get_only_date, StringType())
get_just_left_udf = udf(get_just_left, LongType())
get_just_stopped_udf = udf(get_just_stopped, LongType())
get_congestion_udf = udf(get_congestion, LongType())


df = df.withColumn('date', get_timestamp_udf('timestamp').cast('timestamp'))
df = df.withColumn('weekday', get_weekday_udf('date'))
df = df.withColumn('month', get_month_udf('date'))
df = df.withColumn('only_date', get_only_date_udf('date'))
df = df.withColumn('is_weekend', get_is_weekend_udf('weekday'))
df = df.withColumn('hour', get_hour_udf('date'))
df = df.withColumn('id', get_id_udf('_id'))
df = df.withColumn('just_left', get_just_left_udf('justLeftStop'))
df = df.withColumn('just_stopped', get_just_stopped_udf('justStopped'))
df = df.withColumn('conges', get_just_left_udf('congestion'))
df = df.drop(*['_id','atStop','justLeftStop','justStopped','congestion','timestamp'])

In [9]:
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import lit

#keep only transitions between stops
my_window = Window.partitionBy('vehicleId').orderBy('date')
target_df = df.withColumn('origin', f.lag(df.busStop.cast("bigint")).over(my_window))
target_df = target_df[target_df['busStop'] != target_df['origin']]

#calculate time difference between stops
target_df = target_df.withColumn('prev_ts', f.lag(target_df.date.cast("bigint")).over(my_window))
target_df = target_df.withColumn('time_to_reach_next', f.when(f.isnull(target_df.date.cast("bigint") - target_df.prev_ts), 0)
                            .otherwise(target_df.date.cast("bigint") - target_df.prev_ts))
target_df = target_df[ target_df['time_to_reach_next'] != 0]

#transform into training data
target_df = target_df.withColumnRenamed('busStop', 'dest')
target_df = target_df.drop('prev_ts')

#add field of from->to
def get_from_to(frm,to):
  return str(frm)+'->'+str(to)

get_from_to_udf = udf(get_from_to, StringType())
target_df = target_df.withColumn('from_to', lit(get_from_to_udf(f.col('origin'),f.col('dest'))))

w = Window.partitionBy('from_to')
target_df = target_df.withColumn('seg_count', f.count('from_to').over(w))

In [10]:
target_df = target_df[target_df.time_to_reach_next < 7200] #drop rows with more then two hours between stops
target_df = target_df[target_df.time_to_reach_next > 30]  #drop rows with less then 30 seconds between stops
target_df = target_df.filter(target_df.seg_count > 20)

In [11]:
target_df.count()

In [12]:
learn_df = target_df.drop(*['dest','time_to_reach_next','from_to','seg_count'])

In [13]:
true_paths = spark.read.options(header='true', inferSchema = 'true').load('/user/hive/warehouse/true_paths_v2')
true_paths = true_paths.withColumnRenamed('origin', 'tp_origin')
true_paths = true_paths.withColumnRenamed('journeyPatternId', 'tp_journeyPatternId')

In [14]:
learn_df = learn_df.join(true_paths,(learn_df.journeyPatternId == true_paths.tp_journeyPatternId) & (learn_df.origin == true_paths.tp_origin), how = 'left')

In [15]:
learn_df = learn_df.drop(*['top_score','tp_origin','tp_journeyPatternId'])

In [16]:
#add field of from->to
learn_df = learn_df.withColumn('from_to', lit(get_from_to_udf(f.col('origin'),f.col('dest'))))

In [17]:
checker = target_df[['id','from_to']]
checker = checker.withColumnRenamed('id', '_id')
checker = checker.withColumnRenamed('from_to', 'true_from_to')
checked_df = learn_df.join(checker,(learn_df.id == checker._id) & (learn_df.from_to == checker.true_from_to))
print(checked_df.count() / target_df.count())

In [18]:
segment_data = spark.read.options(header='true', inferSchema = 'true').load('/user/hive/warehouse/segment_data')
segment_data = segment_data.withColumnRenamed('from_to', 'sd_from_to')

In [19]:
df_to_predict = checked_df.drop(*['_id','true_from_to'])
df_to_predict = df_to_predict.join(segment_data, df_to_predict.from_to == segment_data.sd_from_to)

In [20]:
original_data = spark.read.json('/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile')

In [21]:
#filter data
total_df = original_data[original_data.atStop == True]
total_df = total_df[['_id','atStop','busStop','congestion','delay','justLeftStop','justStopped','latitude' ,'longitude','vehicleId','vehicleSpeed','actualDelay','timestamp','journeyPatternId']]

#correct attributes
total_df = total_df.withColumn('date', get_timestamp_udf('timestamp').cast('timestamp'))
total_df = total_df.withColumn('weekday', get_weekday_udf('date'))
total_df = total_df.withColumn('month', get_month_udf('date'))
total_df = total_df.withColumn('only_date', get_only_date_udf('date'))
total_df = total_df.withColumn('is_weekend', get_is_weekend_udf('weekday'))
total_df = total_df.withColumn('hour', get_hour_udf('date'))
total_df = total_df.withColumn('id', get_id_udf('_id'))
total_df = total_df.withColumn('just_left', get_just_left_udf('justLeftStop'))
total_df = total_df.withColumn('just_stopped', get_just_stopped_udf('justStopped'))
total_df = total_df.withColumn('conges', get_just_left_udf('congestion'))
total_df = total_df.drop(*['_id','atStop','justLeftStop','justStopped','congestion','timestamp'])

#keep only transitions between stops
my_window = Window.partitionBy('vehicleId').orderBy('date')
learn_df = total_df.withColumn('origin', f.lag(total_df.busStop.cast("bigint")).over(my_window))
learn_df = learn_df[learn_df['busStop'] != learn_df['origin']]

#calculate time difference between stops
learn_df = learn_df.withColumn('prev_ts', f.lag(learn_df.date.cast("bigint")).over(my_window))
learn_df = learn_df.withColumn('time_to_reach_next', f.when(f.isnull(learn_df.date.cast("bigint") - learn_df.prev_ts), 0)
                            .otherwise(learn_df.date.cast("bigint") - learn_df.prev_ts))
learn_df = learn_df[ learn_df['time_to_reach_next'] != 0]

#transform into training data
learn_df = learn_df.withColumnRenamed('busStop', 'dest')
learn_df = learn_df.drop('prev_ts')

#add field of from->to
get_from_to_udf = udf(get_from_to, StringType())
learn_df = learn_df.withColumn('from_to', lit(get_from_to_udf(f.col('origin'),f.col('dest'))))

w = Window.partitionBy('from_to')
learn_df = learn_df.withColumn('seg_count', f.count('from_to').over(w))

learn_df = learn_df[learn_df.time_to_reach_next < 7200] #drop rows with more then two hours between stops
learn_df = learn_df[learn_df.time_to_reach_next > 30]  #drop rows with less then 30 seconds between stops
learn_df = learn_df.filter(learn_df.seg_count > 100) #take only comoon segments 
learn_df = learn_df.filter(learn_df.date < lit('2018-08-01')) #save last two months of the data for testing in streaming (August\July 2018)

my_window = Window.partitionBy('from_to')
#mean time segment
learn_df = learn_df.withColumn('mean_seg', f.mean('time_to_reach_next').over(my_window))
#variance in segment
learn_df = learn_df.withColumn('stdev_seg', f.stddev('time_to_reach_next').over(my_window))

#remove outlayres, greater than 3 stdevs from avg or or less than 2 stdevs 
learn_df = learn_df[learn_df.time_to_reach_next < (learn_df.mean_seg + 3 * learn_df.stdev_seg)]
learn_df = learn_df[learn_df.time_to_reach_next > (learn_df.mean_seg - 3 * learn_df.stdev_seg)]

#max time segment
learn_df = learn_df.withColumn('max_in_seg', f.max('time_to_reach_next').over(my_window))

#min time segment
learn_df = learn_df.withColumn('min_in_seg', f.min('time_to_reach_next').over(my_window))

#prev_time_in_segment
my_window = Window.partitionBy('from_to').orderBy('date')
learn_df = learn_df.withColumn('prev_time', f.lag(learn_df.time_to_reach_next).over(my_window))

#calculate distances
my_window = Window.partitionBy('vehicleId').orderBy('date')
learn_df = learn_df.withColumn('prev_lat', f.lag(learn_df.latitude).over(my_window))
learn_df = learn_df.withColumn('prev_lon', f.lag(learn_df.longitude).over(my_window))   
learn_df = learn_df.na.fill(0)
learn_df = learn_df[learn_df['prev_lat'] != 0]

from math import radians, cos, sin, asin, sqrt

@f.udf("float")
def get_distance(longit_a, latit_a, longit_b, latit_b):
    if None in [longit_a, latit_a, longit_b, latit_b]:
        return 9999
    # Transform to radians
    longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
    dist_longit = longit_b - longit_a
    dist_latit = latit_b - latit_a
    # Calculate area
    area = sin(dist_latit/2)**2 + cos(latit_a) * cos(latit_b) * sin(dist_longit/2)**2
    # Calculate the central angle
    central_angle = 2 * asin(sqrt(area))
    radius = 6371 # THIS IN KM
    # Calculate Distance
    distance = central_angle * radius
    return distance * 1000
  
@f.udf("float")
def get_dis_from_center(longit_a, latit_a):
  longit_b, latit_b = 53.3422665, -6.2554468 #city center coordinates
  if None in [longit_a, latit_a, longit_b, latit_b]:
      return 9999
  # Transform to radians
  longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
  dist_longit = longit_b - longit_a
  dist_latit = latit_b - latit_a
  # Calculate area
  area = sin(dist_latit/2)**2 + cos(latit_a) * cos(latit_b) * sin(dist_longit/2)**2
  # Calculate the central angle
  central_angle = 2 * asin(sqrt(area))
  radius = 6371 # THIS IN KM
  # Calculate Distance
  distance = central_angle * radius
  return distance * 1000

learn_df = learn_df.withColumn('distnce_between',get_distance(f.col('prev_lat'),f.col('prev_lon'),f.col('latitude'),f.col('longitude')))
learn_df = learn_df.withColumn('lat_between',(learn_df.prev_lat + learn_df.latitude)/ 2)
learn_df = learn_df.withColumn('lon_between',(learn_df.prev_lon + learn_df.longitude)/ 2)
learn_df = learn_df.withColumn('dis_from_center',get_dis_from_center(f.col('lat_between'),f.col('lon_between')))
learn_df = learn_df[learn_df['distnce_between'] < 100000]
learn_df = learn_df[learn_df['dis_from_center'] < 100000]
learn_df = learn_df.withColumn('speed_in_seg',learn_df.distnce_between / learn_df.time_to_reach_next)
learn_df = learn_df[learn_df['speed_in_seg'] < 55] #drop segments with speed over 200kmh
learn_df = learn_df.drop('prev_lat','prev_lon','lat_between','lon_between')

In [22]:
small_list = learn_df.take(1000)
small_df = spark.createDataFrame(small_list, learn_df.columns)

In [23]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
import numpy 

numericCols =['delay','vehicleSpeed','actualDelay','max_in_seg','min_in_seg','stdev_seg','distnce_between','dis_from_center','hour','latitude','longitude','mean_seg','prev_time','speed_in_seg']
assembler = VectorAssembler(inputCols = numericCols, outputCol="features")
ml_df = assembler.transform(learn_df)

# Trains a k-means model.
kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(ml_df)

# Make predictions
predictions = model.transform(ml_df)

# Shows the result.
centers = model.clusterCenters()
centroids = {}
for i,center in enumerate(centers):
  centroids[i] = [center, center[-1]]

In [24]:
helper_df = df_to_predict.toPandas()

def impute_speed(a,b,c,d,e,f,g,h,i,j,k,l,m,n):
  fetur_vec = numpy.asarray([a,b,c,d,e,f,g,h,i,j,k,l,m,n])
  speed = 0
  fuzzifier = 1.3
  distances_from_centroids = {}
  for k,val in centroids.items(): #calculate distances for all centroids and save speed in each one
    distances_from_centroids[k] = [numpy.linalg.norm(fetur_vec - val[0]) ** (-2/fuzzifier - 1) , val[1]] # calculate distance from centroids, save speed
  total_dis = 0 
  for v in distances_from_centroids.values():
    total_dis += v[0]
    
  for k,val in distances_from_centroids.items():
    speed += (val[0] * val[1] ) / total_dis #add speed times distance from the centroids
  return speed

helper_df['new_speed'] = [impute_speed(a,b,c,d,e,f,g,h,i,j,k,l,m,n) for a,b,c,d,e,f,g,h,i,j,k,l,m,n in zip(helper_df.delay, helper_df.vehicleSpeed, helper_df.actualDelay, helper_df.max_in_seg, helper_df.min_in_seg, helper_df.stdev_seg, helper_df.distnce_between, helper_df.dis_from_center, helper_df.hour, helper_df.latitude, helper_df.longitude, helper_df.mean_seg, helper_df.prev_time, helper_df.speed_in_seg)]


In [25]:
sqlCtx = SQLContext(sc)
altered_df = sqlCtx.createDataFrame(helper_df)

In [26]:
altered_df = altered_df.drop('speed_in_seg')
altered_df = altered_df.withColumnRenamed('new_speed','speed_in_seg')

In [27]:
from pyspark.ml.regression import GBTRegressionModel
streaming_gbt_model = GBTRegressionModel.load('/user/hive/warehouse/streaming_gbt_model')

In [28]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

altered_df = altered_df.withColumn('time_to_reach_next', lit(0)) #add field to be predicted

ml_df = altered_df

numericCols =['delay','vehicleSpeed','actualDelay','max_in_seg','min_in_seg','stdev_seg','distnce_between','dis_from_center','hour','speed_in_seg','latitude','longitude','mean_seg','prev_time']
assembler = VectorAssembler(inputCols = numericCols, outputCol="features")
ml_df = assembler.transform(ml_df)
ml_df = ml_df.select([ 'features','id'])

In [29]:
gbt_predictions = streaming_gbt_model.transform(ml_df)
preds = gbt_predictions.select("prediction","id")
preds = preds.withColumnRenamed('id','_id')

In [30]:
predicted_df = altered_df.join(preds, altered_df.id == preds._id)
predicted_df = predicted_df.drop(*['time_to_reach_next','_id'])
predicted_df = predicted_df.withColumnRenamed('prediction','time_to_reach_next')

In [31]:
# predicted_df.write.saveAsTable("predicted_df")
predicted_df = spark.read.options(header='true', inferSchema = 'true').load('/user/hive/warehouse/predicted_df')

In [32]:
display(predicted_df[['journeyPatternId','from_to','mean_seg','speed_in_seg','time_to_reach_next']])

journeyPatternId,from_to,mean_seg,speed_in_seg,time_to_reach_next
01231001,4518->509,85.41085899513774,5.186134152703045,88.92921419909192
01400003,6229->6364,57.34652383034542,7.9506655769456,46.12935945464972
00071008,2041->3071,117.68272023233308,7.752837870332268,90.01042876863866
00270004,6130->1262,66.27543659832953,7.944515554296605,70.73132424569441
00070008,416->417,69.3415795586528,5.271007489573638,72.31117903208228
00271001,2379->2380,60.617735303922686,5.209307360653201,60.295493065727065
00070008,423->424,80.28022471910111,7.608975140351362,66.09665904741827
039A1001,1863->1864,48.69296577946769,7.750656394019913,44.04630271138805
00701001,1479->315,160.60432925931352,5.212914751699954,145.39734879163277
00150003,4415->301,244.2998909011561,5.245932866519469,223.242172820139


In [33]:
display(predicted_df[['journeyPatternId','from_to','mean_seg','speed_in_seg','time_to_reach_next']])

journeyPatternId,from_to,mean_seg,speed_in_seg,time_to_reach_next
01231001,4518->509,85.41085899513774,5.186134152703045,88.92921419909192
01400003,6229->6364,57.34652383034542,7.9506655769456,46.12935945464972
00071008,2041->3071,117.68272023233308,7.752837870332268,90.01042876863866
00270004,6130->1262,66.27543659832953,7.944515554296605,70.73132424569441
00070008,416->417,69.3415795586528,5.271007489573638,72.31117903208228
00271001,2379->2380,60.617735303922686,5.209307360653201,60.295493065727065
00070008,423->424,80.28022471910111,7.608975140351362,66.09665904741827
039A1001,1863->1864,48.69296577946769,7.750656394019913,44.04630271138805
00701001,1479->315,160.60432925931352,5.212914751699954,145.39734879163277
00150003,4415->301,244.2998909011561,5.245932866519469,223.242172820139


In [34]:
true_results = target_df[['id','time_to_reach_next']]
true_results = true_results.withColumnRenamed('id','_id')
true_results = true_results.withColumnRenamed('time_to_reach_next','true_time_to_reach_next')
final_df = predicted_df.join(true_results, predicted_df.id == true_results._id)
final_df = final_df.withColumn('squared_error', (final_df.time_to_reach_next - final_df.true_time_to_reach_next)** 2)

In [35]:
mean_time = 0
for x in final_df.select('true_time_to_reach_next').rdd.flatMap(lambda x: x).collect():
  mean_time += x
mean_time = mean_time / len(final_df.select('true_time_to_reach_next').rdd.flatMap(lambda x: x).collect())
filtered_df = final_df.filter(final_df.true_time_to_reach_next < 1200)

In [36]:
total_se = 0
for x in filtered_df.select('squared_error').rdd.flatMap(lambda x: x).collect():
  total_se += x
RMSE = (total_se / len(filtered_df.select('squared_error').rdd.flatMap(lambda x: x).collect())) ** (1/2)

In [37]:
print(RMSE)