In [1]:
import pandas as pd

In [2]:
pd.__version__

'1.4.2'

In [None]:
df = pd.read_parquet('raw_yellow_tripdata_2021_output_2021-12.parquet')

In [None]:
df.head()

In [None]:
df.dtypes

In [None]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

In [None]:
schema_def = pd.io.sql.get_schema(df, name='yellow_taxi_data')
schema_def

In [None]:
from sqlalchemy import create_engine

In [None]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [None]:
engine.connect() # to check if it works

In [None]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))

# Custom code for parquet

## Use iterator from pandas to batch insert rows into database (instead of 1mil rows at once)
- `read_parquet` doesnt have iterator
- using `fastparquet` library instead
1. Create an empty table (using n=0 rows) `if_exists='replace'`
2. Insert the chunks `if_exists='append'`

In [None]:
df.head(n=0).to_sql(name="yellow_taxi_data", con=engine, if_exists='replace')

In [None]:
from fastparquet import ParquetFile
pf = ParquetFile('yellow_tripdata_2021-01.parquet')
for df in pf.iter_row_groups():
    df.to_sql(name="yellow_taxi_data", con=engine, if_exists='append')
#     break
#     process sub-data-frame df

In [None]:
# df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100)

In [None]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [None]:
from sqlalchemy import create_engine

In [None]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [None]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))

## FOR CSV

In [None]:
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)

In [None]:
df = next(df_iter)

In [None]:
len(df)

In [None]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [None]:
df

In [None]:
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

In [None]:
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

In [None]:
from time import time

In [None]:
while True: 
    t_start = time()

    df = next(df_iter)

    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    
    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

    t_end = time()

    print('inserted another chunk, took %.3f second' % (t_end - t_start))

In [None]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

In [None]:
df_zones = pd.read_csv('taxi+_zone_lookup.csv')

In [None]:
df_zones.head()

In [None]:
df_zones.to_sql(name='zones', con=engine, if_exists='replace')

In [None]:
## Workaround for week3:
- turn old .csv zones file into parquet so that it can be manually uploaded to gcs

In [None]:
import pyarrow.csv as pv
import pyarrow.parquet as pq

In [None]:
def format_to_parquet(src_file):
    if not src_file.endswith('.csv'):
        print("no")
        # table = pq.read_table(src_file)
        # pq.write_table(table, src_file.replace('.csv', '.parquet'))
    else:
        table = pv.read_csv(src_file)
        pq.write_table(table, src_file.replace('.csv', '.parquet'))

In [None]:
!ls

In [None]:
format_to_parquet("taxi_zone_lookup.csv")

In [None]:
## Workaround for week3:
- turn old .csv zones file into parquet so that it can be manually uploaded to gcs

In [None]:
import pyarrow.csv as pv
import pyarrow.parquet as pq

In [None]:
def format_to_parquet(src_file):
    if not src_file.endswith('.csv'):
        print("no")
        # table = pq.read_table(src_file)
        # pq.write_table(table, src_file.replace('.csv', '.parquet'))
    else:
        table = pv.read_csv(src_file)
        pq.write_table(table, src_file.replace('.csv', '.parquet'))

In [None]:
!ls

In [None]:
format_to_parquet("taxi_zone_lookup.csv")

In [2]:
import pyarrow.csv as pv
import pyarrow.parquet as pq

In [8]:
def format_to_parquet(src_file):
    if not src_file.endswith('.csv'):
        print("no")
        # table = pq.read_table(src_file)
        # pq.write_table(table, src_file.replace('.csv', '.parquet'))
    else:
        table = pv.read_csv(src_file)
        pq.write_table(table, src_file.replace('.csv', '.parquet'))

In [9]:
!ls

data_dictionary_trip_records_yellow.pdf  pipeline.py
docker-compose.yaml			 README.md
Dockerfile				 taxi_zone_lookup.csv
ingest_data.py				 upload-data.ipynb
output.parquet				 upload-data.py
pg-test-connection.ipynb		 yellow_tripdata_2021-01.parquet


In [10]:
format_to_parquet("taxi_zone_lookup.csv")