In [1]:
from dataprocessing.processors.refined_ingestion import TrackingDataRefinedProcess, TimetableRefinedProcess, BusStopRefinedProcess, haversine
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window

In [2]:
# from dataprocessing.processors.sparketl import ETLSpark

# etl_spark = ETLSpark()

# df = etl_spark.sqlContext.read.parquet("/data/trusted/vehicles").withColumn("hour", F.hour(F.col("event_timestamp"))).sort(F.asc("event_timestamp"))

In [3]:
tracking_data = TrackingDataRefinedProcess(2020,5,3)

In [4]:
tracking_data.df.count()

931661

In [5]:
tracking_data.df = tracking_data.df.filter("line_code = 561 and vehicle = 'EA210'")

In [8]:
events = tracking_data.compute_metrics()

In [9]:
stop_events = tracking_data.stop_events(events)

In [10]:
stop_events.show()

+---------+-------+-------------------+----+-----+---+----+------+----------+----------+----+-----+---+----+------------+--------+
|line_code|vehicle|     stop_timestamp|year|month|day|hour|minute|  latitude| longitude|year|month|day|hour|avg_velocity|distance|
+---------+-------+-------------------+----+-----+---+----+------+----------+----------+----+-----+---+----+------------+--------+
|      561|  EA210|2020-05-03 10:58:43|2020|    5|  3|  10|    58|-25.466313|-49.258228|2020|    5|  3|  10|       27.31|  662.88|
|      561|  EA210|2020-05-03 16:31:54|2020|    5|  3|  16|    31|-25.462851|-49.260816|2020|    5|  3|  16|       23.19|  565.05|
|      561|  EA210|2020-05-03 16:37:10|2020|    5|  3|  16|    37|-25.445436| -49.26923|2020|    5|  3|  16|       11.95|  216.22|
|      561|  EA210|2020-05-03 17:08:02|2020|    5|  3|  17|     8|-25.483258|-49.270653|2020|    5|  3|  17|       29.41| 1941.77|
|      561|  EA210|2020-05-03 18:21:03|2020|    5|  3|  18|    21|-25.488501|-49.27

In [25]:
window_spec = (
            Window.partitionBy(stop_events.line_code, stop_events.vehicle)
                .orderBy(stop_events.stop_timestamp)
        )


stop_events_windowed =  stop_events.withColumn("last_stop", F.lag("stop_timestamp").over(window_spec)).filter("last_stop is not null").select("line_code","vehicle","last_stop",F.col("stop_timestamp").alias("actual_stop"))

In [35]:
stop_events_windowed.show()

+---------+-------+-------------------+-------------------+
|line_code|vehicle|          last_stop|        actual_stop|
+---------+-------+-------------------+-------------------+
|      561|  EA210|2020-05-03 06:20:18|2020-05-03 06:23:48|
|      561|  EA210|2020-05-03 06:23:48|2020-05-03 06:25:47|
|      561|  EA210|2020-05-03 06:25:47|2020-05-03 06:26:10|
|      561|  EA210|2020-05-03 06:26:10|2020-05-03 06:27:14|
|      561|  EA210|2020-05-03 06:27:14|2020-05-03 06:28:28|
|      561|  EA210|2020-05-03 06:28:28|2020-05-03 06:30:23|
|      561|  EA210|2020-05-03 06:30:23|2020-05-03 06:31:50|
|      561|  EA210|2020-05-03 06:31:50|2020-05-03 06:32:55|
|      561|  EA210|2020-05-03 06:32:55|2020-05-03 06:33:21|
|      561|  EA210|2020-05-03 06:33:21|2020-05-03 06:38:38|
|      561|  EA210|2020-05-03 06:38:38|2020-05-03 06:43:39|
|      561|  EA210|2020-05-03 06:43:39|2020-05-03 06:48:40|
|      561|  EA210|2020-05-03 06:48:40|2020-05-03 06:49:47|
|      561|  EA210|2020-05-03 06:49:47|2

In [62]:
events_computed = (events.filter("delta_time is not null").join(stop_events_windowed, ['line_code','vehicle'],'inner').filter(F.col("event_timestamp").between(F.col("last_stop"), F.col("actual_stop")))
 .groupBy("year","month","day","hour","line_code","vehicle","actual_stop").agg(F.round(F.mean('delta_velocity'),2).alias("avg_velocity"),F.round(F.sum('delta_distance'),2).alias("distance"))).withColumnRenamed("actual_stop","stop_timestamp")

In [63]:
events_computed.show()

+----+-----+---+----+---------+-------+-------------------+------------+--------+
|year|month|day|hour|line_code|vehicle|     stop_timestamp|avg_velocity|distance|
+----+-----+---+----+---------+-------+-------------------+------------+--------+
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:23:48|       42.23| 2245.67|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:25:47|       30.79| 1047.65|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:26:10|       14.74|   84.26|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:27:14|       34.45|  669.03|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:28:28|       33.45|   802.6|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:30:23|       43.41| 1486.36|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:31:50|       27.52|  529.34|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:32:55|        36.1|  752.81|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:33:21|       22.16|  208.97|
|2020|    5|  3|

In [75]:
stop_events.join(events_computed,['year','month','day','hour','line_code','vehicle',"stop_timestamp"],'inner').sort(F.asc("stop_timestamp")).show()

+----+-----+---+----+---------+-------+-------------------+------+----------+----------+------------+--------+
|year|month|day|hour|line_code|vehicle|     stop_timestamp|minute|  latitude| longitude|avg_velocity|distance|
+----+-----+---+----+---------+-------+-------------------+------+----------+----------+------------+--------+
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:23:48|    23|-25.473771|-49.262943|       42.23| 2245.67|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:25:47|    25|-25.467953|-49.262276|       30.79| 1047.65|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:26:10|    26| -25.46767|-49.261583|       14.74|   84.26|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:27:14|    27|-25.464385|-49.259098|       34.45|  669.03|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:28:28|    28|-25.459136|-49.262753|       33.45|   802.6|
|2020|    5|  3|   6|      561|  EA210|2020-05-03 06:30:23|    30|-25.446936|-49.268555|       43.41| 1486.36|
|

In [33]:
def event_edges(events):
    trips = TimetableRefinedProcess(2020,5,3).trips().drop("year", "month", "day")
    bus_stops = BusStopRefinedProcess(2020,5,3).bus_stops().drop("year", "month", "day")
    bus_stops = bus_stops.withColumnRenamed("latitude", "bus_stop_latitude").withColumnRenamed("longitude","bus_stop_longitude")

    events_computed = (
        events.withColumn("event_time", F.date_format(F.col("event_timestamp"), 'HH:mm:ss')).alias("se")
            .join(trips.alias("tr"), ["line_code", "vehicle"])
            .filter(F.col("event_time").between(F.col("start_time"), F.col("end_time")))
    )

    return (events_computed.alias("se").join(bus_stops.alias("bs"), ["line_code", "line_way"], 'inner')
            .withColumn("distance",
                        haversine(F.col('se.longitude').cast(T.DoubleType()),
                                  F.col('se.latitude').cast(T.DoubleType()),
                                  F.col('bs.bus_stop_longitude').cast(T.DoubleType()),
                                  F.col('bs.bus_stop_latitude').cast(T.DoubleType())))
            .filter(F.col("distance") < 30))

In [37]:
event_stop_edges(events).select("line_code","event_timestamp","latitude","longitude","vehicle","moving_status","line_way","number","seq","distance","delta_velocity","delta_time").sort(F.asc("event_timestamp")).show()

+---------+-------------------+----------+----------+-------+-------------+-----------------+------+---+--------+--------------+----------+
|line_code|    event_timestamp|  latitude| longitude|vehicle|moving_status|         line_way|number|seq|distance|delta_velocity|delta_time|
+---------+-------------------+----------+----------+-------+-------------+-----------------+------+---+--------+--------------+----------+
|      561|2020-05-03 06:20:34|-25.488551|-49.275153|  EA210|       MOVING|Praca Rui Barbosa|150629|  2|     8.0|         41.61|         4|
|      561|2020-05-03 06:20:36|-25.488616|-49.275056|  EA210|       MOVING|Praca Rui Barbosa|150629|  2|   19.43|         21.83|         2|
|      561|2020-05-03 06:21:34|-25.484915|-49.271936|  EA210|       MOVING|Praca Rui Barbosa|150285|  4|   24.14|         57.61|         4|
|      561|2020-05-03 06:21:50| -25.48316|-49.270521|  EA210|       MOVING|Praca Rui Barbosa|150168|  5|   24.26|         50.76|         1|
|      561|2020-05-0