In [1]:
sc

<pyspark.context.SparkContext at 0x7fcb40149e10>

In [2]:
scheduleNo=3037

In [3]:
strYear="2016"
strMonth="09"
strDay="01"

In [4]:
dateLit=strYear+strMonth+strDay
dateLit

'20160901'

In [5]:
dateHyphen=strYear+"-"+strMonth+"-"+strDay
dateHyphen

'2016-09-01'

In [6]:
monthLit= "Sep"+strYear
monthLit

'Sep2016'

In [7]:
queryYear = int(strYear)
queryMonth = int(strMonth)
queryDay = int(strDay)

In [8]:
queryYear

2016

In [9]:
queryMonth

9

In [10]:
queryDay

1

### Load the bus stop table having geo location

In [11]:
sqlContext.sql("use bmtcvts")
sqlContext.sql("show tables").show(10, truncate=False)

+----------------------+-----------+
|tableName             |isTemporary|
+----------------------+-----------+
|vts_data_sept         |false      |
|vts_parse_data        |false      |
|vts_parse_data_1      |false      |
|vts_parse_data_parquet|false      |
+----------------------+-----------+



In [14]:
vts_df = sqlContext.sql("select id,device_id,acc_distance,\
                                lat,longitude,\
                                ist_date,ign_status,\
                                vehicle_direction, speed_kmph\
                        from vts_parse_data_parquet")

In [16]:
# Remove the corrupted Lat and Long values
filtered_vts_df = vts_df.where((vts_df.lat != 0) & (vts_df.longitude != 0) & (vts_df.ign_status == 1))

### Algorithm outline for calculating bus stop matrix having average time taken between bus stops

1) Source WayBill_Trip_Details table<br>
2) Get distinct device_id and route_id combination from WayBill_Trip_Details table <br>
3) Select the bus Stops for the route_id from the Route_Point table and get the distinct device id-bus stop combination <br>
4) Attach the bus stop lat long values with device id-bus stop by joining with bus stop table  <br>
5) Join dataframe in step 3 with Vts_Parse_Data table based on device id. <br>
6) After step 5, we would be having lat long values of bus entering the geo fence of the bus stop <br>
7) Calculate the average time between bus stops for this route <br>
8) Similarly do for all the routes<br>
9) Finally make a bus stop matrix<br>
10) 24(hours)X7(days)X12(months) such matrix should be formed<br>

### 1) Source WayBill_Trip_Details table

In [23]:
import datetime
from pyspark.sql.functions import year, month, dayofmonth

sqlContext.sql("use bmtc")
waybill_trip_details_df = sqlContext.sql("select id,waybill_id,duty_dt,device_id,\
                                          status,schedule_type_id,\
                                          schedule_no,schedule_name,service_type,service_name,\
                                          trip_number,trip_status,start_point,start_bus_stop_name,\
                                          end_point,end_bus_stop_name,route_id,route_no,\
                                          distance,start_time,\
                                          act_start_time,etm_start_time,end_time,act_end_time,\
                                          etm_end_time,running_time,vehicle_no,duty_start_date, is_dread_trip \
                                          from waybill_trip_details")
waybill_trip_details_df = waybill_trip_details_df.withColumnRenamed("device_id","device_id_waybill")
waybill_trip_details_df = waybill_trip_details_df.withColumnRenamed("route_id","route_id_waybill")


### Filter data from waybill_trip_details for 1 Day

In [18]:
from pyspark.sql.functions import col,unix_timestamp,abs, from_unixtime, avg, count, sum, desc, date_format, lit, concat

In [24]:
# waybill trip details by duty date for september 01
waybill_trip_details_selected_df = waybill_trip_details_df.filter((year(waybill_trip_details_df.etm_start_time) == queryYear) & \
                                                              (month(waybill_trip_details_df.etm_start_time)==queryMonth) & \
                                                              (dayofmonth(waybill_trip_details_df.etm_start_time)==queryDay))

### Filter data from waybill_trip_details for the given schedule

In [21]:
waybill_trip_details_selected_df = waybill_trip_details_selected_df.where((col("status") != "NEW") \
                                                                          & (col("status") != "325-703-380") \
                                                                          & (col("status") != "325-703-3144") \
                                                                          & (col("status") != "325-703-560") \
                                                                          & (col("status") != "325-702-741") \
                                                                          & (col("status") != "325-702-835"))

In [76]:
waybill_trip_details_selected_df = waybill_trip_details_selected_df.filter(col("schedule_no")==scheduleNo)

In [77]:
waybill_trip_details_selected_df = waybill_trip_details_selected_df\
                            .select("id","waybill_id","duty_dt","device_id_waybill",\
                                 "schedule_no","route_id_waybill","trip_number", "is_dread_trip", \
                                 "start_time", "end_time", "act_start_time", "act_end_time", "etm_start_time", "etm_end_time", \
                                   "start_point", "end_point")

### Find the time window for the route from waybill_trip_details table

In [78]:
timeFmt = "HH:mm:ss"
timeDiff= abs(unix_timestamp('end_time', format=timeFmt)\
            - unix_timestamp('start_time', format=timeFmt))
endTimeLimit=from_unixtime(unix_timestamp('end_time', format=timeFmt)\
                           + timeDiff, format=timeFmt)

In [79]:
dateTimeFmt = "yyyy-MM-dd HH:mm:ss.S"
dateTimeFmt1 = "yyyy-MM-dd"
timestampFormat = "HH:mm:ss"
etm_start_time_date_part=from_unixtime(unix_timestamp('etm_start_time', format=dateTimeFmt), format=dateTimeFmt1)
etm_start_time_timestamp_part=from_unixtime(unix_timestamp('etm_start_time', format=dateTimeFmt), format=timestampFormat)
etm_end_time_timestamp_part=from_unixtime(unix_timestamp('etm_end_time', format=dateTimeFmt), format=timestampFormat)

### Define the window limit as (startTime, (endTime + (endTime-startTime)))
#### This is done to factor in any delays in trip completion. Assumption is that a trip will be completed atleast in twice the time that it is supposed to take. 

In [80]:
waybill_trip_details_selected_df=waybill_trip_details_selected_df.withColumn("end_time_limit", endTimeLimit)
waybill_trip_details_selected_df=waybill_trip_details_selected_df.withColumn("business_date", etm_start_time_date_part)

In [81]:
waybill_trip_details_selected_df=waybill_trip_details_selected_df.withColumn("etm_start_timestamp", etm_start_time_timestamp_part)
waybill_trip_details_selected_df=waybill_trip_details_selected_df.withColumn("etm_end_timestamp", etm_end_time_timestamp_part)

In [82]:
sqlContext.registerDataFrameAsTable(waybill_trip_details_selected_df, "way_bill_data_table")
windowQuery = """SELECT id, waybill_id, duty_dt, device_id_waybill, schedule_no, route_id_waybill, trip_number,
is_dread_trip, start_point, end_point,
concat(business_date," ",(CASE 
    WHEN (act_start_time == 'null' or act_start_time == '00:00:00'
        or act_end_time == 'null' or act_end_time == '00:00:00' or act_start_time > act_end_time) 
    THEN etm_start_timestamp 
    ELSE act_start_time 
END)) as window_start_time,
concat(business_date," ",(CASE 
    WHEN (act_start_time == 'null' or act_start_time == '00:00:00'
        or act_end_time == 'null' or act_end_time == '00:00:00' or act_start_time > act_end_time) 
    THEN etm_end_timestamp 
    ELSE act_end_time
END)) as window_end_time
FROM way_bill_data_table"""
waybill_trip_details_sample_df = sqlContext.sql(windowQuery)

In [83]:
windowStartTimeFmt1 = "yyyy-MM-dd HH:mm:ss"
windowExtension1=900
windowExtensionFunction1=from_unixtime(unix_timestamp("window_end_time", format=windowStartTimeFmt1)\
                           + windowExtension1, format=windowStartTimeFmt1)

In [84]:
waybill_trip_details_sample_df= waybill_trip_details_sample_df.withColumn("window_end_time",windowExtensionFunction1)

### Filter data from vts_parse_data table for 1 day

In [85]:
vts_sample_df = filtered_vts_df.filter((year(filtered_vts_df.ist_date) == queryYear) & \
                                     (month(filtered_vts_df.ist_date) == queryMonth) & \
                                     (dayofmonth(filtered_vts_df.ist_date)==queryDay))

### 2) Get distinct device_id and route_id combination from WayBill_Trip_Details table

In [86]:
sqlContext.sql("use bmtc")
route_point_df = sqlContext.sql("select * from route_point")

In [87]:
waybill_device_id_route_id_df = waybill_trip_details_sample_df.select(waybill_trip_details_sample_df.device_id_waybill,\
                                                                     waybill_trip_details_sample_df.schedule_no,\
                                                                     waybill_trip_details_sample_df.trip_number,\
                                                                     waybill_trip_details_sample_df.route_id_waybill,\
                                                                     waybill_trip_details_sample_df.window_start_time,\
                                                                     waybill_trip_details_sample_df.window_end_time,\
                                                                     waybill_trip_details_sample_df.is_dread_trip,\
                                                                     waybill_trip_details_sample_df.start_point,\
                                                                     waybill_trip_details_sample_df.end_point)

### Use EOD as Window End Time to track the bus for trip 11 and 12

In [88]:
##waybill_device_id_route_id_df.orderBy("trip_number").show(20, truncate=False)

In [89]:
##valid_window_waybill_device_id_route_id_df=waybill_device_id_route_id_df.filter(col("trip_number") < 11)

In [90]:
##till_eod_window_waybill_device_id_route_id_df= waybill_device_id_route_id_df.filter(col("trip_number") >= 11)\
   ##                     .withColumn("window_end_time",lit(dateHyphen+" 23:59:59"))

In [91]:
##last_valid_window_waybill_device_id_route_id_df=waybill_device_id_route_id_df.filter(col("trip_number") == 13)

In [92]:
resultant_waybill_device_id_route_id_df=waybill_device_id_route_id_df

In [93]:
##resultant_waybill_device_id_route_id_df = valid_window_waybill_device_id_route_id_df.unionAll(till_eod_window_waybill_device_id_route_id_df)\
##.orderBy("trip_number")
##.unionAll(last_valid_window_waybill_device_id_route_id_df)

In [94]:
##resultant_waybill_device_id_route_id_df.orderBy("trip_number").show(20, truncate=False)

### 3) Select the bus Stops for the route_id from the Route_Point table and get the distinct device id-bus stop combination

In [95]:
resultant_waybill_device_id_route_id_start_df = resultant_waybill_device_id_route_id_df.join(route_point_df, 
                ((resultant_waybill_device_id_route_id_df.route_id_waybill == route_point_df.route_id)
                 & (resultant_waybill_device_id_route_id_df.start_point == route_point_df.bus_stop_id)),"left_outer")\
               .select("device_id_waybill", "schedule_no", "trip_number", "route_id_waybill",\
                "window_start_time", "window_end_time", "is_dread_trip", "start_point", "end_point", "route_order")\
              .withColumnRenamed("route_order","start_route_order") 

In [96]:
resultant_waybill_device_id_route_id_start_end_df = resultant_waybill_device_id_route_id_start_df.join(route_point_df, 
                ((resultant_waybill_device_id_route_id_start_df.route_id_waybill == route_point_df.route_id)
                 & (resultant_waybill_device_id_route_id_start_df.end_point == route_point_df.bus_stop_id)),"left_outer")\
                .select("device_id_waybill", "schedule_no", "trip_number", "route_id_waybill",\
                "window_start_time", "window_end_time", "is_dread_trip", "start_point", "end_point",\
                "start_route_order", "route_order")\
                .withColumnRenamed("route_order","end_route_order") 

In [97]:
waybill_device_id_route_id_bus_stop_id_df =resultant_waybill_device_id_route_id_start_end_df.join(route_point_df, \
              (resultant_waybill_device_id_route_id_start_end_df.route_id_waybill == route_point_df.route_id), "left_outer")

In [98]:
waybill_device_id_route_id_bus_stop_id_df=waybill_device_id_route_id_bus_stop_id_df\
                                                .filter((col("route_order") >= col("start_route_order"))\
                                                 & (col("route_order") <= col("end_route_order")))

In [99]:
waybill_device_id_bus_stop_id_df= waybill_device_id_route_id_bus_stop_id_df.select(\
    "device_id_waybill", "schedule_no","trip_number", "route_id_waybill", "bus_stop_id", "route_order",\
             "window_start_time", "window_end_time", "is_dread_trip", "start_route_order", "end_route_order")

In [100]:
waybill_device_id_bus_stop_id_df = waybill_device_id_bus_stop_id_df.withColumnRenamed("bus_stop_id", "route_bus_stop_id")

### Populate distance and time_to_travel from route_map table 

In [101]:
route_map_df = sqlContext.sql("select * from route_map").select("route_id", "end_bus_stop_id", "bus_stop_order", "distance", "time_to_travel")

In [102]:
route_map_df.printSchema()

root
 |-- route_id: integer (nullable = true)
 |-- end_bus_stop_id: integer (nullable = true)
 |-- bus_stop_order: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- time_to_travel: integer (nullable = true)



#### Populate distance and time_to_travel as 0 for start_bus_stop in a route 

In [103]:
route_start_bus_stop_df = waybill_device_id_bus_stop_id_df.where(col("route_order") == col("start_route_order"))

In [104]:
route_start_bus_stop_df=route_start_bus_stop_df.withColumn("distance", lit(0))
route_start_bus_stop_df=route_start_bus_stop_df.withColumn("time_to_travel", lit(0))

In [105]:
route_start_bus_stop_df.printSchema()

root
 |-- device_id_waybill: string (nullable = true)
 |-- schedule_no: long (nullable = true)
 |-- trip_number: integer (nullable = true)
 |-- route_id_waybill: integer (nullable = true)
 |-- route_bus_stop_id: integer (nullable = true)
 |-- route_order: integer (nullable = true)
 |-- window_start_time: string (nullable = true)
 |-- window_end_time: string (nullable = true)
 |-- is_dread_trip: integer (nullable = true)
 |-- start_route_order: integer (nullable = true)
 |-- end_route_order: integer (nullable = true)
 |-- distance: integer (nullable = false)
 |-- time_to_travel: integer (nullable = false)



#### Joining with Route_Map for distance and time_to_travel 

In [106]:
route_rest_bus_stop_df = waybill_device_id_bus_stop_id_df.where(col("route_order") != col("start_route_order"))

In [107]:
mapped_route_rest_bus_stop_df = route_rest_bus_stop_df.join(route_map_df, ((route_rest_bus_stop_df.route_id_waybill == route_map_df.route_id)\
                                           & (route_rest_bus_stop_df.route_bus_stop_id == route_map_df.end_bus_stop_id)),\
                           "left_outer").drop("route_id").drop("end_bus_stop_id").drop("bus_stop_order")

In [108]:
resultant_waybill_device_id_bus_stop_id_df= route_start_bus_stop_df.unionAll(mapped_route_rest_bus_stop_df)

### 4) Attach the bus stop lat long values with device id-bus stop by joining with bus stop table

In [109]:
filter_bus_stop_df = sqlContext.sql("select bus_stop_id,bus_stop_code,\
                                            bus_stop_name,latitude_current,\
                                            longitude_current,bearing \
                                    from bus_stop")

In [110]:
inscope_filter_bus_stop_with_dread_trip_df =  resultant_waybill_device_id_bus_stop_id_df.join(filter_bus_stop_df,\
                             resultant_waybill_device_id_bus_stop_id_df.route_bus_stop_id == filter_bus_stop_df.bus_stop_id,\
                             "left_outer").drop("bus_stop_id").drop("start_route_order").drop("end_route_order")

In [111]:
vts_sample_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- device_id: string (nullable = true)
 |-- acc_distance: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- vehicle_direction: string (nullable = true)
 |-- ist_date: string (nullable = true)
 |-- ign_status: byte (nullable = true)
 |-- speed_kmph: double (nullable = true)



In [112]:
inscope_filter_bus_stop_with_dread_trip_df.printSchema()

root
 |-- device_id_waybill: string (nullable = true)
 |-- schedule_no: long (nullable = true)
 |-- trip_number: integer (nullable = true)
 |-- route_id_waybill: integer (nullable = true)
 |-- route_bus_stop_id: integer (nullable = true)
 |-- route_order: integer (nullable = true)
 |-- window_start_time: string (nullable = true)
 |-- window_end_time: string (nullable = true)
 |-- is_dread_trip: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- time_to_travel: integer (nullable = true)
 |-- bus_stop_code: string (nullable = true)
 |-- bus_stop_name: string (nullable = true)
 |-- latitude_current: double (nullable = true)
 |-- longitude_current: double (nullable = true)
 |-- bearing: string (nullable = true)



In [113]:
inscope_filter_bus_stop_with_dread_trip_df=inscope_filter_bus_stop_with_dread_trip_df.select("device_id_waybill", "schedule_no","trip_number",\
                                                             "route_id_waybill", "route_bus_stop_id", "route_order",\
                                                             "window_start_time","window_end_time",\
                                                             "latitude_current", "longitude_current", \
                                                             "bus_stop_name", "is_dread_trip", \
                                                              "distance", "time_to_travel")

### Removing the dread trips

In [114]:
inscope_filter_bus_stop_df= inscope_filter_bus_stop_with_dread_trip_df.where(col("is_dread_trip") ==0)

In [115]:
timeFmt_Ist = "yyyy-MM-dd HH:mm:ss.S"
timeFmt1 = "yyyy-MM-dd HH:mm:ss"
istDateTrunc=from_unixtime(unix_timestamp('ist_date', format=timeFmt_Ist), format=timeFmt1)

In [116]:
vts_sample_df=vts_sample_df.withColumn("ist_timestamp", istDateTrunc)

## Continue with run for all bus stops for all trips

### 5) Join dataframe in step 4 with Vts_Parse_Data table based on device id

In [117]:
inscope_filter_bus_stop_df = inscope_filter_bus_stop_df.distinct()

In [118]:
vts_joined_data = inscope_filter_bus_stop_df.join(vts_sample_df,\
                                       inscope_filter_bus_stop_df.device_id_waybill == vts_sample_df.device_id,\
                                     "left_outer")

In [119]:
vts_joined_data.printSchema()

root
 |-- device_id_waybill: string (nullable = true)
 |-- schedule_no: long (nullable = true)
 |-- trip_number: integer (nullable = true)
 |-- route_id_waybill: integer (nullable = true)
 |-- route_bus_stop_id: integer (nullable = true)
 |-- route_order: integer (nullable = true)
 |-- window_start_time: string (nullable = true)
 |-- window_end_time: string (nullable = true)
 |-- latitude_current: double (nullable = true)
 |-- longitude_current: double (nullable = true)
 |-- bus_stop_name: string (nullable = true)
 |-- is_dread_trip: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- time_to_travel: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- device_id: string (nullable = true)
 |-- acc_distance: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- vehicle_direction: string (nullable = true)
 |-- ist_date: string (nullable = true)
 |-- ign_status: byte (nullable = true)
 |-- speed_kmph: double 

### 6) After step 5, we would be having lat long values of bus entering the geo fence of the bus stop 

In [None]:
sqlContext.registerDataFrameAsTable(vts_joined_data, "vts_Table")
query = """SELECT device_id_waybill,schedule_no, trip_number, route_id_waybill, route_bus_stop_id, route_order, 
window_start_time, window_end_time, ist_date, ist_timestamp, acc_distance, 
(((acos(sin((latitude_current*pi()/180)) * 
sin((lat*pi()/180))+cos((latitude_current*pi()/180)) * 
cos((lat*pi()/180)) * cos(((longitude_current- longitude)* 
pi()/180))))*180/pi())*60*1.1515)*1000 as dist 
FROM vts_Table where ist_timestamp >= window_start_time and ist_timestamp <= window_end_time
HAVING (dist > 0 AND dist <= 60) order by IST_DATE,route_bus_stop_id"""

In [None]:
sqlContext.sql(query).coalesce(1).write.option("header", "true")\
    .csv("lalit/bmtc_project/staging/features/"+dateLit+"Final_Fix_BusStops"+"/etm_start_time_based/windowed_bus_stop_geo_fence_distances")

### Reading the saved geo-fence distance from HDFS

In [434]:
windowGeoFenceDf = sqlContext.read.option("header", "true")\
                .option("inferSchema", "true")\
                .csv("lalit/bmtc_project/staging/features/"+dateLit+"Final_Fix_BusStops"+"/etm_start_time_based/windowed_bus_stop_geo_fence_distances")

In [435]:
windowGeoFenceDf.count()

1875

In [436]:
## Instead of saving the geo-fence data to HDFS and reading it again like in last 2 cells, we can use it directly in-memory.
##windowGeoFenceDf=sqlContext.sql(query)

### Finding the earliest entry into a bus stop geo-fence by a device

In [437]:
windowGeoFenceDf=windowGeoFenceDf.withColumn("ist_timestamp_seconds", unix_timestamp("ist_timestamp", format=timeFmt1))

In [438]:
windowGeoFenceDf= windowGeoFenceDf.withColumnRenamed("device_id_waybill", "device_id")
windowGeoFenceDf= windowGeoFenceDf.withColumnRenamed("route_id_waybill", "route_id")

In [439]:
windowEarliestGeoFenceEntryDf=windowGeoFenceDf.groupby("device_id", "schedule_no", "trip_number", "route_id", "route_bus_stop_id","route_order", "window_start_time", "window_end_time").min("ist_timestamp_seconds") \
                             .withColumnRenamed("min(ist_timestamp_seconds)", "ist_timestamp_seconds")

In [440]:
windowEarliestGeoFenceEntryDf.printSchema()

root
 |-- device_id: integer (nullable = true)
 |-- schedule_no: integer (nullable = true)
 |-- trip_number: integer (nullable = true)
 |-- route_id: integer (nullable = true)
 |-- route_bus_stop_id: integer (nullable = true)
 |-- route_order: integer (nullable = true)
 |-- window_start_time: timestamp (nullable = true)
 |-- window_end_time: timestamp (nullable = true)
 |-- ist_timestamp_seconds: long (nullable = true)



In [441]:
windowGeoFenceDf= windowGeoFenceDf.withColumnRenamed("device_id", "device_id_d")\
.withColumnRenamed("route_id", "route_id_d")\
.withColumnRenamed("schedule_no", "schedule_no_d")\
.withColumnRenamed("trip_number", "trip_number_d")\
.withColumnRenamed("route_bus_stop_id", "route_bus_stop_id_d")\
.withColumnRenamed("route_order", "route_order_d")\
.withColumnRenamed("window_start_time", "window_start_time_d")\
.withColumnRenamed("window_end_time", "window_end_time_d")\
.withColumnRenamed("ist_timestamp_seconds", "ist_timestamp_seconds_d")

### Populating distance travelled by device when entering into geo-fence of a bus stop 

In [442]:
earliestGeoFenceEntryWithDistanceDf= windowEarliestGeoFenceEntryDf.join(windowGeoFenceDf, ((windowEarliestGeoFenceEntryDf.device_id == windowGeoFenceDf.device_id_d)\
                           & (windowEarliestGeoFenceEntryDf.route_id == windowGeoFenceDf.route_id_d)\
                           & (windowEarliestGeoFenceEntryDf.schedule_no == windowGeoFenceDf.schedule_no_d)\
                           & (windowEarliestGeoFenceEntryDf.trip_number == windowGeoFenceDf.trip_number_d)\
                           & (windowEarliestGeoFenceEntryDf.route_bus_stop_id == windowGeoFenceDf.route_bus_stop_id_d)\
                           & (windowEarliestGeoFenceEntryDf.route_order == windowGeoFenceDf.route_order_d)\
                           & (windowEarliestGeoFenceEntryDf.window_start_time == windowGeoFenceDf.window_start_time_d)\
                           & (windowEarliestGeoFenceEntryDf.window_end_time == windowGeoFenceDf.window_end_time_d)\
                           & (windowEarliestGeoFenceEntryDf.ist_timestamp_seconds == windowGeoFenceDf.ist_timestamp_seconds_d)))

In [443]:
earliestGeoFenceEntryWithDistanceDf= earliestGeoFenceEntryWithDistanceDf.groupBy("device_id", "schedule_no", "trip_number", "route_id", "route_bus_stop_id", "route_order", "window_start_time","window_end_time", "ist_timestamp_seconds").min("acc_distance")

In [444]:
earliestGeoFenceEntryWithDistanceDf= earliestGeoFenceEntryWithDistanceDf.withColumnRenamed("min(acc_distance)", "acc_distance")

## Enriching with useful columns 

In [445]:
earliestGeoFenceEntryWithDistanceDf=earliestGeoFenceEntryWithDistanceDf.withColumn("arrival_time", from_unixtime("ist_timestamp_seconds", format=timeFmt1))

### Joining with all trips data to find missing values 

In [446]:
earliestGeoFenceEntryWithDistanceDf=earliestGeoFenceEntryWithDistanceDf.withColumn("window_start_timestamp", \
            from_unixtime(unix_timestamp('window_start_time', format=dateTimeFmt),format="yyyy-MM-dd HH:mm:ss"))

In [447]:
earliestGeoFenceEntryWithDistanceDf=earliestGeoFenceEntryWithDistanceDf.withColumn("window_end_timestamp", \
            from_unixtime(unix_timestamp('window_end_time', format=dateTimeFmt),format="yyyy-MM-dd HH:mm:ss"))

In [448]:
earliestGeoFenceEntryWithDistanceDf=earliestGeoFenceEntryWithDistanceDf.drop("window_start_time").drop("window_end_time")

In [449]:
earliestGeoFenceEntryWithDistanceDf=earliestGeoFenceEntryWithDistanceDf\
.withColumnRenamed("window_start_timestamp","window_start_time").withColumnRenamed("window_end_timestamp","window_end_time")

In [450]:
earliestGeoFenceEntryWithDistanceDf.count()

298

In [451]:
bus_stop_with_dread_trip_df= inscope_filter_bus_stop_with_dread_trip_df.withColumnRenamed("schedule_no", "schedule_no_waybill")\
.withColumnRenamed("trip_number", "trip_number_waybill")\
.withColumnRenamed("route_bus_stop_id", "route_bus_stop_id_waybill")\
.withColumnRenamed("route_order", "route_order_waybill")\
.withColumnRenamed("window_start_time", "window_start_time_waybill")\
.withColumnRenamed("window_end_time", "window_end_time_waybill")\
.withColumnRenamed("latitude_current", "bus_stop_latitude")\
.withColumnRenamed("longitude_current", "bus_stop_longitude")

In [452]:
bus_stop_with_dread_trip_df=bus_stop_with_dread_trip_df.withColumn("dayofweek", date_format("window_start_time_waybill", "EEEE"))

In [453]:
bus_stop_with_dread_trip_df=bus_stop_with_dread_trip_df.withColumn("month", lit(queryMonth))

In [454]:
bus_stop_with_dread_trip_df=bus_stop_with_dread_trip_df.withColumn("year", lit(queryYear))

In [455]:
bus_stop_with_dread_trip_df.printSchema()

root
 |-- device_id_waybill: string (nullable = true)
 |-- schedule_no_waybill: long (nullable = true)
 |-- trip_number_waybill: integer (nullable = true)
 |-- route_id_waybill: integer (nullable = true)
 |-- route_bus_stop_id_waybill: integer (nullable = true)
 |-- route_order_waybill: integer (nullable = true)
 |-- window_start_time_waybill: string (nullable = true)
 |-- window_end_time_waybill: string (nullable = true)
 |-- bus_stop_latitude: double (nullable = true)
 |-- bus_stop_longitude: double (nullable = true)
 |-- bus_stop_name: string (nullable = true)
 |-- is_dread_trip: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- time_to_travel: integer (nullable = true)
 |-- dayofweek: string (nullable = true)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)



### Left outer join with waybill data (bus_stop_with_dread_trip_df) so as to ensure we identify missing values

In [456]:
resultantEarliestGeoFenceEntryDf = bus_stop_with_dread_trip_df.join(earliestGeoFenceEntryWithDistanceDf, \
       ((bus_stop_with_dread_trip_df.device_id_waybill == earliestGeoFenceEntryWithDistanceDf.device_id)\
       & (bus_stop_with_dread_trip_df.route_id_waybill == earliestGeoFenceEntryWithDistanceDf.route_id)\
       & (bus_stop_with_dread_trip_df.schedule_no_waybill == earliestGeoFenceEntryWithDistanceDf.schedule_no)\
       & (bus_stop_with_dread_trip_df.trip_number_waybill == earliestGeoFenceEntryWithDistanceDf.trip_number)\
       & (bus_stop_with_dread_trip_df.route_bus_stop_id_waybill == earliestGeoFenceEntryWithDistanceDf.route_bus_stop_id)\
       & (bus_stop_with_dread_trip_df.route_order_waybill == earliestGeoFenceEntryWithDistanceDf.route_order)\
       & (bus_stop_with_dread_trip_df.window_start_time_waybill == earliestGeoFenceEntryWithDistanceDf.window_start_time)\
       & (bus_stop_with_dread_trip_df.window_end_time_waybill == earliestGeoFenceEntryWithDistanceDf.window_end_time)), \
       "left_outer")

In [457]:
resultantEarliestGeoFenceEntryDf= resultantEarliestGeoFenceEntryDf.drop("device_id").drop("schedule_no").drop("trip_number")\
.drop("route_id").drop("route_bus_stop_id").drop("route_order").drop("window_start_time").drop("window_end_time")

In [458]:
resultantEarliestGeoFenceEntryDf= resultantEarliestGeoFenceEntryDf.withColumnRenamed("device_id_waybill", "device_id")\
.withColumnRenamed("schedule_no_waybill", "schedule_no")\
.withColumnRenamed("trip_number_waybill", "trip_number")\
.withColumnRenamed("route_id_waybill", "route_id")\
.withColumnRenamed("route_bus_stop_id_waybill", "route_bus_stop_id")\
.withColumnRenamed("route_order_waybill", "route_order")\
.withColumnRenamed("window_start_time_waybill", "window_start_time")\
.withColumnRenamed("window_end_time_waybill", "window_end_time")

In [459]:
resultantEarliestGeoFenceEntryDf.printSchema()

root
 |-- device_id: string (nullable = true)
 |-- schedule_no: long (nullable = true)
 |-- trip_number: integer (nullable = true)
 |-- route_id: integer (nullable = true)
 |-- route_bus_stop_id: integer (nullable = true)
 |-- route_order: integer (nullable = true)
 |-- window_start_time: string (nullable = true)
 |-- window_end_time: string (nullable = true)
 |-- bus_stop_latitude: double (nullable = true)
 |-- bus_stop_longitude: double (nullable = true)
 |-- bus_stop_name: string (nullable = true)
 |-- is_dread_trip: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- time_to_travel: integer (nullable = true)
 |-- dayofweek: string (nullable = true)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- ist_timestamp_seconds: long (nullable = true)
 |-- acc_distance: integer (nullable = true)
 |-- arrival_time: string (nullable = true)



In [460]:
resultantEarliestGeoFenceEntryDf= resultantEarliestGeoFenceEntryDf.withColumnRenamed("trip_number", "results_trip_number")

### Populating the results against schedule_details 

In [461]:
schedule_details_df=sqlContext.sql("select * from schedule_details")

In [462]:
schedule_details_df=schedule_details_df.select("form_four_id", "route_number_id", "trip_number", "start_time", "end_time")

In [463]:
schedule_geo_entry_df= schedule_details_df.join(resultantEarliestGeoFenceEntryDf, 
        ((schedule_details_df.form_four_id == resultantEarliestGeoFenceEntryDf.schedule_no)\
        & (schedule_details_df.trip_number == resultantEarliestGeoFenceEntryDf.results_trip_number)
        & (schedule_details_df.route_number_id == resultantEarliestGeoFenceEntryDf.route_id)))

In [464]:
schedule_geo_entry_df=schedule_geo_entry_df.drop("schedule_no").drop("results_trip_number").drop("route_id")

In [465]:
schedule_geo_entry_df=schedule_geo_entry_df.withColumnRenamed("form_four_id", "schedule_no")
schedule_geo_entry_df=schedule_geo_entry_df.withColumnRenamed("route_number_id", "route_id")

In [466]:
###schedule_geo_entry_df=schedule_geo_entry_df.withColumn("scheduled_trip_start_time", concat(lit( dateHyphen + " " ), col("start_time")))

In [467]:
###schedule_geo_entry_df=schedule_geo_entry_df.withColumn("scheduled_trip_end_time", concat(lit( dateHyphen + " " ), col("end_time")))

In [468]:
schedule_geo_entry_df=schedule_geo_entry_df.withColumnRenamed("start_time", "scheduled_trip_start_time")

In [469]:
schedule_geo_entry_df=schedule_geo_entry_df.withColumnRenamed("end_time", "scheduled_trip_end_time")

In [470]:
schedule_geo_entry_df=schedule_geo_entry_df.orderBy("schedule_no", "trip_number", "route_order")

In [471]:
schedule_geo_entry_df.show(400, truncate=False)

+-----------+--------+-----------+-------------------------+-----------------------+---------+-----------------+-----------+-------------------+-------------------+-----------------+------------------+------------------------------------------+-------------+--------+--------------+---------+-----+----+---------------------+------------+-------------------+
|schedule_no|route_id|trip_number|scheduled_trip_start_time|scheduled_trip_end_time|device_id|route_bus_stop_id|route_order|window_start_time  |window_end_time    |bus_stop_latitude|bus_stop_longitude|bus_stop_name                             |is_dread_trip|distance|time_to_travel|dayofweek|month|year|ist_timestamp_seconds|acc_distance|arrival_time       |
+-----------+--------+-----------+-------------------------+-----------------------+---------+-----------------+-----------+-------------------+-------------------+-----------------+------------------+------------------------------------------+-------------+--------+--------------+

In [472]:
schedule_geo_entry_df.coalesce(1).write.option("header","true")\
    .csv("lalit/bmtc_project/staging/features/"+dateLit+"/bus_stops_arrival_time_enriched_features_start_end_stops")