In [2]:
import sqlite3
import pandas as pd
import time
from urllib.parse import urlencode

After importing the necessary packages, start defining constants, including the base API URL for the raw data, the name of the database and table, and the batch size for loading data into the database.

In [3]:
# Constants
BASE_URL = "https://data.cityofnewyork.us/resource/h9gi-nx95.csv"
DB_PATH = "nyc_crashes.db"
TABLE_NAME = "crashes"
BATCH_SIZE = 50000

Next, define the connection to the database using the defined database name. Define more constants to initiate the while loop.

In [4]:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

total_fetched = 0
batch_num = 0

Let's start running the while loop. We start be defining the offset as batch_num * batch_size. Output which batch and records are being loaded for user verification and updates.
Next, try loading the csv using the base API URL with some parameter adders including ordering the data by latest crash date, the size of the batch and the offset. 

In [5]:
while True:
    offset = batch_num * BATCH_SIZE
    print(f"\nFetching batch {batch_num + 1}, records {offset + 1} to {offset + BATCH_SIZE} ...")

    params = {
        "$order": "crash_date DESC",
        "$limit": BATCH_SIZE,
        "$offset": offset
    }
    query_string = urlencode(params)
    url = f"{BASE_URL}?{query_string}"

    try:
        df = pd.read_csv(
            url,
            parse_dates=["crash_date"],
            dtype={"crash_time": str, "zip_code": str, "collision_id": str}  # Ensure collision_id is string
        )
        if df.empty:
            print("No more data to fetch, exiting.")
            break

        # Normalize column names: lowercase and replace spaces with underscores
        df.columns = [col.lower().replace(" ", "_") for col in df.columns]

        # Get existing collision_ids from DB to identify duplicates
        batch_collision_ids = df["collision_id"].astype(str).tolist()

        if batch_collision_ids:
            placeholders = ",".join("?" for _ in batch_collision_ids)
            query = f"SELECT collision_id FROM {TABLE_NAME} WHERE collision_id IN ({placeholders})"
            cursor.execute(query, batch_collision_ids)
            existing_ids = set(row[0] for row in cursor.fetchall())
        else:
            existing_ids = set()

        # Filter df to only unique rows (collision_id not in existing_ids)
        df_unique = df[~df["collision_id"].astype(str).isin(existing_ids)]

        if df_unique.empty:
            print("No unique records found in batch. Stopping fetch.")
            break

        if batch_num == 0:
            # Create table if not exists based on this batch's schema
            columns_with_types = []
            for col, dtype in df_unique.dtypes.items():
                if pd.api.types.is_integer_dtype(dtype):
                    col_type = "INTEGER"
                elif pd.api.types.is_float_dtype(dtype):
                    col_type = "REAL"
                elif pd.api.types.is_datetime64_any_dtype(dtype):
                    col_type = "TEXT"
                else:
                    col_type = "TEXT"
                columns_with_types.append(f"{col} {col_type}")

            create_table_sql = f"CREATE TABLE IF NOT EXISTS {TABLE_NAME} ({', '.join(columns_with_types)});"
            cursor.execute(create_table_sql)
            conn.commit()

            df_unique.to_sql(TABLE_NAME, conn, if_exists="append", index=False)
            print(f"Inserted first {len(df_unique)} unique records.")
        else:
            df_unique.to_sql(TABLE_NAME, conn, if_exists="append", index=False)
            print(f"Inserted {len(df_unique)} unique records.")

        total_fetched += len(df_unique)
        batch_num += 1

        time.sleep(1)  # Be polite to the API

    except pd.errors.EmptyDataError:
        print("No more data to fetch, exiting.")
        break
    except Exception as e:
        print(f"Error fetching batch at offset {offset}: {e}")
        break


Fetching batch 1, records 1 to 50000 ...
Error fetching batch at offset 0: too many SQL variables


In [53]:
print(f"\nFinished. Total records inserted: {total_fetched}")
conn.close()


Finished. Total records inserted: 200000
