In [6]:
from pathlib import Path
from collections import namedtuple
from datetime import datetime
from decimal import Decimal
import pandas as pd
import pytz

In [2]:
UTC = pytz.UTC
EST = pytz.timezone('US/Eastern')

## Read in Yellow Taxi Trip

Ref: http://www.nyc.gov/html/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf

In [16]:
YELLOW_COLS = [
    'vendor_id', 'pickup_datetime', 'dropoff_datetime',
    'passenger_count', 'trip_distance',
    'pickup_lon', 'pickup_lat',
    'rate_code_id', 'store_and_fwd_flag',
    'dropoff_lon', 'dropoff_lat',
    'payment_type', 'fare_amount',
    'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
    'improvement_surcharge', 'total_amount'
]

INTEGER_FIELDS = [
    'vendor_id',
    'passenger_count',
    'rate_code_id',
    'payment_type'
]
DATETIME_FIELDS = [
    'pickup_datetime', 'dropoff_datetime',
]
DECIMAL_FIELDS = [
    'trip_distance',
    'pickup_lon', 'pickup_lat',
    'dropoff_lon', 'dropoff_lat',
    'fare_amount', 'extra', 'mta_tax', 
    'tip_amount', 'tolls_amount',
    'improvement_surcharge', 'total_amount'
]

In [27]:
def make_nyc_dt(text):
    return EST.localize(datetime.strptime(text, '%Y-%m-%d %H:%M:%S'))

def read_yellow_trip(csv_pth, chunksize=100000):
    return pd.read_csv(
        csv_pth,
        header=0, 
        names=YELLOW_COLS,
        usecols=range(19),
        index_col=False, parse_dates=False, na_filter=False,
        dtype={
            col: int for col in INTEGER_FIELDS
        },
        converters={
            'store_and_fwd_flag': str.encode,
            **{col: Decimal for col in DECIMAL_FIELDS},
            **{col: make_nyc_dt for col in DATETIME_FIELDS},
            **{col: int for col in INTEGER_FIELDS},
        },
        chunksize=chunksize,
        memory_map=True
    )

In [33]:
%%time
df = next(read_yellow_trip('../raw_trip_data/yellow_tripdata_2016-06.csv', chunksize=1000))

CPU times: user 108 ms, sys: 2.85 ms, total: 111 ms
Wall time: 110 ms


In [34]:
import psycopg2
from pgcopy import CopyManager, Replace
from io import BytesIO, StringIO

In [39]:
conn = psycopg2.connect(database='liang-bo.wang_project1')

In [42]:
%%time
mgr = CopyManager(conn, 'taxi_trips', ['taxi_type', *YELLOW_COLS])

for i, df in enumerate(read_yellow_trip('../raw_trip_data/yellow_tripdata_2016-06.csv', chunksize=1000)):
    print(f'Reading chunk {i}')
    records = [
        (b'YELLOW', *tup)
        for tup in df.itertuples(index=False, name='YellowTrip')
    ]
    with conn:
        mgr.copy(records, BytesIO)

Reading chunk 0
Reading chunk 1
Reading chunk 2
Reading chunk 3
Reading chunk 4
Reading chunk 5
Reading chunk 6
Reading chunk 7
Reading chunk 8
Reading chunk 9
Reading chunk 10
Reading chunk 11
Reading chunk 12
Reading chunk 13


KeyboardInterrupt: 

In [38]:
conn.close()