# Data Cleaning

BTRY 4100 -- Final Project

Andrew Chung, hc893

## Station Ridership Data

In [None]:
import numpy as np
import pandas as pd
import re
from functools import reduce
from datetime import datetime

# download_url = "https://data.ny.gov/api/views/5wq4-mkjj/rows.csv?accessType=DOWNLOAD"
# hourly ridership (station-level)
hourly_ridership_url = "5wq4-mkjj"

def export_from(url):
  return pd.read_csv("{}{}{}".format(
    "https://data.ny.gov/api/views/",
    url,
    "/rows.csv?accessType=DOWNLOAD"
  ))

def impute(data, 
          gc_shuttle = 'S 42nd', 
          fk_shuttle = 'S Fkln',
          jz = 'J'):
  df = data.copy()
  df.loc[df['line'] == gc_shuttle, 'line'] = 'SG'
  df.loc[df['line'] == fk_shuttle, 'line'] = 'SF'
  df.loc[df['line'] == jz, 'line'] = 'JZ' # merge J/Z
  return df

def extract_lines(text):
  matches = re.findall(r'\((.*?)\)', text)
  items = np.concatenate([item.split(',') for item in matches])
   # sometimes, station names in parentheses get thrown in the mix
  return ','.join(map(str, items[np.char.str_len(items) <= 2]))

def main():

  print("Loading data files from data.ny.gov...")
  print(datetime.now())
  # read in the data: hourly ridership
  hourly_ridership_data = export_from(hourly_ridership_url).query(
    "transit_mode == \'subway\'"
  ).reset_index().drop(columns = ['index'])
  hourly_ridership_data['transit_timestamp'] = pd.to_datetime(hourly_ridership_data['transit_timestamp'])

  print("Hourly Ridership data successfully loaded.")
  print(datetime.now())

  print("Processing hourly ridership data:")

  # cleaning station data

  # define peak time blocks
  start_time_am = pd.to_datetime('06:30:00').time()
  end_time_am = pd.to_datetime('09:30:00').time()
  start_time_pm = pd.to_datetime('15:30:00').time()
  end_time_pm = pd.to_datetime('20:00:00').time()

  # first, filter by month (Jan-Feb)
  hourly_ridership_data = hourly_ridership_data[
    hourly_ridership_data['transit_timestamp'].dt.month < 3
  ]
  # filter hourly ridership data by peak status
  hourly_ridership_data = hourly_ridership_data[
    hourly_ridership_data['transit_timestamp'].dt.time.between(start_time_am, end_time_am) |
    hourly_ridership_data['transit_timestamp'].dt.time.between(start_time_pm, end_time_pm)
  ].sort_values(by = 'transit_timestamp').reset_index().drop(columns = ['index'])\
                                      .groupby(['transit_timestamp', 'station_complex'], as_index = False)\
                                      .agg({
                                        'borough': lambda x: x.mode()[0], # stations do not span different boroughs
                                        'ridership': 'sum'
                                      })
  
  # remove weekends
  hourly_ridership_data = hourly_ridership_data[hourly_ridership_data['transit_timestamp'].dt.weekday < 5] # 5,6 are Sat/Sun
  # remove holidays
  hourly_ridership_data = hourly_ridership_data[~hourly_ridership_data['transit_timestamp'].dt.date.isin(pd.to_datetime([
    '2025-01-01', '2025-01-20', '2025-02-17'
  ]))]

  # group by station ID, aggregate hourly ridership into monthly figures
  hourly_ridership_data['month'] = hourly_ridership_data['transit_timestamp'].dt.month
  monthly_ridership_data = hourly_ridership_data.groupby(['station_complex', 'month'], as_index = False).agg({
    'borough': lambda x: x.mode()[0],
    'ridership': 'sum'
  })
  monthly_ridership_data['lines'] = monthly_ridership_data['station_complex'].apply(extract_lines)
  monthly_ridership_data.to_csv("MTA_Subway_Ridership_Summarized_Apr21.csv", index = False)

  print("Hourly Ridership Dataset successfully written into .csv.")

if __name__ == "__main__":
  main()

## Line Performance Data

In [None]:
import numpy as np
import pandas as pd
import re
from functools import reduce
from datetime import datetime

# download_url = "https://data.ny.gov/api/views/5wq4-mkjj/rows.csv?accessType=DOWNLOAD"
# subway line-level data sets
on_time_url = "ks33-g5ze"
trains_delayed_url = "9zbp-wz3y"
service_delivered_url = "nmu4-7tz9"
late_arrivals_url = "x7nj-r656"
major_incidents_url = "uqnw-2qfk"
customer_journey_url = "s4u6-t435"
wait_assessment_url = "62c4-mvcx"
terminal_ontime_url = "ks33-g5ze"

def export_from(url):
  return pd.read_csv("{}{}{}".format(
    "https://data.ny.gov/api/views/",
    url,
    "/rows.csv?accessType=DOWNLOAD"
  ))

def impute(data, 
          gc_shuttle = 'S 42nd', 
          fk_shuttle = 'S Fkln',
          jz = 'J'):
  df = data.copy()
  df.loc[df['line'] == gc_shuttle, 'line'] = 'SG'
  df.loc[df['line'] == fk_shuttle, 'line'] = 'SF'
  df.loc[df['line'] == jz, 'line'] = 'JZ' # merge J/Z
  return df

def extract_lines(text):
  matches = re.findall(r'\((.*?)\)', text)
  items = np.concatenate([item.split(',') for item in matches])
   # sometimes, station names in parentheses get thrown in the mix
  return ','.join(map(str, items[np.char.str_len(items) <= 2]))

def main():

  print("Loading data files from data.ny.gov...")

  # subway line performance data
  ## customer journey metrics
  customer_journey_data = export_from(customer_journey_url).query(
    "period == \"peak\" & line not in ['W', 'S Rock']"
  ).reset_index().drop(columns = ['division', 'index', 'period'])
  customer_journey_data = impute(customer_journey_data).groupby(['line', 'month']).agg({
    'num_passengers': 'sum', # sum over 2 months
    'additional platform time': 'mean', 
    'additional train time': 'mean', 
    'over_five_mins_perc': 'mean'
  }).reset_index().rename(columns = {
    'index': 'line',
    'additional platform time': 'additional_platform_time',
    'additional train time': 'additional_train_time'
  })

  ## wait assessment
  wait_assessment_data = export_from(wait_assessment_url).query(
    "day_type == 1 and period == \"peak\" & line not in ['H', 'W', 'S Rock']"
  ).reset_index().drop(columns = ['division', 'index', 'day_type', 'period'])
  wait_assessment_data = impute(wait_assessment_data, gc_shuttle = 'GS', fk_shuttle = 'FS').groupby(['line', 'month']).agg({
    'wait assessment': 'mean'
  }).reset_index().rename(columns = {'index': 'line', 'wait assessment': 'wait_assessment'})

  ## service delivered
  service_delivered_data = export_from(service_delivered_url).query(
    "day_type == 1 & line not in ['H', 'W', 'S Rock']"
  ).reset_index().drop(columns = ['division', 'index', 'day_type'])
  service_delivered_data = impute(service_delivered_data, gc_shuttle = 'GS', fk_shuttle = 'FS').groupby(['line', 'month']).agg({
    'service delivered': 'mean'
  }).reset_index().rename(columns = {'index': 'line', 'service delivered': 'service_delivered'})

  ## terminal on-time performance
  terminal_ontime_data = export_from(terminal_ontime_url).query(
    "day_type == 1 & line not in ['W', 'S Rock']"
  ).reset_index().drop(columns = ['division', 'index', 'day_type'])
  terminal_ontime_data = impute(terminal_ontime_data).groupby(['line', 'month']).agg({
    'terminal_on_time_performance': 'mean'
  }).reset_index().rename(columns = {'index': 'line'})

  # 4-5 minute late arrivala
  late_arrivals_data = export_from(late_arrivals_url).query(
    "`Day Type` == 1 & Line not in ['SI', 'W', 'S Rock']"
  ).reset_index().drop(columns = ['Division', 'index', 'Day Type']).rename(columns = {
    'Month': 'month',
    'Line': 'line',
    'Percent Late': 'percent_late'
  })
  late_arrivals_data = impute(
    late_arrivals_data[late_arrivals_data['month']\
      .isin(['2025-01-01', '2025-02-01'])]\
      .reset_index()\
      .drop(columns = ['index'])
  )
  late_arrivals_data.loc[late_arrivals_data['line'] == 'NW', 'line'] = 'N' # NW -> N
  late_arrivals_data = late_arrivals_data.groupby(['line', 'month']).agg({
    'percent_late': 'mean'
  }).reset_index().rename(columns = {'index': 'line'})

  ## trains delayed
  trains_delayed_data = export_from(trains_delayed_url).query(
    "day_type == 1 & line not in ['W', 'S Rock']"
  ).reset_index().drop(columns = ['division', 'index', 'day_type'])
  trains_delayed_data = impute(trains_delayed_data, gc_shuttle = 'GS')

  ## major incidents
  major_incidents_data = export_from(major_incidents_url).query(
    "day_type == 1"
  ).dropna().reset_index().drop(columns = ['index']).dropna()

  print("All data files loaded.")
  print("Merging First 5 datasets...")

  # aggregate line-specific data

  # lines in the NYC subway system
  subway_lines = np.concatenate((
    np.arange(1,8).astype(str), # numbered lines
    np.array([
      "SG","A","B","C","D","E","F","G","JZ","L","M","N","Q","R","SF"
    ]) # lettered lines
  ))

  # initialize dataset, assign lines and divisions
  line_data = pd.DataFrame(columns =  ['line', 'month']).assign(line = subway_lines, division = None)
  datasets = [
    customer_journey_data, 
    wait_assessment_data,
    service_delivered_data, 
    terminal_ontime_data,
    late_arrivals_data
  ]

  for dataset in range(len(datasets)):
    assert 'line' in datasets[dataset].columns, "line does not exist in {}".format(dataset)

  # Join Datasets
  line_data = reduce(lambda left, right: pd.merge(left, right, on = ['line', 'month'], how = 'left'), datasets)
  line_data

  print("Merging major incidents and train delay data...")

  # major delays and trains delayed data
  # major incidents: indicator variables
  # I will group the incidents into 2 types
  ## 1. Infrastructural -- signal malfunction, subway car, track, stations and structural
  ## 2. Personal/civil: Persons on trackbed/police/medical, other
  incidents = major_incidents_data['category'].unique()
  major_incidents_data['class'] = major_incidents_data['category'].map({
    'Signals': 'Infrastructure',
    'Subway Car': 'Infrastructure',
    'Track': 'Infrastructure',
    'Stations and Structure': 'Infrastructure',
    'Persons on Trackbed/Police/Medical': 'Non-Infrastructure',
    'Other': 'Non-Infrastructure'
  })
  incident_data = major_incidents_data.groupby(['line', 'month', 'class']).agg({
    'count': 'sum'
  }).reset_index().pivot_table(
    index = ['line', 'month'], 
    columns = 'class', 
    values = 'count', 
    aggfunc ='sum'
  ).reset_index().rename(columns = {
    'index': 'line',
    'Infrastructure': 'infra_critical',
    'Non-Infrastructure': 'noninfra_critical'
  }).fillna(0)
  incident_data['infra_critical'] = incident_data['infra_critical'].astype('Int64')
  incident_data['noninfra_critical'] = incident_data['noninfra_critical'].astype('Int64')

  # Delays: in similar fashion, except the reports are already categorized.
  # These events have not spurred major incidents but have nonetheless slowed service.
  ## 1. Infrastructural: Crew Availability, Infra/Equipment, Operating Conditions, Planned ROW work
  ## 2. Non-Infrastructural: Police & Medical, External Factors
  delays = trains_delayed_data['reporting_category'].unique()
  trains_delayed_data['class'] = trains_delayed_data['reporting_category'].map({
    'Crew Availability': 'Infrastructure',
    'Infrastructure & Equipment': 'Infrastructure',
    'Operating Conditions': 'Infrastructure',
    'Planned ROW Work': 'Infrastructure',
    'External Factors': 'Non-Infrastructure',
    'Police & Medical': 'Non-Infrastructure'
  })
  delay_data = trains_delayed_data.groupby(['line', 'month', 'class']).agg({
    'delays': 'sum'
  }).reset_index().pivot_table(
    index = ['line', 'month'],
    columns = 'class',
    values = 'delays',
    aggfunc = 'sum'
  ).reset_index().rename(columns = {
    'index': 'line',
    'Infrastructure': 'infra_noncritical',
    'Non-Infrastructure': 'noninfra_noncritical'
  }).fillna(0)
  delay_data['infra_noncritical'] = delay_data['infra_noncritical'].astype('Int64')
  delay_data['noninfra_noncritical'] = delay_data['noninfra_noncritical'].astype('Int64')

  line_data = line_data.merge(
    incident_data, on = ['line', 'month'], how = 'left'
  ).merge(
    delay_data, on = ['line', 'month'], how = 'left'
  ).fillna(0) # note there is no existing data for major incidents in shuttle services.
  
  line_data.to_csv("MTA_Subway_Line_Data_2025_Apr21.csv")

  print("Line Dataset successfully written into .csv.")

if __name__ == "__main__":
  main()

## Data Integration

In [None]:
import numpy as np
import pandas as pd

def impute_shuttles(data):
  df = data.copy()
  df = df[~df['lines'].isin(['A,S'])] # remove Rockaway shuttle stations
  # since shuttle type is not specified in hourly ridership data, I will
  # manually impute it based on domain knowledge.

  # SG (42nd St Shuttle) connects to Times Square (1,2,3,7,A,C,E,N,Q,R,W) and Grand Central (4,5,6,7)
  # SF (Franklin Av Shuttle) connects to Franklin Av (A,C), Botanic Garden (2,3,4,5), and Prospect Park (B,Q); Park Place (no connections)

  def assign_lines(val):
    parts = [p.strip() for p in val.split(',')]
    sg_stations = np.array(["N,Q,R,W,S,1,2,3,7,A,C,E", "S,4,5,6,7"])
    sf_stations = np.array(["2,3,4,5,S", "C,S", "B,Q,S", "S"])

    if val in sg_stations:
      return ','.join(['SG' if p == 'S' else p for p in parts])
    elif val in sf_stations:
      return ','.join(['SF' if p == 'S' else p for p in parts])
    else:
      return val

  df['lines'] = df['lines'].apply(assign_lines)
  return df

def main():
  
  print("Processing Monthly Ridership data...")
  # fix shuttle designations
  # aggregate by line complexes
  ridership_data = pd.read_csv(
    "MTA_Subway_Ridership_Summarized_Apr21.csv"
  ).groupby(['lines', 'month'], as_index = False).agg({
    'ridership': 'mean'
  })
  ridership_data = impute_shuttles(ridership_data)
  
  print("Processing line performance data...")

  # import line performanace data
  line_data = pd.read_csv("MTA_Subway_Line_Data_2025_Apr21.csv").iloc[:, 1:] # remove index column
  line_data['month'] = pd.to_datetime(line_data['month']).dt.month
  metrics = line_data.columns[3:].to_numpy()
  # compute weights by gross ridership
  line_data['line_weight'] = line_data['num_passengers']/line_data['num_passengers'].sum()

  # assign metrics to ridership data, name it "composite_data_unweighted"
  # no weights for relative subway ridership applied
  composite_data_unweighted = ridership_data.assign(**{col: None for col in metrics})

  # assign weighted metrics to ridership data, name it "composite_data_weighted"
  # weights for relative subway ridership computed by ridership numbers
  composite_data_weighted = ridership_data.assign(**{col: None for col in np.append(metrics, ['line_weight'])})

  # helper function to resolve confusions with N/W and J/Z lines
  def impute_lines(lines):
    arr = lines.copy().tolist()
    # Case 1: N/W
    if 'W' in arr:
      if 'N' not in arr:
        arr[arr == 'W'] = 'N'
      else:
        arr.remove('W')

    # Case 2: J/Z
    if 'J' in arr:
      if 'Z' in arr:
        arr.remove('J')
        arr.remove('Z')
        arr.append('JZ')
      else:
        arr = ['JZ' if x == 'J' else x for x in arr]

    return np.array(arr)

  print("Computing unweighted and weighted composite datasets...")

  # fill in performance data (unweighted)
  for idx, row in composite_data_unweighted.iterrows():

    lines_arr = impute_lines(np.array(row['lines'].split(',')))
    row['lines'] = ','.join(lines_arr) # fix line formatting
    month = row['month']
    indices = line_data[
      (line_data['month'] == month) & (line_data['line'].isin(lines_arr))
    ].index.to_numpy()

    for metric in metrics:
      subframe = line_data.loc[indices]
      weights_sum = subframe['line_weight'].sum()
      composite_data_unweighted.loc[idx, metric] = subframe[metric].sum()
      composite_data_weighted.loc[idx, metric] = (
        subframe[metric] * (subframe['line_weight'] * len(indices)/weights_sum)
      ).sum()

  # save to .csv
  composite_data_unweighted.to_csv("Unweighted_Data_Apr22.csv")
  composite_data_weighted.to_csv("Weighted_Data_Apr22.csv")

  print("Successfully saved to .csv.")

if __name__ == "__main__":
  main()