In [1]:
import requests
import sqlite3
import pandas as pd
import numpy as np
import time
from datetime import datetime

# Extraction

## Open Meteo API Call

https://open-meteo.com/en/docs/historical-forecast-api?location_mode=csv_coordinates&hourly=&daily=temperature_2m_max,temperature_2m_min,precipitation_sum&temperature_unit=fahrenheit

In [2]:
def openMeteo_APICall(latitude, longitude, start_date, end_date):

    url = "https://archive-api.open-meteo.com/v1/archive"

    params = {
        "latitude": latitude,
        "longitude": longitude,
	    "start_date": start_date,
        "end_date": end_date,
        "daily": ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "rain_sum", "snowfall_sum"],
        "temperature_unit": "fahrenheit",
        "timezone": "America/New_York" #Eastern Time
    }

    response = requests.get(url, params=params, timeout=30)
    data = response.json()
    

    if response.status_code == 200: #OK
        if not "error" in data:
            return data
        else:
            print("Response Error: ", data['error'], data['reason'])
    else:
        print("HTTP Call Error:", response.status_code, response.text)
        return None

In [None]:
weather_data = openMeteo_APICall(33.8034, -84.3963, "2018-01-01", "2023-12-31")

## NHTSA API By Location Call

https://crashviewer.nhtsa.dot.gov/CrashAPI

In [3]:
def NHTSA_APICall(stateCode, countyCode, startYear, endYear):
    
    url = f"https://crashviewer.nhtsa.dot.gov/CrashAPI/crashes/GetCrashesByLocation?fromCaseYear={startYear}&toCaseYear={endYear}&state={stateCode}&county={countyCode}&format=json"
    
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
    }

    response = requests.get(url, headers=headers, timeout=30)
    data = response.json()
    
    if response.status_code == 200: #OK
        return data
    else:
        print("Error:", response.status_code, response.text)
        return None

In [None]:
crash_data = NHTSA_APICall(13, 121, 2018, 2023)

## NHTSA API Case Specifics Call

In [4]:
def NHTSA_GetCaseSpecifics(db, stateCode, year):

    conn = sqlite3.connect(db)

    crash_df = pd.read_sql_query("SELECT * FROM crashes", conn)

    caseSpecDB = pd.DataFrame(columns=["state_case","year","month","day"])
    tempdb = pd.read_sql_query(f"SELECT name FROM sqlite_master WHERE type='table' AND name='case_specifics'", conn)
    
    if len(tempdb["name"]) > 0: #case specifics table already exists in database
        caseSpecDB = pd.read_sql_query("SELECT * FROM case_specifics", conn)

        for scase in crash_df["state_case"]:
            if scase in caseSpecDB["state_case"].unique():
                print(f"Skipping case {scase}, as it is in the dataset already")
            else:
                url = f"https://crashviewer.nhtsa.dot.gov/CrashAPI/crashes/GetCaseDetails?stateCase={scase}&caseYear={year}&state={stateCode}&format=json"

                new_df = NHTSA_CaseSpec_APICall(url, scase)
                caseSpecDB = pd.concat([caseSpecDB, new_df])

        return caseSpecDB
    
def NHTSA_CaseSpec_APICall(url, scase):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
    }

    time.sleep(3) #Being cautious since this will be sending multiple API calls in rapid succession when new crash specifics emerge

    response = requests.get(url, headers=headers, timeout=30)
    data = response.json()

    new_df = pd.DataFrame([[scase, None, None, None]])
    if data["Count"] == 1: #Some cases don't have detailed information released yet, in which case the API will return a count of 0
        new_df = pd.DataFrame([[
        scase,
        data["Results"][0][0]['CrashResultSet']['YEAR'],
        data["Results"][0][0]['CrashResultSet']['MONTH'],
        data["Results"][0][0]['CrashResultSet']['DAY']
        ]], columns = ["state_case","year","month","day"])

        print(f"Successfully got date for {scase}")

    return new_df


In [None]:
casespecs_db = NHTSA_GetCaseSpecifics("crashinfo.db", 13, 2023)

In [None]:
scase = "130112"
url = f"https://crashviewer.nhtsa.dot.gov/CrashAPI/crashes/GetCaseDetails?stateCase={scase}&caseYear=2023&state=13&format=json"

temp_df = NHTSA_CaseSpec_APICall(url, scase)
temp_df

## OpenAQ API Call

In [None]:
from openaq import OpenAQ

def OpenAQ_Location_APICall(latitude, longitude, radius, start_date, end_date):

    client = OpenAQ(api_key="7755a70a98b5f75b8d6c20c291ea8d3bec9a8b18542a10126358c7d19f49a75a")
    response = client.locations.list(
        coordinates = [latitude, longitude],
        radius = radius,
        limit = 1000
    )
    #client.close()

    return response

In [None]:
latitude = 33.8034
longitude = -84.3963
openAQ = OpenAQ_Location_APICall(latitude, longitude, 10000, "2023-01-01", "2023-01-17")

In [57]:
for loc in openAQ.results:
    print(loc.id, loc.name, loc.coordinates, loc.sensors)

1951 United Ave Coordinates(latitude=33.7206, longitude=-84.3578) [SensorBase(id=3451, name='o3 ppm', parameter=ParameterBase(id=10, name='o3', units='ppm', display_name='O₃')), SensorBase(id=3453, name='pm25 µg/m³', parameter=ParameterBase(id=2, name='pm25', units='µg/m³', display_name='PM2.5')), SensorBase(id=3452, name='so2 ppm', parameter=ParameterBase(id=9, name='so2', units='ppm', display_name='SO₂'))]
1972 NR-Georgia Tech Coordinates(latitude=33.778315, longitude=-84.391418) [SensorBase(id=5077788, name='co ppm', parameter=ParameterBase(id=8, name='co', units='ppm', display_name='CO')), SensorBase(id=5077750, name='no ppm', parameter=ParameterBase(id=35, name='no', units='ppm', display_name='NO')), SensorBase(id=5077563, name='no2 ppm', parameter=ParameterBase(id=7, name='no2', units='ppm', display_name='NO₂')), SensorBase(id=5077742, name='nox ppm', parameter=ParameterBase(id=19840, name='nox', units='ppm', display_name='NOx')), SensorBase(id=5077784, name='pm25 µg/m³', paramet

In [64]:
def OpenAQ_Sensor_APICall(sensorID, limit):
    
    client = OpenAQ(api_key="7755a70a98b5f75b8d6c20c291ea8d3bec9a8b18542a10126358c7d19f49a75a")
    response = client.measurements.list(
        sensors_id=sensorID,
        data="days",
        limit=limit,
        page=3
    )

    return response

In [65]:
sensorData = OpenAQ_Sensor_APICall(1972, 1000) #ga tech ozone sensor

In [66]:
for item in sensorData.results:
    print(item.period.datetime_from.local, item.value)

2021-12-29T00:00:00-06:00 0.0197
2021-12-30T00:00:00-06:00 0.0162
2021-12-31T00:00:00-06:00 0.0211
2022-01-01T00:00:00-06:00 0.0218
2022-01-02T00:00:00-06:00 0.0187
2022-01-03T00:00:00-06:00 0.0292
2022-01-04T00:00:00-06:00 0.0201
2022-01-05T00:00:00-06:00 0.0215
2022-01-06T00:00:00-06:00 0.0219
2022-01-07T00:00:00-06:00 0.025
2022-01-08T00:00:00-06:00 0.0285
2022-01-09T00:00:00-06:00 0.0254
2022-01-10T00:00:00-06:00 0.0288
2022-01-11T00:00:00-06:00 0.0138
2022-01-12T00:00:00-06:00 0.0145
2022-01-13T00:00:00-06:00 0.0256
2022-01-14T00:00:00-06:00 0.0277
2022-01-15T00:00:00-06:00 0.0253
2022-01-16T00:00:00-06:00 0.018
2022-01-17T00:00:00-06:00 0.0236
2022-01-18T00:00:00-06:00 0.0223
2022-01-19T00:00:00-06:00 0.0309
2022-01-20T00:00:00-06:00 0.0247
2022-01-21T00:00:00-06:00 0.0245
2022-01-22T00:00:00-06:00 0.0272
2022-01-23T00:00:00-06:00 0.0192
2022-01-24T00:00:00-06:00 0.0258
2022-01-25T00:00:00-06:00 0.025
2022-01-26T00:00:00-06:00 0.0315
2022-01-27T00:00:00-06:00 0.034
2022-01-28T00:

# Transformation

In [77]:
def transform_weather(data):
    weather_df = pd.DataFrame({
                    "latitude": data["latitude"],
                    "longitude": data["longitude"],
                    "date": data["daily"]["time"],
                    "temp_max_F": data["daily"]["temperature_2m_max"],
                    "temp_min_F": data["daily"]["temperature_2m_min"],
                    "precip_sum": data["daily"]["precipitation_sum"],
                    "rain_sum": data["daily"]["rain_sum"],
                    "snowfall_sum": data["daily"]["snowfall_sum"]
                })
    return weather_df

def transform_accidents(data):
    accident_df = pd.DataFrame.from_dict(data['Results'][0])

    accident_df = accident_df.drop(columns=["CITY","COUNTY","STATE","TWAY_ID2","VE_FORMS"])

    accident_df = accident_df.rename(columns={
        "CITYNAME": "city",
        "COUNTYNAME": "county",
        "CaseYear": "year",
        "FATALS": "fatals",
        "LATITUDE": "latitude",
        "LONGITUD": "longitude",
        "STATENAME": "state",
        "ST_CASE": "state_case",
        "TOTALVEHICLES": "vehicles",
        "TWAY_ID": "road_occurred"
    })

    return accident_df

def transform_ozone_measure(data):
    ozone_df = pd.DataFrame(columns=["datetime","ozone_ppm"])
    for item in data.results:
        date = item.period.datetime_from.local[:10]

        new_df = pd.DataFrame([[
            date,
            item.value
        ]], columns=["datetime","ozone_ppm"])

        ozone_df = pd.concat([ozone_df, new_df])
    return ozone_df

In [None]:
weather_df = transform_weather(weather_data)
crash_df = transform_accidents(crash_data)

In [78]:
ozone_df = transform_ozone_measure(sensorData)
ozone_df

  ozone_df = pd.concat([ozone_df, new_df])


Unnamed: 0,datetime,ozone_ppm
0,2021-12-29,0.0197
0,2021-12-30,0.0162
0,2021-12-31,0.0211
0,2022-01-01,0.0218
0,2022-01-02,0.0187
...,...,...
0,2024-11-13,0.0194
0,2024-11-14,0.0233
0,2024-11-15,0.0225
0,2024-11-16,0.0231


# Loading To SQLite

In [None]:
def load_to_database(weather_df, crash_df, casespecs_df, ozone_df, db="crashinfo.db"):
    conn = sqlite3.connect(db)

    weather_df.to_sql("weather", conn, if_exists="replace", index=False)
    print("Weather (weather) data loaded into SQLite")
    crash_df.to_sql("crashes", conn, if_exists="replace", index=False)
    print("Crash data (crashes) loaded into SQLite")
    casespecs_df.to_sql("case_specifics", conn, if_exists="replace", index=False)
    print("Case Specific (case_specifics) data loaded into SQLite")
    ozone_df.to_sql("gtech_ozone", conn, if_exists="replace", index=False)
    print("Georgia Tech Ozone (gtech_ozone) data loaded into SQLite")

    conn.close()

In [None]:
load_to_database(weather_df, crash_df, casespecs_db, db="crashinfo.db")

# Accessing SQLite

In [80]:
conn = sqlite3.connect("crashinfo.db")
cursor = conn.cursor()

In [None]:
pd.read_sql_query("SELECT * FROM crashes", conn)

In [None]:
tempdb = pd.read_sql_query(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{"weather"}'", conn)

if len(tempdb["name"]) > 0:
    print("Table exists.")
else:
    print("Table does not exist.")

# Testing

In [None]:
url = f"https://crashviewer.nhtsa.dot.gov/CrashAPI/crashes/GetCaseDetails?stateCase=130035&caseYear=2023&state=13&format=json"

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}

response = requests.get(url, headers=headers, timeout=30)
data = response.json()

In [None]:
case_specifics = pd.DataFrame([[1,2,3,4]], columns=["state_case","year","month","day"])

new_df = pd.DataFrame([[
    "130035",
    data["Results"][0][0]['CrashResultSet']['YEAR'],
    data["Results"][0][0]['CrashResultSet']['MONTH'],
    data["Results"][0][0]['CrashResultSet']['DAY']
]], columns = ["state_case","year","month","day"])

case_specifics = pd.concat([case_specifics, new_df])

case_specifics


In [None]:
caseSpecDB = pd.read_sql_query("SELECT * FROM case_specifics", conn)
crashDB = pd.read_sql_query("SELECT * FROM crashes", conn)
for item in crashDB["state_case"]:
    if item not in caseSpecDB["state_case"].unique():
        print(item)

In [None]:
weatherDB = pd.read_sql_query("SELECT * FROM weather", conn)
weatherDB

In [None]:
caseSpecDB = pd.read_sql_query("SELECT * FROM case_specifics", conn)
caseSpecDB

In [81]:
ozoneDB = pd.read_sql_query("SELECT * FROM gtech_ozone", conn)
ozoneDB

Unnamed: 0,datetime,ozone_ppm
0,2021-12-29,0.0197
1,2021-12-30,0.0162
2,2021-12-31,0.0211
3,2022-01-01,0.0218
4,2022-01-02,0.0187
...,...,...
995,2024-11-13,0.0194
996,2024-11-14,0.0233
997,2024-11-15,0.0225
998,2024-11-16,0.0231


To do:
expand ozone and weather datasets to bring in more information
create the views to draw conclusions between vehicle crashes, weather conditions, and ozone levels
analyze database schema (look at how to make relations)
implement the running of this program on a batch pipeline with tooling
add commentary to ETL_Functions and pipeline
add README file explaining the project and parts of it
move api key to separate txt file and import it in