In [2]:
sc

<pyspark.context.SparkContext at 0x7f16fef41e10>

In [76]:
sc.applicationId

u'application_1522648856070_0188'

In [4]:
import sys
sys.path.append("/usr/lib/python2.7/site-packages")

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import datetime
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import col, asc, desc,log

In [5]:
# For displaying multiple outputs
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

## Load the VTS data for the 365R/1-All Days schedule

In [8]:
sqlContext.sql("use bmtc_eta_default")
sqlContext.sql("show tables").show()

DataFrame[]

+-----------------+-----------+
|        tableName|isTemporary|
+-----------------+-----------+
|         vts_365r|      false|
|vts_365r_filtered|      false|
+-----------------+-----------+



In [7]:
vts_365r_df = sqlContext.sql("select * from vts_365r_filtered")

In [9]:
vts_365r_df.show(2)

+----------+---------+----------+------------+--------------------+---------+---------+-----------------+-------------+-------+
|        id|device_id|ign_status|acc_distance|            ist_date|      lat|longitude|vehicle_direction|ist_date_part|isvalid|
+----------+---------+----------+------------+--------------------+---------+---------+-----------------+-------------+-------+
|5867192541|150814730|         0|     9201134|2016-07-01 00:12:...|12.790125|77.706451|             40.0|   2016-07-01|      1|
|5867355190|150814730|         0|     9201134|2016-07-01 00:27:...|12.790125|77.706451|             40.0|   2016-07-01|      1|
+----------+---------+----------+------------+--------------------+---------+---------+-----------------+-------------+-------+
only showing top 2 rows



### Load static data into Spark data frames

In [10]:
sqlContext.sql("use bmtc")

# Get the route map
route_map_df = sqlContext.sql("select route_id,start_bus_stop_id,end_bus_stop_id,\
                                      distance,time_to_travel,bus_stop_order,status \
                               from route_map")

# Get the route_point
route_point_df = sqlContext.sql("select route_id, route_order, \
                                bus_stop_id from route_point")

bus_stop_df = sqlContext.sql("select bus_stop_id, bus_stop_name, \
                              latitude_current,longitude_current from bus_stop")

# Drop corrupted locations
bus_stop_df = bus_stop_df.na.drop(subset=["latitude_current"])
bus_stop_df = bus_stop_df.na.drop(subset=["longitude_current"])

# Join the bus stop ID with lat,long
route_point_joined_df = route_point_df.join(bus_stop_df,\
                                            ["bus_stop_id"],\
                                            "left_outer")

form_four_df = sqlContext.sql("select form_four_id,form_four_name,schedule_number_id,\
                                      schedule_number_name,no_of_trips,start_time,\
                                      route_id,route_number,toll_zone,\
                                      area_limit,total_km,total_dead_km,\
                                      actual_km,total_running_time,total_break_time,\
                                      total_steering_time,spread_over_hours,ot_hours \
                               from form_four")

schedule_df = sqlContext.sql('select * from schedule')

schedule_df = schedule_df.select("schedule_id","schedule_number", \
                                 "depot_id","route_id","schedule_type")

schedule_details_df = sqlContext.sql('select * from schedule_details')

schedule_details_df = schedule_details_df.select("schedule_details_id", \
                                                 "form_four_id","schedule_number","number_of_trips",\
                                                 "trip_number","trip_type","start_point","end_point", \
                                                 "route_number_id", "route_number","route_direction",\
                                                 "distance","start_time","end_time",\
                                                 "running_time","break_type_id","shift_type_id","is_dread_trip")

DataFrame[]

In [11]:
sqlContext.sql("use bmtcwaybill")

# Get the waybill details, and clean it
waybill_trip_details_df = sqlContext.sql("select id,waybill_id,duty_dt,device_id,\
                                          status,schedule_type_id,schedule_no,schedule_name,\
                                          service_type,service_name,trip_number,\
                                          start_point,start_bus_stop_name,end_point,end_bus_stop_name,\
                                          route_id,route_no,distance,start_time,\
                                          act_start_time,etm_start_time,end_time,act_end_time,\
                                          etm_end_time,running_time,is_dread_trip \
                                          from waybill_trip_details")

waybill_trip_details_filtered_df = waybill_trip_details_df.filter(((year(waybill_trip_details_df.duty_dt) == 2016) & 
                                                                   (month(waybill_trip_details_df.duty_dt) >= 6)) |
                                                                  (year(waybill_trip_details_df.duty_dt) == 2017))

waybill_trip_details_filtered_365_df = waybill_trip_details_filtered_df.filter(col('schedule_no')==3037)

DataFrame[]

## Explore the 365R/1-All Days schedule

In [12]:
# Take schedule 365R/1-All Days, and we see that it takes two schedule no, which corresponds to two form four IDs
waybill_trip_details_filtered_365_df.filter(col('schedule_name')=='365R/1-All Days').\
select("schedule_no").distinct().show()

+-----------+
|schedule_no|
+-----------+
|       3037|
+-----------+



In [13]:
form_four_df.filter(col('form_four_id').isin([3037])).show()

+------------+--------------+------------------+--------------------+-----------+----------+--------+------------+---------+----------+--------+-------------+---------+------------------+----------------+-------------------+-----------------+--------+
|form_four_id|form_four_name|schedule_number_id|schedule_number_name|no_of_trips|start_time|route_id|route_number|toll_zone|area_limit|total_km|total_dead_km|actual_km|total_running_time|total_break_time|total_steering_time|spread_over_hours|ot_hours|
+------------+--------------+------------------+--------------------+-----------+----------+--------+------------+---------+----------+--------+-------------+---------+------------------+----------------+-------------------+-----------------+--------+
|        3037|             1|              2828|     365R/1-All Days|         13|  07:45:00|   20239|        null|        0|         0|   185.4|          0.4|    185.0|              9:55|            null|               11:0|             12:0|  

In [14]:
# Check the waybill data having schedule no listed above
waybill_trip_details_filtered_365_df.filter((col('schedule_no')==3037) & (col('duty_dt') == '2017-01-02')).\
                                                        select('schedule_no','trip_number',\
                                                             'start_point','start_bus_stop_name',\
                                                             'end_point','end_bus_stop_name',\
                                                             'route_id','is_dread_trip').orderBy('trip_number').show()

+-----------+-----------+-----------+--------------------+---------+--------------------+--------+-------------+
|schedule_no|trip_number|start_point| start_bus_stop_name|end_point|   end_bus_stop_name|route_id|is_dread_trip|
+-----------+-----------+-----------+--------------------+---------+--------------------+--------+-------------+
|       3037|          1|       1291| Depot-34 (Kottanur)|    10793|       Depot-34 Gate|    9197|            1|
|       3037|          1|       1291| Depot-34 (Kottanur)|    10793|       Depot-34 Gate|    9197|            1|
|       3037|          2|      10793|       Depot-34 Gate|    10094| Basavanapura Church|   20240|            0|
|       3037|          3|      10093| Basavanapura Church|     8456|Kempegowda Bus St...|   29325|            0|
|       3037|          4|        160|Kempegowda Bus St...|    10094| Basavanapura Church|   20239|            0|
|       3037|          5|      10093| Basavanapura Church|     8456|Kempegowda Bus St...|   2932

In [15]:
# Get the schedule details
schedule_details_df.filter((col("schedule_number")==2828) & (col('form_four_id')==3037) ).show()

+-------------------+------------+---------------+---------------+-----------+---------+-----------+---------+---------------+------------+---------------+--------+----------+--------+------------+-------------+-------------+-------------+
|schedule_details_id|form_four_id|schedule_number|number_of_trips|trip_number|trip_type|start_point|end_point|route_number_id|route_number|route_direction|distance|start_time|end_time|running_time|break_type_id|shift_type_id|is_dread_trip|
+-------------------+------------+---------------+---------------+-----------+---------+-----------+---------+---------------+------------+---------------+--------+----------+--------+------------+-------------+-------------+-------------+
|              41110|        3037|           2828|             13|          1|        3|       1291|    10793|           9197|        null|           null|     0.2|  07:45:00|07:47:00|    00:02:00|            3|            1|            1|
|              41111|        3037|      

## Feature Extraction

### Extract the route ID as part of 365R/1-All Days

In [16]:
schedule_365_route_ids_list = schedule_details_df.filter((col("schedule_number")==2828) & (col('form_four_id')==3037)). \
select('route_number_id').rdd.map(lambda x: x[0]).distinct().collect()
schedule_365_route_ids_list

[20240, 20241, 29325, 9196, 9197, 20239]

### Select all the bus stops on the above selected routes for 365R/1-All Days

In [17]:
bus_stops_on_365R_df = route_point_df.where(col('route_id').isin(schedule_365_route_ids_list))

In [18]:
bus_stops_on_365R_df.count()
bus_stops_on_365R_df.show()

128

+--------+-----------+-----------+
|route_id|route_order|bus_stop_id|
+--------+-----------+-----------+
|    9196|          1|      10795|
|    9196|          2|      10381|
|    9196|          3|      10380|
|    9196|          4|       1291|
|    9197|          1|       1291|
|    9197|          2|      10380|
|    9197|          3|      10381|
|    9197|          4|      10793|
|   20239|          1|        160|
|   20239|          2|       5841|
|   20239|          3|         33|
|   20239|          4|       5840|
|   20239|          5|       8455|
|   20239|          6|        124|
|   20239|          7|         61|
|   20239|          8|        126|
|   20239|          9|        158|
|   20239|         10|        159|
|   20239|         11|        353|
|   20239|         12|       8579|
+--------+-----------+-----------+
only showing top 20 rows



In [19]:
bus_stops_on_365R_joined_df = bus_stops_on_365R_df.join(bus_stop_df,\
                                            ["bus_stop_id"],\
                                            "left_outer")
bus_stops_on_365R_joined_df.show()
bus_stops_on_365R_joined_df.count()

+-----------+--------+-----------+--------------------+----------------+-----------------+
|bus_stop_id|route_id|route_order|       bus_stop_name|latitude_current|longitude_current|
+-----------+--------+-----------+--------------------+----------------+-----------------+
|      10795|    9196|          1|       Depot-34 Gate|     12.87021282|       77.5857047|
|      10381|    9196|          2|               Dummy|     12.87132242|      77.58583613|
|      10380|    9196|          3|           Connector|     12.87155318|       77.5860849|
|       1291|    9196|          4| Depot-34 (Kottanur)|     12.87193846|      77.58619537|
|       1291|    9197|          1| Depot-34 (Kottanur)|     12.87193846|      77.58619537|
|      10380|    9197|          2|           Connector|     12.87155318|       77.5860849|
|      10381|    9197|          3|               Dummy|     12.87132242|      77.58583613|
|      10793|    9197|          4|       Depot-34 Gate|     12.87018994|      77.58574426|

128

## We now inflate the vts_365r_filtered table with the joined bus stop table we have built

In [47]:
# Finall!! 
# Enable cross joins
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [50]:
vts_365r_joined_df = vts_365r_df.join(bus_stops_on_365R_joined_df)

### Figure out which row is within the geo-fence of one of the bus stops

In [51]:
from math import *

pi_broabcast = sc.broadcast(pi)

In [52]:
pi_broabcast.value

3.141592653589793

In [53]:
# Return the Haversine distance between two geo-points
# The returned value is in metres
def distBetweenGeoPoints(lat1, long1, lat2, long2):
    dist =  ((acos(sin((lat1*pi_broabcast.value/180)) * 
                   sin((lat2*pi_broabcast.value/180))+cos((lat1*pi_broabcast.value/180)) * 
                   cos((lat2*pi_broabcast.value/180)) * cos(((long1 - long2)* 
                                               pi_broabcast.value/180))))*180/pi_broabcast.value*60*1.1515*1.60934)*1000
    return dist

# The returned value is in metres

In [68]:
# Test if this is working well
# Copying coordinates from https://andrew.hedges.name/experiments/haversine/
# and verifying
ret = distBetweenGeoPoints(38.898556, -77.037852, 38.897147, -77.043934)
ret
type(ret)
# We expect a distance of 549 metres

549.1280124617311

float

In [55]:
from pyspark.sql.functions import udf

distBetweenGeoPointsUDF = udf(distBetweenGeoPoints)

In [56]:
vts_365r_joined_with_dist_df = vts_365r_joined_df.withColumn("distance", 
                                                             distBetweenGeoPointsUDF(vts_365r_joined_df.lat, 
                                                                                     vts_365r_joined_df.longitude,
                                                                                     vts_365r_joined_df.latitude_current, 
                                                                                     vts_365r_joined_df.longitude_current))

In [71]:
vts_365r_joined_with_dist_df.show(2)

+----------+---------+----------+------------+--------------------+---------+---------+-----------------+-------------+-------+-----------+--------+-----------+-------------+----------------+-----------------+------------------+
|        id|device_id|ign_status|acc_distance|            ist_date|      lat|longitude|vehicle_direction|ist_date_part|isvalid|bus_stop_id|route_id|route_order|bus_stop_name|latitude_current|longitude_current|          distance|
+----------+---------+----------+------------+--------------------+---------+---------+-----------------+-------------+-------+-----------+--------+-----------+-------------+----------------+-----------------+------------------+
|5867192541|150814730|         0|     9201134|2016-07-01 00:12:...|12.790125|77.706451|             40.0|   2016-07-01|      1|      10795|    9196|          1|Depot-34 Gate|     12.87021282|       77.5857047|15832.192804946722|
|5867192541|150814730|         0|     9201134|2016-07-01 00:12:...|12.790125|77.7064

In [82]:
# Perfect!!
# Learning: need to do cache in this case before applying a filter
vts_365r_joined_with_dist_df.cache()

DataFrame[id: bigint, device_id: string, ign_status: tinyint, acc_distance: string, ist_date: string, lat: double, longitude: double, vehicle_direction: string, ist_date_part: string, isvalid: string, bus_stop_id: int, route_id: int, route_order: int, bus_stop_name: string, latitude_current: double, longitude_current: double, distance: string]

In [83]:
# Filter by geo-fence radius
geofence_radius = 50
vts_365r_joined_within_stop_df = vts_365r_joined_with_dist_df.filter(vts_365r_joined_with_dist_df.distance < 50)
# The above code works if we have cached the vts_365r_joined_with_dist_df dataframe

In [None]:
vts_365r_joined_within_stop_df.show()

In [58]:
# code to truncate milliseconds from the time stamp
from pyspark.sql.functions import unix_timestamp, from_unixtime

timeFmt_Ist = "yyyy-MM-dd HH:mm:ss.S"
timeFmt1 = "yyyy-MM-dd HH:mm:ss"
istDateTrunc = from_unixtime(unix_timestamp('ist_date', 
                                            format=timeFmt_Ist), format=timeFmt1)

vts_365r_within_stop_df = vts_365r_joined_within_stop_df.withColumn("ist_timestamp", istDateTrunc)

In [59]:
# Finding the earliest entry into a bus stop geo-fence by a device
vts_365r_seconds_within_stop_df = vts_365r_within_stop_df.withColumn("ist_timestamp_seconds", 
                                                                     unix_timestamp("ist_timestamp", 
                                                                                    format=timeFmt1))

In [None]:
vts_365r_min_within_stop_df = vts_365r_seconds_within_stop_df.groupby("device_id", "schedule_no", 
                                                                      "trip_number", "route_id", 
                                                                      "route_bus_stop_id","route_order").\
min("ist_timestamp_seconds").withColumnRenamed("min(ist_timestamp_seconds)", "ist_timestamp_seconds")

In [None]:
vts_365r_min_within_stop_df.show()