In [0]:
import pyspark.sql.functions as F
import pickle
from elasticsearch import Elasticsearch
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.types as T
import json
import pandas as pd
import seaborn as sns
from pyspark.sql.functions import isnan, when, count, col
import datetime
from pyspark.sql import Window
from pyspark.sql.functions import monotonically_increasing_id 

In [0]:
kafka_server = dbutils.widgets.get("kafka server ip")


# Subscribe to a pattern - all vehicleIds
# Subscribe to a pattern - all vehicleIds
# connection to the kafka server
# defines pattern to get all vehicleIds
# start streaming from the earliest offset
kafka_raw_df = spark \
   .readStream \
   .format("kafka") \
   .option("kafka.bootstrap.servers", kafka_server) \ 
   .option("subscribePattern", "vehicleId_.*") \
   .option("startingOffsets", "earliest") \ 
   .load()
 
# Cast to string 
kafka_value_df = kafka_raw_df.selectExpr("CAST(value AS STRING)") 
 
# Load the schema
schema = pickle.load(open("/dbfs/mnt/schema.pkl", "rb"))
 
# Load the streaming dataframe
kafka_df = kafka_value_df \
           .select(F.from_json(F.col("value"), schema=schema).alias('json')) \
           .select("json.*")
 display(kafka_df)

_id,actualDelay,angle,anomaly,areaId,areaId1,areaId2,areaId3,atStop,busStop,calendar,congestion,currentHour,dateType,dateTypeEnum,delay,direction,distanceCovered,ellapsedTime,filteredActualDelay,gridID,journeyPatternId,justLeftStop,justStopped,latitude,lineId,loc,longitude,poiId,poiId2,probability,systemTimestamp,timestamp,vehicleId,vehicleSpeed
List(595a2300e45b4b2ea81ae77a),0,225.0,False,18342,17,286,4585,False,113,List(1499079397000000),False,23,1,WEEKEND,124,0,0.0481675320491737,23000,0,155131,130002,True,False,53.395714,13,"List(List(-6.26408, 53.395714), Point)",-6.26408,0,0,0.0,615949540.0,List(1499079397000),43041,0
List(595a459ae45b4b4bbcea7299),20,-2.0,False,71673,17,279,4479,False,2002,List(1499088263000000),False,14,0,WEEKDAY,-341,0,0.0,20000,0,152101,131002,False,False,53.343094,13,"List(List(-6.272911, 53.343094), Point)",-6.272911,0,0,0.0,624808199.0,List(1499088263000),43041,0
List(595a2310e45b4b2ea81ae8ee),0,0.0,False,18342,17,286,4585,False,113,List(1499079420000000),False,6,0,WEEKDAY,124,0,0.0,0,0,155130,130002,False,False,53.394713,13,"List(List(-6.264065, 53.394713), Point)",-6.264065,0,0,0.0,615965639.0,List(1499079420000),43041,0
List(595a2320e45b4b2ea81aea86),0,45.0,False,18342,17,286,4585,False,113,List(1499079432000000),False,9,0,WEEKDAY,124,0,0.1495683598544187,12000,0,155129,130002,False,False,53.393372,13,"List(List(-6.263889, 53.393372), Point)",-6.263889,0,0,0.0,615981647.0,List(1499079432000),43041,0
List(595a2334e45b4b2ea81aebf5),0,45.0,False,18341,17,286,4585,False,113,List(1499079451000000),False,14,0,WEEKDAY,124,0,0.1163137463429374,19000,0,155129,130002,False,False,53.392327,13,"List(List(-6.263811, 53.392327), Point)",-6.263811,0,0,0.0,616001550.0,List(1499079451000),43041,0
List(595a45b1e45b4b4bbcea7473),0,45.0,False,71673,17,279,4479,False,2002,List(1499088290000000),False,21,0,WEEKDAY,-341,0,0.0812212281606989,27000,0,152101,131002,False,False,53.342971,13,"List(List(-6.271705, 53.342971), Point)",-6.271705,0,0,0.0,624830637.0,List(1499088290000),43041,0
List(595a2346e45b4b2ea81aeda3),0,45.0,False,18341,17,286,4585,False,114,List(1499079471000000),False,20,0,WEEKDAY,124,0,0.0639458847906384,20000,0,155128,130002,False,False,53.391752,13,"List(List(-6.263795, 53.391752), Point)",-6.263795,0,0,0.0,616019665.0,List(1499079471000),43041,0
List(595a45c4e45b4b4bbcea75ee),0,0.0,False,71675,17,279,4479,False,2002,List(1499088304000000),False,1,1,WEEKEND,-369,0,0.0,0,0,153101,131002,False,False,53.343183,13,"List(List(-6.270578, 53.343183), Point)",-6.270578,0,0,0.0,624850186.0,List(1499088304000),43041,0
List(595a235ae45b4b2ea81aef11),0,0.0,False,18341,17,286,4585,False,115,List(1499079491000000),False,1,0,WEEKDAY,98,0,0.0,0,0,155127,130002,False,False,53.389642,13,"List(List(-6.26445, 53.389642), Point)",-6.26445,0,0,0.0,616039568.0,List(1499079491000),43041,0
List(595a2372e45b4b2ea81af0a7),-22,225.0,False,17988,17,280,4496,False,115,List(1499079511000000),False,7,0,WEEKDAY,76,0,0.1794604612206637,20000,0,155126,130002,False,False,53.388083,13,"List(List(-6.26515, 53.388083), Point)",-6.26515,0,0,0.0,616063547.0,List(1499079511000),43041,0


In [0]:
spark.conf.set("spark.sql.session.timeZone", "Europe/Dublin")
data = kafka_df.select(kafka_df["*"],to_timestamp(from_unixtime(substring(kafka_df["timestamp.$numberLong"],0,10))).alias("timeAndDate")) 
data = data.select(data["*"], date_format(data['timeAndDate'], 'E').alias('DOW'), hour(data["timeAndDate"]).cast("long").alias("real_hour"))

In [0]:
temp = Row("current_time_1")
temp1 = temp(dbutils.widgets.get("date & time"))
schema = StructType([StructField("current_time_1", StringType(), False)])
df = spark.createDataFrame([temp1], schema)
current_time_df = df.withColumn("current_time",df['current_time_1'].cast(TimestampType())).drop("current_time_1")
display(current_time_df)

current_time
2017-07-03T17:00:00.000+0100


In [0]:
time_settings = current_time_df.withColumn("DOW", date_format(current_time_df['current_time'], 'E')).withColumn("hour", hour(current_time_df["current_time"]).cast("long"))

In [0]:
DOW = time_settings.collect()[0][1]
hour = time_settings.collect()[0][2]
if hour<6:
  part_of_the_day = 'early_morning'
elif hour<12:
  part_of_the_day = 'morning'
elif hour<18:
  part_of_the_day = 'noon'
else:
  part_of_the_day = 'evening'

In [0]:
data_current_time = data.crossJoin(current_time_df)
data_current_time = data_current_time.filter((F.col('timeAndDate') <= F.col('current_time')) & (F.col('timeAndDate') >= F.col('current_time')-F.expr('INTERVAL 3 HOURS')) & (F.col("atStop") == 'true')).withColumn("datetime_diff_sec", (F.col("current_time").cast("long") - F.col("timeAndDate").cast("long")))

In [0]:
query = (
  data_current_time.writeStream
    .format("memory")
    .queryName("Dublin_BUS")
    .outputMode("append")
    .start())

In [0]:
# Create a view or table
temp_table_name = "Dublin_BUS"
data_current_time.createOrReplaceTempView(temp_table_name)

In [0]:
max_journey = spark.sql('''
SELECT journeyPatternId, MAX(timeAndDate) AS timeAndDate
FROM Dublin_BUS
GROUP BY journeyPatternId''')

In [0]:
query = (
  max_journey.writeStream
    .format("memory")
    .queryName("max_journey")
    .outputMode("complete")
    .start())

In [0]:
real_time_bus_locations = spark.sql('''
SELECT t1.journeyPatternId, t1.busStop, t1.timeAndDate, t1.datetime_diff_sec, t1.longitude as bus_current_longitude, t1.latitude as bus_current_latitude
FROM
(SELECT *
FROM Dublin_BUS) t1
INNER JOIN
(SELECT *
FROM max_journey) t2
ON t1.journeyPatternId=t2.journeyPatternId AND t1.timeAndDate=t2.timeAndDate
''')

In [0]:
IP = '10.0.0.21'
es = Elasticsearch([{'host': IP}])

In [0]:
def read_from_elastic(index, query="", scroll_size="10000", array_field=""):
    return spark.read\
                .format("org.elasticsearch.spark.sql")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.nodes",IP)\
                .option("es.nodes.client.only", "false")\
                .option("pushdown", "true")\
                .option("es.query", query)\
                .option("es.scroll.size", scroll_size)\
                .option("es.scroll.keepalive", "120m")\
                .option("es.read.field.as.array.include", array_field)\
                .load(index)

In [0]:
bus_stops_estimated_time = read_from_elastic('bus_stops_estimated_time_arrival')

In [0]:
bus_stops_estimated_time = bus_stops_estimated_time.filter((F.col('DOW') == DOW) & (F.col('part_of_the_day') == part_of_the_day))

In [0]:
real_time_to_origin_stop = real_time_bus_locations.join(bus_stops_estimated_time, (real_time_bus_locations["busStop"] == bus_stops_estimated_time["from_busStop"]) & (real_time_bus_locations["journeyPatternId"] == bus_stops_estimated_time["journeyPatternId"])).select(real_time_bus_locations["journeyPatternId"], bus_stops_estimated_time["median_time_sec"], real_time_bus_locations["datetime_diff_sec"], bus_stops_estimated_time["to_busStop"],real_time_bus_locations["bus_current_longitude"], real_time_bus_locations["bus_current_latitude"])

In [0]:
real_time_to_origin_stop = real_time_to_origin_stop.withColumn('real_time_to_origin_stop', F.col("median_time_sec")-F.col("datetime_diff_sec"))

In [0]:
real_time_to_origin_stop = real_time_to_origin_stop.filter(real_time_to_origin_stop["real_time_to_origin_stop"] >= 0).drop("datetime_diff_sec", "median_time_sec").withColumnRenamed('to_busStop', 'from_busStop')

In [0]:
origin_longitude = float(dbutils.widgets.get("origin longitude"))
origin_latitude = float(dbutils.widgets.get("origin latitude"))
destination_longitude = float(dbutils.widgets.get("destination longitude"))
destination_latitude = float(dbutils.widgets.get("destaniation latitude"))

In [0]:
origin_destination_point = sc.parallelize([[origin_latitude, origin_longitude, destination_latitude, destination_longitude]]).toDF(("origin_latitude", "origin_longitude", "destination_latitude", "destination_longitude"))

In [0]:
df_bus_stops = read_from_elastic('bus_stops_data')

In [0]:
# Create a view or table
temp_table_name = "dublin_bus_stops_table"
df_bus_stops.createOrReplaceTempView(temp_table_name)

In [0]:
dublin_bus_stops_table = spark.sql('''
SELECT 
   INT(split(Unique_British_Isles_Id, 'DB')[1]) AS busStop, 
   CONCAT(Name_without_locality, ", ", Name) AS busStopName, 
   POINT_X AS longitude, 
   POINT_Y AS latitude
FROM dublin_bus_stops_table
''')

In [0]:
distance_tables_to_function = dublin_bus_stops_table.crossJoin(origin_destination_point)

In [0]:
distance_tables_origin = distance_tables_to_function.withColumn("a", (
        F.pow(F.sin(F.radians(F.col("latitude") - F.col("origin_latitude")) / 2), 2) +
        F.cos(F.radians(F.col("origin_latitude"))) * F.cos(F.radians(F.col("latitude"))) *
        F.pow(F.sin(F.radians(F.col("longitude") - F.col("origin_longitude")) / 2), 2)
    )).withColumn("origin_distance_meters", F.atan2(F.sqrt(F.col("a")), F.sqrt(-F.col("a") + 1)) * 12742000)

In [0]:
distance_tables_origin_destination = distance_tables_origin.withColumn("b", (
        F.pow(F.sin(F.radians(F.col("latitude") - F.col("destination_latitude")) / 2), 2) +
        F.cos(F.radians(F.col("destination_latitude"))) * F.cos(F.radians(F.col("latitude"))) *
        F.pow(F.sin(F.radians(F.col("longitude") - F.col("destination_longitude")) / 2), 2)
    )).withColumn("destination_distance_meters", F.atan2(F.sqrt(F.col("b")), F.sqrt(-F.col("b") + 1)) * 12742000)

In [0]:
distance_tables_with_time = distance_tables_origin_destination.withColumn('origin_walking_time_seconds',distance_tables_origin_destination["origin_distance_meters"]).withColumn('destination_walking_time_seconds',distance_tables_origin_destination["destination_distance_meters"])
columns_to_drop = ["a", "b"]
distance_time_walking_bus_stops = distance_tables_with_time.drop(*columns_to_drop)


In [0]:
origin_join = distance_time_walking_bus_stops.select(distance_time_walking_bus_stops.busStop, distance_time_walking_bus_stops.busStopName, distance_time_walking_bus_stops.longitude, distance_time_walking_bus_stops.latitude, distance_time_walking_bus_stops.origin_walking_time_seconds)
origin_join = origin_join.withColumnRenamed("busStopName", "originBusStopName")\
                         .withColumnRenamed("longitude", "originLongitude")\
                         .withColumnRenamed("latitude", "originLatitude")

In [0]:
destination = distance_time_walking_bus_stops.select(distance_time_walking_bus_stops.busStop, distance_time_walking_bus_stops.busStopName, distance_time_walking_bus_stops.longitude, distance_time_walking_bus_stops.latitude, distance_time_walking_bus_stops.destination_walking_time_seconds)
destination_join = destination.withColumnRenamed("busStopName", "destinationBusStopName")\
                         .withColumnRenamed("longitude", "destinationLongitude")\
                         .withColumnRenamed("latitude", "destinationLatitude")

In [0]:
joined_table = bus_stops_estimated_time.join(origin_join, bus_stops_estimated_time.from_busStop == origin_join.busStop).drop("busStop")
joined_table = joined_table.join(destination_join, bus_stops_estimated_time.to_busStop == destination_join.busStop).drop("busStop")

In [0]:
joined_table = joined_table.join(real_time_to_origin_stop, ['journeyPatternId', 'from_busStop'])

In [0]:
joined_table = joined_table.filter(F.col("origin_walking_time_seconds")<F.col("real_time_to_origin_stop"))

In [0]:
joined_table = joined_table.withColumn("time_to_wait", F.col("real_time_to_origin_stop") - F.col("origin_walking_time_seconds"))

In [0]:
final_table_to_user = joined_table.withColumn('final_time_estimation', joined_table.median_time_sec + joined_table.origin_walking_time_seconds + joined_table.
destination_walking_time_seconds + joined_table.time_to_wait)

In [0]:
final_table_to_user2 = final_table_to_user.select(round(final_table_to_user.final_time_estimation/60, 3).alias("estimated_total_travel_time_min"), final_table_to_user.line ,final_table_to_user.originBusStopName.alias("from_bus_stop"), 
round(final_table_to_user.origin_walking_time_seconds/60,3).alias("walking_time_to_bus_stop"), round(final_table_to_user.time_to_wait/60,3).alias("time_to_wait_in_bus_stop"), round(final_table_to_user.median_time_sec/60, 3).alias("bus_travel_time"), final_table_to_user.destinationBusStopName.alias("to_bus_stop"), round(final_table_to_user.destination_walking_time_seconds/60, 3).alias("walking_time_to_final_destination"))

In [0]:
query = (
  final_table_to_user2.writeStream
    .format("memory")
    .queryName("final_table_to_user2")
    .outputMode("append")
    .start())

In [0]:
# Create a view or table
temp_table_name = "final_table_to_user2"
final_table_to_user2.createOrReplaceTempView(temp_table_name)

In [0]:
best_per_line = spark.sql('''
SELECT line, MIN(estimated_total_travel_time_min) AS estimated_total_travel_time_min
FROM final_table_to_user2
GROUP BY line''')

In [0]:
query = (
  best_per_line.writeStream
    .format("memory")
    .queryName("best_per_line")
    .outputMode("complete")
    .start())

In [0]:
%sql
SELECT *
FROM
(SELECT *
FROM final_table_to_user2) t1
INNER JOIN
(SELECT *
FROM best_per_line) t2
ON t1.line=t2.line AND t1.estimated_total_travel_time_min=t2.estimated_total_travel_time_min

estimated_total_travel_time_min,line,from_bus_stop,walking_time_to_bus_stop,time_to_wait_in_bus_stop,bus_travel_time,to_bus_stop,walking_time_to_final_destination,line.1,estimated_total_travel_time_min.1
84.377,40,"Wigan Road, Whitworth Road",15.717,46.25,10.5,"Cathedral Street, O'Connell St",11.91,40,84.377
60.381,83,"Fairfield Road, Botanic Rd",7.294,2.139,0.967,"Botanic Gardens, Botanic Rd",49.981,83,60.381
43.717,11,"Home Farm Road, Mobhi Road",15.922,0.228,18.0,"Abbey Street, O'Connell St",9.567,11,43.717
551.05,270,"Mill View, Dublin Rd",220.181,124.819,9.317,"Hunters Run, Littlepace Rd",196.733,270,551.05
121.159,27,"Ardee St, Cork Street",60.27,50.33,5.967,"Carnegie Centre, Lord Edward St",4.592,27,121.159
66.281,39,"Stanhope Street Convent, Stoneybatter",37.621,2.846,1.067,"Aughrim Street, Manor Street",24.748,39,66.281
106.415,15,"Maypark, Malahide Road",66.441,11.942,16.917,"Quay Wall (Stop EA), Eden Quay",11.115,15,106.415
89.559,13,"Bridgefoot Street, Thomas Street",51.761,28.539,4.667,"Carnegie Centre, Lord Edward St",4.592,13,89.559


In [0]:
tinder_users = sc.parallelize([["John", 28, "https://www.instagram.com/discoverdublin/", "2017-07-04 14:20:00", -6.9358641091910585, 53.97606360456374]]).toDF(("Name", "Age", "Instagram", "estimated_arrival_time", "destination_longitude", "destination_latitude"))

In [0]:
IP = '10.0.0.21'
es = Elasticsearch([{'host': IP}])

def write_table_to_elastic(df, schema, table_name: str):
    es.indices.create(index=table_name, ignore=400, body=schema)
    df.write.format("org.elasticsearch.spark.sql")\
        .option("es.resource", table_name)\
        .option("es.nodes.wan.only","true")\
        .option("es.port","9200")\
        .option("es.nodes",IP)\
        .option("es.nodes.client.only", "false")\
        .option("es.batch.write.retry.count", "6")\
        .mode("append")\
        .save()

In [0]:
#write areas_table to Elastic
SCEHMA = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
          "Instagram" : { "type": "url.original" }
        }
    }
}
#write_table_to_elastic(tinder_users, SCEHMA, "tinder_users")

In [0]:
tinder_users = read_from_elastic('tinder_users')

In [0]:
tinder_users = tinder_users.crossJoin(origin_destination_point.withColumnRenamed("destination_latitude", 'current_user_destination_latitude').withColumnRenamed("destination_longitude", 'current_user_destination_longitude')).drop("origin_latitude", 
"origin_longitude")

In [0]:
tinder_users = tinder_users.withColumn("a", (
        F.pow(F.sin(F.radians(F.col("current_user_destination_latitude") - F.col("destination_latitude")) / 2), 2) +
        F.cos(F.radians(F.col("destination_latitude"))) * F.cos(F.radians(F.col("current_user_destination_latitude"))) *
        F.pow(F.sin(F.radians(F.col("current_user_destination_longitude") - F.col("destination_longitude")) / 2), 2)
    )).withColumn("destination_distance_meters", F.atan2(F.sqrt(F.col("a")), F.sqrt(-F.col("a") + 1)) * 12742000)

In [0]:
tinder_users = tinder_users.crossJoin(current_time_df).filter((F.col("destination_distance_meters")<=1000) & \
               (F.to_date(F.col("estimated_arrival_time"))==F.to_date(F.col("current_time")))) \
                .select("Name", "Age", "Instagram", "estimated_arrival_time")

In [0]:
Tinder_mode = dbutils.widgets.get("Tinder Mode")
if Tinder_mode=='ON':
  display(tinder_users.select("Name", "Age", "Instagram"))

Name,Age,Instagram
Alice,31,https://www.instagram.com/discoverdublin/
