In [None]:
!pip install duckdb --user --pre --upgrade && pip install --pre pandas==2.0.*

In [None]:
import duckdb
import boto3
import pandas as pd
pd.__version__

In [None]:
MAX_MEMORY = "25GB" # increase to available python memory -25%
TMP_DIR = "fg-data-v8/"
DUCKDB_FILE = f"{TMP_DIR}/taxi.duckdb"
DATA_FOLDER = f"{TMP_DIR}/taxidata" 

# S3 Uploads
AWS_ACCESS_KEY=''
AWS_SECRET_ACCESS_KEY=''
AWS_REGION='eu-west-1'
BUCKET = "ayushman-hops"
session = boto3.Session( aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
s3 = session.resource('s3')


# HDFS Uploads
HOPS_HOST=''
HOPS_API_KEY=''
HDFS_PATH = "/Projects/testproj/Resources/"


In [None]:
!mkdir -p {TMP_DIR}
!mkdir -p {DATA_FOLDER}
!ls -lR {TMP_DIR}

In [None]:
con = duckdb.connect(DUCKDB_FILE, config={'memory_limit': MAX_MEMORY, 'temp_directory': TMP_DIR}) 
con.execute("INSTALL httpfs;")
con.execute("INSTALL parquet;")
con.execute("LOAD httpfs;")
con.execute("LOAD parquet;")
con.execute(f"""
    SET s3_region='{AWS_REGION}';
    SET s3_access_key_id='{AWS_ACCESS_KEY}';
    SET s3_secret_access_key='{AWS_SECRET_ACCESS_KEY}';
    """)

In [None]:
def setup_zone_table():
    query = f'''
    CREATE OR REPLACE TABLE locations (
      LocationID INTEGER,
      Borough VARCHAR,
      Zone VARCHAR,
      service_zone VARCHAR
    );

    COPY locations FROM 's3://{BUCKET}/taxidata/taxi+_zone_lookup.csv' (FORMAT csv, HEADER true);
    '''
    con.execute(query)

In [None]:
def get_clean_columns(yyyy, mm):
    file_path = f's3://{BUCKET}/taxidata/{yyyy}/*{mm}.parquet'
    print(f"Reading {file_path}...")
    con.execute(f"CREATE OR REPLACE TABLE taxidata AS SELECT * FROM read_parquet('{file_path}');")
    
    con.execute("ALTER TABLE taxidata ADD COLUMN pu_borough VARCHAR;")
    con.execute("ALTER TABLE taxidata ADD COLUMN do_borough VARCHAR;")
    con.execute("ALTER TABLE taxidata ADD COLUMN pu_svc_zone VARCHAR;")
    con.execute("ALTER TABLE taxidata ADD COLUMN do_svc_zone VARCHAR;")
    con.execute("ALTER TABLE taxidata ADD COLUMN pu_zone VARCHAR;")
    con.execute("ALTER TABLE taxidata ADD COLUMN do_zone VARCHAR;")
    
    # Clean columns
    df = con.execute("SELECT column_name FROM information_schema.columns WHERE table_name='taxidata'").df()
    # Rename, cast types, and drop columns
    column_allow_list = { 
        # type conversion
        "pickup_datetime": ["tpep_pickup_datetime", "timestamp"],
        "tpep_pickup_datetime": ["tpep_pickup_datetime", "timestamp"],
        "trip_pickup_datetime": ["tpep_pickup_datetime", "timestamp"],

        "dropoff_datetime": ["tpep_dropoff_datetime", "timestamp"],
        "tpep_dropoff_datetime": ["tpep_dropoff_datetime", "timestamp"],
        "trip_dropoff_datetime": ["tpep_dropoff_datetime", "timestamp"],

        "pulocationid": ["pu_location_id", "integer"],
        "dolocationid": ["do_location_id", "integer"],
        "pu_borough": ["pu_borough", "varchar"],
        "pu_svc_zone": ["pu_svc_zone", "varchar"],
        "pu_zone": ["pu_zone", "varchar"],
        "do_borough": ["do_borough", "varchar"],
        "do_svc_zone": ["do_svc_zone", "varchar"],
        "do_zone": ["do_zone", "varchar"],

        
        "pickup_zip": ["pickup_zip", "integer"],
        "dropoff_zip": ["dropoff_zip", "integer"],
        "trip_distance": ["trip_distance", "double"],
        "fare_amount": ["fare_amount", "double"],
        "tip_amount": ["tip_amount", "double"],
        "fare_amt": ["fare_amount", "double"],
        "pickup_latitude": ["pickup_latitude", "double"],
        "pickup_longitude": ["pickup_longitude", "double"],
        "start_lat": ["pickup_latitude", "double"],
        "start_lon": ["pickup_longitude", "double"],
        "dropoff_latitude": ["dropoff_latitude", "double"],
        "dropoff_longitude": ["dropoff_longitude", "double"],
        "end_lat": ["dropoff_latitude", "double"],
        "end_lon": ["dropoff_longitude", "double"],
    }
    for val in df.values:
        orig = val[0]
        orig_lower = orig.lower()
        if orig_lower in column_allow_list:
            new_name, data_type = column_allow_list[orig_lower]
            new_name = new_name.lower()
            con.execute(f"ALTER TABLE taxidata ALTER {orig} TYPE {data_type};")
            con.execute(f"ALTER TABLE taxidata RENAME {orig} TO {new_name};")
        else:
            con.execute(f"ALTER TABLE taxidata DROP COLUMN {orig}").df()

In [None]:
def annotate_zones():
    query = f'''
        UPDATE taxidata
        SET pu_borough = (
            SELECT Borough
            FROM locations
            WHERE LocationID = taxidata.pu_location_id
          ),
          do_borough = (
            SELECT Borough
            FROM locations
            WHERE LocationID = taxidata.do_location_id
          ),
          pu_zone = (
            SELECT Zone
            FROM locations
            WHERE LocationID = taxidata.pu_location_id
          ),
          do_zone = (
            SELECT Zone
            FROM locations
            WHERE LocationID = taxidata.do_location_id
          ),
          pu_svc_zone = (
            SELECT service_zone
            FROM locations
            WHERE LocationID = taxidata.pu_location_id
          ),
          do_svc_zone = (
            SELECT service_zone
            FROM locations
            WHERE LocationID = taxidata.do_location_id
          );
    '''
    con.execute(query)
    con.execute("ALTER TABLE taxidata DROP pu_location_id;")
    con.execute("ALTER TABLE taxidata DROP do_location_id;")
    df = con.execute("SELECT * FROM taxidata").df()
    df = df[df.pu_borough != 'Unknown']
    df = df[df.do_borough != 'Unknown']
    df = df[df.pu_svc_zone != 'N/A']
    df = df[df.do_svc_zone != 'N/A']
    df = df[df.pu_zone != 'N/A']
    df = df[df.do_zone != 'N/A']
    return df

In [None]:
setup_zone_table()

con.execute("SELECT * FROM locations LIMIT 4").df()

In [None]:

years = [2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021]

for yyyy in years:
    for mm in range(1,13):
        if mm < 10: mm = f'0{mm}'
        get_clean_columns(yyyy, mm)
        df = annotate_zones()
        print("Creating table from df")
        con.execute("CREATE OR REPLACE TABLE taxidata as SELECT * FROM df")
        print(f"Writing local parquet {yyyy}-{mm}")
        con.execute(f"COPY (SELECT * FROM taxidata) TO '{TMP_DIR}/{yyyy}-{mm}-cleaned.parquet' (FORMAT PARQUET);")
    con.execute(f"CREATE OR REPLACE TABLE taxidata AS SELECT * FROM read_parquet('{TMP_DIR}/{yyyy}-*.parquet');")
    print(f"Uploading {yyyy} parquet to S3")
    con.execute(f"COPY (SELECT * FROM taxidata) TO 's3://{BUCKET}/taxidata_cleaned/{yyyy}.parquet' (FORMAT PARQUET);")


### Test reads and data shape

In [None]:
raw_data = con.execute(f"CREATE OR REPLACE TABLE taxidata AS SELECT * FROM read_parquet('s3://{BUCKET}/taxidata_cleaned/*.parquet');")

In [None]:
con.execute(f"SELECT * FROM read_parquet('s3://{BUCKET}/taxidata_cleaned/2011.parquet') LIMIT 10;").df()

In [None]:
con.execute(f"SELECT COUNT(*) FROM read_parquet('s3://{BUCKET}/taxidata_cleaned/*.parquet');").df()

In [None]:
con.execute("SELECT COUNT(*) FROM taxidata").df()


In [None]:
con.close()