In [1]:
import os
from xml.dom.pulldom import default_bufsize
from dotenv import load_dotenv
import duckdb
from collections import defaultdict
import pandas as pd
import re

In [2]:
load_dotenv()
# conn = duckdb.connect(':memory:')
conn = duckdb.connect("citi_bike.duckdb")
conn.execute("PRAGMA threads=8;")
conn.execute("PRAGMA enable_print_progress_bar;")
conn.execute("SET preserve_insertion_order = false;")
conn.execute('INSTALL httpfs;')
conn.execute('LOAD httpfs;')
# authenication
conn.execute(f"SET s3_region='us-east-1';")
conn.execute(f"SET s3_access_key_id='{os.environ['AWS_ACCESS_KEY_ID']}';")
conn.execute(f"SET s3_secret_access_key='{os.environ['AWS_SECRET_ACCESS_KEY']}';")

<_duckdb.DuckDBPyConnection at 0x1c3887ca4f0>

In [3]:
nyc_year_range = range(2013,2026)
jc_year_range = range(2015,2026)

In [4]:
from random import sample

'''
nyc_schema_cols_1 = [
    "trip_duration",
    # "trip_date",
    "start_time",
    "stop_time",
    "start_station_id",
    "start_station_name",
    "start_station_lat",
    "start_station_long",
    "end_station_id",
    "end_station_name",
    "end_station_lat",
    "end_station_long",
    "user_type"
]
nyc_schema_cols_2 = [
    'start_time',
    'stop_time', 
    'start_station_name', 
    'start_station_id', 
    'end_station_name', 
    'end_station_id', 
    'start_station_lat', 
    'start_station_long', 
    'end_station_lat', 
    'end_station_long',
    'user_type'
]
jc_schema_cols_1 = [
    'trip_duration',
    'start_time',
    'stop_time',
    'start_station_id',
    'start_station_name',
    'start_station_lat',
    'start_station_long',
    'end_station_id',
    'end_station_name',
    'end_station_lat',
    'end_station_long',
    'user_type'
]
jc_schema_cols_2 = [
    'start_time',
    'stop_time',
    'start_station_name',
    'start_station_id',
    'end_station_name',
    'end_station_id',
    'start_station_lat',
    'start_station_long',
    'end_station_lat',
    'end_station_long',
    'user_type',
]

schema_cols_map = {"nyc": {"2013":nyc_schema_cols_1, "2014":nyc_schema_cols_1, "2015":nyc_schema_cols_1, "2016":nyc_schema_cols_1, "2017":nyc_schema_cols_1, "2018":nyc_schema_cols_1, "2019":nyc_schema_cols_1, "2020":nyc_schema_cols_2, "2021":nyc_schema_cols_2, "2022":nyc_schema_cols_2, "2023":nyc_schema_cols_2, "2024":nyc_schema_cols_2, "2025":nyc_schema_cols_2},
            "jc": {"2015": jc_schema_cols_1, "2016": jc_schema_cols_1, "2017": jc_schema_cols_1, "2018": jc_schema_cols_1, "2019": jc_schema_cols_1, "2020": jc_schema_cols_1, "2021": jc_schema_cols_2, "2022": jc_schema_cols_2, "2023": jc_schema_cols_2, "2024": jc_schema_cols_2, "2025": jc_schema_cols_2}}
'''
         
def assert_schema_and_dtypes(all_cols):
    target_schema = {
        "trip_duration" : "BIGINT",
        "trip_date" : "DATE",
        "start_time" : "TIMESTAMP",
        "stop_time" : "TIMESTAMP",
        "start_station_id" : "VARCHAR",
        "start_station_name" : "VARCHAR",
        "start_station_lat" : "DOUBLE",
        "start_station_long" : "DOUBLE",
        "end_station_id" : "VARCHAR",
        "end_station_name" : "VARCHAR",
        "end_station_lat" : "DOUBLE",
        "end_station_long" : "DOUBLE",
        "user_type" : "VARCHAR",
        "rideable_type" : "VARCHAR"
    }
    sample_schema = {col_name:col_type for col_name,col_type in all_cols}
    assert len(target_schema) == len(sample_schema), f"Expected {len(target_schema)} cols, Got {len(sample_schema)} cols"
    for canonical_name,canonical_dtype in target_schema.items():
        assert canonical_name in sample_schema, f"provided schema is missing {canonical_name}"
        assert sample_schema[canonical_name] == canonical_dtype, f"datatype mismatch, expected {canonical_name} -> {canonical_dtype}, got {canonical_name} -> {sample_schema[canonical_name]}"
    

In [16]:
col_mapping = {"nyc": {"2013" : {"col_names" : ["tripduration AS trip_duration","CAST(REPLACE(starttime, '\"', '') AS TIMESTAMP) AS start_time","CAST(REPLACE(SPLIT_PART(CAST(starttime AS VARCHAR),' ',1),'\"','') AS DATE) AS trip_date","CAST(REPLACE(stoptime,'\"','') AS TIMESTAMP) AS stop_time", 'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','usertype AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'], 
                                     "filter_clause" : 'WHERE starttime IS NOT NULL AND stoptime IS NOT NULL AND "start station id" IS NOT NULL AND "start station latitude" IS NOT NULL AND "start station longitude" IS NOT NULL AND "end station id" IS NOT NULL AND "end station latitude" IS NOT NULL AND "end station longitude" IS NOT NULL'},
                           "2014" : {"col_names" : ["tripduration AS trip_duration","CAST(TRY_STRPTIME(starttime, ['%m/%d/%Y %H:%M:%S','%Y-%m-%d %H:%M:%S']) AS TIMESTAMP) AS start_time","CAST(TRY_STRPTIME(starttime,['%m/%d/%Y %H:%M:%S','%Y-%m-%d %H:%M:%S']) AS DATE) as trip_date","TRY_STRPTIME(stoptime,['%m/%d/%Y %H:%M:%S','%Y-%m-%d %H:%M:%S']) AS stop_time", 'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','usertype AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                    "filter_clause" : ""},
                           "2015" : {"col_names" : ["tripduration AS trip_duration","CAST(TRY_STRPTIME(starttime, ['%m/%d/%Y %H:%M:%S','%m/%d/%Y %H:%M','%m/%d/%Y %H:%M:%S']) AS TIMESTAMP) AS start_time","CAST(TRY_STRPTIME(starttime,['%m/%d/%Y %H:%M','%m/%d/%Y %H:%M:%S']) AS DATE) AS trip_date","CAST(TRY_STRPTIME(stoptime,['%m/%d/%Y %H:%M:%S','%m/%d/%Y %H:%M']) AS TIMESTAMP) AS stop_time", 'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','usertype AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                     "filter_clause" : ""},
                           "2016" : {"col_names" : ["tripduration AS trip_duration","CAST(TRY_STRPTIME(starttime,['%m/%d/%Y %H:%M:%S','%Y-%m-%d %H:%M:%S']) AS TIMESTAMP) AS start_time","CAST(TRY_STRPTIME(starttime,['%m/%d/%Y %H:%M:%S','%Y-%m-%d %H:%M:%S']) AS DATE) AS trip_date","CAST(TRY_STRPTIME(stoptime,['%m/%d/%Y %H:%M:%S','%Y-%m-%d %H:%M:%S']) AS TIMESTAMP) AS stop_time", 'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','usertype AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'], 
                                     "filter_clause" : ""},
                           "2017" : {"col_names" : ["tripduration AS trip_duration","CAST(starttime AS TIMESTAMP) AS start_time","CAST(starttime AS DATE) as trip_date","CAST(stoptime AS TIMESTAMP) AS stop_time", 'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','usertype AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                    "filter_clause" : 'WHERE "start station latitude" IS NOT NULL AND "end station latitude" IS NOT NULL AND "end station longitude" IS NOT NULL'},
                           "2018" : {"col_names" : ["tripduration AS trip_duration","CAST(REPLACE(starttime,'\"','') AS TIMESTAMP) AS start_time","CAST(REPLACE(starttime,'\"','') AS DATE) as trip_date","CAST(REPLACE(stoptime,'\"','') AS TIMESTAMP) AS stop_time", 'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','usertype AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                    "filter_clause" : 'WHERE "start station id" IS NOT NULL AND "end station id" IS NOT NULL'},
                           "2019" : {"col_names" : ["tripduration AS trip_duration","CAST(starttime AS TIMESTAMP) AS start_time","CAST(starttime AS DATE) AS trip_date","CAST(stoptime AS TIMESTAMP) AS stop_time", 'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','usertype AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                    "filter_clause" : 'WHERE "start station id" IS NOT NULL AND "end station id" IS NOT NULL'},
                           "2020" : {"col_names" : ["CAST(started_at AS TIMESTAMP) AS start_time","CAST(ended_at AS TIMESTAMP) AS stop_time","CAST(started_at AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(ended_at AS TIMESTAMP) - CAST(started_at AS TIMESTAMP))) AS BIGINT) AS trip_duration","start_station_name","CAST(start_station_id AS VARCHAR) AS start_station_id","end_station_name","end_station_id","start_lat AS start_station_lat","start_lng AS start_station_long","end_lat AS end_station_lat","end_lng AS end_station_long","member_casual AS user_type","rideable_type"],
                                     "filter_clause" : "WHERE start_station_id IS NOT NULL AND end_lat IS NOT NULL"},
                           "2021" : {"col_names" : ["CAST(started_at AS TIMESTAMP) AS start_time","CAST(ended_at AS TIMESTAMP) AS stop_time","CAST(started_at AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(ended_at AS TIMESTAMP) - CAST(started_at AS TIMESTAMP))) AS BIGINT) AS trip_duration","start_station_name","CAST(start_station_id AS VARCHAR) AS start_station_id","end_station_name","end_station_id","start_lat AS start_station_lat","start_lng AS start_station_long","end_lat AS end_station_lat","end_lng AS end_station_long","member_casual AS user_type","rideable_type"],
                                     "filter_clause" : "WHERE start_station_id IS NOT NULL AND end_lat IS NOT NULL"},
                           "2022" : {"col_names" : ["CAST(started_at AS TIMESTAMP) AS start_time","CAST(ended_at AS TIMESTAMP) AS stop_time","CAST(started_at AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(ended_at AS TIMESTAMP) - CAST(started_at AS TIMESTAMP))) AS BIGINT) AS trip_duration","start_station_name","CAST(start_station_id AS VARCHAR) AS start_station_id","end_station_name","end_station_id","start_lat AS start_station_lat","start_lng AS start_station_long","end_lat AS end_station_lat","end_lng AS end_station_long","member_casual AS user_type","rideable_type"],
                                     "filter_clause" : "WHERE start_station_id IS NOT NULL AND end_lat IS NOT NULL"},
                           "2023" : {"col_names" : ["CAST(started_at AS TIMESTAMP) AS start_time","CAST(ended_at AS TIMESTAMP) AS stop_time","CAST(started_at AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(ended_at AS TIMESTAMP) - CAST(started_at AS TIMESTAMP))) AS BIGINT) AS trip_duration","start_station_name","start_station_id","end_station_name","end_station_id","start_lat AS start_station_lat","start_lng AS start_station_long","end_lat AS end_station_lat","end_lng AS end_station_long","member_casual AS user_type","rideable_type"],
                                     "filter_clause" : "WHERE start_lat IS NOT NULL AND end_lat IS NOT NULL"},
                           "2024" : {"col_names" : ["CAST(started_at AS TIMESTAMP) AS start_time","CAST(ended_at AS TIMESTAMP) AS stop_time","CAST(started_at AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(ended_at AS TIMESTAMP) - CAST(started_at AS TIMESTAMP))) AS BIGINT) AS trip_duration","start_station_name","start_station_id","end_station_name","end_station_id","start_lat AS start_station_lat","start_lng AS start_station_long","end_lat AS end_station_lat","end_lng AS end_station_long","member_casual AS user_type","rideable_type"],
                                     "filter_clause" : "WHERE start_lat IS NOT NULL AND end_lat IS NOT NULL"},
                           "2025" : {"col_names" : ["CAST(started_at AS TIMESTAMP) AS start_time","CAST(ended_at AS TIMESTAMP) AS stop_time","CAST(started_at AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(ended_at AS TIMESTAMP) - CAST(started_at AS TIMESTAMP))) AS BIGINT) AS trip_duration","start_station_name","start_station_id","end_station_name","end_station_id","start_lat AS start_station_lat","start_lng AS start_station_long","end_lat AS end_station_lat","end_lng AS end_station_long","member_casual AS user_type","rideable_type"],
                                     "filter_clause" : "WHERE start_lat IS NOT NULL AND end_lat IS NOT NULL"}
                           }, 
                    "jc": {"2015" : {"col_names" : ['"trip duration" AS trip_duration','CAST("start time" AS TIMESTAMP) AS start_time','CAST("stop time" AS TIMESTAMP) AS stop_time','CAST("start time" AS DATE) AS trip_date','CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','"user type" AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                     "filter_clause" : ""},
                           "2016" : {"col_names" : ['"trip duration" AS trip_duration','CAST("start time" AS TIMESTAMP) AS start_time','CAST("stop time" AS TIMESTAMP) AS stop_time','CAST("start time" AS DATE) AS trip_date','CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','"user type" AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                     "filter_clause" : ""},
                           "2017" : {"col_names" : ['"tripduration" AS trip_duration',"CAST(REPLACE(starttime,'\"','') AS TIMESTAMP) AS start_time","CAST(REPLACE(stoptime,'\"','') AS TIMESTAMP) AS stop_time","CAST(REPLACE(starttime,'\"','') AS DATE) AS trip_date",'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','"usertype" AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                     "filter_clause" : ""},
                           "2018" : {"col_names" : ['"tripduration" AS trip_duration',"CAST(REPLACE(starttime,'\"','') AS TIMESTAMP) AS start_time","CAST(REPLACE(stoptime,'\"','') AS TIMESTAMP) AS stop_time","CAST(REPLACE(starttime,'\"','') AS DATE) AS trip_date",'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','"usertype" AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                     "filter_clause" : ""},
                           "2019" : {"col_names" : ['"tripduration" AS trip_duration',"CAST(REPLACE(starttime,'\"','') AS TIMESTAMP) AS start_time","CAST(REPLACE(stoptime,'\"','') AS TIMESTAMP) AS stop_time","CAST(REPLACE(starttime,'\"','') AS DATE) AS trip_date",'CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','"usertype" AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                     "filter_clause" : ""},
                           "2020" : {"col_names" : ['"tripduration" AS trip_duration',"CAST(REPLACE(starttime,'\"','') AS TIMESTAMP) AS start_time","CAST(REPLACE(stoptime,'\"','') AS TIMESTAMP) AS stop_time",'CAST(start_time as DATE) AS trip_date','CAST("start station id" AS VARCHAR) AS start_station_id','"start station name" AS start_station_name','"start station latitude" AS start_station_lat','"start station longitude" AS start_station_long','CAST("end station id" AS VARCHAR) AS end_station_id','"end station name" AS end_station_name','"end station latitude" AS end_station_lat','"end station longitude" AS end_station_long','"usertype" AS user_type','CAST(NULL AS VARCHAR) AS rideable_type'],
                                     "filter_clause" : ""},
                           "2021" : {"col_names" : ['CAST(started_at AS TIMESTAMP) AS start_time', 'CAST(ended_at AS TIMESTAMP) AS stop_time', 'CAST(started_at AS DATE) AS trip_date', 'CAST(EXTRACT(EPOCH FROM (CAST(ended_at AS TIMESTAMP) - CAST(started_at AS TIMESTAMP))) AS BIGINT) AS trip_duration','start_station_name', 'start_station_id', 'end_station_name', 'end_station_id', 'start_lat AS start_station_lat', 'start_lng AS start_station_long', 'end_lat AS end_station_lat', '"end_lng" AS end_station_long', 'member_casual AS user_type',"rideable_type"],
                                     "filter_clause" : "WHERE start_lat IS NOT NULL AND end_lat IS NOT NULL"},
                           "2022" : {"col_names" : ["CAST(REPLACE(started_at,'\"','') AS TIMESTAMP) AS start_time", "CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) AS stop_time", "CAST(REPLACE(started_at,'\"','') AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) - CAST(REPLACE(started_at,'\"','') AS TIMESTAMP))) AS BIGINT) AS trip_duration",'start_station_name', 'start_station_id', 'end_station_name', 'end_station_id', 'start_lat AS start_station_lat', 'start_lng AS start_station_long', 'end_lat AS end_station_lat', '"end_lng" AS end_station_long', 'member_casual AS user_type',"rideable_type"],
                                     "filter_clause" : "WHERE end_lat IS NOT NULL"},
                           "2023" : {"col_names" : ["CAST(REPLACE(started_at,'\"','') AS TIMESTAMP) AS start_time", "CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) AS stop_time", "CAST(REPLACE(started_at,'\"','') AS DATE) AS trip_date", "CAST(EXTRACT(EPOCH FROM (CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) - CAST(REPLACE(started_at,'\"','') AS TIMESTAMP))) AS BIGINT) AS trip_duration",'start_station_name', 'start_station_id', 'end_station_name', 'end_station_id', 'start_lat AS start_station_lat', 'start_lng AS start_station_long', 'end_lat AS end_station_lat', '"end_lng" AS end_station_long', 'member_casual AS user_type',"rideable_type"],
                                     "filter_clause" : "WHERE end_lat IS NOT NULL"},
                           "2024" : {"col_names" : ["CAST(REPLACE(started_at,'\"','') AS TIMESTAMP) AS start_time", "CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) AS stop_time","CAST(REPLACE(started_at,'\"','') AS DATE) AS trip_date","CAST(EXTRACT(EPOCH FROM (CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) - CAST(REPLACE(started_at,'\"','') AS TIMESTAMP))) AS BIGINT) AS trip_duration",'start_station_name', 'start_station_id', 'end_station_name', 'end_station_id', 'start_lat AS start_station_lat', 'start_lng AS start_station_long', 'end_lat AS end_station_lat', '"end_lng" AS end_station_long', 'member_casual AS user_type',"rideable_type"],
                                     "filter_clause" : "WHERE end_lat IS NOT NULL"},
                           "2025" : {"col_names" : ["CAST(REPLACE(started_at,'\"','') AS TIMESTAMP) AS start_time", "CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) AS stop_time", "CAST(REPLACE(started_at,'\"','') AS DATE) AS trip_date", "CAST(EXTRACT(EPOCH FROM (CAST(REPLACE(ended_at,'\"','') AS TIMESTAMP) - CAST(REPLACE(started_at,'\"','') AS TIMESTAMP))) AS BIGINT) AS trip_duration",'start_station_name', 'start_station_id', 'end_station_name', 'end_station_id', 'start_lat AS start_station_lat', 'start_lng AS start_station_long', 'end_lat AS end_station_lat', '"end_lng" AS end_station_long', 'member_casual AS user_type',"rideable_type"],
                                     "filter_clause" : "WHERE start_lat IS NOT NULL AND end_lat IS NOT NULL"}}}


In [6]:
# annual total rides -> source -> city -> year -> num
# monthly -> cleaned dataset -> fuk it
total_rides = {"nyc": {},"jc":{}}
for year in nyc_year_range:
    total_rides["nyc"][str(year)] = 0
for year in jc_year_range:
    total_rides["jc"][str(year)] = 0

In [7]:
nyc_views = {} # year [str] -> view_name [str]

In [8]:
def prune_local_tables_in_disk(city=None):
    # fetch all table names once
    all_tables = [t[0] for t in conn.execute("SHOW TABLES;").fetchall()]
    if city == "nyc":
        all_tables = list(filter(lambda x: x.startswith("nyc"),all_tables))
    elif city == "jc":
        all_tables = list(filter(lambda x: x.startswith("jc"),all_tables))
    else:
        all_tables = list(filter(lambda x: x.startswith("nyc") or x.startswith("jc"),all_tables))
    for table in all_tables:
        print(f"Dropping table {table}")
        conn.execute(f"DROP TABLE IF EXISTS {table}")
def show_tables_on_disk():
    print([t[0] for t in conn.execute("SHOW TABLES;").fetchall()])

In [18]:
for city in ["nyc","jc"]:
    year_range = nyc_year_range if city == "nyc" else jc_year_range
    for year in year_range:
        table_name = table_name = f"{city}_{year}"
        tables = [c[0] for c in conn.execute("SHOW TABLES").fetchall()]
        if table_name not in tables:
            try:
                conn.execute("BEGIN TRANSACTION")
                print(f"⏳ Creating table {table_name} from S3 ...")
                conn.execute(f"""
                        CREATE OR REPLACE VIEW raw_view AS SELECT * FROM {f"read_parquet('s3://citibike-nycdata/parquet_files/{city}_files/{year}/*')"}
                            """)
                # ['column_name', 'column_type', 'min', 'max', 'approx_unique', 'avg', 'std', 'q25', 'q50', 'q75', 'count', 'null_percentage']
                result = conn.execute("SELECT COUNT(*) FROM raw_view").fetchall()
                raw_view_total = result[0][0]
                total_rides[city][str(year)] = raw_view_total
                print(f"Total entries found for {year} -> {raw_view_total}")
                col_names = ','.join(col_mapping[city][str(year)]["col_names"])
                filter_clause = col_mapping[city][str(year)]["filter_clause"]
                result = conn.execute(f'CREATE OR REPLACE VIEW temp_view AS SELECT {col_names} FROM raw_view {filter_clause}')
                temp_view_cols = [(col[0],col[1]) for col in conn.execute("DESCRIBE temp_view").fetchall()]
                print("begin schema validity check")
                assert_schema_and_dtypes(temp_view_cols)
                print("done schema validity check")
                
                # begin nullity check -> for filtering
                '''
                query = []
                for c in schema_cols:
                    query.append(f"COUNT(*) - COUNT({c})")
                result = conn.execute(f"""
                                SELECT {','.join(query)} FROM temp_view
                                    """).fetchall()
                for i,col in enumerate(schema_cols):
                    print(f"Null Count -- {col} -> ",result[0][i])
                    assert result[0][i] == 0, "nil value unhandled!!"
                '''
                # end nullity check -> for filtering

                # general nullity check
                print("begin nullity check")
                for c in [e[0] for e in conn.execute("DESCRIBE SELECT(*) FROM temp_view").fetchall()]:
                    if c == "rideable_type":
                        print(f"skipping ... {c}")
                        continue
                    null_count = conn.execute(f"SELECT COUNT(*) - COUNT({c}) FROM temp_view").fetchall()[0][0]
                    assert null_count == 0, f"detected {null_count} nil values for {c}"
                print("done nullity check")
                # end general nullity check
                
                temp_view_total = conn.execute("SELECT COUNT(*) FROM temp_view").fetchall()[0][0]
                print(f"reduced {raw_view_total} by {raw_view_total-temp_view_total} -> {temp_view_total}")
                print("sample query -> ")
                print(conn.execute("SELECT * FROM temp_view LIMIT 5").fetchall())
                print(conn.description)
                print("attempting to add view")
                conn.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM temp_view;")
                nyc_views[str(year)] = table_name
                print("done adding view...")
                # filter to new view entries where any of the following col c1,c2,c3,c4 are null
                # print([c[0] for c in conn.description])
                # print(f"{'Column':25} | {'Type':10} | {'Null %':>7} | {'Count':>8} | {'Avg':>10} | {'Std':>10}")
                # print("-" * 80)
                # for e in result:
                #     col_name, col_type, _, _, _, avg, std, _, _, _, count, null_pct = e
                #     print(f"{col_name:25} | {col_type:10} | {float(null_pct):7.2f} | {count:8} | {avg or '':>10} | {std or '':>10}")
                print(f"✅ {table_name} created & persisted.")
                conn.execute("COMMIT")
            except Exception as e:
                conn.execute("ROLLBACK")
                raise e
        else:
            print(f"⚡ {table_name} already exists — skipping.")
        print("--------------------------------------------------------------------")


⚡ nyc_2013 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2014 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2015 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2016 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2017 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2018 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2019 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2020 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2021 already exists — skipping.
--------------------------------------------------------------------
⚡ nyc_2022 already exists — skipping.

In [None]:
raw_data_total_rides = {'nyc': {'2013': 11229776,
  '2014': 8081216,
  '2015': 9937969,
  '2016': 13845655,
  '2017': 16364657,
  '2018': 35096678,
  '2019': 20551697,
  '2020': 19562314,
  '2021': 27130122,
  '2022': 29838806,
  '2023': 35106986,
  '2024': 44303209,
  '2025': 35530530},
 'jc': {'2015': 52883,
  '2016': 247584,
  '2017': 259456,
  '2018': 353892,
  '2019': 404947,
  '2020': 336802,
  '2021': 644443,
  '2022': 895485,
  '2023': 988851,
  '2024': 1052451,
  '2025': 773301}}

{'nyc': {'2013': 11229776,
  '2014': 8081216,
  '2015': 9937969,
  '2016': 13845655,
  '2017': 16364657,
  '2018': 35096678,
  '2019': 20551697,
  '2020': 19562314,
  '2021': 27130122,
  '2022': 29838806,
  '2023': 35106986,
  '2024': 44303209,
  '2025': 35530530},
 'jc': {'2015': 0,
  '2016': 247584,
  '2017': 259456,
  '2018': 353892,
  '2019': 404947,
  '2020': 336802,
  '2021': 644443,
  '2022': 895485,
  '2023': 988851,
  '2024': 1052451,
  '2025': 0}}