### MAST30034: Applied Data Science Project 1
---
# Preprocessing Part 2: Aggregating Data by MMWR Week
#### Xavier Travers (1178369)

Aggregate the TLC data by MMWR week (defined [here](https://ndc.services.cdc.gov/wp-content/uploads/MMWR_Week_overview.pdf)).
This means counting trips to and from each of the boroughs per month.
This is done for each of the taxi types.

In [8]:
# imports used throughout this notebook
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
import os
import re
from itertools import chain

# for printouts
DEBUGGING = True

In [9]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName('MAST30034 XT Project 1')
    .config('spark.sql.repl.eagerEval.enabled', True) 
    .config('spark.sql.parquet.cacheMetadata', 'true')
    .getOrCreate()
)

In [10]:
# read in the taxi zones dataset
zones_df = spark.read.parquet('../data/raw/tlc_zones/zones')

In [26]:
# create a general function to count months since start of timeline
# used on TLC and covid data
# TODO: move this to a helper script
def create_nth_month_column(df: DataFrame) -> DataFrame:
    """Create a `DataFrame` with a named column containing the index of the day/month
    combination since the start of the timeline.

    For example, 2019-07 has `month_index = 0`,
    and 2020-01 has `month_index = 6`.

    Args:
        df (`DataFrame`): `DataFrame` to modify (must have `month` and `year` columns)

    Returns:
        `DataFrame`: Modified `DataFrame`
    """
    start_year = 2019
    start_month = 7

    return df\
        .withColumn('month_index',
                (F.col('month') - start_month 
                 + 12 * (F.col('year') - start_year))
            )


### 1. Aggregating the TLC data by month

In [23]:
# TODO move this to a helper script
def prefix_column_names(df: DataFrame, prefix: str) -> DataFrame:
    """ Add a prefix to the columns names of a `DataFrame`.

    Args:
    - df (`DataFrame`): The `DataFrame` for which to add prefixes
    - prefix (str): The prefix

    Returns:
        `DataFrame`: The modified `DataFrame`
    """

    out_df = df
    for col in df.columns:
        out_df = out_df.withColumnRenamed(col, prefix + col)
    return out_df

In [24]:
# define the tlc dataset names
TLC_NAMES = ['yellow', 'green', 'fhv', 'fhvhv']

In [25]:
# TODO move this to a helper script
def add_borough_names(df: DataFrame) -> DataFrame:
    """Add basic legible information about pickup and dropoff boroughs

    Args:
        df (`DataFrame`): A `DataFrame` with `PULocationID` and `DOLocationID`.

    Returns:
        `DataFrame`: Joined `DataFrame` with borough information
    """

    location_id_colname = 'LocationID'

    required_zone_colnames = [
        'LocationID',
        'borough',
        'zone']

    out_df = df
    for prefix in ['PU', 'DO']:
        out_df = out_df.join(
            prefix_column_names(zones_df.select(
                required_zone_colnames), prefix),
            on=prefix + location_id_colname,
            how='inner'
        )

    return out_df


In [27]:
# this filter is used to reduce the amt of columns stored
OUT_COL_NAMES = [
    'year', # year group
    'month', # month group
    'type', # taxi type group
    'PUborough', # pickup borough group
    'DOborough', # dropoff borough group
    # grouping by sharing configuration 
    'shared', # (binary value for fhvhv and #passengers for yellow/green)
    'total_trips', # total trips in the group 
    'total_distance', # total sum of distance travelled on this route/sharing combo
    'avg_distance', # avg distance
]

In [28]:
# TODO move this to a helper script
def aggregate_trips_green_yellow(df: DataFrame, year:int, month:int, taxi_type:str) -> DataFrame:
    """ Group the trips from the green/yellow datasets by:
    - year
    - month
    - type
    - PUborough
    - DOborough
    - shared

    Args:
        df (DataFrame): Dataset to aggregate
        year (int): Year of df
        month (int): Month of df
        taxi_type (str): Taxi type (`'green'` or `'yellow'`) of df

    Returns:
        DataFrame: Grouped dataframe with aggregate values:
        - total_trips: # of trips in the above grouping
        - total_distance: # the sum of the distance travelled in the above grouping
        - avg_distance: average length in miles travelled for this grouping
    """
    # filter for only needed columns
    join_selections = ['PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance']
    joined_df = df\
        .select(join_selections)\
        .join(prefix_column_names(zones_df, 'PU'), 'PULocationID', 'inner')\
        .join(prefix_column_names(zones_df, 'DO'), 'DOLocationID', 'inner')

    # group the dataset
    group_filters = ['PUborough', 'DOborough', 'passenger_count']
    grouped_df = joined_df.groupBy(group_filters)

    # aggregate and force the data into memory (should be <10'000 rows)
    # (otherwise the java executors seem to run out of memory)
    out_rows = grouped_df.agg(
        F.count('*').alias('total_trips'),
        F.sum('trip_distance').alias('total_distance'),
        F.avg('trip_distance').alias('avg_distance')
    ).collect()
    out_df = spark.createDataFrame(out_rows)

    return out_df\
        .withColumn('type', F.lit(taxi_type))\
        .withColumn('year', F.lit(year))\
        .withColumn('month', F.lit(month))\
        .withColumnRenamed('passenger_count', 'shared')\
        .select(OUT_COL_NAMES)

In [29]:
def aggregate_trips_fhvhv(df: DataFrame, year:int, month:int) -> DataFrame:
    """ Group the trips from the fhvhv datasets by:
    - year
    - month
    - type
    - PUborough
    - DOborough
    - shared

    Args:
        df (DataFrame): Dataset to aggregate
        year (int): Year of df
        month (int): Month of df

    Returns:
        DataFrame: Grouped dataframe with aggregate values:
        - total_trips: # of trips in the above grouping
        - total_distance: # the sum of the distance travelled in the above grouping
        - avg_distance: average length in miles travelled for this grouping
    """
    join_selections = ['PULocationID', 'DOLocationID', 'shared_request_flag', 'trip_miles', 'hvfhs_license_num']
    joined_df = df\
        .select(join_selections)\
        .join(prefix_column_names(zones_df, 'PU'), 'PULocationID', 'inner')\
        .join(prefix_column_names(zones_df, 'DO'), 'DOLocationID', 'inner')

    group_filters = ['PUborough', 'DOborough', 'hvfhs_license_num', 'shared_request_flag']
    grouped_df = joined_df.groupBy(group_filters)

    # aggregate and force the data into memory (should be <10'000 rows)
    # (otherwise the java executors run out of memory at write time)
    out_rows = grouped_df.agg(
        F.count('*').alias('total_trips'),
        F.sum('trip_miles').alias('total_distance'),
        F.avg('trip_miles').alias('avg_distance')
    ).collect()
    out_df = spark.createDataFrame(out_rows)

    # create maps to map columns into other types/values
    licenses_dict = {
        'HV0002': 'juno',
        'HV0003': 'uber',
        'HV0004': 'via',
        'HV0005': 'lyft'
    }
    # from: https://stackoverflow.com/questions/42980704/pyspark-create-new-column-with-mapping-from-a-dict
    license_mapping_expr = F.create_map([F.lit(x) for x in chain(*licenses_dict.items())])
    
    flags_dict = {
        'Y': 1.0,
        'N': 0.0
    }
    # from: https://stackoverflow.com/questions/42980704/pyspark-create-new-column-with-mapping-from-a-dict
    flag_mapping_expr = F.create_map([F.lit(x) for x in chain(*flags_dict.items())])

    return out_df\
        .withColumn('year', F.lit(year))\
        .withColumn('month', F.lit(month))\
        .withColumn('type', license_mapping_expr[F.col('hvfhs_license_num')])\
        .withColumn('shared', flag_mapping_expr[F.col('shared_request_flag')])\
        .select(OUT_COL_NAMES)

In [30]:
def aggregate_trips(df: DataFrame, year:int, month:int, taxi_type: str) -> DataFrame:
    if taxi_type == 'fhvhv':
        return create_nth_month_column(
            aggregate_trips_fhvhv(df, year, month)
        )
    return create_nth_month_column(
        aggregate_trips_green_yellow(df, year, month, taxi_type)
    )

In [31]:
TLC_NAMES = ['green', 'yellow', 'fhvhv']
aggregated_df = None

# iterate through TLC datasets and aggregate them
# strap in, this takes a while (~5-10 mins)
for name in TLC_NAMES:
    if DEBUGGING:
        print(f'\nAGGREGATING "{name}" DATA')
    temp_df = None
    for filename in os.listdir(f'../data/raw/tlc/{name}'):
        tlc_df = spark.read.parquet(f'../data/raw/tlc/{name}/{filename}')
        filedata = re.split(r'[-.]', filename)
        if temp_df == None:
            temp_df = aggregate_trips(tlc_df, int(filedata[0]), int(filedata[1]), name)
        else:
            temp_df = temp_df.union(aggregate_trips(tlc_df, int(filedata[0]), int(filedata[1]), name))
    
    if DEBUGGING:
        print(temp_df.count())

    # aggregated_df.write.mode('overwrite').parquet(f'../data/curated/tlc/aggregated/{name}')
    temp_df.write.mode('overwrite').parquet(f'../data/curated/tlc/aggregated/{name}')
    if aggregated_df == None:
        aggregated_df = temp_df
    else:
        aggregated_df = aggregated_df.union(temp_df)

if DEBUGGING:
    print(aggregated_df.count())
aggregated_df.limit(20)

# save the aggregated data
aggregated_df.write.mode('overwrite').parquet('../data/curated/tlc/aggregated/all_types')


AGGREGATING "green" DATA


                                                                                

4616
22/08/07 13:10:50 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 91.57% for 8 writers
22/08/07 13:10:50 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 81.39% for 9 writers
22/08/07 13:10:50 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 73.25% for 10 writers
22/08/07 13:10:50 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 66.59% for 11 writers
22/08/07 13:10:50 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 61.04% for 12 writers
22/08/07 13:10:50 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 56.35% for 13 writers
22/08/07 13:10:50 WARN MemoryManager: Total allocation exceeds 95.00% (98

                                                                                


AGGREGATING "yellow" DATA


                                                                                

5984


                                                                                


AGGREGATING "fhvhv" DATA


                                                                                

4034


                                                                                

14634




22/08/07 13:14:51 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 91.57% for 8 writers
22/08/07 13:14:51 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 81.39% for 9 writers
22/08/07 13:14:51 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 91.57% for 8 writers
22/08/07 13:14:51 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 81.39% for 9 writers
22/08/07 13:14:51 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 73.25% for 10 writers
22/08/07 13:14:51 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 66.59% for 11 writers
22/08/07 13:14:51 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,2



22/08/07 13:14:54 WARN MemoryManager: Total allocation exceeds 95.00% (983,197,274 bytes) of heap memory
Scaling row group sizes to 91.57% for 8 writers


                                                                                

### 2. Aggregating the COVID data by month.

In [32]:
# read in the covid dataset
covid_df = spark.read.parquet('../data/curated/covid/cases-by-day')

In [33]:
# check that the data is read in correctly
covid_df.limit(5)

22/08/07 13:16:43 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


year,month,date_of_interest,CASE_COUNT,PROBABLE_CASE_COUNT,HOSPITALIZED_COUNT,DEATH_COUNT,PROBABLE_DEATH_COUNT,CASE_COUNT_7DAY_AVG,ALL_CASE_COUNT_7DAY_AVG,HOSP_COUNT_7DAY_AVG,DEATH_COUNT_7DAY_AVG,ALL_DEATH_COUNT_7DAY_AVG,BX_CASE_COUNT,BX_PROBABLE_CASE_COUNT,BX_HOSPITALIZED_COUNT,BX_DEATH_COUNT,BX_PROBABLE_DEATH_COUNT,BX_CASE_COUNT_7DAY_AVG,BX_PROBABLE_CASE_COUNT_7DAY_AVG,BX_ALL_CASE_COUNT_7DAY_AVG,BX_HOSPITALIZED_COUNT_7DAY_AVG,BX_DEATH_COUNT_7DAY_AVG,BX_ALL_DEATH_COUNT_7DAY_AVG,BK_CASE_COUNT,BK_PROBABLE_CASE_COUNT,BK_HOSPITALIZED_COUNT,BK_DEATH_COUNT,BK_PROBABLE_DEATH_COUNT,BK_CASE_COUNT_7DAY_AVG,BK_PROBABLE_CASE_COUNT_7DAY_AVG,BK_ALL_CASE_COUNT_7DAY_AVG,BK_HOSPITALIZED_COUNT_7DAY_AVG,BK_DEATH_COUNT_7DAY_AVG,BK_ALL_DEATH_COUNT_7DAY_AVG,MN_CASE_COUNT,MN_PROBABLE_CASE_COUNT,MN_HOSPITALIZED_COUNT,MN_DEATH_COUNT,MN_PROBABLE_DEATH_COUNT,MN_CASE_COUNT_7DAY_AVG,MN_PROBABLE_CASE_COUNT_7DAY_AVG,MN_ALL_CASE_COUNT_7DAY_AVG,MN_HOSPITALIZED_COUNT_7DAY_AVG,MN_DEATH_COUNT_7DAY_AVG,MN_ALL_DEATH_COUNT_7DAY_AVG,QN_CASE_COUNT,QN_PROBABLE_CASE_COUNT,QN_HOSPITALIZED_COUNT,QN_DEATH_COUNT,QN_PROBABLE_DEATH_COUNT,QN_CASE_COUNT_7DAY_AVG,QN_PROBABLE_CASE_COUNT_7DAY_AVG,QN_ALL_CASE_COUNT_7DAY_AVG,QN_HOSPITALIZED_COUNT_7DAY_AVG,QN_DEATH_COUNT_7DAY_AVG,QN_ALL_DEATH_COUNT_7DAY_AVG,SI_CASE_COUNT,SI_PROBABLE_CASE_COUNT,SI_HOSPITALIZED_COUNT,SI_DEATH_COUNT,SI_PROBABLE_DEATH_COUNT,SI_CASE_COUNT_7DAY_AVG,SI_PROBABLE_CASE_COUNT_7DAY_AVG,SI_ALL_CASE_COUNT_7DAY_AVG,SI_HOSPITALIZED_COUNT_7DAY_AVG,SI_DEATH_COUNT_7DAY_AVG,SI_ALL_DEATH_COUNT_7DAY_AVG,INCOMPLETE
2020,2,02/29/2020,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2020,3,03/01/2020,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2020,3,03/02/2020,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2020,3,03/03/2020,1,0,7,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2020,3,03/04/2020,5,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,2,0,1,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [34]:
# define the borough codes in column names
BOROUGH_PREFIXES = {
    '': 'Overall',
    'BX_': 'Bronx',
    'BK_': 'Brooklyn',
    'MN_': 'Manhattan',
    'QN_': 'Queens',
    'SI_': 'Staten Island'
}

# define the suffixes of column names that I want to keep
COL_SUFFIXES = [
    'CASE_COUNT',
    'HOSPITALIZED_COUNT',
    'DEATH_COUNT'
]

# generate the list of columns I want
from itertools import product

COL_AGGREGATES = []
for pref, suff in product(BOROUGH_PREFIXES.keys(), COL_SUFFIXES):
    col_name = pref + suff
    COL_AGGREGATES.append(col_name)

# now define the aggregation groups
COL_GROUPS = [
    'year',
    'month'
] 

# chosen columns to send in the db
COL_CHOSEN = COL_GROUPS + COL_AGGREGATES

In [35]:
# select only these columns from the covid dataset
covid_df = covid_df.select(COL_CHOSEN)
# also, remove february 2020 (month 02, year 2020)
# since only one day of that month has any data (will skew results as it is an outlier)
covid_df = covid_df.where((F.col('month') != 2) | (F.col('year') != 2020))

In [36]:
grouped_df = covid_df.groupBy(COL_GROUPS)

aggregated_rows = grouped_df.agg(
    F.count('*').alias('num_days'),
    *[F.sum(chosen).alias(f'TOTAL_{chosen}') for chosen in COL_AGGREGATES],
    *[F.avg(chosen).alias(f'AVG_{chosen}') for chosen in COL_AGGREGATES]
).collect()

aggregated_df = spark.createDataFrame(aggregated_rows).sort('year', 'month')
aggregated_df.limit(5)

year,month,num_days,TOTAL_CASE_COUNT,TOTAL_HOSPITALIZED_COUNT,TOTAL_DEATH_COUNT,TOTAL_BX_CASE_COUNT,TOTAL_BX_HOSPITALIZED_COUNT,TOTAL_BX_DEATH_COUNT,TOTAL_BK_CASE_COUNT,TOTAL_BK_HOSPITALIZED_COUNT,TOTAL_BK_DEATH_COUNT,TOTAL_MN_CASE_COUNT,TOTAL_MN_HOSPITALIZED_COUNT,TOTAL_MN_DEATH_COUNT,TOTAL_QN_CASE_COUNT,TOTAL_QN_HOSPITALIZED_COUNT,TOTAL_QN_DEATH_COUNT,TOTAL_SI_CASE_COUNT,TOTAL_SI_HOSPITALIZED_COUNT,TOTAL_SI_DEATH_COUNT,AVG_CASE_COUNT,AVG_HOSPITALIZED_COUNT,AVG_DEATH_COUNT,AVG_BX_CASE_COUNT,AVG_BX_HOSPITALIZED_COUNT,AVG_BX_DEATH_COUNT,AVG_BK_CASE_COUNT,AVG_BK_HOSPITALIZED_COUNT,AVG_BK_DEATH_COUNT,AVG_MN_CASE_COUNT,AVG_MN_HOSPITALIZED_COUNT,AVG_MN_DEATH_COUNT,AVG_QN_CASE_COUNT,AVG_QN_HOSPITALIZED_COUNT,AVG_QN_DEATH_COUNT,AVG_SI_CASE_COUNT,AVG_SI_HOSPITALIZED_COUNT,AVG_SI_DEATH_COUNT
2020,3,31,65182,18427,2183,12681,3805,467,18516,5319,662,9538,2729,287,19983,6000,656,4461,750,111,2102.6451612903224,594.4193548387096,70.41935483870968,409.06451612903226,122.74193548387096,15.064516129032258,597.2903225806451,171.5806451612903,21.35483870967742,307.6774193548387,88.03225806451613,9.258064516129032,644.6129032258065,193.5483870967742,21.161290322580644,143.90322580645162,24.193548387096776,3.5806451612903225
2020,4,30,109297,27565,12713,26882,6234,2626,28742,7738,3857,11741,3863,1623,34120,8920,4042,7808,1263,565,3643.233333333333,918.8333333333334,423.76666666666665,896.0666666666667,207.8,87.53333333333333,958.0666666666668,257.93333333333334,128.56666666666666,391.3666666666667,128.76666666666668,54.1,1137.3333333333333,297.3333333333333,134.73333333333332,260.26666666666665,42.1,18.83333333333333
2020,5,31,28417,3910,2816,6494,944,581,8654,1174,790,3795,545,391,8185,1078,900,1289,231,154,916.6774193548388,126.12903225806453,90.83870967741936,209.48387096774192,30.451612903225808,18.741935483870968,279.16129032258067,37.87096774193548,25.483870967741936,122.41935483870968,17.580645161290324,12.612903225806452,264.03225806451616,34.774193548387096,29.032258064516128,41.58064516129032,7.451612903225806,4.967741935483871
2020,6,30,10844,1528,675,2248,316,149,3111,549,179,1733,208,98,3219,417,213,533,55,36,361.4666666666666,50.93333333333333,22.5,74.93333333333334,10.533333333333331,4.966666666666667,103.7,18.3,5.966666666666667,57.766666666666666,6.933333333333334,3.2666666666666666,107.3,13.9,7.1,17.766666666666666,1.8333333333333333,1.2
2020,7,31,9786,1150,239,2018,265,56,2706,360,67,1920,174,39,2496,305,68,646,56,9,315.6774193548387,37.096774193548384,7.709677419354839,65.09677419354838,8.548387096774194,1.8064516129032255,87.29032258064517,11.612903225806452,2.161290322580645,61.935483870967744,5.612903225806452,1.2580645161290325,80.51612903225806,9.838709677419354,2.193548387096774,20.83870967741936,1.8064516129032255,0.2903225806451613


In [37]:
aggregated_borough_df = None
for prefix in BOROUGH_PREFIXES:
    temp_df = aggregated_df\
        .select(
            [F.col(col_name) 
                for col_name in COL_GROUPS + ['num_days']] + 
            [F.col(f'TOTAL_{prefix}{suffix}').alias(f'TOTAL_{suffix}'.lower()) 
                for suffix in COL_SUFFIXES] + 
            [F.col(f'AVG_{prefix}{suffix}').alias(f'AVG_{suffix}'.lower()) 
                for suffix in COL_SUFFIXES])\
        .withColumn('borough', F.lit(BOROUGH_PREFIXES[prefix]))

    if aggregated_borough_df == None:
        aggregated_borough_df = temp_df
    else:
        aggregated_borough_df = aggregated_borough_df.union(temp_df)

aggregated_borough_df = create_nth_month_column(aggregated_borough_df)

aggregated_borough_df.write.mode('overwrite').parquet('../data/curated/covid/cases-by-month')

In [None]:
# TODO: Redo aggregation by CDC week rather than by month (too few points otherwise)