<a href="https://colab.research.google.com/github/CUSPADS2022IBX/IBXRidership/blob/main/Turnstile%20Data%20Processing/TOD_Turnstile_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data Source: http://web.mta.info/developers/turnstile.html

Example:

The data below shows the entry/exit register values for one turnstile at control area (A002) from 09/27/14 at 00:00 hours to 09/29/14 at 00:00 hours

Schema Example:
C/A,UNIT,SCP,STATION,LINENAME,DIVISION,DATE,TIME,DESC,ENTRIES,EXITS
A002,R051,02-00-00,LEXINGTON AVE,456NQR,BMT,09-27-14,00:00:00,REGULAR,0004800073,0001629137,

Data cleaning and processing resources used:

1)https://medium.com/qri-io/taming-the-mtas-unruly-turnstile-data-c945f5f96ba0

2)https://toddwschneider.com/dashboards/nyc-subway-turnstiles/#notes


In [1]:
!pip install pyspark
!pip install --upgrade xlrd

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 22 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=aa9d6d7fe79ce4c2c1b289ad8b4c64614d07946cae47f059b3dc1d4f5380db6e
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Collecting xlrd
  Downloading xlrd-2.0.1-py2.py3-none-any.whl (96 kB)
[K     |████████████████████████████████| 96 kB 2.8 MB/s 
[?25hInstalling collected packages: xlrd
  Attempting uninstall: xlrd
    Found existing

In [17]:
import pandas as pd
import numpy as np
from datetime import datetime, date, timedelta
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from pyspark import SparkFiles

sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession(sc)

#The start date has to match the date of the first URL in the MTA turnstile data, otherwise the URL pull will not work. Data is reported every Saturday.
#NOTE: On November 20, 2021 MTA changed their normal turnstile count periods from [12AM, 4AM, 8AM, 12PM, 4PM, 8PM] to [3AM, 7AM, 11AM, 3PM, 7PM, 11PM]
start_date = date(2022,3, 5)
end_date = date(2022, 4, 23)

#Create a list of dates for the date range requested
date_range = list(pd.date_range(start_date, end_date, freq='7D').strftime("%y%m%d"))

#MTA tunrstile schema, 'EXITS' kepts giving nulls when imported as IntegerType
mta_turnstile_schema = T.StructType([
  T.StructField('C/A', T.StringType(), True),
  T.StructField('UNIT', T.StringType(), True),
  T.StructField('SCP', T.StringType(), True),
  T.StructField('STATION', T.StringType(), True),
  T.StructField('LINENAME', T.StringType(), True),
  T.StructField('DIVISION', T.StringType(), True),
  T.StructField('DATE', T.StringType(), True),
  T.StructField('TIME', T.StringType(), True),
  T.StructField('DESC', T.StringType(), True),
  T.StructField('ENTRIES', T.IntegerType(), True),
  T.StructField('EXITS', T.FloatType(), True),
  ])

#Create empty dataframe with previous scheme
bigdf = spark.createDataFrame([], mta_turnstile_schema)

#Download each .txt file on to Spark job node and load into Spark DataFrame and union onto Empty DataFrame we created
for date_string in date_range:
  url = 'http://web.mta.info/developers/data/nyct/turnstile/turnstile_{}.txt'.format(date_string)
  spark.sparkContext.addFile(url)
  df = spark.read.csv(SparkFiles.get('turnstile_{}.txt'.format(date_string)), mta_turnstile_schema, header=True)
  bigdf = bigdf.union(df)

#Change 'EXITS' column data type to IntegerType and concate 'DATE' and 'TIME' columns and cast to datetime
bigdf = bigdf.withColumn('EXITS',bigdf.EXITS.cast(T.IntegerType()))\
             .withColumn('timestamp',
                         F.unix_timestamp(F.concat(bigdf.DATE,bigdf.TIME),'MM/dd/yyyyHH:mm:ss').cast('timestamp'))
             
#Create columns to represent unique observation id, and unique turnstile id for data processing             
bigdf = bigdf.withColumn('unit_division', F.concat(bigdf.UNIT,bigdf.DIVISION))\
             .withColumn('unit_id', F.concat(bigdf['C/A'],bigdf.UNIT,bigdf.SCP))

In [18]:
bigdf.show(5)

+----+----+--------+-------+--------+--------+----------+--------+-------+-------+-------+-------------------+-------------+----------------+
| C/A|UNIT|     SCP|STATION|LINENAME|DIVISION|      DATE|    TIME|   DESC|ENTRIES|  EXITS|          timestamp|unit_division|         unit_id|
+----+----+--------+-------+--------+--------+----------+--------+-------+-------+-------+-------------------+-------------+----------------+
|A002|R051|02-00-00|  59 ST| NQR456W|     BMT|02/26/2022|03:00:00|REGULAR|7689737|2671257|2022-02-26 03:00:00|      R051BMT|A002R05102-00-00|
|A002|R051|02-00-00|  59 ST| NQR456W|     BMT|02/26/2022|07:00:00|REGULAR|7689741|2671278|2022-02-26 07:00:00|      R051BMT|A002R05102-00-00|
|A002|R051|02-00-00|  59 ST| NQR456W|     BMT|02/26/2022|11:00:00|REGULAR|7689758|2671380|2022-02-26 11:00:00|      R051BMT|A002R05102-00-00|
|A002|R051|02-00-00|  59 ST| NQR456W|     BMT|02/26/2022|15:00:00|REGULAR|7689791|2671465|2022-02-26 15:00:00|      R051BMT|A002R05102-00-00|
|A002|

In [19]:
print(bigdf.count())
bigdf.describe(['ENTRIES', 'EXITS']).show()

1684150
+-------+--------------------+--------------------+
|summary|             ENTRIES|               EXITS|
+-------+--------------------+--------------------+
|  count|             1684150|             1684150|
|   mean|  4.27020266821542E7|3.4312966725517325E7|
| stddev|2.2301099300602618E8|  1.96194359027336E8|
|    min|                   0|                   0|
|    max|          2147319334|          2122443392|
+-------+--------------------+--------------------+



In [20]:
#Use utility function window to partition by turnstile and order by timestamp
window = Window.partitionBy('unit_id').orderBy('timestamp')

#Use previous window to find the 'net_entries' and 'net_exits'. Remove all entries that are above 10000, because
#turnstiles act as odometers, and when turnstile reaches end it resets creating a large value. 10000 is a good cutoff.
#Also drop first rows of each turnstile data, because .lag function creates None for first row.
net_entry_exit = bigdf.withColumn('net_entries', F.abs(F.col('ENTRIES') - F.lag(F.col('ENTRIES'), 1).over(window)))\
                      .withColumn('net_exits', F.abs(F.col('EXITS') - F.lag(F.col('EXITS'), 1).over(window))).dropna()

In [21]:
print(net_entry_exit.count())
net_entry_exit.describe(['net_entries', 'net_exits']).show()

1679109
+-------+------------------+------------------+
|summary|       net_entries|         net_exits|
+-------+------------------+------------------+
|  count|           1679109|           1679109|
|   mean|3636.3177625752705| 3958.484421201959|
| stddev|1866207.4947073061|2090951.0729941803|
|    min|                 0|                 0|
|    max|        1278711326|        1871701216|
+-------+------------------+------------------+



In [22]:
net_entry_exit = net_entry_exit.filter((F.col('net_entries')<10000) & (F.col('net_exits')<10000))

In [23]:
print(net_entry_exit.count())
net_entry_exit.describe(['net_entries', 'net_exits']).show()

1678901
+-------+-----------------+------------------+
|summary|      net_entries|         net_exits|
+-------+-----------------+------------------+
|  count|          1678901|           1678901|
|   mean|70.30216433250085| 89.40866793217707|
| stddev|  104.14214187233|142.35420633867332|
|    min|                0|                 0|
|    max|             9834|              8934|
+-------+-----------------+------------------+



In [24]:
net_entry_exit.show(5)

+----+----+--------+-------+--------+--------+----------+--------+-------+-------+-------+-------------------+-------------+----------------+-----------+---------+
| C/A|UNIT|     SCP|STATION|LINENAME|DIVISION|      DATE|    TIME|   DESC|ENTRIES|  EXITS|          timestamp|unit_division|         unit_id|net_entries|net_exits|
+----+----+--------+-------+--------+--------+----------+--------+-------+-------+-------+-------------------+-------------+----------------+-----------+---------+
|A002|R051|02-00-01|  59 ST| NQR456W|     BMT|02/26/2022|07:00:00|REGULAR|6803071|1576712|2022-02-26 07:00:00|      R051BMT|A002R05102-00-01|          2|       10|
|A002|R051|02-00-01|  59 ST| NQR456W|     BMT|02/26/2022|11:00:00|REGULAR|6803085|1576759|2022-02-26 11:00:00|      R051BMT|A002R05102-00-01|         14|       47|
|A002|R051|02-00-01|  59 ST| NQR456W|     BMT|02/26/2022|15:00:00|REGULAR|6803131|1576797|2022-02-26 15:00:00|      R051BMT|A002R05102-00-01|         46|       38|
|A002|R051|02-00

In [25]:
#Upload Remote_complex_lookup table and create key table for unit_division join
#Manually checked if complex_id was correct (google sheets for reference: https://docs.google.com/spreadsheets/d/1kMmoqzq3uWM5J8Esrzi1DPEBrdezzVtQ1Rv5ZAIsEfk/edit?usp=sharing)
remote_complex_url = 'https://raw.githubusercontent.com/qri-io/data-stories-scripts/master/nyc-turnstile-counts/lookup/remote_complex_lookup.csv'
remote_complex = pd.read_csv(remote_complex_url).sort_values('station')
remote_complex['complex_id'] = remote_complex['complex_id'].astype('Int64').astype('str')
remote_complex['unit_division'] = remote_complex['remote ']+remote_complex['division']
remote_complex = pd.DataFrame(remote_complex.groupby(['unit_division', 'complex_id']).size()).reset_index()
remote_complex_spark = spark.createDataFrame(remote_complex)

print(remote_complex.count())
remote_complex_spark.show(5)

unit_division    506
complex_id       506
0                506
dtype: int64
+-------------+----------+---+
|unit_division|complex_id|  0|
+-------------+----------+---+
|      R001BMT|       635|  2|
|      R001IRT|       635|  1|
|      R002BMT|       628|  3|
|      R003BMT|        86|  1|
|      R004BMT|        85|  1|
+-------------+----------+---+
only showing top 5 rows



In [26]:
#join to the entry_exit_df to create unique complex_Id column to aggregate on
complex_id_df = net_entry_exit.join(remote_complex_spark, net_entry_exit.unit_division==remote_complex_spark.unit_division, how='left')\
                              .select('complex_id','net_entries','net_exits', 'timestamp').sort(F.col('complex_id')).dropna()

In [27]:
complex_id_df.show(5)

+----------+-----------+---------+-------------------+
|complex_id|net_entries|net_exits|          timestamp|
+----------+-----------+---------+-------------------+
|         1|         20|       18|2022-02-26 07:00:00|
|         1|         11|      100|2022-02-27 03:00:00|
|         1|        111|       73|2022-02-26 11:00:00|
|         1|        132|      132|2022-02-26 15:00:00|
|         1|        116|      248|2022-02-26 19:00:00|
+----------+-----------+---------+-------------------+
only showing top 5 rows



In [28]:
print(complex_id_df.count())
complex_id_df.describe(['net_entries', 'net_exits']).show()

1669809
+-------+-----------------+------------------+
|summary|      net_entries|         net_exits|
+-------+-----------------+------------------+
|  count|          1669809|           1669809|
|   mean|70.50102137430089| 89.51339344799315|
| stddev|104.3006992604758|142.42225900403463|
|    min|                0|                 0|
|    max|             9834|              8934|
+-------+-----------------+------------------+



In [37]:
#Create new column for 'Day' 
complex_id_df = complex_id_df.withColumn('Day', F.date_trunc('day', F.col('timestamp')))

#Create new column for 'DOW' (day of week) to aggregate by weekends and weekdays.
complex_id_df = complex_id_df.withColumn('DOW', F.when((F.dayofweek(F.col('timestamp'))<7) & (F.dayofweek(F.col('timestamp'))>1),'weekday')\
                                                .when((F.dayofweek(F.col('timestamp'))==7) | (F.dayofweek(F.col('timestamp'))==1),'weekend'))

#Creates new column 'TOD' (Time of Day)
#NOTE: Hour intervals are messaged up due to MTA practices, these hours intervals were chosen based on how well the distributions matched with each TOD interval
complex_id_df = complex_id_df.withColumn('TOD', F.when((F.date_format(F.col('timestamp'), 'HH:mm:ss')> '00:00:00') & (F.date_format(F.col('timestamp'), 'HH:mm:ss')<= '07:00:00'), 'overnight')\
                                                .when((F.date_format(F.col('timestamp'), 'HH:mm:ss')> '07:00:00') & (F.date_format(F.col('timestamp'), 'HH:mm:ss')<= '16:00:00'), 'morning')\
                                                .when((F.date_format(F.col('timestamp'), 'HH:mm:ss')> '16:00:00') | (F.date_format(F.col('timestamp'), 'HH:mm:ss')<= '00:00:00'), 'evening'))

In [38]:
complex_id_df.show(50)

+----------+-----------+---------+-------------------+-------------------+-------+---------+
|complex_id|net_entries|net_exits|          timestamp|                Day|    DOW|      TOD|
+----------+-----------+---------+-------------------+-------------------+-------+---------+
|         1|         20|       18|2022-02-26 07:00:00|2022-02-26 00:00:00|weekend|overnight|
|         1|        115|       87|2022-03-06 15:00:00|2022-03-06 00:00:00|weekend|  morning|
|         1|        111|       73|2022-02-26 11:00:00|2022-02-26 00:00:00|weekend|  morning|
|         1|        132|      132|2022-02-26 15:00:00|2022-02-26 00:00:00|weekend|  morning|
|         1|        116|      248|2022-02-26 19:00:00|2022-02-26 00:00:00|weekend|  evening|
|         1|         61|      164|2022-02-26 23:00:00|2022-02-26 00:00:00|weekend|  evening|
|         1|         11|      100|2022-02-27 03:00:00|2022-02-27 00:00:00|weekend|overnight|
|         1|         15|       17|2022-02-27 07:00:00|2022-02-27 00:00

In [39]:
#Aggregate on complex_id, timestamp and unit_division to get all entries and exits for each station
complex_id_entry_exit_df = complex_id_df.groupBy('Day','DOW','TOD','complex_id')\
                                        .sum('net_entries','net_exits')\
                                        .withColumnRenamed('sum(net_entries)', 'entries')\
                                        .withColumnRenamed('sum(net_exits)', 'exits')
complex_id_entry_exit_df.show(5)

+-------------------+-------+---------+----------+-------+-----+
|                Day|    DOW|      TOD|complex_id|entries|exits|
+-------------------+-------+---------+----------+-------+-----+
|2022-03-09 00:00:00|weekday|  morning|       613|  11317|37339|
|2022-03-22 00:00:00|weekday|overnight|       613|    569|  378|
|2022-03-30 00:00:00|weekday|  evening|       613|  21359|12695|
|2022-04-04 00:00:00|weekday|  morning|       607|  19108|36139|
|2022-03-03 00:00:00|weekday|overnight|        14|    235|  421|
+-------------------+-------+---------+----------+-------+-----+
only showing top 5 rows



In [40]:
print(complex_id_entry_exit_df.count())
complex_id_entry_exit_df.describe(['entries', 'exits']).show()

71715
+-------+------------------+-----------------+
|summary|           entries|            exits|
+-------+------------------+-----------------+
|  count|             71715|            71715|
|   mean|1641.5427734783518|2084.226033605243|
| stddev| 3607.969891814449|4590.236950551468|
|    min|                 0|                0|
|    max|             85791|            88760|
+-------+------------------+-----------------+



In [41]:
#Aggregate on complex_id, weekend/weekday status, Morning, Evening, Overnight
final_agg_df = complex_id_entry_exit_df.groupBy('DOW','TOD','complex_id').mean('entries','exits').sort(F.col('complex_ID'))

In [42]:
weekend_morning = final_agg_df.filter((final_agg_df.DOW == 'weekend') & (final_agg_df.TOD == 'morning')).toPandas()
weekend_evening = final_agg_df.filter((final_agg_df.DOW == 'weekend') & (final_agg_df.TOD == 'evening')).toPandas()
weekend_overnight = final_agg_df.filter((final_agg_df.DOW == 'weekend') & (final_agg_df.TOD == 'overnight')).toPandas()
weekday_morning = final_agg_df.filter((final_agg_df.DOW == 'weekday') & (final_agg_df.TOD == 'morning')).toPandas()
weekday_evening = final_agg_df.filter((final_agg_df.DOW == 'weekday') & (final_agg_df.TOD == 'evening')).toPandas()
weekday_overnight = final_agg_df.filter((final_agg_df.DOW == 'weekday') & (final_agg_df.TOD == 'overnight')).toPandas()

In [43]:
#Sanity Check
(weekday_morning['avg(entries)'].sum() + weekday_evening['avg(entries)'].sum() + weekday_overnight['avg(entries)'].sum())

2438463.8876068373

In [44]:
#Sanity Check
weekday_morning['avg(exits)'].sum() + weekday_evening['avg(exits)'].sum() + weekday_overnight['avg(exits)'].sum()

3004871.1155982907

In [45]:
#Sanity Check
weekend_morning['avg(entries)'].sum() + weekend_evening['avg(entries)'].sum() + weekend_overnight['avg(entries)'].sum()

1262210.8958333333

In [48]:
#Sanity Check
weekend_morning['avg(exits)'].sum() + weekend_evening['avg(exits)'].sum() + weekend_overnight['avg(exits)'].sum()

1831265.1416666666

In [49]:
weekday_evening.count()

DOW             427
TOD             427
complex_id      427
avg(entries)    427
avg(exits)      427
dtype: int64

In [47]:
weekend_evening.loc[:425].to_csv('weekend_evening.csv')
weekend_morning.loc[:425].to_csv('weekend_morning.csv')
weekend_overnight.loc[:425].to_csv('weekend_overnight.csv')
weekday_morning.loc[:425].to_csv('weekday_morning.csv')
weekday_evening.loc[:425].to_csv('weekday_evening.csv')
weekday_overnight.loc[:425].to_csv('weekday_overnight.csv')