In [1]:
import requests
import os

In [2]:
colors = ['green', 'yellow']
years = [2019, 2020]
months = list(range(1, 13))

In [3]:
def build_url(color, year, month):
    return f'https://d37ci6vzurychx.cloudfront.net/trip-data/{color}_tripdata_{year}-{month:02d}.parquet'

In [4]:
def download(colors, years, months):
    tuples = [
        (color, year, month) for color in colors for year in years for month in months
    ]
    for color, year, month in tuples:
        url = build_url(color, year, month)
        local_prefix = f'./buckets/data/raw/{color}/{year}/{month}'
        local_file = f'{color}_tripdata_{year}_{month}.parquet'
        local_path = f'{local_prefix}/{local_file}'
        r = requests.get(url)
        if not os.path.isdir(local_prefix):
            os.makedirs(local_prefix)
        with open(local_path, 'wb+') as f:
            f.write(r.content)

In [5]:
# download(colors, years, months)

# Partitioning

In [135]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types
import math

In [136]:
spark = (
    SparkSession.builder
        .master('local[*]')
        .appName('test')
        .config('spark.driver.cores', 3)
        .config('spark.driver.memory', '2g')
        .getOrCreate()
)

In [246]:
@F.udf(returnType=types.StringType())
def get_payment_type_description(x):
    if x:
        return {
            1: 'Credit card',
            2: 'Cash',
            3: 'No charge',
            4: 'Dispute',
            5: 'Unknown',
            6: 'Voided trip'
        }[int(x)]
    return None

In [291]:
def stg_green_tripdata(year, month):
    df_raw = spark.read.parquet(f'buckets/data/raw/green/{year}/{month}')
    df_raw = df_raw.toDF(*[c.lower() for c in df_raw.columns])
    df_clean = df_raw.select(
        F.hash('vendorid', 'lpep_pickup_datetime', 'lpep_dropoff_datetime').alias('tripid'),
        F.col('vendorid').cast(types.IntegerType()).alias('vendorid'),
        F.col('ratecodeid').cast(types.IntegerType()).alias('ratecodeid'),
        F.col('pulocationid').cast(types.IntegerType()).alias('pickup_locationid'),
        F.col('dolocationid').cast(types.IntegerType()).alias('dropoff_locationid'),

        F.col('lpep_pickup_datetime').cast(types.TimestampType()).alias('pickup_datetime'),
        F.col('lpep_dropoff_datetime').cast(types.TimestampType()).alias('dropoff_datetime'),

        F.col('store_and_fwd_flag').cast(types.StringType()).alias('store_and_fwd_flag'),
        F.col('passenger_count').cast(types.IntegerType()).alias('passenger_count'),
        F.col('trip_distance').cast(types.DoubleType()).alias('trip_distance'),
        F.col('trip_type').cast(types.IntegerType()).alias('trip_type'),

        F.col('fare_amount').cast(types.DoubleType()).alias('fare_amount'),
        F.col('extra').cast(types.DoubleType()).alias('extra'),
        F.col('mta_tax').cast(types.DoubleType()).alias('mta_tax'),
        F.col('tip_amount').cast(types.DoubleType()).alias('tip_amount'),

        F.col('tolls_amount').cast(types.DoubleType()).alias('tolls_amount'),
        F.col('ehail_fee').cast(types.DoubleType()).alias('ehail_fee'),
        F.col('improvement_surcharge').cast(types.DoubleType()).alias('improvement_surcharge'),
        F.col('total_amount').cast(types.DoubleType()).alias('total_amount'),
        F.col('payment_type').cast(types.DoubleType()).alias('payment_type'),
        get_payment_type_description(F.col('payment_type')).alias('payment_type_description'),

        F.col('congestion_surcharge').cast(types.DoubleType()).alias('congestion_surcharge'),
        F.lit('Green').alias('service_type'),
    ).filter('vendorid is not null')
    return df_clean

In [290]:
def stg_yellow_tripdata(year, month):
    df_raw = spark.read.parquet(f'buckets/data/raw/yellow/{year}/{month}')
    df_raw = df_raw.toDF(*[c.lower() for c in df_raw.columns])
    df_clean = df_raw.select(
        F.hash('vendorid', 'tpep_pickup_datetime', 'tpep_dropoff_datetime').alias('tripid'),
        F.col('vendorid').cast(types.IntegerType()).alias('vendorid'),
        F.col('ratecodeid').cast(types.IntegerType()).alias('ratecodeid'),
        F.col('pulocationid').cast(types.IntegerType()).alias('pickup_locationid'),
        F.col('dolocationid').cast(types.IntegerType()).alias('dropoff_locationid'),

        F.col('tpep_pickup_datetime').cast(types.TimestampType()).alias('pickup_datetime'),
        F.col('tpep_dropoff_datetime').cast(types.TimestampType()).alias('dropoff_datetime'),

        F.col('store_and_fwd_flag').cast(types.StringType()).alias('store_and_fwd_flag'),
        F.col('passenger_count').cast(types.IntegerType()).alias('passenger_count'),
        F.col('trip_distance').cast(types.DoubleType()).alias('trip_distance'),
        F.lit(1).alias('trip_type'),

        F.col('fare_amount').cast(types.DoubleType()).alias('fare_amount'),
        F.col('extra').cast(types.DoubleType()).alias('extra'),
        F.col('mta_tax').cast(types.DoubleType()).alias('mta_tax'),
        F.col('tip_amount').cast(types.DoubleType()).alias('tip_amount'),

        F.col('tolls_amount').cast(types.DoubleType()).alias('tolls_amount'),
        F.lit(0.0).alias('ehail_fee'),
        F.col('improvement_surcharge').cast(types.DoubleType()).alias('improvement_surcharge'),
        F.col('total_amount').cast(types.DoubleType()).alias('total_amount'),
        F.col('payment_type').cast(types.DoubleType()).alias('payment_type'),
        get_payment_type_description(F.col('payment_type')).alias('payment_type_description'),

        F.col('congestion_surcharge').cast(types.DoubleType()).alias('congestion_surcharge'),
        F.lit('Yellow').alias('service_type'),
    ).filter('vendorid is not null')
    return df_clean

In [289]:
def dim_zones():
    df_zone = (
        spark.read.csv('../data/taxi+_zone_lookup.csv', header='true')
    )
    return (df_zone
        .withColumn('locationid', F.col('locationid').cast(types.IntegerType()))
        .withColumn('service_zone', F.regexp_replace('service_zone', 'Boro', 'Green'))
    ).filter("borough != 'Unknown'")

In [288]:
def fact_trips(yellow_df, green_df, zones_df):
    df_trip_data = yellow_df.union(green_df)
    df_trip_data.createOrReplaceTempView('trips_unioned')
    zones_df.createOrReplaceTempView('dim_zones')
    df_fact = spark.sql('''
    select 
        trips_unioned.tripid, 
        trips_unioned.vendorid, 
        trips_unioned.service_type,
        trips_unioned.ratecodeid, 
        trips_unioned.pickup_locationid, 
        pickup_zone.borough as pickup_borough, 
        pickup_zone.zone as pickup_zone, 
        trips_unioned.dropoff_locationid,
        dropoff_zone.borough as dropoff_borough, 
        dropoff_zone.zone as dropoff_zone,  
        trips_unioned.pickup_datetime, 
        trips_unioned.dropoff_datetime, 
        trips_unioned.store_and_fwd_flag, 
        trips_unioned.passenger_count, 
        trips_unioned.trip_distance, 
        trips_unioned.trip_type, 
        trips_unioned.fare_amount, 
        trips_unioned.extra, 
        trips_unioned.mta_tax, 
        trips_unioned.tip_amount, 
        trips_unioned.tolls_amount, 
        trips_unioned.ehail_fee, 
        trips_unioned.improvement_surcharge, 
        trips_unioned.total_amount, 
        trips_unioned.payment_type, 
        trips_unioned.payment_type_description, 
        trips_unioned.congestion_surcharge
    from trips_unioned
    join dim_zones as pickup_zone
        on trips_unioned.pickup_locationid = pickup_zone.locationid
    join dim_zones as dropoff_zone
        on trips_unioned.dropoff_locationid = dropoff_zone.locationid
    ''')
    return df_fact

In [293]:
def partition(years, months, n=10, mode='overwrite'):
    tuples = [
        (year, month) for year in years for month in months
    ]
    df_zones = dim_zones()
    for year, month in tuples:
        print(f'processing {year}/{month}')
        output_path = f'buckets/data/fact/{year}/{month:02d}'

        df_green = stg_green_tripdata(year, month)
        df_yellow = stg_yellow_tripdata(year, month)
        df_fact = fact_trips(df_yellow, df_green, df_zones)
        df_fact.repartition(n).write.parquet(output_path, mode=mode)
        print(f'done {year}/{month}')


In [294]:
partition(years, months)

processing 2019/1




22/08/05 15:29:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/1
processing 2019/2




22/08/05 15:30:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/2
processing 2019/3




22/08/05 15:31:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/3
processing 2019/4




22/08/05 15:32:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/4
processing 2019/5




22/08/05 15:32:55 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/5
processing 2019/6




22/08/05 15:33:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/6
processing 2019/7




22/08/05 15:34:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/7
processing 2019/8


[Stage 646:>                                                       (0 + 8) / 10]

22/08/05 15:35:20 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/8
processing 2019/9


[Stage 652:>                                                       (0 + 8) / 10]

22/08/05 15:36:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/9
processing 2019/10


[Stage 658:>                                                       (0 + 8) / 10]

22/08/05 15:36:56 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/10
processing 2019/11


[Stage 664:>                                                       (0 + 8) / 10]

22/08/05 15:37:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/11
processing 2019/12


[Stage 670:>                                                       (0 + 8) / 10]

22/08/05 15:38:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2019/12
processing 2020/1


[Stage 676:>                                                       (0 + 8) / 10]

22/08/05 15:39:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/1
processing 2020/2




22/08/05 15:40:14 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/2
processing 2020/3


[Stage 688:>                                                       (0 + 8) / 10]

22/08/05 15:40:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/3
processing 2020/4


[Stage 694:>                                                       (0 + 8) / 10]

22/08/05 15:41:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/4
processing 2020/5




22/08/05 15:41:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/5
processing 2020/6




22/08/05 15:41:19 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/6
processing 2020/7


[Stage 712:>                                                       (0 + 8) / 10]

22/08/05 15:41:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/7
processing 2020/8


[Stage 718:>                                                       (0 + 8) / 10]

22/08/05 15:41:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/8
processing 2020/9




22/08/05 15:41:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/9
processing 2020/10


[Stage 730:>                                                       (0 + 8) / 10]

22/08/05 15:42:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/10
processing 2020/11




22/08/05 15:42:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/11
processing 2020/12




22/08/05 15:42:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

done 2020/12


In [295]:
spark.sparkContext.stop()