In [3]:
import pandas as pd
import h3
import hashlib
import json

### Data ingestion planning


We want to convert the zip files of CSVs from the citi bike data [website](https://s3.amazonaws.com/tripdata/index.html) into a data format we can work with.

Big picture the plan is to take the CSVs of individual rides and upload each ride as an entry to the `ride_data` database table.

There are a few reasons to reshape the trip data before aggregating by month. The biggest reason is that the file structure for the data dumps is not consistent. So if we pulled down data and directly converted to aggregated monthly formats it would mean the logic for aggregating and the logic for extracting data would be tightly coupled. AKA if we want to change aggregation logic we would need to update the code for each file structure for data.

1. The file structure is inconsistent across years/months. Before 2024 all data is in one file
2. The `ride_id` field is not present until a certain year.
3. We want to easily be able to add more cities.
4. When there are multiple files for a single month we can process each one individually and send to the backend without having to load all of them each time. So our system doesn't need to **understand** the file structure. It can just load all the trips in the format we want then process them. 


This table will have 5 fields:

`id`: int incremental unique ID

`ride_id`: A unique ID for each ride

`locale`: Jersey or NYC for now

`start_date`: The starting time for the ride.

`created_at` timestamp for creation. 



A few solutions:

1. Data before a certain year does not contain the ride_id field. We will create it for those dates by combining and hashing the `start_time` and the `bike_id` since that should be unique.
2. Duplicated trips across files. If they are from recent years they will have `ride_id` which will dedupe. Otherwise our generated `ride_id` will hopefully suffice.


### Process

1. Hit index page https://s3.amazonaws.com/tripdata/index.html

2. Get all file names/last modified dates. Compare against db to see if anything requires update.

3. For any file which has been modified or is new, run ingestion.

4. Download zip file, iterate through each sub-file.

In [4]:
import pandas as pd
import requests
import zipfile
import io
from datetime import datetime
from pathlib import Path
import os

def find_column_match(df_columns, possible_names):
    """Find matching column name from possibilities, case-insensitive"""
    df_columns_lower = [col.lower() for col in df_columns]
    for possible_name in possible_names:
        if possible_name.lower() in df_columns_lower:
            # Return the original column name (with original case)
            original_idx = df_columns_lower.index(possible_name.lower())
            return df_columns[original_idx]
    return None


def process_all_csvs_from_zip_url(zip_url, locale):
    """
    Download ZIP file from URL and process ALL CSV files found in any folder/subfolder.

    Parameters:
    zip_url (str): URL to the ZIP file

    Returns:
    dict: Dictionary where keys are CSV filenames and values are processed DataFrames
    """

    try:
        # Download ZIP file into memory
        print(f"Downloading ZIP file from: {zip_url}")
        response = requests.get(zip_url, stream=True)
        response.raise_for_status()

        # Create a BytesIO object from the downloaded content
        zip_data = io.BytesIO(response.content)

        return process_all_csvs_from_zip_data(zip_data, locale)

    except requests.RequestException as e:
        raise Exception(f"Error downloading ZIP file: {str(e)}")
    except Exception as e:
        raise Exception(f"Error processing ZIP file: {str(e)}")


def process_all_csvs_from_zip_data(zip_data, locale):
    """
    Process all CSV files from ZIP data (works with both URL and local files).

    Parameters:
    zip_data (io.BytesIO): ZIP file data

    Returns:
    dict: Dictionary where keys are CSV filenames and values are processed DataFrames
    """

    results = {}
    processed_count = 0
    failed_files = []

    try:
        with zipfile.ZipFile(zip_data, "r") as zip_ref:
            # Get ALL files in ZIP (including subfolders)
            all_files = zip_ref.namelist()

            # Filter for CSV files (case insensitive) and exclude system/metadata files
            csv_files = []
            for f in all_files:
                # Skip directories
                if f.endswith("/"):
                    continue
                # Skip macOS metadata files
                if "__MACOSX" in f or f.startswith("._"):
                    continue
                # Skip Windows/Linux hidden files
                if "/.DS_Store" in f or f.endswith(".DS_Store"):
                    continue
                # Skip other common system files
                if f.endswith(".thumbs.db") or f.endswith("Thumbs.db"):
                    continue
                # Keep only CSV files
                if f.lower().endswith(".csv"):
                    csv_files.append(f)

            if not csv_files:
                raise ValueError("No CSV files found in ZIP archive")

            print(f"Found {len(csv_files)} CSV file(s) in ZIP archive:")
            for csv_file in csv_files:
                print(f"  - {csv_file}")

            # Process each CSV file
            for csv_file_path in csv_files:
                try:
                    print(f"\nProcessing: {csv_file_path}")

                    # Read CSV directly from ZIP
                    with zip_ref.open(csv_file_path) as csv_file:
                        df = pd.read_csv(csv_file)

                    # Process the DataFrame based on which format it uses.
                    if "ride_id" in df.columns:
                        print("here")
                        processed_df = process_dataframe(df, locale)
                    else:
                        print("239821h")
                        processed_df = process_dataframe_old_format(df, locale)

                    # Use just the filename (without path) as key
                    filename_only = os.path.basename(csv_file_path)

                    # Handle duplicate filenames by adding folder info
                    if filename_only in results:
                        # Create unique key with folder path
                        folder_path = os.path.dirname(csv_file_path)
                        unique_key = (
                            f"{folder_path}/{filename_only}"
                            if folder_path
                            else filename_only
                        )
                        results[unique_key] = processed_df
                    else:
                        results[filename_only] = processed_df

                    processed_count += 1
                    print(f"  ✓ Successfully processed {len(processed_df)} rows")

                except Exception as e:
                    error_msg = f"Failed to process {csv_file_path}: {str(e)}"
                    print(f"  ✗ {error_msg}")
                    failed_files.append((csv_file_path, str(e)))
                    continue

            # Summary
            print(f"\n{'='*50}")
            print(f"PROCESSING SUMMARY:")
            print(f"Successfully processed: {processed_count} files")
            print(f"Failed: {len(failed_files)} files")

            if failed_files:
                print(f"\nFailed files:")
                for failed_file, error in failed_files:
                    print(f"  - {failed_file}: {error}")

            if processed_count == 0:
                raise ValueError("No CSV files could be processed successfully")

            return results

    except zipfile.BadZipFile:
        print("File is not a valid ZIP archive, skipping.")
    except Exception as e:
        raise Exception(f"Error processing ZIP file: {str(e)}")
def process_dataframe(df, locale):
    """
    Process DataFrame to create the desired output format.

    Parameters:
    df (pd.DataFrame): Input DataFrame

    Returns:
    pd.DataFrame: Processed DataFrame
    """

    required_columns = [
        "ride_id",
        "started_at",
        "start_lat",
        "start_lng",
        "end_lat",
        "end_lng",
    ]

    # Check if all required columns exist
    missing_columns = [col for col in required_columns if col not in df.columns]

    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}")

    # Create new DataFrame with only the needed columns
    result_df = pd.DataFrame()

    # Copy ride_id as-is
    result_df["ride_id"] = df["ride_id"]

    # Convert started_at to date only (remove time component)
    result_df["start_date"] = pd.to_datetime(df["started_at"]).dt.date

    # Set local to constant "JC"
    result_df["locale"] = locale

    # Copy latitude and longitude
    result_df["start_lat"] = df["start_lat"]
    result_df["start_lng"] = df["start_lng"]
    result_df["end_lat"] = df["end_lat"]
    result_df["end_lng"] = df["end_lng"]

    return result_df


# Create hashed ride_id from starttime and bikeid combination
def create_ride_id_hash(row):
    # Convert both values to strings and concatenate
    combined_string = f"{row['start_time']}_{row['bike_id']}"
    # Create SHA256 hash and take first 16 characters for shorter ID
    hash_object = hashlib.sha256(combined_string.encode())
    return hash_object.hexdigest()[:16]


def process_dataframe_old_format(df, locale):
    """
    Process DataFrame to create the desired output format.

    Parameters:
    df (pd.DataFrame): Input DataFrame
    locale: Locale identifier

    Returns:
    pd.DataFrame: Processed DataFrame
    """

    # Define column mappings with multiple possible names (all lowercase for comparison)
    column_mappings = {
        "bikeid": ["bikeid", "bike_id", "bike id"],
        "starttime": ["starttime", "start time", "start_time"],
        "start_station_latitude": [
            "start station latitude",
            "start_station_latitude",
            "start station lat",
            "start_lat",
        ],
        "start_station_longitude": [
            "start station longitude",
            "start_station_longitude",
            "start station lng",
            "start station lon",
            "start_lng",
            "start_lon",
        ],
        "end_station_latitude": [
            "end station latitude",
            "end_station_latitude",
            "end station lat",
            "end_lat",
        ],
        "end_station_longitude": [
            "end station longitude",
            "end_station_longitude",
            "end station lng",
            "end station lon",
            "end_lng",
            "end_lon",
        ],
    }

    # Find actual column names in the DataFrame
    actual_columns = {}
    missing_columns = []

    for standard_name, possible_names in column_mappings.items():
        matched_column = find_column_match(df.columns.tolist(), possible_names)
        if matched_column:
            actual_columns[standard_name] = matched_column
        else:
            missing_columns.append(
                f"{standard_name} (tried: {', '.join(possible_names)})"
            )

    if missing_columns:
        raise ValueError(
            f"Missing required columns: {missing_columns}. Available columns: {df.columns.tolist()}"
        )

    # Create new DataFrame with only the needed columns
    result_df = pd.DataFrame()

    # Generate unique ride_id using hash of starttime and bikeid
    # Create a temporary series for the hash function
    temp_df = pd.DataFrame(
        {
            "start_time": df[actual_columns["starttime"]],
            "bike_id": df[actual_columns["bikeid"]],
        }
    )
    result_df["ride_id"] = temp_df.apply(create_ride_id_hash, axis=1)

    # Convert started_at to date only (remove time component)
    result_df["start_date"] = pd.to_datetime(df[actual_columns["starttime"]]).dt.date

    result_df["locale"] = locale

    # Copy latitude and longitude using the matched column names
    result_df["start_lat"] = df[actual_columns["start_station_latitude"]]
    result_df["start_lng"] = df[actual_columns["start_station_longitude"]]
    result_df["end_lat"] = df[actual_columns["end_station_latitude"]]
    result_df["end_lng"] = df[actual_columns["end_station_longitude"]]

    return result_df


def download_and_save_zip(zip_url, local_path):
    """
    Download ZIP file and save locally.

    Parameters:
    zip_url (str): URL to download
    local_path (str): Local path to save the file
    """
    try:
        print(f"Downloading ZIP file to: {local_path}")
        response = requests.get(zip_url, stream=True)
        response.raise_for_status()

        # Create directory if it doesn't exist
        Path(local_path).parent.mkdir(parents=True, exist_ok=True)

        # Save file
        with open(local_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        print(f"ZIP file saved successfully to: {local_path}")
        return local_path

    except requests.RequestException as e:
        raise Exception(f"Error downloading ZIP file: {str(e)}")
    except Exception as e:
        raise Exception(f"Error saving ZIP file: {str(e)}")



In [5]:
def apply_h3_latlng_to_cell(df_by_file, resolution=9) -> dict[str, pd.DataFrame]:
    output_obj = {}
    for key in df_by_file.keys():
        print(f"[{key}] entries: {df_by_file[key].shape[0]}")
        resolution = 9
        df_in_loop = df_by_file[key].copy()
        df_in_loop["h3_cell_start"] = df_in_loop.apply(
            lambda row: (
                h3.latlng_to_cell(row["start_lat"], row["start_lng"], resolution)
                if pd.notnull(row["start_lat"]) and pd.notnull(row["start_lng"])
                else None
            ),
            axis=1,
        )
        df_in_loop["h3_cell_end"] = df_in_loop.apply(
            lambda row: (
                h3.latlng_to_cell(row["end_lat"], row["end_lng"], resolution)
                if pd.notnull(row["end_lat"]) and pd.notnull(row["end_lng"])
                else None
            ),
            axis=1,
        )

        df_in_loop.drop(
            columns=["start_lat", "start_lng", "end_lat", "end_lng"], inplace=True
        )
        # Remove duplicates. These are infrequent and probably just bad data.
        dupes = df_in_loop.duplicated(subset=["ride_id"])
        print(f"  - Removing {dupes.sum()} duplicate ride_id entries")
        df_in_loop = df_in_loop[~dupes]

        output_obj[key] = df_in_loop
    return output_obj

In [6]:
from supabase import create_client, Client
import dotenv
dotenv.load_dotenv()

url: str = os.environ.get("URL")
key: str = os.environ.get("ANON_KEY")
supabase: Client = create_client(url, "sb_secret__hURMlWV0e8lHlKreudFAA_UBScLp6U")

In [7]:
def get_processed_file_record(file_df_entry):
    json_str = file_df_entry.to_json(date_format='iso')
    data = json.loads(json_str)
    return data
    

In [8]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time

def upload_results_with_guaranteed_retry(
    output, batch_size=1000, max_workers=2, max_retries=5
):
    """
    Upload with comprehensive retry logic. Process fails only if retries are exhausted.
    """
    print(
        f"Starting upload process with {len(output)} files, max_retries={max_retries}"
    )

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_file = {}

        # Submit all jobs with staggered starts
        for i, (file_name, df) in enumerate(output.items()):
            time.sleep(i * 0.2)  # Stagger submissions
            future = executor.submit(
                upload_file_with_retry, file_name, df, batch_size, max_retries
            )
            future_to_file[future] = file_name

        # Wait for all to complete
        failed_files = []
        for future in as_completed(future_to_file):
            file_name = future_to_file[future]
            try:
                future.result()  # This will raise if all retries failed
                print(f"🎉 Successfully completed {file_name}")
            except Exception as e:
                failed_files.append(file_name)
                print(f"💥 FINAL FAILURE for {file_name}: {e}")

                # Cancel remaining work
                for pending_future in future_to_file:
                    if not pending_future.done():
                        pending_future.cancel()
                        print(f"⏹️  Cancelled remaining upload")
                break

        if failed_files:
            raise RuntimeError(f"Upload process failed. Failed files: {failed_files}")

    print("✅ All uploads completed successfully!")


def upload_file_with_retry(file_name: str, df, batch_size: int, max_retries: int):
    """Upload a single file with comprehensive retry logic for each batch"""
    print(f"📁 Processing {file_name} with {len(df)} records")
    
    json_str = df.to_json(orient="records", date_format="iso")
    data = json.loads(json_str)
    total_batches = (len(data) + batch_size - 1) // batch_size

    for i in range(0, len(data), batch_size):
        batch = data[i : i + batch_size]
        batch_num = i // batch_size + 1

        # Retry this specific batch
        success = upload_batch_with_retry(
            batch, file_name, batch_num, total_batches, max_retries
        )

        if not success:
            raise RuntimeError(f"Batch {batch_num} failed after {max_retries} retries")

        # Small delay between batches within same file
        time.sleep(0.1)

    print(f"✅ Completed all batches for {file_name}")


def upload_batch_with_retry(
    batch, file_name: str, batch_num: int, total_batches: int, max_retries: int
) -> bool:
    """Retry a single batch with exponential backoff"""

    for attempt in range(max_retries):
        try:
            log_connection_info()
            result = (
                supabase.table("ride_data")
                .upsert(batch, on_conflict="ride_id,locale,start_date")
                .execute()
            )

            print(
                f"[{file_name}] ✅ Batch {batch_num}/{total_batches}: {len(batch)} records"
            )
            return True

        except Exception as e:
            error_str = str(e).lower()

            wait_time = calculate_backoff(attempt)
            remaining_attempts = max_retries - attempt - 1

            print(
                f"[{file_name}] ⚠️ Batch {batch_num} failed (attempt {attempt + 1}/{max_retries}): {e}"
            )

            if remaining_attempts > 0:
                print(
                    f"[{file_name}] ⏳ Retrying in {wait_time:.1f}s... ({remaining_attempts} attempts left)"
                )
                time.sleep(wait_time)
            else:
                print(
                    f"[{file_name}] 💥 Batch {batch_num} exhausted all {max_retries} retries"
                )
                return False

    return False


def calculate_backoff(
    attempt: int, base_delay: float = 1.0, max_delay: float = 60.0
) -> float:
    """Calculate exponential backoff with jitter"""
    delay = min(base_delay * (2**attempt), max_delay)
    # Add jitter to avoid thundering herd
    jitter = random.uniform(0.1, 0.3) * delay
    return delay + jitter

In [9]:
import psycopg2
import sqlalchemy
from dotenv import load_dotenv
import os

# Load environment variables from .env
load_dotenv()

# Fetch variables
USER = os.getenv("user")
PASSWORD = os.getenv("password")
HOST = os.getenv("host")
PORT = os.getenv("port")
DBNAME = os.getenv("dbname")

password="T5Sxsx9Lt18Kb4EW"
user="postgres.kevndteqglsoslznrntz" 
host="aws-1-us-east-1.pooler.supabase.com"
port=5432
dbname="postgres"

# Connect to the database
try:
    connection = psycopg2.connect(
        user=user,
        password=password,
        host=host,
        port=port,
        dbname=dbname
    )
    print("Connection successful!")
    
    # Create a cursor to execute SQL queries
    cursor = connection.cursor()
    
    # Example query
    cursor.execute("SELECT NOW();")
    result = cursor.fetchone()
    print("Current Time:", result)

    # Close the cursor and connection
    cursor.close()
    connection.close()
    print("Connection closed.")

except Exception as e:
    print(f"Failed to connect: {e}")

Failed to connect: connection to server at "aws-1-us-east-1.pooler.supabase.com" (3.227.209.82), port 5432 failed: FATAL:  Authentication error, reason: "Authentication query failed: %DBConnection.ConnectionError{message: \"connection not available and request was dropped from queue after 10000ms. This means requests are coming in and your connection pool cannot serve them fast enough. You can address this by:\\n\\n  1. Ensuring your database is available and that you can connect to it\\n  2. Tracking down slow queries and making sure they are running fast enough\\n  3. Increasing the pool_size (although this increases resource consumption)\\n  4. Allowing requests to wait longer by increasing :queue_target and :queue_interval\\n\\nSee DBConnection.start_link/2 for more information\\n\", severity: :error, reason: :queue_timeout}"
connection to server at "aws-1-us-east-1.pooler.supabase.com" (3.227.209.82), port 5432 failed: FATAL:  Authentication error, reason: "Authentication query fa

In [10]:
import sqlalchemy
import psycopg2  # or asyncpg

# user='postgres'
# password="T5Sxsx9Lt18Kb4EW"
# host='db.kevndteqglsoslznrntz.supabase.co'
# port=6543
# dbname='postgres'
user = "postgres.kevndteqglsoslznrntz"
host = "aws-1-us-east-1.pooler.supabase.com"
port = 5432
dbname = "postgres"
# Your original code should now work
engine = sqlalchemy.create_engine(
    f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
)

In [23]:
connection = engine.connect()

In [None]:

file_df = pd.read_csv("./new_files_test.csv")
for _, file in file_df.iterrows():
    url = f"https://s3.amazonaws.com/tripdata/{file['file_name']}"
    df_by_file = process_all_csvs_from_zip_url(url, locale="NYC")
    if not df_by_file:
        print(f"No data processed for {file['file_name']}, skipping upload.")
    else:
        output = apply_h3_latlng_to_cell(df_by_file)
        # for _, file in output.iterrows():
        #     file.to_sql("ride_data", connection, if_exists="append", index=False)

In [83]:
from io import StringIO
import logging
from datetime import datetime
import pandas as pd

logger = logging.getLogger(__name__)


class BikeShareProcessor:
    def __init__(self, conn_details):
        self.conn_details = conn_details
        self.create_new_connection()

    def reset_connection(self):
        """Create a new connection"""
        try:
            self.conn.close()
        except:
            pass
        self.create_new_connection()
        self.connection_created_at = datetime.now()
        logger.info("Database connection reset successfully")

    def create_new_connection(self):
        self.conn = psycopg2.connect(**self.conn_details)

    def process_files_df(self, files: pd.DataFrame):
        for _, file in files.iterrows():
            self.reset_connection()
            output = self.fetch_and_process_file(file)
            if output:
                self.upload_output_obj(output, table_name="ride_data")
                self.mark_file_as_processed(file)
            else:
                print(f"No data processed for {file['file_name']}, skipping upload.")

    def fetch_and_process_file(self, file: pd.Series):
        url = f"https://s3.amazonaws.com/tripdata/{file['file_name']}"
        df_by_file = process_all_csvs_from_zip_url(url, locale=file["locale"])
        if not df_by_file:
            print(f"No data processed for {file['file_name']}, skipping upload.")
        else:
            output = apply_h3_latlng_to_cell(df_by_file)
        return output

    def upload_output_obj(self, output_obj: dict[str, pd.DataFrame], table_name: str):
        for key in output_obj.keys():
            print(f"Uploading {key} with {output_obj[key].shape[0]} records")
            self.upload_df(output_obj[key], table_name)

    def upload_df(self, df: pd.DataFrame, table_name: str, chunk_size=50000):
        for i in range(0, len(df), chunk_size):
            print(f"uploading chunk {i/ chunk_size}")
            chunk = df.iloc[i : i + 50000]
            self.bulk_insert_with_staging(chunk, table_name)

    def bulk_insert_with_staging(self, df: pd.DataFrame, table_name: str):
        """
        High-performance bulk insert using staging table and COPY with UPSERT capability.
        Updates existing records and inserts new ones.
        """
        if len(df) == 0:
            return {"inserted": 0, "updated": 0}

        logger.info(f"Bulk upserting {len(df)} records to {table_name}")

        with self.conn.cursor() as cur:
            # Create temporary staging table
            staging_table = f"staging_{table_name}_{int(datetime.now().timestamp())}"

            cur.execute(
                f"""
                CREATE TEMP TABLE {staging_table} (LIKE {table_name} INCLUDING ALL)
                ON COMMIT DROP
                """
            )

            # Use COPY to load data into staging table (fastest method)
            output = StringIO()
            df.to_csv(output, sep="\t", header=False, index=False, na_rep="\\N")
            output.seek(0)

            # Build COPY command with correct column list
            columns = ", ".join(df.columns)
            copy_sql = f"COPY {staging_table} ({columns}) FROM STDIN WITH (FORMAT csv, DELIMITER E'\\t', NULL '\\N')"

            cur.copy_expert(copy_sql, output)

            # Get all columns except the conflict resolution columns for the UPDATE SET clause
            all_columns = list(df.columns)
            conflict_columns = [
                "ride_id",
                "start_date",
                "locale",
            ]  # Your unique constraint columnsF
            update_columns = [col for col in all_columns if col not in conflict_columns]

            # Build the SET clause for updates
            set_clause = ", ".join(
                [f"{col} = EXCLUDED.{col}" for col in update_columns]
            )

            # Perform UPSERT operation
            upsert_sql = f"""
                INSERT INTO {table_name} ({columns})
                SELECT {columns} FROM {staging_table}
                ON CONFLICT (ride_id, start_date, locale) 
                DO UPDATE SET {set_clause}
            """

            cur.execute(upsert_sql)

            cur.execute(upsert_sql)
            total_processed = len(df)

            self.conn.commit()

            logger.info(
                f"Successfully processed {total_processed} records in {table_name}"
            )

            return {"total_processed": total_processed}

    def mark_file_as_processed(self, file_obj):
        print(f"Marking file {file_obj['file_name']} as completed")
        result = (
            supabase.table("processed_files")
            .upsert(
                get_processed_file_record(file_obj),
                on_conflict="file_name,locale",
            )
            .execute()
        )
        print(result)

In [84]:
connection_pspg2 = psycopg2.connect(
    user=user, password=password, host=host, port=port, dbname=dbname
)
conn_details = {
    "user": user,
    "password": password,
    "host": host,
    "port": port,
    "dbname": dbname,
}
new_files = pd.read_csv("./new_files_test.csv")
processor = BikeShareProcessor(conn_details)
processor.process_files_df(new_files)

Downloading ZIP file from: https://s3.amazonaws.com/tripdata/202404-citibike-tripdata.zip
Found 4 CSV file(s) in ZIP archive:
  - 202404-citibike-tripdata_4.csv
  - 202404-citibike-tripdata_1.csv
  - 202404-citibike-tripdata_2.csv
  - 202404-citibike-tripdata_3.csv

Processing: 202404-citibike-tripdata_4.csv
here
  ✓ Successfully processed 217063 rows

Processing: 202404-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202404-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202404-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

PROCESSING SUMMARY:
Successfully processed: 4 files
Failed: 0 files
[202404-citibike-tripdata_4.csv] entries: 217063
  - Removing 0 duplicate ride_id entries
[202404-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202404-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202404-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
Uploading 202404-citibike-tripdata_4.csv with 217063 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
Uploading 202404-citibike-tripdata_1.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202405-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202405-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202405-citibike-tripdata_5.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 133961 rows

Processing: 202405-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

PROCESSING SUMMARY:
Successfully processed: 5 files
Failed: 0 files
[202405-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202405-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202405-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202405-citibike-tripdata_5.csv] entries: 133961
  - Removing 0 duplicate ride_id entries
[202405-citibike-tripdata_4.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
Uploading 202405-citibike-tripdata_3.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 14.0
uploading chunk 15.0
uploading chunk 16.0
uploading chunk 17.0
up

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 783576 rows

Processing: 202406-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202406-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202406-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202406-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

PROCESSING SUMMARY:
Successfully processed: 5 files
Failed: 0 files
[202406-citibike-tripdata_5.csv] entries: 783576
  - Removing 0 duplicate ride_id entries
[202406-citibike-tripdata_4.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202406-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202406-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202406-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
Uploading 202406-citibike-tripdata_5.csv with 783576 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 14.0
uploading chunk 15.0
Uploading 202406-citibike-tripdata_4.csv with

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202407-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202407-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202407-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202407-citibike-tripdata_5.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 722896 rows

PROCESSING SUMMARY:
Successfully processed: 5 files
Failed: 0 files
[202407-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202407-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202407-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202407-citibike-tripdata_4.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202407-citibike-tripdata_5.csv] entries: 722896
  - Removing 0 duplicate ride_id entries
Uploading 202407-citibike-tripdata_2.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 14.0
uploading chunk 15.0
uploading chunk 16.0
uploading chunk 17.0
upl

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202408-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202408-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202408-citibike-tripdata_5.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 603575 rows

Processing: 202408-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

PROCESSING SUMMARY:
Successfully processed: 5 files
Failed: 0 files
[202408-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202408-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202408-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202408-citibike-tripdata_5.csv] entries: 603575
  - Removing 0 duplicate ride_id entries
[202408-citibike-tripdata_4.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
Uploading 202408-citibike-tripdata_3.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 14.0
uploading chunk 15.0
uploading chunk 16.0
uploading chunk 17.0
up

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202409-citibike-tripdata_5.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 997898 rows

Processing: 202409-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202409-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202409-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

PROCESSING SUMMARY:
Successfully processed: 5 files
Failed: 0 files
[202409-citibike-tripdata_4.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202409-citibike-tripdata_5.csv] entries: 997898
  - Removing 0 duplicate ride_id entries
[202409-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202409-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202409-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
Uploading 202409-citibike-tripdata_4.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 14.0
uploading chunk 15.0
uploading chunk 16.0
uploading chunk 17.0
up

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 150054 rows

Processing: 202410-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202410-citibike-tripdata_5.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202410-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202410-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202410-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

PROCESSING SUMMARY:
Successfully processed: 6 files
Failed: 0 files
[202410-citibike-tripdata_6.csv] entries: 150054
  - Removing 0 duplicate ride_id entries
[202410-citibike-tripdata_4.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202410-citibike-tripdata_5.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202410-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202410-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202410-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
Uploading 202410-citibike-tripdata_6.csv with 150054 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
Uploading 202410-citibike-tripdata_4.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202411-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202411-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202411-citibike-tripdata_4.csv
here
  ✓ Successfully processed 710134 rows

PROCESSING SUMMARY:
Successfully processed: 4 files
Failed: 0 files
[202411-citibike-tripdata_3.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202411-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202411-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202411-citibike-tripdata_4.csv] entries: 710134
  - Removing 0 duplicate ride_id entries
Uploading 202411-citibike-tripdata_3.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 14.0
uploading chunk 15.0
uploading chunk 16.0
uploading chunk 17.0
uploa

  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

Processing: 202412-citibike-tripdata_3.csv
here
  ✓ Successfully processed 311171 rows

Processing: 202412-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


here
  ✓ Successfully processed 1000000 rows

PROCESSING SUMMARY:
Successfully processed: 3 files
Failed: 0 files
[202412-citibike-tripdata_1.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
[202412-citibike-tripdata_3.csv] entries: 311171
  - Removing 0 duplicate ride_id entries
[202412-citibike-tripdata_2.csv] entries: 1000000
  - Removing 0 duplicate ride_id entries
Uploading 202412-citibike-tripdata_1.csv with 1000000 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
uploading chunk 4.0
uploading chunk 5.0
uploading chunk 6.0
uploading chunk 7.0
uploading chunk 8.0
uploading chunk 9.0
uploading chunk 10.0
uploading chunk 11.0
uploading chunk 12.0
uploading chunk 13.0
uploading chunk 14.0
uploading chunk 15.0
uploading chunk 16.0
uploading chunk 17.0
uploading chunk 18.0
uploading chunk 19.0
Uploading 202412-citibike-tripdata_3.csv with 311171 records
uploading chunk 0.0
uploading chunk 1.0
uploading chunk 2.0
uploading chunk 3.0
u

In [55]:
rows_50k = df.head(50000)

In [60]:
for i in range(0, len(df), 50000):
    print(f"uploading chunk {i}")
    chunk = df.iloc[i : i + 50000]
    processor.bulk_insert_with_staging(chunk, "ride_data")
# processor.bulk_insert_with_staging(rows_50k, "ride_data")

In [24]:
from sqlalchemy.dialects.postgresql import insert

def upsert_postgresql(df, table_name, engine, conflict_columns=['id']):
    """Insert with ON CONFLICT DO UPDATE - supports composite keys"""
    df.to_sql(table_name, engine, if_exists='append', index=False, 
              method=lambda pd_table, conn, keys, data_iter: 
              upsert_method_pg(pd_table, conn, keys, data_iter, conflict_columns))

def upsert_method_pg(pd_table, conn, keys, data_iter, conflict_columns):
    data = [dict(zip(keys, row)) for row in data_iter]
    stmt = insert(pd_table.table).values(data)
    
    # Create update dict excluding ALL conflict columns
    update_dict = {c.name: c for c in stmt.excluded if c.name not in conflict_columns}
    
    stmt = stmt.on_conflict_do_update(
        index_elements=conflict_columns,  # Pass list of columns
        set_=update_dict
    )
    conn.execute(stmt)


In [8]:
def process_file(file_obj):
    url = f"https://s3.amazonaws.com/tripdata/{file_obj['file_name']}"
    df_by_file = process_all_csvs_from_zip_url(url, locale="NYC")
    if not df_by_file:
        print(f"No data processed for {file_obj['file_name']}, skipping upload.")
        return
    output = apply_h3_latlng_to_cell(df_by_file)


    upload_results_with_guaranteed_retry(output, max_workers=3, batch_size=500)
    # Update supabase processed_files table
    print(f"Updating processed_files table for {file_obj['file_name']}")
    result = (
        supabase.table("processed_files")
        .upsert(
            get_processed_file_record(file_obj),
            on_conflict="file_name,locale",
        )
        .execute()
    )

In [9]:
import time
def log_connection_info():
    # Add this before each batch upload
    print(f"Timestamp: {time.time()}")
    # This will help correlate timing with your batch numbers

In [10]:
def process_files(file_df) -> None:
    for index, row in file_df.iterrows():
        process_file(row)

In [67]:
file_df = pd.read_csv("./new_files_test.csv")
process_files(file_df)