# Convert Taxi to Parquet

In [1]:
import os
import dask_cudf
import cuspatial
import numpy as np

In [2]:
dtype_list = {'dropoff_datetime': 'str', 
              'dropoff_latitude': 'float64',
              'dropoff_taxizone_id': 'float64',
              'dropoff_longitude': 'float64',
              'ehail_fee': 'float64',
              'extra': 'float64',
              'fare_amount': 'float64',
              'improvement_surcharge': 'float64',
              'junk1': 'str',
              'junk2': 'str',
              'mta_tax': 'float64',
              'passenger_count': 'str', 
              'payment_type': 'str', 
              'pickup_datetime': 'str',  
              'pickup_latitude': 'float64',
              'pickup_taxizone_id': 'float64',
              'pickup_longitude': 'float64',
              'rate_code_id': 'str', 
              'store_and_fwd_flag': 'str', 
              'tip_amount': 'float64',
              'tolls_amount': 'float64',
              'total_amount': 'float64',
              'trip_distance': 'float64',
              'trip_type': 'str', 
              'vendor_id': 'str',  
             }

# make dict of paths to data directories
relative_path = '../00_download_scripts/raw_data'
config = {'citibike_raw_data_path': f'{relative_path}/bike/',
          'taxi_raw_data_path': f'{relative_path}/taxi/',
          'uber_raw_data_path': f'{relative_path}/uber/',
          'subway_raw_data_path': f'{relative_path}/subway/',
          'parquet_output_path': f'data/'
         }

def glob(x):
    '''
    Signature: sorted(glob(pathname=x, *, recursive=False))
    Docstring:
    Return a list of paths matching a pathname pattern.

    The pattern may contain simple shell-style wildcards a la
    fnmatch. However, unlike fnmatch, filenames starting with a
    dot are special cases that are not matched by '*' and '?'
    patterns.

    If recursive is true, the pattern '**' will match any files and
    zero or more directories and subdirectories.
    '''
    from glob import glob
    return sorted(glob(x))

In [3]:
def get_green():
    green_schema_pre_2015 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type,junk1,junk2"
    green_glob_pre_2015 = glob(
        os.path.join(config['taxi_raw_data_path'], 'green_tripdata_201[34]*.csv'))

    green_schema_2015_h1 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2"
    green_glob_2015_h1 = glob(
        os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-0[1-6].csv'))

    green_schema_2015_h2_2016_h1 = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type"
    green_glob_2015_h2_2016_h1 = glob(os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-0[7-9].csv')) + glob(
                                      os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2015-1[0-2].csv')) + glob(
                                      os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-0[1-6].csv'))
    
    green_schema_2016_h2_plus = "vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_taxizone_id,dropoff_taxizone_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2"
    green_glob_2016_h2_plus = glob(os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-0[7-9].csv')) + glob(
                                   os.path.join(config['taxi_raw_data_path'], 'green_tripdata_2016-1[0-2].csv')) + glob(
                                   os.path.join(config['taxi_raw_data_path'], 'green_tripdata_201[7-9]*.csv'))

    # before 2015 dataframe
    green1 = dask_cudf.read_csv(green_glob_pre_2015, 
                                header=0,
                                na_values=["NA"],
                                parse_dates=[1, 2],
                                infer_datetime_format=True,
                                dtype=dtype_list,
                                names=green_schema_pre_2015.split(','))
    green1['dropoff_taxizone_id'] = -1.0
    green1['pickup_taxizone_id'] = -1.0
    green1['improvement_surcharge'] = np.nan
    green1 = green1.drop(['junk1', 'junk2'], axis=1)

    # january 2015 - june 2015 dataframe
    green2 = dask_cudf.read_csv(green_glob_2015_h1, 
                                header=0,
                                na_values=["NA"],
                                parse_dates=[1, 2],
                                infer_datetime_format=True,
                                dtype=dtype_list,
                                names=green_schema_2015_h1.split(','))
    green2['dropoff_taxizone_id'] = -1.0
    green2['pickup_taxizone_id'] = -1.0
    green2 = green2.drop(['junk1', 'junk2'], axis=1)

    # july 2015 - june 2016 dataframe
    green3 = dask_cudf.read_csv(green_glob_2015_h2_2016_h1, 
                                header=0,
                                na_values=["NA"],
                                parse_dates=[1, 2],
                                infer_datetime_format=True,
                                dtype=dtype_list,
                                names=green_schema_2015_h2_2016_h1.split(','))
    green3['dropoff_taxizone_id'] = -1.0
    green3['pickup_taxizone_id'] = -1.0

    # july 2016 or later dataframe
    green4 = dask_cudf.read_csv(green_glob_2016_h2_plus, 
                                header=0,
                                na_values=["NA"],
                                parse_dates=[1, 2],
                                infer_datetime_format=True,
                                dtype=dtype_list,
                                names=green_schema_2016_h2_plus.split(','))
    green4['dropoff_latitude'] = 0.0
    green4['dropoff_longitude'] = 0.0
    green4['pickup_latitude'] = 0.0
    green4['pickup_longitude'] = 0.0
    green4 = green4.drop(['junk1', 'junk2'], axis=1)

    # combine dataframes
    green = dask_cudf.concat([green1[sorted(green1.columns)],
                              green2[sorted(green1.columns)],
                              green3[sorted(green1.columns)],
                              green4[sorted(green1.columns)]]
                            )
    for field in list(green.columns):
        if field in dtype_list:
            green[field] = green[field].astype(dtype_list[field])

    green['trip_type'] = 'green'

    return green

In [4]:
def get_yellow():
    # tag file paths to data and column names by schema (x < 2015, 2015 <= x <= 2016.5, 2016.5 < x)
    yellow_schema_pre_2015 = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,total_amount"
    yellow_glob_pre_2015 = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_201[0-4]*.csv')) + glob(
                                os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2009*.csv'))
    yellow_schema_2015_2016_h1 = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount"
    yellow_glob_2015_2016_h1 = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2015*.csv')) + glob(
                                    os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-0[1-6].csv'))
    yellow_schema_2016_h2_plus = "vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_taxizone_id,dropoff_taxizone_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,junk1,junk2"
    yellow_glob_2016_h2_plus = glob(os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-0[7-9].csv')) + glob(
                                    os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_2016-1[0-2].csv')) + glob(
                                    os.path.join(config['taxi_raw_data_path'], 'yellow_tripdata_201[7-9]*.csv'))

    # create pre 2015 dataframe
    yellow1 = dask_cudf.read_csv(yellow_glob_pre_2015, 
                                 header=0,
                                 na_values=["NA"],
                                 parse_dates=[1, 2],
                                 infer_datetime_format=True,
                                 dtype=dtype_list,
                                 names=yellow_schema_pre_2015.split(',')
                                )
    yellow1['dropoff_taxizone_id'] = -1.0
    yellow1['pickup_taxizone_id'] = -1.0
    yellow1['ehail_fee'] = np.nan
    yellow1['improvement_surcharge'] = np.nan
    yellow1['improvement_surcharge'] = yellow1['improvement_surcharge'].astype('float32')
    yellow1['trip_type'] = -1.0
    
    # create january 2015 - june 2016 dataframe
    yellow2 = dask_cudf.read_csv(yellow_glob_2015_2016_h1, 
                                 header=0,
                                 na_values=["NA"],
                                 parse_dates=[1, 2],
                                 infer_datetime_format=True,
                                 dtype=dtype_list,
                                 names=yellow_schema_2015_2016_h1.split(',')
                                )
    yellow2['dropoff_taxizone_id'] = -1.0
    yellow2['pickup_taxizone_id'] = -1.0
    yellow2['ehail_fee'] = np.nan
    yellow2['trip_type'] = -1.0

    # create post june 2016 dataframe
    yellow3 = dask_cudf.read_csv(yellow_glob_2016_h2_plus, 
                                 header=0,
                                 na_values=["NA"],
                                 parse_dates=[1, 2],
                                 infer_datetime_format=True,
                                 dtype=dtype_list,
                                 names=yellow_schema_2016_h2_plus.split(',')
                                )
    yellow3['dropoff_latitude'] = 0.0
    yellow3['dropoff_longitude'] = 0.0
    yellow3['pickup_latitude'] = 0.0
    yellow3['pickup_longitude'] = 0.0
    yellow3['ehail_fee'] = np.nan
    yellow3['trip_type'] = -1.0
    yellow3 = yellow3.drop(['junk1', 'junk2'], axis=1)

    yellow = dask_cudf.concat([yellow1[sorted(yellow1.columns)], 
                               yellow2[sorted(yellow1.columns)], 
                               yellow3[sorted(yellow1.columns)]]
                             )
    for field in list(yellow.columns):
        if field in dtype_list:
            yellow[field] = yellow[field].astype(dtype_list[field])

    yellow['trip_type'] = 'yellow'

    return yellow

In [5]:
def get_uber():
    uber_schema_2014 = "pickup_datetime,pickup_latitude,pickup_longitude,junk1"
    uber_glob_2014 = glob(os.path.join(config['uber_raw_data_path'], 'uber*-???14.csv'))

    uber1 = dask_cudf.read_csv(uber_glob_2014, 
                               header=0,
                               na_values=["NA"], 
                               parse_dates=[0,],
                               infer_datetime_format = True,
                               dtype=dtype_list,
                               names=uber_schema_2014.split(',')
                              )
    uber1 = uber1.drop(['junk1',], axis=1)
    uber1 = uber1.assign(pickup_taxizone_id=-1.0)

    uber_schema_2015 = "junk1,pickup_datetime,junk2,pickup_taxizone_id"
    uber_glob_2015 = glob(os.path.join(config['uber_raw_data_path'], 'uber*15.csv'))

    uber2 = dask_cudf.read_csv(uber_glob_2015, 
                        header=0,
                        na_values=["NA"], 
                        parse_dates=[1,],
                        infer_datetime_format = True,
                        dtype=dtype_list,
                        names=uber_schema_2015.split(',')
                       )
    uber2 = uber2.drop(['junk1', 'junk2'], axis=1)
    uber2 = uber2.assign(pickup_latitude=0.0, pickup_longitude=0.0)

    uberdf = dask_cudf.concat([uber1[sorted(uber1.columns)], 
                               uber2[sorted(uber1.columns)]]
                             )
    for field in dtype_list:
        if (field in uberdf.columns):
            uberdf[field] = uberdf[field].astype(dtype_list[field])
        elif field == 'pickup_datetime':
            pass
        else:
            uberdf[field] = np.nan
            uberdf[field] = uberdf[field].astype(dtype_list[field])

    uberdf = uberdf.drop(['junk1', 'junk2'], axis=1)

#     uberdf['dropoff_datetime'] = np.datetime64("1970-01-01 00:00:00")
#     #uberdf = uberdf.repartition(npartitions=20)

    uberdf['trip_type'] = 'uber'

    uberdf = uberdf[sorted(uberdf.columns)]

    return uberdf

In [6]:
green = get_green()
yellow = get_yellow()
uber = get_uber()

all_trips = uber.append(green).append(yellow)

In [7]:
pip_iterations = list(np.arange(0, 263, 31))
pip_iterations.append(263)

taxi_zones = cuspatial.read_polygon_shapefile('zones/cu_taxi_zones.shp')

def assign_taxi_zones(df, lon_var, lat_var, locid_var):
    """
    Derives Taxi Zones from shapefile.
    
    This function takes longitude values provided by `lon_var`, and latitude
    values provided by `lat_var` in DataFrame `df`, and performs a spatial join
    with the NYC taxi_zones shapefile. 
    
    The shapefile is hard coded in, as this function makes a hard assumption of
    latitude and longitude coordinates. It also assumes latitude=0.0 and 
    longitude=0.0 is not a datapoint that can exist in your dataset. Which is 
    reasonable for a dataset of New York, but a bit edgy for a global dataset.
    
    Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
    and `df.locid_var` is set to -1.0 are updated.
    
    Parameters
    ----------
    df : cudf.DataFrame or dask_cudf.DataFrame
        DataFrame containing latitudes, longitudes, and location_id columns.
    lon_var : string
        Name of column in `df` containing longitude values. Invalid values 
        should be -1.0.
    lat_var : string
        Name of column in `df` containing latitude values. Invalid values 
        should be -1.0
    locid_var : string
        Name of column in `df` containing taxi_zone location ids. Rows with
        valid, nonzero values are not overwritten.
        """
    # focus location columns
    localdf = df[[lon_var, lat_var, locid_var]].copy()
    # localdf = localdf.reset_index()
    
    # fill missing lat/long values
    localdf[lon_var] = localdf[lon_var].fillna(value=0.0)
    localdf[lat_var] = localdf[lat_var].fillna(value=0.0)
    
    # (bool column) is location id missing && do we have lat/long coordinates?
    localdf['replace_locid'] = ((localdf[locid_var] == -1.0)
                                & (localdf[lon_var] != 0.0)
                                & (localdf[lat_var] != 0.0)
                               )
    
    # are there any values to replace?
    if (np.any(localdf['replace_locid'])):  # makes ~28.469% faster
        # go through zones 31 at a time
        for i in range(len(pip_iterations)-1):
            # tag 1st and last zone #s
            start = pip_iterations[i]
            end = pip_iterations[i+1]
            # derive taxi zones from coordinates
            t_zones = cuspatial.point_in_polygon(localdf[lon_var], 
                                                 localdf[lat_var], 
                                                 taxi_zones[0][start:end], 
                                                 taxi_zones[1], 
                                                 taxi_zones[2]['x'], 
                                                 taxi_zones[2]['y'])
            # insert taxi zones into location id columns 
            for j in t_zones.columns:
                localdf[locid_var].loc[t_zones[j]] = j
            
        return localdf[locid_var].astype('float64') 

    else:
        localdf[locid_var] = localdf[locid_var].astype('float64')   
        return localdf[locid_var]

In [8]:
# derive & assign pickup & dropoff taxi zones 
all_trips['dropoff_taxizone_id'] = all_trips.map_partitions(assign_taxi_zones, 
                                                            lon_var='dropoff_longitude', 
                                                            lat_var='dropoff_latitude',
                                                            locid_var='dropoff_taxizone_id', 
                                                            meta=('dropoff_taxizone_id', np.float64))
all_trips['pickup_taxizone_id'] = all_trips.map_partitions(assign_taxi_zones, 
                                                           lon_var='pickup_longitude', 
                                                           lat_var='pickup_latitude',
                                                           locid_var='pickup_taxizone_id', 
                                                           meta=('pickup_taxizone_id', np.float64))

In [9]:
all_trips = all_trips[sorted(all_trips.columns)]
# all_trips = all_trips.repartition(npartitions=1200)

In [10]:
all_trips = all_trips.map_partitions(lambda x: x.sort_values('pickup_datetime'), 
                                     meta=all_trips)

In [11]:
for fieldName in all_trips.columns:
    if fieldName in dtype_list:
        all_trips[fieldName] = all_trips[fieldName].astype(dtype_list[fieldName])

In [12]:
%%time
all_trips.to_parquet(os.path.join(config['parquet_output_path'], 'all_trips_unprocessed.parquet'),
                     compression='snappy',  # GZIP'
                     has_nulls=True,
                     object_encoding='json',
                     index=False
                    )

CPU times: user 1min 17s, sys: 27.7 s, total: 1min 45s
Wall time: 1min 2s


## Check

In [13]:
from blazingsql import BlazingContext
bc = BlazingContext()

BlazingContext ready


In [14]:
bc.create_table('taxi', [f'data/all_trips_unprocessed.parquet/part.{i}.parquet' for i in range(25)])

In [15]:
bc.sql('select * from taxi')

Unnamed: 0,index,dropoff_datetime,dropoff_latitude,dropoff_longitude,dropoff_taxizone_id,ehail_fee,extra,fare_amount,improvement_surcharge,mta_tax,...,pickup_longitude,pickup_taxizone_id,rate_code_id,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,trip_distance,trip_type,vendor_id
0,69553,,,,,,,,,,...,-73.9932,185.0,,,,,,,uber,
1,50011,,,,,,,,,,...,-73.9777,238.0,,,,,,,uber,
2,78981,,,,,,,,,,...,-73.9706,161.0,,,,,,,uber,
3,25675,,,,,,,,,,...,-74.0060,248.0,,,,,,,uber,
4,74756,,,,,,,,,,...,-73.9846,163.0,,,,,,,uber,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2407618,139,2018-02-01 00:09:05,0.0,0.0,68.0,,0.5,12.0,0.3,0.5,...,0.0000,87.0,1,N,0.00,0.0,13.30,3.30,yellow,1
2407619,50798,2018-02-01 00:05:07,0.0,0.0,4.0,,0.5,8.0,0.3,0.5,...,0.0000,90.0,1,N,2.30,0.0,11.60,1.60,yellow,1
2407620,12630,2018-02-01 00:01:13,0.0,0.0,79.0,,0.5,4.0,0.3,0.5,...,0.0000,114.0,1,N,0.00,0.0,5.30,0.49,yellow,2
2407621,20387,2018-02-01 00:02:30,0.0,0.0,107.0,,0.5,4.0,0.3,0.5,...,0.0000,234.0,1,N,1.05,0.0,6.35,0.40,yellow,1


In [16]:
cols = 'pickup_taxizone_id, dropoff_taxizone_id, trip_distance, passenger_count, fare_amount, total_amount - fare_amount as tax, tip_amount'
bc.sql(f'select {cols} from taxi where pickup_taxizone_id = 200 and year(cast(pickup_datetime as timestamp)) between 2014 and 2018')

Unnamed: 0,pickup_taxizone_id,dropoff_taxizone_id,trip_distance,passenger_count,fare_amount,tax,tip_amount
0,200.0,,,,,,
1,200.0,,,,,,
2,200.0,,,,,,
3,200.0,,,,,,
4,200.0,,,,,,
...,...,...,...,...,...,...,...
763,200.0,242.0,2.20,1,10.0,1.30,0.00
764,200.0,236.0,7.20,1,23.0,0.80,0.00
765,200.0,200.0,0.98,2,6.0,2.16,1.36
766,200.0,127.0,2.87,3,16.0,0.80,0.00
