In [309]:
import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os
from datetime import datetime
from sqlalchemy import create_engine, text, MetaData, Table, Column, String, Integer, Date, ForeignKey
from sqlalchemy.dialects.postgresql import insert

In [310]:
def load_data_from_api(limit = 10000):
    client = Socrata("data.cityofnewyork.us", None)
    results = client.get("43nn-pn8j", limit=limit)
    # Convert to pandas DataFrame
    results_df = pd.DataFrame.from_records(results)
    return results_df

# First 2000 results, returned as JSON from API / converted to Python list of
# dictionaries by sodapy.
limit = 10000
results_df = load_data_from_api(limit)

# TODO: Use app_token to avoid throttling



In [311]:
results_df.head()

Unnamed: 0,camis,dba,boro,building,street,zipcode,phone,inspection_date,critical_flag,record_date,...,:@computed_region_sbqj_enih,:@computed_region_92fq_4b7q,cuisine_description,action,violation_code,violation_description,score,inspection_type,grade,grade_date
0,50177106,LAKAY BISTRO LLC,Brooklyn,55,RALPH AVENUE,11221,7184046099,1900-01-01T00:00:00.000,Not Applicable,2025-10-31T06:00:15.000,...,52,17,,,,,,,,
1,50164066,VINILE ITALIAN CHOPHOUSE,Manhattan,31,WEST 17 STREET,10011,6142842871,1900-01-01T00:00:00.000,Not Applicable,2025-10-31T06:00:15.000,...,7,10,,,,,,,,
2,50165820,AB STABLE LLC,Manhattan,301,PARK AVENUE,10022,5182826019,1900-01-01T00:00:00.000,Not Applicable,2025-10-31T06:00:15.000,...,10,51,,,,,,,,
3,50158367,TIPSY SCOOP,Queens,3815,23RD AVE,11105,3472959369,1900-01-01T00:00:00.000,Not Applicable,2025-10-31T06:00:15.000,...,72,4,,,,,,,,
4,50175756,QAHWAH TIME,Brooklyn,66,COURT STREET,11201,9297265837,1900-01-01T00:00:00.000,Not Applicable,2025-10-31T06:00:15.000,...,54,38,,,,,,,,


In [312]:
results_df.columns

Index(['camis', 'dba', 'boro', 'building', 'street', 'zipcode', 'phone',
       'inspection_date', 'critical_flag', 'record_date', 'latitude',
       'longitude', 'community_board', 'council_district', 'census_tract',
       'bin', 'bbl', 'nta', 'location', ':@computed_region_f5dn_yrer',
       ':@computed_region_yeji_bk3q', ':@computed_region_sbqj_enih',
       ':@computed_region_92fq_4b7q', 'cuisine_description', 'action',
       'violation_code', 'violation_description', 'score', 'inspection_type',
       'grade', 'grade_date'],
      dtype='object')

# Data preprocessing

In [313]:
def remove_unnecessary_columns(df):
    unnecessary_columns = [':@computed_region_f5dn_yrer', ':@computed_region_yeji_bk3q', ':@computed_region_sbqj_enih', ':@computed_region_92fq_4b7q']
    df.drop(columns=unnecessary_columns, inplace=True)
    return df

def preprocess_data(df):
    # replace '0' in the 'boro' column with 'Unknown'
    df['boro'] = df['boro'].replace('0', 'Unknown')
    # replace 'N/A' in 'violation_code' and 'violation_description' with 'No violation description available'
    df.fillna({'violation_code': 'N/A', 'violation_description': 'No violation description available'}, inplace=True)
    return df

def transform_date_columns(df, date_columns):
    for col in date_columns:
        df[col] = pd.to_datetime(df[col])
        df[col] = df[col].dt.strftime('%Y-%m-%d')
    return df

results_df = remove_unnecessary_columns(results_df)
results_df = preprocess_data(results_df)
results_df = transform_date_columns(results_df, ['inspection_date', 'grade_date', 'record_date'])

In [314]:
results_df.head()

Unnamed: 0,camis,dba,boro,building,street,zipcode,phone,inspection_date,critical_flag,record_date,...,nta,location,cuisine_description,action,violation_code,violation_description,score,inspection_type,grade,grade_date
0,50177106,LAKAY BISTRO LLC,Brooklyn,55,RALPH AVENUE,11221,7184046099,1900-01-01,Not Applicable,2025-10-31,...,BK35,"{'type': 'Point', 'coordinates': [-73.92384459...",,,,No violation description available,,,,
1,50164066,VINILE ITALIAN CHOPHOUSE,Manhattan,31,WEST 17 STREET,10011,6142842871,1900-01-01,Not Applicable,2025-10-31,...,MN13,"{'type': 'Point', 'coordinates': [-73.99339275...",,,,No violation description available,,,,
2,50165820,AB STABLE LLC,Manhattan,301,PARK AVENUE,10022,5182826019,1900-01-01,Not Applicable,2025-10-31,...,MN19,"{'type': 'Point', 'coordinates': [-73.97435784...",,,,No violation description available,,,,
3,50158367,TIPSY SCOOP,Queens,3815,23RD AVE,11105,3472959369,1900-01-01,Not Applicable,2025-10-31,...,QN72,"{'type': 'Point', 'coordinates': [-73.90882428...",,,,No violation description available,,,,
4,50175756,QAHWAH TIME,Brooklyn,66,COURT STREET,11201,9297265837,1900-01-01,Not Applicable,2025-10-31,...,BK09,"{'type': 'Point', 'coordinates': [-73.99129146...",,,,No violation description available,,,,


# Splitting data into individual dataframes before table creation

In [315]:
def create_violation_codes_df(df):
    violation_codes_df = df[['violation_code', 'violation_description']].drop_duplicates(subset=['violation_code']).reset_index(drop=True)
    return violation_codes_df

def create_cuisines_df(df):
    cuisines_df = df[['cuisine_description']].drop_duplicates().dropna().reset_index(drop=True)
    return cuisines_df

def create_restaurants_df(df):
    restaurants_df = df[['dba', 'camis', 'building', 'street', 'zipcode', 'boro', 'phone', 'cuisine_description']].drop_duplicates(subset=['dba', 'camis']).reset_index(drop=True)
    return restaurants_df

violation_codes_df = create_violation_codes_df(results_df)
cuisines_df = create_cuisines_df(results_df)
restaurants_df = create_restaurants_df(results_df)

# Creating the SQL Schema and inserting data

In [316]:
# Created a Supabase DB to store restaurant and violation code data

def connect_to_db():
    load_dotenv(override=True)
    PASSWORD = os.getenv("DB_PASSWORD")
    DB_URL = os.getenv("DB_URL")
    connection_string = f'postgresql://postgres:{PASSWORD}@db.{DB_URL}.supabase.co:5432/postgres'
    return create_engine(connection_string, connect_args={"sslmode": "require"})

engine = connect_to_db()

try:
    engine.connect()
    print("Connected to DB successfully!")
except Exception as e:
    print(e)

def insert_table(df, table_name, primary_key):
    with engine.connect() as conn:
        conn.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;"))
        conn.commit()

    df.to_sql(table_name, engine, if_exists='append', index=False)

    with engine.connect() as conn:
        conn.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({primary_key});"))
        conn.commit()

RESTAURANT_TABLE = 'restaurants'
VIOLATION_CODE_TABLE = 'violation_codes'
RESTAURANT_PRIMARY_KEY = 'camis'
VIOLATION_CODE_PRIMARY_KEY = 'violation_code'
MAPPING_TABLE = 'rest_to_violations'

insert_table(restaurants_df, RESTAURANT_TABLE, RESTAURANT_PRIMARY_KEY)
insert_table(violation_codes_df, VIOLATION_CODE_TABLE, VIOLATION_CODE_PRIMARY_KEY)


Connected to DB successfully!


In [317]:
# Table and row deletion helper functions
def delete_rows_from_table(table_name):
    delete_sql_query = f"""
    DELETE FROM {table_name};
    """

    with engine.connect() as conn:
        conn.execute(text(delete_sql_query))
        conn.commit()

def drop_table_if_exists(table_name):
    drop_sql_query = f"""
    DROP TABLE IF EXISTS {table_name} CASCADE;
    """

    with engine.connect() as conn:
        conn.execute(text(drop_sql_query))

def get_all_rows(table_name):
    query = text(f"SELECT * FROM {table_name};")

    with engine.connect() as conn:
        return conn.execute(query).fetchall()

In [318]:
# drop_table_if_exists(MAPPING_TABLE)

create_sql_query = f"""
CREATE TABLE IF NOT EXISTS {MAPPING_TABLE} (
    camis TEXT REFERENCES {RESTAURANT_TABLE}(camis),
    violation_code TEXT REFERENCES {VIOLATION_CODE_TABLE}(violation_code),
    inspection_date DATE DEFAULT '1900-01-01',
    action TEXT DEFAULT NULL,
    critical_flag TEXT DEFAULT 'Not Applicable',
    grade TEXT DEFAULT NULL,
    score INTEGER DEFAULT NULL,
    grade_date DATE DEFAULT NULL,
    record_date DATE DEFAULT CURRENT_DATE,
    inspection_type TEXT DEFAULT NULL,
    PRIMARY KEY (camis, violation_code, inspection_date)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_sql_query))
    conn.commit()

print("restaurants to violations mapping table created")

restaurants to violations mapping table created


In [319]:
metadata = MetaData()

restaurant_to_violations = Table(
    'rest_to_violations', metadata,
    Column('camis', String, ForeignKey('restaurants.camis'), primary_key=True),
    Column('violation_code', String, ForeignKey('violations.violation_code'), primary_key=True),
    Column('inspection_date', String, primary_key=True),
    Column('grade', String),
    Column('score', Integer),
    Column('grade_date', Date),
    Column('record_date', Date),
    Column('inspection_type', String)
)

metadata.create_all(engine)

# extract only columns relevant to the mapping table

features = results_df.columns.tolist()
features = [
    'camis',
    'inspection_date',
    'action',
    'violation_code',
    'critical_flag',
    'score',
    'grade',
    'grade_date',
    'record_date',
    'inspection_type'
]

mapping_df = results_df[features]
mapping_df

Unnamed: 0,camis,inspection_date,action,violation_code,critical_flag,score,grade,grade_date,record_date,inspection_type
0,50177106,1900-01-01,,,Not Applicable,,,,2025-10-31,
1,50164066,1900-01-01,,,Not Applicable,,,,2025-10-31,
2,50165820,1900-01-01,,,Not Applicable,,,,2025-10-31,
3,50158367,1900-01-01,,,Not Applicable,,,,2025-10-31,
4,50175756,1900-01-01,,,Not Applicable,,,,2025-10-31,
...,...,...,...,...,...,...,...,...,...,...
9995,50069635,2025-05-06,Violations were cited in the following area(s).,02B,Critical,22,B,2025-05-06,2025-10-31,Cycle Inspection / Re-inspection
9996,50067806,2024-12-13,Violations were cited in the following area(s).,20-08,Not Critical,,,,2025-10-31,Administrative Miscellaneous / Initial Inspection
9997,41476556,2023-05-10,Violations were cited in the following area(s).,10F,Not Critical,2,A,2023-05-10,2025-10-31,Cycle Inspection / Initial Inspection
9998,50105185,2023-08-24,Establishment Closed by DOHMH. Violations were...,06C,Critical,35,,,2025-10-31,Cycle Inspection / Re-inspection


In [320]:
# mapping_df.to_sql(MAPPING_TABLE, con=engine, if_exists='append', index=False)

def upsert_ignore(table, conn, keys, data_iter):
    """
    Wrapper for pd.to_sql to ignore duplicate key conflicts.
    """
    data = [dict(zip(keys, row)) for row in data_iter]
    stmt = insert(table.table).values(data)
    stmt = stmt.on_conflict_do_nothing(
        index_elements=["camis", "violation_code", "inspection_date"]
    )
    conn.execute(stmt)

# convert mapping_df to sql and insert into mapping table
mapping_df.to_sql(
    MAPPING_TABLE,
    con=engine,
    if_exists="append",
    index=False,
    method=upsert_ignore
)

with engine.connect() as conn:
    rows = conn.execute(text(f"SELECT * FROM {MAPPING_TABLE};")).fetchall()

if len(rows) <= limit:
    print(f"Note: Expected {limit} rows, but got {len(rows)}. Duplicate rows are ignored by default")
else:
    print("Success: All rows were inserted into the mapping table")

Note: Expected 10000 rows, but got 9999. Duplicate rows are ignored by default
