# Tracking One Train

The challenge here is that we need to be able to track sbahns along the path they follow using only information about when they are planning on departing the next station. There is no unique id for each train trip, instead we have to:
- identify the trip by the first station it starts at
- match the next stop based on the route and planned departure

## Imports 

In [1]:
import os
from uuid import uuid4
# settings.py
from dotenv import load_dotenv
load_dotenv()

from joblib import Parallel, delayed
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine

pd.options.plotting.backend = "plotly"

## DB Connection 

In [2]:
engine = create_engine('postgresql://{username}:{password}@167.99.243.10/sbahn'.format(username=os.getenv('POSTGRE_USERNAME'), password=os.getenv('POSTGRE_PASSWORD')),
                       executemany_mode='batch'
                      )

## Getting Data

### Basic Data

In [3]:
all_departures = """
select d.departure_id, d.destination, SUBSTRING(d.product, 7,2) as line, s.station_name, d.departure_time
from departures as d, stations as s
where d.station = s.station_id
;
"""

all_departures_df = pd.read_sql(all_departures, engine)

### Loading Additional Data

This data originates from the other notebook and makes it easier to calculate the next stop.



In [4]:
line_data = pd.read_csv('line_data.csv')

In [5]:
line_data.head()

Unnamed: 0,line,line_id,start,end,from,to,order,delta
0,S8,1,Flughafen München,Herrsching,Flughafen München,Flughafen Besucherpark,1,2.0
1,S8,1,Flughafen München,Herrsching,Flughafen Besucherpark,Hallbergmoos,2,5.0
2,S8,1,Flughafen München,Herrsching,Hallbergmoos,Ismaning,3,7.0
3,S8,1,Flughafen München,Herrsching,Ismaning,Unterföhring,4,4.0
4,S8,1,Flughafen München,Herrsching,Unterföhring,Johanneskirchen,5,4.0


### Matching Logic

In [6]:
current_line_id = 5

In [7]:
current_line = line_data[line_data.line_id.eq(current_line_id)]

In [8]:
current_line

Unnamed: 0,line,line_id,start,end,from,to,order,delta
226,S2,5,Erding,Petershausen,Erding,Altenerding,1,1.0
227,S2,5,Erding,Petershausen,Altenerding,Aufhausen,2,3.0
228,S2,5,Erding,Petershausen,Aufhausen,St. Koloman,3,4.0
229,S2,5,Erding,Petershausen,St. Koloman,Ottenhofen,4,3.0
230,S2,5,Erding,Petershausen,Ottenhofen,Markt Schwaben,5,4.0
231,S2,5,Erding,Petershausen,Markt Schwaben,Poing,6,6.0
232,S2,5,Erding,Petershausen,Poing,Grub,7,3.0
233,S2,5,Erding,Petershausen,Grub,Heimstetten,8,2.0
234,S2,5,Erding,Petershausen,Heimstetten,Feldkirchen,9,2.0
235,S2,5,Erding,Petershausen,Feldkirchen,Riem,10,4.0


In [9]:
departures_query = """\
select d.departure_id, d.destination, SUBSTRING(d.product, 7,2) as line, s.station_name, d.departure_time \
from departures_new as d, stations as s \
where d.station = s.station_id \
and d.trip_searched=false \
and s.station_name = '{starting_station}' \
limit 1 \
;\
"""

In [10]:
next_stop_query = """
select d.departure_id, d.destination, SUBSTRING(d.product, 7,2) as line, s.station_name, d.departure_time \
from departures_new as d, stations as s \
where d.station = s.station_id \
and d.trip_searched=false \
and s.station_name = '{station}' \
and d.destination = '{destination}' \
and SUBSTRING(d.product, 7,2) = '{line}'
and d.departure_time > '{departure_time}' \
and d.departure_time <= '{offset_time}' \
;\
"""

### Update Database records

In [11]:
update_query = """
UPDATE departures_new
SET trip_searched = true,
    trip_id = '{trip_id}'
WHERE departure_id in ('{departure_ids}'); """

## Writing Loop

This is really damn slow, but it at least has a small memory footprint and could work well as a running task.

In [12]:
for current_line_id in line_data.line_id.unique().tolist():
    print(f"Starting line: {current_line_id}")
    #select current line data
    current_line = line_data[line_data.line_id.eq(current_line_id)]
    first_stop = current_line[current_line['order'].eq(1)]['from'].values[0]
    starting_departure = departures_query.format(starting_station = first_stop)
    #check if there is a starting station to be matched
    while pd.read_sql(starting_departure, engine).shape[0] > 0:
        print(f"Starting a trip")
        trip = []
        for index, row in current_line.sort_values(by='order', ascending=True).iterrows():
            if len(trip) == 0:
                current_start = pd.read_sql(starting_departure, engine)
                #save row and previous row
                trip.append(current_start.copy())
                continue
            station = row['from']
            line = row['line']
            destination = trip[-1].destination.values[0]
            departure_time = trip[-1].departure_time.values[0]
            offset_time = departure_time + pd.Timedelta(row.delta+5, unit='m')
            # check if we made it to end of destination
            if station == destination:
                print('Found trip from start to destination')
                break

            # search for next stop
            next_stop_q = next_stop_query.format(station=station, line=line, departure_time=departure_time, offset_time=offset_time, destination=destination)
            next_stop = pd.read_sql(next_stop_q, engine)

            # if there is a match save it
            if next_stop.shape[0] > 0:
                trip.append(next_stop.sort_values(by='departure_time', ascending=True).head(1).copy())
            #otherwise break process and remove records except for first
            else:
                print('No match found')
                trip = [trip[0]]
                break
        #making sure it runs and then breaking out
        break
        print('saving trip')
        tmp = pd.concat(trip)
        trip_id = uuid4()
        departure_ids = tmp.departure_id.to_list()
        update_q = update_query.format(trip_id=trip_id, departure_ids="','".join(departure_ids))
        engine.execute(update_q)
    #making sure it runs and then breaking out
    break

Starting line: 1
Starting a trip
Found trip from start to destination


# Pandas Implementation

### Getting Departures

Here we just want to grab the most crucial information to match the departures to the lines we have stored in a csv from the other notebook.


In [13]:
all_departures = """
select d.departure_id, d.destination, SUBSTRING(d.product, 7,2) as line, s.station_name, d.departure_time
from departures as d, stations as s
where d.station = s.station_id
;
"""

In [14]:
all_departures_df = pd.read_sql(all_departures, engine)

In [15]:
all_departures_df.head()

Unnamed: 0,departure_id,destination,line,station_name,departure_time
0,795bab1475ba9a8cef40cd395471d6ae#1609435440000...,Maisach,S3,Giesing,2020-12-31 17:24:00
1,325683058c54b8bb02e7f98642f90125#1609447440000...,Deisenhofen,S3,Mammendorf,2020-12-31 20:44:00
2,491aef0ead0d52d15b942ef05640e32c#1609450860000...,Aying,S7,Höhenkirchen-Siegertsbrunn,2020-12-31 21:41:00
3,cd2e054e7878fc95cbf607f0ddebfd60#1609436460000...,Geltendorf,S4,Trudering,2020-12-31 17:41:00
4,3a70df4118f9b7d48048b0d84444b419#1609452240000...,Holzkirchen,S3,Mammendorf,2020-12-31 22:04:00


In [16]:
all_departures_df.shape

(570437, 5)

In [17]:
all_departures_df.departure_id.duplicated().sum()

0

## Redoing Logic in Pandas

In [18]:
all_departures_df['trip_id'] = None
all_departures_df['trip_searched'] = False
all_departures_df['trip_matched'] = False  


In [19]:
def get_starts(stop: str, line: str):
    first_stop_true = all_departures_df.station_name.eq(stop)
    same_line_true = all_departures_df.line.eq(line)
    trip_search_false = all_departures_df.trip_searched.eq(False)
    return all_departures_df[first_stop_true & same_line_true & trip_search_false]

In [20]:
def get_next_stop(station, line, departure_time, offset_time, destination):
    station_name = all_departures_df.station_name.eq(station)
    same_line_true = all_departures_df.line.eq(line)
    after_departure = all_departures_df.departure_time.gt(departure_time)
    before_offset = all_departures_df.departure_time.le(offset_time)
    same_destination = all_departures_df.destination.eq(destination)
    trip_search_false = all_departures_df.trip_searched.eq(False)
    logic = station_name & same_line_true & after_departure & before_offset & same_destination & trip_search_false
    return all_departures_df[logic]

In [21]:
def update_departures(trip_id, departure_ids):
    matched_id = all_departures_df.departure_id.isin(departure_ids)
    if len(departure_ids) == 1:
        all_departures_df.loc[matched_id, ['trip_id']] = None
        all_departures_df.loc[matched_id, ['trip_searched']] = True    
        all_departures_df.loc[matched_id, ['trip_matched']] = False  
    else:
        all_departures_df.loc[matched_id, ['trip_id']] = trip_id
        all_departures_df.loc[matched_id, ['trip_searched']] = True    
        all_departures_df.loc[matched_id, ['trip_matched']] = True    

### Also work but is is damn slow

In [22]:
for current_line_id in line_data.line_id.unique().tolist():
    print(f"Starting line: {current_line_id}")
    #select current line data
    current_line = line_data[line_data.line_id.eq(current_line_id)]
    line = current_line.line.values[0]
    first_stop = current_line[current_line['order'].eq(1)]['from'].values[0]
    
    #check if there is a starting station to be matched
    while get_starts(first_stop, line).shape[0] > 0:
        print(f"Starting a trip")
        trip = []
        for index, row in current_line.sort_values(by='order', ascending=True).iterrows():
            if len(trip) == 0:
                current_start = get_starts(first_stop, line).head(1)
                #save row and previous row
                trip.append(current_start.copy())
                continue
            station = row['from']
            line = row['line']
            destination = trip[-1].destination.values[0]
            departure_time = trip[-1].departure_time.values[0]
            offset_time = departure_time + pd.Timedelta(row.delta+5, unit='m')
            # check if we made it to end of destination
            if station == destination:
                print('Found trip from start to destination')
                break

            # search for next stop
            next_stop = get_next_stop(station=station, line=line, departure_time=departure_time, offset_time=offset_time, destination=destination)

            # if there is a match save it
            if next_stop.shape[0] > 0:
                trip.append(next_stop.sort_values(by='departure_time', ascending=True).head(1).copy())
            #otherwise break process and remove records except for first
            else:
                print('No match found')
                trip = [trip[0]]
                break
        #making sure it works and then break
        break
        tmp = pd.concat(trip)
        trip_id = uuid4()
        departure_ids = tmp.departure_id.to_list()
        update_departures(trip_id, departure_ids)
    #making sure it works and then break
    break


Starting line: 1
Starting a trip
Found trip from start to destination


### Third Revision

In [23]:
all_departures_df['trip_id'] = None
all_departures_df['line_id'] = None
all_departures_df['trip_searched'] = False
all_departures_df['trip_matched'] = False  


In [24]:
def get_starts(df, stop, line):
    first_stop_true = df.station_name.eq(stop)
    same_line_true = df.line.eq(line)
    trip_search_false = df.trip_searched.eq(False)
    return df[first_stop_true & same_line_true & trip_search_false]

In [25]:
def get_next_stop(df, station, line, departure_time, offset_time, destination):
    station_name = df.station_name.eq(station)
    same_line_true = df.line.eq(line)
    after_departure = df.departure_time.gt(departure_time)
    before_offset = df.departure_time.le(offset_time)
    same_destination = df.destination.eq(destination)
    trip_search_false = df.trip_searched.eq(False)
    logic = station_name & same_line_true & after_departure & before_offset & same_destination & trip_search_false
    return df[logic]

In [26]:
def update_departures(df, line_id, trip_id, departure_ids):
    matched_id = df.departure_id.isin(departure_ids)
    if len(departure_ids) == 1:
        df.loc[matched_id, ['trip_id']] = None
        df.loc[matched_id, ['trip_searched']] = True    
        df.loc[matched_id, ['trip_matched']] = False  
    else:
        df.loc[matched_id, ['trip_id']] = trip_id
        df.loc[matched_id, ['line_id']] = line_id
        df.loc[matched_id, ['trip_searched']] = True    
        df.loc[matched_id, ['trip_matched']] = True    

In [27]:
def process_line(line_id, stops_df):    
    #select current line data
    current_line = line_data[line_data.line_id.eq(line_id)]
    line = current_line.line.values[0]
    first_stop = current_line[current_line['order'].eq(1)]['from'].values[0]
    
    #check if there is a starting station to be matched
    while get_starts(stops_df, first_stop, line).shape[0] > 0:
        trip = []
        for index, row in current_line.sort_values(by='order', ascending=True).iterrows():
            if len(trip) == 0:
                current_start = get_starts(stops_df, first_stop, line).head(1)
                #save row and previous row
                trip.append(current_start.copy())
                continue
            station = row['from']
            line = row['line']
            destination = trip[-1].destination.values[0]
            departure_time = trip[-1].departure_time.values[0]
            offset_time = departure_time + pd.Timedelta(row.delta+5, unit='m')
            # check if we made it to end of destination
            if station == destination:
                break

            # search for next stop
            next_stop = get_next_stop(stops_df, station=station, line=line, departure_time=departure_time, offset_time=offset_time, destination=destination)

            # if there is a match save it
            if next_stop.shape[0] > 0:
                trip.append(next_stop.sort_values(by='departure_time', ascending=True).head(1).copy())
            #otherwise break process and remove records except for first
            else:
                trip = [trip[0]]
                break
        
        tmp = pd.concat(trip)
        trip_id = uuid4()
        departure_ids = tmp.departure_id.to_list()
        update_departures(stops_df, line_id, trip_id, departure_ids)
    return stops_df[stops_df.trip_searched]

### Parallel Processing

In [28]:
line_ids = line_data.line_id.unique().tolist()

In [29]:
%%time
# weird bug with joblib/pandas, thus the nbytes argument
# https://stackoverflow.com/questions/56036527/pandas-merge-command-failing-in-parallel-loop-valueerror-buffer-source-array
accumulator = Parallel(n_jobs=8, max_nbytes=None)(
    delayed(process_line)(line_id, all_departures_df.copy()) for line_id in line_ids)

CPU times: user 21.2 s, sys: 3.19 s, total: 24.4 s
Wall time: 1h 49min 18s


In [30]:
#assert len(accumulator) == len(line_ids)

## Rejoining Results

Assuming we've done the parallel processing we can now analyze the results here:

In [31]:
updated_trips = pd.concat(accumulator)

### Comparing Results

In [32]:
all_departures_df.shape

(570437, 9)

In [33]:
updated_trips.shape, updated_trips.trip_searched.sum()

((371472, 9), 371472)

### Saving Locally

In [34]:
updated_trips.to_csv('matched_trips.csv')

### Dropping Set of problematic duplicates

In some cases it is still possible that departures have gotten duplicated, probably because of just the S1 where the departures are timed very close together? or maybe they're sharing a train? Anyways we drop those for now and pretend as if they aren't matched

In [35]:
duplicates = updated_trips.duplicated(subset='departure_id', keep=False)

trip_ids_to_drop = updated_trips[duplicates].trip_id.unique()
updated_trips = updated_trips[~updated_trips.trip_id.isin(trip_ids_to_drop)]

In [36]:
updated_trips.trip_searched.sum()

288209

### Creating New Table with required data

In [37]:
unmatched_departures = all_departures_df[~all_departures_df.departure_id.isin(updated_trips.departure_id)]

In [38]:
merged_trips = pd.concat([updated_trips, unmatched_departures]).drop_duplicates(keep='first', subset='departure_id')

In [39]:
merged_trips

Unnamed: 0,departure_id,destination,line,station_name,departure_time,trip_id,trip_searched,trip_matched,line_id
6,be5caa40ac9304b82575afe80a7f0b31#1609452240000...,Gilching-Argelsried,S8,Flughafen München,2020-12-31 22:04:00,b6fc5a97-b148-4410-9b04-253d25fc21ca,True,True,1
11,71ff651e3d2583658ff020b6439dac60#1609452540000...,Herrsching,S8,Daglfing,2020-12-31 22:09:00,fe5cb6d4-1c1f-4fe5-afbf-173fb70d7af1,True,True,1
16,0e0e8efa7f892575da18eefbb5c6554b#1609435020000...,Herrsching,S8,Ostbahnhof München,2020-12-31 17:17:00,1243ecac-9d9e-4527-b69b-f29e607c3090,True,True,1
36,30d4adc8157bb8f3287d6ca727155b0d#1609440300000...,Herrsching,S8,Johanneskirchen,2020-12-31 18:45:00,8adafab4-bd30-4dee-a0b2-e0cbd9519f8a,True,True,1
37,f8a330ec64b9684a971f6caa844c883b#1609444860000...,Herrsching,S8,Weßling,2020-12-31 20:01:00,5d4992e9-08ef-43f6-8ddb-f0d582bff293,True,True,1
...,...,...,...,...,...,...,...,...,...
570429,6c3c4eab7c5b9f0fc58a1781d2588318#1609456980000...,Wolfratshausen,S7,Hohenschäftlarn,2020-12-31 23:23:00,,False,False,
570430,6ab07aa7324dc0898045395655b591c5#1609461960000...,Petershausen,S2,St. Koloman,2021-01-01 00:46:00,,False,False,
570432,856eee39e676cb6b38b65a5289bc4867#1609466400000...,Kreuzstraße,S7,Baierbrunn,2021-01-01 02:00:00,,False,False,
570434,710f2711ae3723c9b9ade9aae5d6ff94#1609466580000...,Petershausen,S2,Vierkirchen-Esterhofen,2021-01-01 02:03:00,,False,False,


In [40]:
merged_trips.shape, merged_trips.trip_searched.sum()

((570437, 9), 288209)

In [41]:
merged_trips.to_sql("matched_departures",engine, index=False,if_exists='replace')

## Getting Statistics

In [44]:
updated_trips.groupby(['line_id']).count()['trip_id']

line_id
1      36834
2      15746
3       9850
4       4560
6        696
7      28328
8      17326
9      23641
101    29016
102    20754
103    19272
105    25649
107    28002
109    20975
110     7560
Name: trip_id, dtype: int64