In [1]:
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)

Clean

In [2]:
# Read in function
dtypes = {
  'DoW': 'category',
  'serviceDate': 'str',
  'routeId': 'category',
  'directionId': 'category',
  'blockId': 'int16',
  'tripId': 'int32',
  'scheduledTripStartTime': 'str',
  'scheduledRuntimeSeconds': 'int16',
  'scheduledFromStopDepartureTime': 'str',
  'observedRuntimeSeconds': 'float32',
  'stopPathLength': 'float32',
  'fromStopfrom_stop_stopPathIndex': 'int16',
  'fromStopfrom_stop_name': 'category',
  'fromStopfrom_stop_id': 'category',
  'fromStopfrom_stop_gtfsStopSequence':'int16',
  'fromStopfrom_stop_isScheduleAdherenceStop': 'bool',
  'fromStopfrom_stop_isFirstStopInTrip': 'bool',
  'fromStopfrom_stop_isLastStopInTrip': 'bool',
  'fromStopfrom_stop_isWaitStop': 'bool',
  'toStopto_stop_stopPathIndex': 'int16',
  'toStopto_stop_name': 'category',
  'toStopto_stop_id': 'category',
  'toStopto_stop_gtfsStopSequence': 'int16',
  'toStopto_stop_isScheduleAdherenceStop': 'bool',
  'toStopto_stop_isFirstStopInTrip': 'bool',
  'toStopto_stop_isLastStopInTrip': 'bool',
  'toStopto_stop_isWaitStop': 'bool',
  'speed_mph': 'float32'
}
columnsToRename = {
  'fromStopfrom_stop_stopPathIndex': 'fromStopPathIndex',
  'fromStopfrom_stop_name': 'fromStopName',
  'fromStopfrom_stop_id': 'fromStopId',
  'fromStopfrom_stop_gtfsStopSequence': 'fromStopSequence',
  'fromStopfrom_stop_isScheduleAdherenceStop': 'fromStopIsScheduledAdherenceStop',
  'fromStopfrom_stop_isFirstStopInTrip': 'fromStopIsTripFirstStop',
  'fromStopfrom_stop_isLastStopInTrip': 'fromStopIsTripLastStop',
  'fromStopfrom_stop_isWaitStop': 'fromStopIsWaitStop',
  'toStopto_stop_stopPathIndex': 'toStopPathIndex',
  'toStopto_stop_name': 'toStopName',
  'toStopto_stop_id': 'toStopId',
  'toStopto_stop_gtfsStopSequence': 'toStopSequence',
  'toStopto_stop_isScheduleAdherenceStop': 'toStopIsScheduledAdherenceStop',
  'toStopto_stop_isFirstStopInTrip': 'toStopIsTripFirstStop',
  'toStopto_stop_isLastStopInTrip': 'toStopIsTripLastStop',
  'toStopto_stop_isWaitStop': 'toStopIsWaitStop',
}

def readRuntimeData (path):
  result = pd.read_csv(
    path,
    dtype=dtypes,
    usecols=list(dtypes.keys())
  ).rename(
    columns=columnsToRename
  )
  return result

In [3]:
# Correct and cast datetime
def castDatetime (dateColumn, timeColumn):
  hours = timeColumn.str.slice(0, 2).astype(int)
  correctedHours = (hours % 24).astype(str).str.zfill(2)
  timesExcludingHours = timeColumn.str.slice(start=2)
  correctedTimes = correctedHours + timesExcludingHours

  hourOverflowMask = hours >= 24

  datetimeColumn = pd.to_datetime(
    dateColumn +
    ' ' +
    correctedTimes
  )
  datetimeColumn[hourOverflowMask] = (
    datetimeColumn[hourOverflowMask] +
    pd.to_timedelta(1, unit='d')
  )
  return datetimeColumn

In [4]:
columnsToCast = [
  'scheduledTripStartTime', 
  'scheduledFromStopDepartureTime',
]

def castDatetimeColumns (runtimeDf):
  # Scheduled departure time from terminal and each stop
  for column in columnsToCast:
    runtimeDf[column] = castDatetime(
      runtimeDf['serviceDate'], 
      runtimeDf[column]
    )
  runtimeDf['serviceDate'] = pd.to_datetime(runtimeDf['serviceDate'])
  
  return runtimeDf.dropna()

In [5]:
# Add cumulative distance
tripGroupByColumns = [
  'routeId',
  'serviceDate',
  'directionId',
  'tripId',
]

def addCumDistance (runtimeDf):
  runtimeDf = runtimeDf.sort_values(
    tripGroupByColumns + ["toStopSequence"]
  ).dropna()

  runtimeDf['cumDistance'] = runtimeDf.groupby(
    tripGroupByColumns
  )['stopPathLength'].cumsum()

  return runtimeDf

In [6]:
# Add scheduled and observed arrival times
def addArrivalTimes (runtimeDf):
  # Scheduled arrival time by stop
  runtimeDf['scheduledToStopArrivalTime'] = (
    runtimeDf['scheduledFromStopDepartureTime'] +
    pd.to_timedelta(
      runtimeDf['scheduledRuntimeSeconds'], 
      unit = 's'
    )
  )

  runtimeDf = runtimeDf.sort_values(
    tripGroupByColumns + ['toStopSequence']
  ).dropna()
  
  # Get the cumulative seconds from starting from the terminus
  runtimeDf['observedCumRuntimeSeconds'] = runtimeDf.groupby(
    tripGroupByColumns
  )['observedRuntimeSeconds'].cumsum().astype('int32')

  runtimeDf['observedToStopArrivalTime'] = (
    runtimeDf['scheduledTripStartTime'] +
    pd.to_timedelta(
      runtimeDf['observedCumRuntimeSeconds'],
      unit = 's'
    )
  )
  return runtimeDf

In [7]:
def readAndProcessData (path):
  runtimeDf = readRuntimeData(
    path=path
  )
  runtimeDf = castDatetimeColumns(runtimeDf)
  runtimeDf = addCumDistance(runtimeDf)
  runtimeDf = addArrivalTimes(runtimeDf)

  return runtimeDf

In [8]:
boxPath = "C:/Users/leeje/Box/Practicum_Otis_Bus/raw-data"
runtime = readAndProcessData(f"{boxPath}/oct_16_31_weekday_runtimes.csv")

Calculate speed, lateness, and headway

In [9]:
# Rename column
runtime = runtime.rename(columns={
  "speed_mph": "speed"
})

# Add unique identifier to each arrival and use that as index
numInstances = len(runtime)
runtime['instanceId'] = range(numInstances)

In [10]:
# Calculate: lateness compared to if average/uniform speed

"""
Find the expected average speed for each trip (regardless of service date)
"""

# Expected average speed is calculated by total trip distance divided by total trip time
cumDistTime = runtime.query("toStopIsTripLastStop==True").copy().groupby([
  "routeId",
  "directionId",
  "tripId"
])[[
  "observedCumRuntimeSeconds",
  "cumDistance"
]].mean().dropna().reset_index()

cumDistTime["expectedSpeed"] = cumDistTime["cumDistance"] / cumDistTime["observedCumRuntimeSeconds"]

In [11]:
# Merge expected trip average speed to runtime DF
# set_index again: prevent merge from disrupting index
runtime = runtime.merge(
  cumDistTime[[
    "routeId", "directionId", 
    "tripId", "expectedSpeed"
  ]],
  how="left",
  left_on=["routeId", "directionId", "tripId"],
  right_on=["routeId", "directionId", "tripId"]
)

In [12]:
# Expected runtime seconds assuming using trip average speed as uniform speed
runtime["expectedCumRuntimeSeconds"] = runtime["cumDistance"] / runtime["expectedSpeed"]
runtime = runtime.dropna()

# Difference between runtime seconds compared to that assuming uniform speed
runtime["late"] = (
  runtime["observedCumRuntimeSeconds"] - runtime["expectedCumRuntimeSeconds"]
).astype("int")

In [13]:
sameServiceStopCols = ["routeId", "directionId", "serviceDate", "toStopId"]

# Add instance of previous trip
runtime = runtime.sort_values(
    sameServiceStopCols + ["scheduledToStopArrivalTime"]
)

runtime["prevInstanceId"] = runtime.groupby(
    sameServiceStopCols
)["instanceId"].shift(1).astype("Int64")


In [14]:
sameTripCols = ["routeId", "directionId", "serviceDate", "tripId"]

runtime = runtime.sort_values(
    sameTripCols + ["toStopSequence"]
)

runtime["lagInstanceId"] = runtime.groupby(
    sameTripCols
)["instanceId"].shift(1).astype("Int64")

In [15]:
# drop NAs

runtime = runtime.dropna(subset=["prevInstanceId", "lagInstanceId"])

In [17]:
toJoinFrom = runtime.copy()

In [18]:
toJoin = toJoinFrom[["instanceId", "observedToStopArrivalTime", "scheduledTripStartTime"]].rename(
    columns={
      "instanceId": "prevInstanceId",
      "observedToStopArrivalTime": "prevBusArrivalTime",
      "scheduledTripStartTime": "prevBusTripStartTime"
    }
).dropna(subset=["prevInstanceId"])

runtimeDf = runtime.merge(
    toJoin,
    how="left",
    on="prevInstanceId"
).dropna(subset=["prevInstanceId"])

In [19]:
# Observed headway

# Do a subtraction to get headway
runtimeDf["headway"] = ((
  runtimeDf["observedToStopArrivalTime"] - runtimeDf["prevBusArrivalTime"]
) / np.timedelta64(1, 's'))


In [20]:
# Expected headway

# Do a subtraction to get headway
runtimeDf["expectedHeadway"] = ((
  runtimeDf["scheduledTripStartTime"] - runtimeDf["prevBusTripStartTime"]
) / np.timedelta64(1, 's'))

In [21]:
toJoinFrom = runtimeDf.copy().dropna(subset=["instanceId"])

In [25]:
# Lag headway

toJoin = toJoinFrom[["instanceId", "headway", "speed", "late"]].rename(
    columns={
        "instanceId": "lagInstanceId",
        "headway": "lagHeadway",
        "speed": "lagSpeed",
        "late": "lagLate"
    }
)

runtimeDf = runtimeDf.merge(
    toJoin,
    how="left",
    on="lagInstanceId"
)

In [23]:
runtimeDf["initBunching"] = False
runtimeDf.loc[
    (runtimeDf["lagBunched"] == False) & (runtimeDf["bunched"] == True), 
    "initBunching"
] = True

In [29]:
for var in ["headway", "speed", "late"]:
  runtimeDf[f"{var}LagDiff"] = runtimeDf[var] - runtimeDf[f"lag{var.title()}"]

In [30]:
runtimeDf = runtimeDf.applymap(lambda x: np.nan if x is pd.NA else x)
toJoinFrom = runtimeDf.copy().dropna(subset=["instanceId"])

In [31]:
# Lag headway

toJoin = toJoinFrom[[
  "instanceId", "headway", "speed", "late", 
  "headwayLagDiff", "speedLagDiff", "lateLagDiff"
]].rename(
    columns={
        "instanceId": "prevInstanceId",
        "headway": "prevBus_headway",
        "speed": "prevBus_speed",
        "late": "prevBus_late",
        "headwayLagDiff": "prevBus_headwayLagDiff",
        "speedLagDiff": "prevBus_speedLagDiff",
        "lateLagDiff": "prevBus_lateLagDiff"
    }
)

runtimeDf = runtimeDf.merge(
    toJoin,
    how="left",
    on="prevInstanceId"
)

In [32]:
lagPredictors = ["speed", "headway", "late"]
lagDiffPredictors = [f"{var}LagDiff" for var in ["speed", "headway", "late"]]
lagPredictors = lagPredictors + lagDiffPredictors
preBusPredictors = [f"prevBus_{var}" for var in lagPredictors]

allLagPredictors = lagPredictors + preBusPredictors

Filter

In [47]:
routes = ["21", "47", "33"]
desired_date = "2022-10-26"
desired_time = "2022-10-26 08:30:16"

query = "(routeId.isin(@routes)) & (serviceDate == @desired_date)"
runtime_sel = runtimeDf.query(query).copy()

In [48]:
# All the trips on this day with their time range
trips = runtime_sel.groupby("tripId")["observedToStopArrivalTime"].agg(["min", "max"])
trips_sel = trips.query("min < @desired_time < max").copy()
tripIds_sel = trips_sel.index.tolist()

In [49]:
runtime_used = runtime_sel.query("tripId in @tripIds_sel").copy()
# Time range of each arrival instance
runtime_used["observedFromStopDepartureTime"] = runtime_used[
    "observedToStopArrivalTime"
] - pd.to_timedelta(runtime_used["observedRuntimeSeconds"], unit="s")

In [50]:
runtime_used = runtime_used.query(
    "observedFromStopDepartureTime < @desired_time < observedToStopArrivalTime"
).copy()

Produce realtime data

In [67]:
import geopandas as gpd
import json
stops = gpd.read_file("../../db/stops-all.geojson")

In [68]:
# ==============================================================================
# Generate fake transit view data (locations)
# ==============================================================================

realtime = runtime_used[
    ["routeId", "directionId", "tripId", "toStopId", "toStopName", "toStopSequence"]
].rename(
    columns={
        "toStopId": "next_stop_id",
        "toStopName": "next_stop_name",
        "toStopSequence": "next_stop_sequence",
        "routeId": "route_id",
        "tripId": "trip",
    }
)

In [69]:
direction_dict = {
    21: {0: "Eastbound", 1: "Westbound"},
    33: {0: "Southbound", 1: "Northbound"},
    47: {0: "Southbound", 1: "Northbound"},
}
realtime["Direction"] = realtime.apply(
    lambda row: direction_dict[int(row.route_id)][int(row.directionId)], axis=1
)
realtime = realtime.drop("directionId", axis=1)


In [70]:
stops.StopId = stops.StopId.astype(str)

stops = (
    stops[["StopId", "Lon", "Lat"]]
    .rename(columns={"StopId": "next_stop_id", "Lon": "lng", "Lat": "lat"})
    .drop_duplicates(subset=["next_stop_id"])
)

realtime = realtime.merge(stops, how="left", on="next_stop_id")

for route in routes:
    subset = realtime.query("route_id == @route").copy()
    json_data = {"bus": subset.to_dict(orient="records")}

    with open(f"../../db/demo-transit-view/{route}.json", "w") as f:
        json.dump(json_data, f)


Produce predictions

In [71]:
from joblib import load

In [81]:
runtime_used["period"] = runtime_used.scheduledTripStartTime.dt.hour

In [78]:
numBasePredictors = ["toStopPathIndex"]
catPredictors = ["directionId", "period"]

In [84]:
model = load("../serialized-models/21-11.joblib")

In [88]:
for route in ["47", "21", "33"]:
  globals()[f"subset_{route}"] = runtime_used.query("routeId == @route").copy()
  for steps in range(11, 21):
    model = load(f"../serialized-models/{route}-{steps}.joblib")
    probs = model.predict_proba(
       globals()[f"subset_{route}"]
    )[:,1]
    globals()[f"subset_{route}"][f"pred_{steps}"] = probs > 0.012
  

In [90]:
prediction = pd.concat(
    [subset_21, subset_33, subset_47], ignore_index=True
)

In [93]:
for row in prediction.iterrows():
  data = row[1]
  route = data["routeId"]
  direction = data["directionId"]
  trip = data["tripId"]
  predictions = [data[f"pred_{steps}"] for steps in range(11, 21)]
  json_data = {
    "prediction": predictions
  }
  with open(f"../../db/demo-prediction-forward/{route}-{direction}-{trip}.json", "w") as f:
    json.dump(json_data, f)