In [66]:
from datetime import datetime
import pandas as pd
from geopy import distance

import pymongo
from pymongo import MongoClient

import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType


import get_recent_days as gtdys

In [67]:
client = MongoClient('localhost', 27017)

db = client['avl_pipeline_test']
in_collection = db['labeled_trips']
out_collection = db['trips']

In [68]:
in_collection.count()

111798

In [69]:
class Distance(object):

    def __init__(self, stop_latlon):
        """
        Input:
            stop_latlon: Tuple of bus stop lat/lon
        """

        self.stop_latlon = stop_latlon

    def dist_function(self, lat_str, lon_str):
        """
        Input:
            lat_lon: Tuple of row lat/lon
        """
        row_latlon = (float(lat_str), float(lon_str))

        return distance.distance(self.stop_latlon, row_latlon).m

For each Trip ID, I can filter with spark.........      
I can incorporate spark into filtering loops and output a dataframe!

In [70]:
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/avl_pipeline_test.labeled_trips") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/avl_pipeline_test.spark_test") \
    .getOrCreate()

In [71]:
spark

In [72]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

I can only do thisi here because I know that all of these trips are from the same GTFS period.

Eventually, I will pipeline these trips by GTFS period, and can load this in once, properly, somehow...

In [73]:
sample_timestamp = df.select('time_stamp').take(1)[0][0]
dt_objct = datetime.fromtimestamp(sample_timestamp)

# trip_id = trip_df.select('trip_id').take(1)[0][0]

gtfs_lookup_df = pd.read_csv('data/gtfs_lookup.csv')
gtfs_directory = gtdys.get_gtfs_file(dt_objct, gtfs_lookup_df)

sched_txt = 'data/gtfs/{}/stop_times.txt'.format(gtfs_directory)

stops_txt = 'data/gtfs/{}/stops.txt'.format(gtfs_directory)


sched_df = spark.read.csv(sched_txt,
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

stops_df = spark.read.csv(stops_txt,
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?


In [84]:
sched_df.show(5)

+-------+------------+--------------+-------+-------------+-------------+-----------+-------------+-------------------+
|trip_id|arrival_time|departure_time|stop_id|stop_sequence|stop_headsign|pickup_type|drop_off_type|shape_dist_traveled|
+-------+------------+--------------+-------+-------------+-------------+-----------+-------------+-------------------+
|7225088|    26:00:00|      26:00:00|   4015|            1|             |           |             |                   |
|7225088|    26:00:45|      26:00:45|   6294|            2|             |           |             |                   |
|7225088|    26:01:35|      26:01:35|   6290|            3|             |           |             |                   |
|7225088|    26:02:00|      26:02:00|   6314|            4|             |           |             |                   |
|7225088|    26:02:35|      26:02:35|   6307|            5|             |           |             |                   |
+-------+------------+--------------+---

In [75]:
trip_rows = df.select(['trip_id_iso']).distinct()

In [76]:
trips = trip_rows.rdd.flatMap(lambda x: x).collect()

1
78
1
14
90
2
1
76
7
108
69
69
76
35
74
78
81
75
77
74
69
63
76
92
50
70
73
78
77
1
59
82
81
77
64
131
78
79
81
74
73
82
80
1
77
56
121
70
70
109
116
72
1
72
72
75
80
61
75
76
74
69
2
114
66
83
83
75
1
79
76
122
75
81
1
125
129
75
75
82
125
79
80
83
73
82
74
70
81
71
1
115
77
1
78
78
1
81
80
82
83
88
3
81
72
78
119
118
68
81
83
67
70
122
81
82
86
1
86
68
65
120
72
79
73
1
76
16
72
77
76
64
73
70
75
61
4
80
79
1
81
1
109
70
73
75
55
80
69
69
54
52
81
80
6
2
1
68
74
79
76
78
72
78
81
75
84
84
76
79
1
78
114
70
70
1
68
74
69
77
75
77
76
76
82
75
74
71
77
79
70
75
1
81
74
78
72
61
86
76
69
79
1
71
81
84
76
65
129
78
73
71
117
67
80
83
83
6
78
5
84
71
106
71
6
1
79
80
1
75
66
1
115
83
180
1
58
78
69
77
60
75
107
79
75
78
77
10
81
77
76
120
71
125
79
75
59
6
79
73
1
79
80
82
67
66
79
59
76
79
83
78
75
76
116
78
79
1
81
59
57
10
76
143
89
3
75
73
1
77
49
76
73
83
1
16
74
115
72
67
125
111
75
83
1
56
123
63
72
58
73
88
80
62
73
79
126
69
85
76
69
51
84
7
104
1
1
76
52
67
125
82
70
58
57
70
69

KeyboardInterrupt: 

In [90]:
test_trip = trips[10:15]

In [91]:
test_trip

['7253684_2016-11-07_WLARK',
 '7253715_2016-11-02_SQLGQ',
 '7253664_2016-11-21_KQ9EB',
 '7253684_2016-11-18_5S2XN',
 '7253660_2016-11-18_3D7RE']

In [92]:
import pprint

In [93]:
for item in test_trip:
    
    trip_df = df.filter(df['trip_id_iso'] == item)
    
    if trip_df.count() < 50:
        continue
        
    if trip_df.filter(trip_df['trip_start'] == 1).count() == 0:
        continue
        
    trip_dict = {}
    trip_dict['trip_id_iso'] = item
    
    trip_id = trip_df.filter(trip_df['trip_start'] == 1)\
                        .select('trip_id')\
                        .first()[0]

    trip_dict['trip_id'] = trip_id
    
    
    first_trip_ts = trip_df.select('time_stamp')\
                            .orderBy('time_stamp')\
                            .first()[0]
            
    trip_dict['trip_start'] = first_trip_ts
            
    last_trip_ts = trip_df.select('time_stamp')\
                            .orderBy('time_stamp', ascending=False)\
                            .first()[0]
            
    trip_dict['trip_end'] = last_trip_ts
            
    first_dt = datetime.fromtimestamp(first_trip_ts)
    last_dt = datetime.fromtimestamp(last_trip_ts)
    
    trip_diff = last_dt - first_dt
    
    trip_dict['trip_duration'] = trip_diff.total_seconds()      
    
    sched_trp_df = sched_df.filter(sched_df['trip_id'] == trip_id)
 

    max_sequence = sched_trp_df.orderBy('stop_sequence', ascending=False)\
                                .select('stop_sequence')\
                                .first()[0]
        
    trip_sequence = sched_trp_df.orderBy('stop_sequence', ascending=False)\
                                .select('stop_sequence')
        
    trip_sequence_lst = trip_sequence.rdd.flatMap(lambda x: x).collect()[::-1]
    
    trip_dict['stop_sequence'] = trip_sequence_lst
    
    max_sequence = trip_sequence.first()[0]
      
        
    stops = sched_trp_df.orderBy('stop_sequence')\
                            .select(['stop_id'])\
                            .distinct()\
                            .rdd\
                            .flatMap(lambda x: x)\
                            .collect()
    
    trip_stops = stops_df.filter(stops_df['stop_id'].isin(stops))
    
    trip_dict['stops'] = {}
    
    for stop in stops:       

        sched_info = sched_trp_df.filter(sched_trp_df['stop_id'] == stop)
        
        seq = sched_info.select('stop_sequence').collect()[0][0]
        
        trip_dict['stops'][seq] = {}
        
        stop_details = trip_dict['stops'][seq]
        
        stop_details['sequence'] = seq
        
        stop_details['stop_id'] = stop
        
        stop_info = trip_stops.filter(trip_stops['stop_id'] == stop)
        
        stop_details['lat'] = stop_info.select('stop_lat').collect()[0][0]
        stop_details['lon'] = stop_info.select('stop_lon').collect()[0][0]
        
        sched_info = sched_trp_df.filter(sched_trp_df['stop_id'] == stop)
        stop_details['sequence'] = sched_info.select('stop_sequence').collect()[0][0]
        
        # Get scheduled arrival time if sequence != 1
        # Parsing :-(
            
        stop_dist = Distance((stop_details['lat'], stop_details['lon']))
        
        distance_udf = udf(lambda lat, lon: stop_dist.dist_function(lat, lon), FloatType())
        
        dist_df = trip_df.withColumn("distance", distance_udf(trip_df['LATITUDE'], trip_df['LONGITUDE']))
        
        stop_rows = dist_df.filter(dist_df['distance'] < 50).orderBy('time_stamp')
        
        if stop_rows.count() == 0:
            
            stop_rows = dist_df.orderBy('distance').limit(1)
        
        stop_details['min_dist_stop'] = stop_rows.orderBy('distance')\
                                                    .select('distance')\
                                                    .first()[0]
        
        first_intersect_ts = stop_rows.select('time_stamp').first()[0]
        last_intersect_ts = stop_rows.select('time_stamp')\
                                        .orderBy('time_stamp', ascending=False)\
                                        .first()[0]

        stop_details['first_intersect_ts'] = first_intersect_ts
        stop_details['last_intersect_ts'] = last_intersect_ts

        first_dt = datetime.fromtimestamp(first_intersect_ts)
        last_dt = datetime.fromtimestamp(last_intersect_ts)

        time_diff = last_dt - first_dt

        stop_details['seconds_at_stop'] = time_diff.total_seconds()
      

    
    trip_dict['edges'] = {}
    
    for idx, edge in enumerate(trip_sequence_lst):
        
        if edge != max_sequence:

            trip_dict['edges'][edge] = {}

            first_stop = trip_dict['stops'][edge]
            last_stop = trip_dict['stops'][trip_sequence_lst[idx+1]]

            first_ts = first_stop['last_intersect_ts']
            last_ts = last_stop['first_intersect_ts']

            first_dt = datetime.fromtimestamp(first_ts)
            last_dt = datetime.fromtimestamp(last_ts)

            edge_diff = last_dt-first_dt
            
            if edge_diff.total_seconds() < 0:
            
                trip_dict['edges'][edge]['edge_time'] = 0
            
            else:
                
                trip_dict['edges'][edge]['edge_time'] = edge_diff.total_seconds()  

            trip_dict['edges'][edge]['starting_stop'] = first_stop['stop_id']
            trip_dict['edges'][edge]['ending_stop'] = last_stop['stop_id']
        

pprint.pprint(trip_dict)

{'edges': {1: {'edge_time': 127.0, 'ending_stop': 3879, 'starting_stop': 6293},
           2: {'edge_time': 53.0, 'ending_stop': 3852, 'starting_stop': 3879},
           3: {'edge_time': 154.0, 'ending_stop': 3644, 'starting_stop': 3852},
           4: {'edge_time': 25.0, 'ending_stop': 3645, 'starting_stop': 3644},
           5: {'edge_time': 91.0, 'ending_stop': 3649, 'starting_stop': 3645},
           6: {'edge_time': 90.0, 'ending_stop': 3642, 'starting_stop': 3649},
           7: {'edge_time': 40.0, 'ending_stop': 4224, 'starting_stop': 3642},
           8: {'edge_time': 45.0, 'ending_stop': 6479, 'starting_stop': 4224},
           9: {'edge_time': 141.0, 'ending_stop': 6481, 'starting_stop': 6479},
           10: {'edge_time': 74.0, 'ending_stop': 4963, 'starting_stop': 6481},
           11: {'edge_time': 115.0, 'ending_stop': 4947, 'starting_stop': 4963},
           12: {'edge_time': 155.0, 'ending_stop': 3665, 'starting_stop': 4947},
           13: {'edge_time': 74.0, 'ending_s