In [1]:
from sqlalchemy import MetaData, Table, Column, String, Integer, Float, Date, DateTime, func, select
from sqlalchemy.engine.base import Engine
from common.connect import create_postgres_conn, create_aws_conn
from db.tables import create_date_table, create_crime_table, create_log_table
from db.helper import initialize_run_log
from utils.helper import generate_date_range
from dotenv import load_dotenv
import os

load_dotenv()

db_params = {
    "host": 'localhost',
    "port": '5433',
    "username": 'admin',
    "password": 'admin',
    "db": 'mydb',
}

engine = create_postgres_conn(**db_params)

In [3]:
initialize_run_log(engine, config={})

(14, None, datetime.date(2025, 8, 4))

In [2]:
meta = MetaData()
meta.reflect(engine)
print(meta.tables.keys())

# meta.drop_all(bind=engine, checkfirst=True)
# print(meta.tables.keys())

dict_keys(['pipeline_logs', 'date'])


In [5]:
log_table = meta.tables['pipeline_logs']
last_src_update = None
with engine.begin() as conn:
    st = select(func.max(log_table.c.ingested_at))
    result = conn.execute(st)
    last_src_update = result.scalar()

In [6]:
last_src_update

datetime.date(2025, 8, 3)

In [10]:
# Decide tables we need to create

# Log table
log_table = create_log_table(engine=engine)

# crime
crime_table = create_crime_table(engine=engine)

# police_stations
# : WILL BE HANDLED IN DBT

# ward_offices
# : WILL BE HANDLED IN DBT

# dim_date
date_table = create_date_table(engine=engine)


In [None]:
# Need a transformation layer before loading the data into the DB

About the source
- Frequency of update - Daily [based on updated_on attribute]

Incremental
- if db exists
- Check last updated_on date and fetch records past that date

Full load (work on this first)
- If db doesnt exist, perform full load
- set a base date to fetch records for full load maybe 2024-01-01
- Dag based full load (backfill) or full load within a dag run + incremental


Workflow

- Download all files into tmp

- Uncompress it

- json loads

- Transform

- sqlalchemy batch insert into postgres

In [6]:
from datetime import datetime, time
t = '2025-01-01'
datetime.strptime(t,"%Y-%m-%d").date()

datetime.date(2025, 1, 1)

In [4]:
import re
from datetime import datetime, timedelta
from pathlib import Path

# get data from s3, filter by ingest
aws_params = {
    "access_key" : os.getenv("AWS_ACCESS_KEY_ID"),
    "secret_access_key" : os.getenv("AWS_SECRET_ACCESS_KEY"),
    "region" : os.getenv("AWS_REGION")
}

s3_client = create_aws_conn(resource='s3', **aws_params)

TMP = Path("./tmp")
BUCKET_NAME = 'open-crime-etl'

"""Need to find a way to detect what is missing from the db to perform ingestion, airflow can take care of this, however we need to handle how we load it from s3 to db.
In other words, say the ingestion takes place weekly, you would query the api and load each page as batch and sort them by year/month. but say we fetch the next weekly batch, how do you plan on organizing that into s3.
## SOLVED BY ADDING A NEW NAMESPACE i.e `load_date=yyyy-mm-dd/` ##
"""
bucket = s3_client.Bucket(BUCKET_NAME)

# Scan pipeline_logs and fetch the last ingested_at with status = 'SUCCESS'
last_date = datetime(2025,7,27).date() # Fetch this from pipeline_logs where last ingested_at with status = 'SUCCESS'
today = datetime.now().date()
date_range = [(last_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((today - last_date).days+1)]
esc_date = "|".join(map(re.escape, date_range))

# Generate dates to add to regex pattern
regex = re.compile(rf"^raw/year=\d{{4}}/month=\d{{2}}/load_date=({esc_date})/.*")

# perform download here
for i in bucket.objects.filter(Prefix=f"raw/"):
    if regex.match(i.key):
        key = i.key.split("/")
        year = key[1]
        month = key[2]
        ingested_at = key[3]
        file = key[-1]

        # Create tmp directory
        if not TMP.exists():
            TMP.mkdir(parents=True, exist_ok=True)

        filename = f"{ingested_at.split('=')[-1]}_{year[-4:]}{month[-2:]}_{file}"
        bucket.download_file(i.key, (TMP / filename))
        break
    # break




In [7]:
# list(TMP.rglob("*.gz"))
TMP

PosixPath('tmp')

In [None]:
import gzip
import json
import pandas as pd
from db.tables import create_crime_table
from sqlalchemy import insert
from sqlalchemy.dialects.postgresql import insert as upsert
from pprint import pprint

# Stream approach, iterate each file -> uncompress -> load -> transform -> batch insert into db

df = None
for file in TMP.rglob("*.gz"):
    print(file.as_posix())
    # Unzip
    with gzip.open(file.as_posix(), 'rt') as f:
        # Load
        data = json.load(f)

        # Transform
        df = pd.DataFrame(data)

        col_drop = [
            ':@computed_region_awaf_s7ux',
            ':@computed_region_6mkv_f3dw',
            ':@computed_region_vrxf_vc4k',
            ':@computed_region_bdys_3d7i',
            ':@computed_region_43wa_7qmu',
            ':@computed_region_rpca_8um6',
            ':@computed_region_d9mm_jgwp',
            ':@computed_region_d3ds_rm58',
            ':@computed_region_8hcu_yrd4',
            'location',
            ':id',
            ':version',
            ':created_at',
            'year',
            'updated_on'
        ]
        
        rename_col = {
            'id' : 'crime_id',
            'case_number' : 'case',
            'date' : 'date_of_occurrence',
            'primary_type' : 'primary_description',
            'description' : 'secondary_description',
            ':updated_at' : 'source_updated_on'
        }
        
        # Drop
        df.drop(columns=col_drop, inplace=True)

        # Rename
        df.rename(columns=rename_col, inplace=True)

        # Handle Null
        df.where(pd.notnull(df), None, inplace=True)

        # Bulk insert (Check if tables exist, create them, then insert)
        meta = MetaData()
        meta.reflect(engine)

        # Check if table exists else create
        crime_table = create_crime_table(engine) if 'crime' not in meta.tables.keys() else meta.tables['crime']

        # Should I define a batch param for the insert or solely rely on the initial batchsize decided when ingesting and storing api data into s3? Would make sense, say, the db gets throttled with reads and you might want to limit the writes? so might want to adjust the batchsize for inserts?

        batchsize = 500

        key_columns = [pk_column.name for pk_column in crime_table.primary_key.columns.values()]
        for start in range(0, len(df), batchsize):
            batch = df.iloc[start : start + batchsize]
            with engine.begin() as conn:
                # Need to perform upsert instead of insert if crime_id exists

                st = upsert(crime_table).values(batch.to_dict(orient='records'))

                up_st = st.on_conflict_do_update(
                    index_elements = [crime_table.c.crime_id],

                    set_= {c.key: c for c in st.excluded if c.key not in key_columns}
                )

                conn.execute(up_st)
            
        # Checkpoint on success, might use sqlite3 for this

    # break

tmp/2025-08-03_202505_part-0011.json.gz
tmp/2025-08-02_202505_part-0034.json.gz
tmp/2025-08-03_202507_part-0002.json.gz
tmp/2025-08-03_202505_part-0033.json.gz
tmp/2025-08-03_202503_part-0001.json.gz
tmp/2025-08-02_202505_part-0031.json.gz
tmp/2025-08-03_202505_part-0025.json.gz
tmp/2025-08-02_202505_part-0005.json.gz
tmp/2025-08-03_202505_part-0045.json.gz
tmp/2025-08-03_202505_part-0030.json.gz
tmp/2025-08-03_202505_part-0018.json.gz
tmp/2025-08-03_202505_part-0043.json.gz
tmp/2025-08-03_202505_part-0008.json.gz
tmp/2025-08-02_202505_part-0007.json.gz
tmp/2025-08-02_202504_part-0001.json.gz
tmp/2025-08-02_202506_part-0004.json.gz
tmp/2025-08-03_202505_part-0037.json.gz
tmp/2025-08-03_202505_part-0041.json.gz
tmp/2025-08-02_202505_part-0012.json.gz
tmp/2025-08-02_202505_part-0024.json.gz
tmp/2025-08-02_202506_part-0003.json.gz
tmp/2025-08-02_202505_part-0009.json.gz
tmp/2025-08-03_202505_part-0002.json.gz
tmp/2025-08-02_202505_part-0014.json.gz
tmp/2025-08-02_202505_part-0010.json.gz
