# ETL
This notebook contains exploration and development of the ETL 
pipeline that will become the script at us_accidents/ETL/ETL.py.

In [6]:
import polars as pl
import pandas as pd
import os
import zipfile
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

from us_accidents import PROJECT_ROOT
os.chdir(PROJECT_ROOT)

user = "us_accidents_admin"
password = os.environ.get("US_ACCIDENTS_ADMIN_PASSWORD")
db_host = 'localhost:5432'
db_name = 'us_accidents'
DB_URI = f"postgresql://{user}:{password}@{db_host}/{db_name}"

# postgres://user:secret@localhost:5432/mydatabasename


In [7]:
assert os.path.exists(os.path.join(PROJECT_ROOT, "data", "us-accidents.zip"))

if not os.path.exists(os.path.join(PROJECT_ROOT, "data", "US_Accidents_March23.csv")):
    logger.info("Unzipping the file")
    with zipfile.ZipFile('data/us-accidents.zip', 'r') as zip_ref:
        zip_ref.extractall('data/')
else:
    logger.info("File already unzipped")

INFO:__main__:File already unzipped


In [8]:
df = pl.scan_csv("data/US_Accidents_March23.csv", try_parse_dates=True)

df = df.select(
    # pl.col("ID"),
    pl.col("Severity").alias('severity'),
    pl.col("Start_Time").alias('datetime'),
    pl.col("Start_Lat").alias('lat'),
    pl.col("Start_Lng").alias('lng'),
    pl.col("Weather_Condition").alias('weather_condition'),
    pl.col("City"),
    pl.col("State"),
    pl.col("County")
)

In [9]:
print(df.head(3).collect().to_pandas().to_markdown(index=False))

|   severity | datetime            |     lat |      lng | weather_condition   | City         | State   | County     |
|-----------:|:--------------------|--------:|---------:|:--------------------|:-------------|:--------|:-----------|
|          3 | 2016-02-08 05:46:00 | 39.8651 | -84.0587 | Light Rain          | Dayton       | OH      | Montgomery |
|          2 | 2016-02-08 06:07:59 | 39.9281 | -82.8312 | Light Rain          | Reynoldsburg | OH      | Franklin   |
|          2 | 2016-02-08 06:49:27 | 39.0631 | -84.0326 | Overcast            | Williamsburg | OH      | Clermont   |


In [10]:
print(
    pd.DataFrame(list(zip(*df.collect_schema().to_python().items()))).T.to_markdown(index=False)
)

| 0                 | 1                           |
|:------------------|:----------------------------|
| severity          | <class 'int'>               |
| datetime          | <class 'datetime.datetime'> |
| lat               | <class 'float'>             |
| lng               | <class 'float'>             |
| weather_condition | <class 'str'>               |
| City              | <class 'str'>               |
| State             | <class 'str'>               |
| County            | <class 'str'>               |


In [11]:
# Develop a table for cities.

# Create a lazy frame grouped by city, state, and county
cities_df = df.select(
    pl.col('City'),
    pl.col('State'),
    pl.col('County')
).unique().with_row_index(name='city_id')

print(cities_df.head().collect().to_pandas().to_markdown(index=False))


|   city_id | City            | State   | County     |
|----------:|:----------------|:--------|:-----------|
|         0 | Davidsville     | PA      | Somerset   |
|         1 | Fairfield       | CT      | Fairfield  |
|         2 | North Highlands | CA      | Sacramento |
|         3 | Peoria          | AZ      | Maricopa   |
|         4 | Brookeville     | MD      | Montgomery |


In [12]:
# So there are lots of cities with the same name in different states/counties.
cities_df.group_by('City').agg(pl.len().alias('count')).sort('count', descending=True).head(5).collect()

City,count
str,u32
"""Washington""",32
"""Salem""",31
"""Franklin""",31
"""Clinton""",28
"""Greenville""",27


In [13]:
cities_df.head(3).collect()

city_id,City,State,County
u32,str,str,str
0,"""Clarissa""","""MN""","""Todd"""
1,"""Stuart""","""FL""","""Martin"""
2,"""Mccomb""","""MS""","""Pike"""


In [14]:
# Does the name, county, and state uniquely identify a city?
# cities_df.group_by('name', 'county', 'state').agg(pl.len().alias('count')).sort('count', descending=True).head(5).collect()
# Yes.

In [15]:
# Does the name and state uniquely identify a city?
(
    cities_df
    .group_by('City', 'State')
    .agg(pl.len().alias('count'))
    .sort('count', descending=True)
    .head(5)
    .collect()
)
# No.


City,State,count
str,str,u32
"""Fredericksburg""","""VA""",8
"""Atlanta""","""GA""",7
"""Alexandria""","""VA""",6
"""Newport News""","""VA""",6
"""Lynchburg""","""VA""",6


In [16]:
# Are county names unique?
cities_df.select(
    pl.col('County'),
    pl.col('State')
).group_by('County').agg(pl.len()).sort('len', descending=True).head(5).collect()

County,len
str,u32
"""Washington""",289
"""Montgomery""",248
"""Jefferson""",247
"""Franklin""",198
"""Lake""",169


In [17]:
# Are city, county combos unique?
(
    cities_df
    .group_by('City', 'County')
    .agg(pl.len().alias('count'))
    .sort('count', descending=True)
    .head(5)
    .collect()
)


City,County,count
str,str,u32
"""Madison""","""Madison""",7
"""Carrollton""","""Carroll""",4
"""York""","""York""",4
"""Washington""","""Washington""",4
"""Fayetteville""","""Fayette""",3


In [18]:
# Confirm that all state codes are 2 characters.
df.select(pl.col('State').str.len_bytes().alias('len')).unique().collect()

len
u32
2


In [19]:
df = (
    df.join(
        cities_df,
        on=['City', 'County', 'State'], 
        how='left'
    )
    .select(pl.exclude(['City', 'County', 'State']))
)

In [20]:
print(df.head(5).collect().to_pandas().to_markdown(index=False))

|   severity | datetime            |     lat |      lng | weather_condition   |   city_id |
|-----------:|:--------------------|--------:|---------:|:--------------------|----------:|
|          3 | 2016-02-08 05:46:00 | 39.8651 | -84.0587 | Light Rain          |     17397 |
|          2 | 2016-02-08 06:07:59 | 39.9281 | -82.8312 | Light Rain          |     10484 |
|          2 | 2016-02-08 06:49:27 | 39.0631 | -84.0326 | Overcast            |      8302 |
|          3 | 2016-02-08 07:23:34 | 39.7478 | -84.2056 | Mostly Cloudy       |     17397 |
|          2 | 2016-02-08 07:39:07 | 39.6278 | -84.1884 | Mostly Cloudy       |     17397 |


In [22]:
#df.collect().write_database(table_name='accidents', connection=DB_URI)
# ~10 min.

In [None]:
# Write the cities too
#cities_df.collect().write_database(table_name="cities", connection=DB_URI)

In [148]:
# Show null proportion for each column.
df.select(pl.col("*")).null_count().collect() 

In [None]:
# Clean up
os.remove("data/US_Accidents_March23.csv")