In [None]:
import requests
import mysql.connector
from mysql.connector import pooling
import json
import time
import logging
from datetime import datetime

# Configuration
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'Pavilion227'
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_DATABASE = 'motor_vehicle_collisions'
TABLE_NAME = 'crashes'
APP_TOKEN = 'tUS0Z2WOHE7hPQfW5gPwvSE6r'

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

# DB connection pool
try:
    cnxpool = pooling.MySQLConnectionPool(
        pool_name="mypool",
        pool_size=5,
        user=MYSQL_USER,
        password=MYSQL_PASSWORD,
        host=MYSQL_HOST,
        port=MYSQL_PORT,
        database=MYSQL_DATABASE
    )
    logging.info("✅ Database connection pool created successfully.")
except mysql.connector.Error as err:
    logging.error(f"❌ Database connection failed: {err}")
    exit(1)

def fetch_data(offset=0, limit=1000):
    url = (
        "https://data.cityofnewyork.us/resource/h9gi-nx95.json?"
        f"$limit={limit}&$offset={offset}&$order=crash_date"
        f"&$where=crash_date between '2024-01-01T00:00:00' and '2024-12-31T23:59:59'"
    )
    headers = {"X-App-Token": APP_TOKEN}
    try:
        response = requests.get(url, headers=headers, timeout=30)
        if response.status_code == 200:
            logging.info(f"📦 Fetched batch at offset {offset}")
            return response.json()
        else:
            logging.error(f"❌ Failed API request: {response.status_code} - {response.text}")
            return []
    except Exception as e:
        logging.error(f"❌ Request error: {e}")
        return []

def flatten_record(record):
    return {
        'CRASH_DATE': record.get('crash_date'),
        'CRASH_TIME': record.get('crash_time'),
        'BOROUGH': record.get('borough'),
        'ZIP_CODE': int(record['zip_code']) if record.get('zip_code') and record['zip_code'].isdigit() else None,
        'LATITUDE': float(record['latitude']) if record.get('latitude') else None,
        'LONGITUDE': float(record['longitude']) if record.get('longitude') else None,
        'LOCATION': str(record.get('location')) if record.get('location') else None,
        'ON_STREET_NAME': record.get('on_street_name'),
        'CROSS_STREET_NAME': record.get('cross_street_name'),
        'OFF_STREET_NAME': record.get('off_street_name'),
        'NUMBER_OF_PERSONS_INJURED': int(float(record['number_of_persons_injured'])) if record.get('number_of_persons_injured') else 0,
        'NUMBER_OF_PERSONS_KILLED': float(record['number_of_persons_killed']) if record.get('number_of_persons_killed') else 0.0,
        'NUMBER_OF_PEDESTRIANS_INJURED': int(float(record['number_of_pedestrians_injured'])) if record.get('number_of_pedestrians_injured') else 0,
        'NUMBER_OF_PEDESTRIANS_KILLED': int(float(record['number_of_pedestrians_killed'])) if record.get('number_of_pedestrians_killed') else 0,
        'NUMBER_OF_CYCLIST_INJURED': int(float(record['number_of_cyclist_injured'])) if record.get('number_of_cyclist_injured') else 0,
        'NUMBER_OF_CYCLIST_KILLED': int(float(record['number_of_cyclist_killed'])) if record.get('number_of_cyclist_killed') else 0,
        'NUMBER_OF_MOTORIST_INJURED': int(float(record['number_of_motorist_injured'])) if record.get('number_of_motorist_injured') else 0,
        'NUMBER_OF_MOTORIST_KILLED': int(float(record['number_of_motorist_killed'])) if record.get('number_of_motorist_killed') else 0,
        'CONTRIBUTING_FACTOR_VEHICLE_1': record.get('contributing_factor_vehicle_1'),
        'CONTRIBUTING_FACTOR_VEHICLE_2': record.get('contributing_factor_vehicle_2'),
        'CONTRIBUTING_FACTOR_VEHICLE_3': record.get('contributing_factor_vehicle_3'),
        'CONTRIBUTING_FACTOR_VEHICLE_4': record.get('contributing_factor_vehicle_4'),
        'CONTRIBUTING_FACTOR_VEHICLE_5': record.get('contributing_factor_vehicle_5'),
        'COLLISION_ID': int(record['collision_id']) if record.get('collision_id') else None,
        'VEHICLE_TYPE_CODE_1': record.get('vehicle_type_code1'),
        'VEHICLE_TYPE_CODE_2': record.get('vehicle_type_code2'),
        'VEHICLE_TYPE_CODE_3': record.get('vehicle_type_code_3'),
        'VEHICLE_TYPE_CODE_4': record.get('vehicle_type_code_4'),
        'VEHICLE_TYPE_CODE_5': record.get('vehicle_type_code_5'),
    }

def insert_records(records):
    if not records:
        logging.info("⚠️ No records to insert.")
        return

    try:
        conn = cnxpool.get_connection()
        cursor = conn.cursor()

        insert_query = f"""
        INSERT INTO {TABLE_NAME} (
            CRASH_DATE, CRASH_TIME, BOROUGH, ZIP_CODE, LATITUDE, LONGITUDE, LOCATION,
            ON_STREET_NAME, CROSS_STREET_NAME, OFF_STREET_NAME, NUMBER_OF_PERSONS_INJURED,
            NUMBER_OF_PERSONS_KILLED, NUMBER_OF_PEDESTRIANS_INJURED, NUMBER_OF_PEDESTRIANS_KILLED,
            NUMBER_OF_CYCLIST_INJURED, NUMBER_OF_CYCLIST_KILLED, NUMBER_OF_MOTORIST_INJURED,
            NUMBER_OF_MOTORIST_KILLED, CONTRIBUTING_FACTOR_VEHICLE_1, CONTRIBUTING_FACTOR_VEHICLE_2,
            CONTRIBUTING_FACTOR_VEHICLE_3, CONTRIBUTING_FACTOR_VEHICLE_4, CONTRIBUTING_FACTOR_VEHICLE_5,
            COLLISION_ID, VEHICLE_TYPE_CODE_1, VEHICLE_TYPE_CODE_2, VEHICLE_TYPE_CODE_3,
            VEHICLE_TYPE_CODE_4, VEHICLE_TYPE_CODE_5
        ) VALUES (
            %(CRASH_DATE)s, %(CRASH_TIME)s, %(BOROUGH)s, %(ZIP_CODE)s, %(LATITUDE)s, %(LONGITUDE)s, %(LOCATION)s,
            %(ON_STREET_NAME)s, %(CROSS_STREET_NAME)s, %(OFF_STREET_NAME)s, %(NUMBER_OF_PERSONS_INJURED)s,
            %(NUMBER_OF_PERSONS_KILLED)s, %(NUMBER_OF_PEDESTRIANS_INJURED)s, %(NUMBER_OF_PEDESTRIANS_KILLED)s,
            %(NUMBER_OF_CYCLIST_INJURED)s, %(NUMBER_OF_CYCLIST_KILLED)s, %(NUMBER_OF_MOTORIST_INJURED)s,
            %(NUMBER_OF_MOTORIST_KILLED)s, %(CONTRIBUTING_FACTOR_VEHICLE_1)s, %(CONTRIBUTING_FACTOR_VEHICLE_2)s,
            %(CONTRIBUTING_FACTOR_VEHICLE_3)s, %(CONTRIBUTING_FACTOR_VEHICLE_4)s, %(CONTRIBUTING_FACTOR_VEHICLE_5)s,
            %(COLLISION_ID)s, %(VEHICLE_TYPE_CODE_1)s, %(VEHICLE_TYPE_CODE_2)s, %(VEHICLE_TYPE_CODE_3)s,
            %(VEHICLE_TYPE_CODE_4)s, %(VEHICLE_TYPE_CODE_5)s
        )
        ON DUPLICATE KEY UPDATE
            CRASH_DATE=VALUES(CRASH_DATE),
            CRASH_TIME=VALUES(CRASH_TIME),
            BOROUGH=VALUES(BOROUGH),
            ZIP_CODE=VALUES(ZIP_CODE),
            LATITUDE=VALUES(LATITUDE),
            LONGITUDE=VALUES(LONGITUDE)
        """

        cursor.executemany(insert_query, records)
        conn.commit()
        logging.info(f"✅ Inserted {cursor.rowcount} records.")
    except mysql.connector.Error as err:
        logging.error(f"❌ Database insert error: {err}")
    finally:
        cursor.close()
        conn.close()

def main():
    offset = 0
    batch_size = 1000
    max_batches = 50  # To limit total fetch (adjust as needed)

    for _ in range(max_batches):
        data = fetch_data(offset, batch_size)
        if not data:
            break
        flat_records = [flatten_record(record) for record in data]
        insert_records(flat_records)
        offset += batch_size
        time.sleep(1)  # Prevent rate limit issues

if __name__ == "__main__":
    main()