# NYC Department of Finance
## 2023 Parking and Camera Violations
---

# Idea
- View by parking violation
- Include definition and listed fine amounts
- Display # violations & total $ paid in 2023
- Waterfall breakdown of financial data
- Heat map of # violations per hour per day of week
- Include # violations per month

# Dependencies

In [None]:
from collections import Counter
import json
import os
from pathlib import Path
import time

import numpy as np
import pandas as pd
import plotly.colors as pc
import plotly.io as pio
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import requests

from models import Violation


pio.templates.default = 'plotly_dark'

NYC_OPEN_DATA_TOKEN = os.getenv("NYC_OPEN_DATA_TOKEN")

DATA_DIR = Path('data')

END_DATE = pd.Timestamp(2024, 1, 1)
TIME_DELTA = pd.Timedelta(days=1)


# Functions

In [None]:
def fetch_violation_amounts_by_issue_date(date: pd.Timestamp, limit: int = 100_000, offset: int = 0, year: int = 2023, token: str = NYC_OPEN_DATA_TOKEN) -> list[dict]:
    date_iso = date.isoformat(timespec='milliseconds')
    date_us = date.strftime('%m/%d/%Y')

    url = "https://data.cityofnewyork.us/resource/nc67-uf89.json"
    params = {
        "$where": f"issue_date in('{date_us}', '{date_iso}')",
        "$select": "summons_number, issue_date, violation_time, violation, fine_amount, penalty_amount, interest_amount, reduction_amount, payment_amount, amount_due, violation_status, license_type, state, issuing_agency",
        "$order": "summons_number ASC",
        "$limit": limit,
        "$offset": offset
    }
    headers = {"X-App-Token": token}

    response = requests.get(url, params=params, headers=headers)
    response.raise_for_status()
    return response.json()


def persist_as_parquet(response_json, filepath: Path, overwrite: bool = False) -> None:
    if filepath.exists() and not overwrite:
        raise FileExistsError(f"The filename {filepath.name!r} already exists in directory {filepath.parent!r}!"
                              "To overwrite file, set `overwrite=True`.")
    df_json = pd.DataFrame(response_json)
    df_json.to_parquet(filepath, index=False, compression='snappy')


def load_parquets_by_month(month: int, input_dir: Path, year: int = 2023) -> pd.DataFrame:
    hours = ['12 AM'] + [f'{h} AM' for h in range(1, 12)] + ['12 PM'] + [f'{h} PM' for h in range(1, 12)]
    days_map = {6: 'Sun', 0: 'Mon', 1: 'Tue', 2: 'Wed', 3: 'Thu', 4: 'Fri', 5: 'Sat'}
    dff = pd.concat([pd.read_parquet(f) for f in input_dir.glob(f'nc67-uf89_issue-date_{year}-{month:0>2}*.parquet')])

    # Assign data types
    dff['issue_date'] = pd.to_datetime(dff['issue_date'], format='mixed', errors='coerce')
    dff['violation_time'] = pd.to_datetime(dff['violation_time']+'M', format='%I:%M%p', errors='coerce')

    for col in ['fine_amount', 'penalty_amount', 'interest_amount', 'reduction_amount', 'payment_amount', 'amount_due']:
        dff[col] = dff[col].astype(float)

    # Drop blanks
    dff.dropna(subset=['issue_date', 'violation_time', 'violation', 'fine_amount'], inplace=True)

    # Get hour and day of week
    dff['hour'] = dff['violation_time'].dt.strftime('%I %p').str.replace(r'^0', '', regex=True)
    dff['hour'] = pd.Categorical(dff['hour'], categories=hours, ordered=True)

    dff['day_of_week'] = dff['issue_date'].dt.day_of_week.map(days_map)
    dff['day_of_week'] = pd.Categorical(dff['day_of_week'], categories=days_map.values(), ordered=True)

    # Tidy up
    dff['violation_time'] = dff['violation_time'].dt.time  # Drop erroneous date-component

    return dff


# Data
- Original dataset provided is missing records
- Will use NYC Open Data API to find records containing "2023" in the `issue_date` column
- Initial query reveals 16,526,342 records available
- `issue_data` column contains mixed timestamp formats:
    - US-style: `mm/dd/yyy`
    - ISO 8601: `yyyy-mm-ddThh:mm:ss.fff`
- Will loop through each calendar day in 2023 and store raw results as parquet
- Transform and compile daily results into monthly parquet datasets for aggregation

## Query
- Takes ~60 minutes w/ 2-sec sleep

In [None]:
date = pd.Timestamp(2023, 1, 1)
errors = []

while date < END_DATE:
    try:
        data = fetch_violation_amounts_by_issue_date(date=date)

        if data:
            data_fp = DATA_DIR / f"nc67-uf89_issue-date_{date.strftime('%Y-%m-%d')}_v2.parquet"
            persist_as_parquet(response_json=data, filepath=data_fp, overwrite=False)
            print(f"Saved data for DATE {date.strftime('%Y-%m-%d')}")
        else:
            message = f"NO DATA for DATE {date.strftime('%Y-%m-%d')}"
            print(message)
            errors.append(message)

    except Exception as e:
        message = f"EXCEPTION encountered for DATE {date.strftime('%Y-%m-%d')}: {e!r}"
        print(message)
        errors.append(message)

    finally:
        date += TIME_DELTA
        time.sleep(2)


## Transform

In [None]:
for month in range(1, 13):
    df_m = load_parquets_by_month(month, DATA_DIR)

    output_fp = DATA_DIR / f'nc67-uf89_month_2023-{month:0>2}_v2.parquet'
    df_m.to_parquet(output_fp, index=False, compression='snappy')


## Aggregate

In [None]:
with open(DATA_DIR / 'nyc_parking_violation_codes.json', 'r', encoding='utf-8') as fp:
    violation_details = json.load(fp)

violations = {v['description']: Violation.from_dict(v) for v in violation_details}


In [None]:
for month in range(1, 13):
    df_m = pd.read_parquet(DATA_DIR / f"nc67-uf89_month_2023-{month:0>2}_v2.parquet")

    for col in ['violation_status', 'issuing_agency', 'state', 'license_type']:
        df_m[col] = df_m[col].fillna('none')

    df_m['period'] = df_m['issue_date'].dt.to_period('W').dt.start_time.dt.date.transform(lambda x: x.isoformat())
    df_m['count'] = 1

    counts = df_m['violation'].value_counts()
    period_count = df_m.groupby(['violation', 'period'])['count'].sum()
    period_fine = df_m.groupby(['violation', 'period'])['fine_amount'].sum()
    amounts = df_m.groupby('violation')[[col for col in df_m.columns if col.find('amount')!= -1]].sum().round(0).astype(int)
    hour_dow_counts = df_m.groupby(['violation', 'hour', 'day_of_week'], observed=False)['count'].sum().astype(int).reset_index().pivot(index=['violation', 'hour'], columns='day_of_week', values='count')

    statuses = df_m.groupby(['violation', 'violation_status'])['count'].sum()
    agencies = df_m.groupby(['violation', 'issuing_agency'])['count'].sum()
    states = df_m.groupby(['violation', 'state'])['count'].sum()
    license_types = df_m.groupby(['violation', 'license_type'])['count'].sum()

    # Update Violation objects
    for v_key in df_m['violation'].unique():
        if v_key not in violations.keys():
            continue

        v = violations[v_key]

        v.total_count += int(counts.loc[v_key])
        v.period_count.update(period_count.loc[v_key].to_dict())
        v.period_fine.update(period_fine.loc[v_key].to_dict())

        v.total_fine += amounts.loc[v_key].get('fine_amount').item()
        v.total_penalty += amounts.loc[v_key].get('penalty_amount').item()
        v.total_interest += amounts.loc[v_key].get('interest_amount').item()
        v.total_reduction += amounts.loc[v_key].get('reduction_amount').item()
        v.total_payment += amounts.loc[v_key].get('payment_amount').item()
        v.total_due += amounts.loc[v_key].get('amount_due').item()

        v.hour_dow_counts += hour_dow_counts.loc[v_key].values

        v.statuses = dict(Counter(statuses.loc[v_key].to_dict()) + Counter(v.statuses))
        v.agencies = dict(Counter(agencies.loc[v_key].to_dict()) + Counter(v.agencies))
        v.states = dict(Counter(states.loc[v_key].to_dict()) + Counter(v.states))
        v.license_types = dict(Counter(license_types.loc[v_key].to_dict()) + Counter(v.license_types))
    


In [None]:
V_ALL = Violation(
    code=0,
    description="ALL VIOLATIONS",
    definition="An aggregation of all parking and camera violations for 2023 as of June 18, 2025. Records missing issue date, violation, or fine amount are omitted along with records assigned to \"BLUE ZONE\", which is no longer a valid NYC violation.",
    fine_amount_manhattan_96st_and_below=[],
    fine_amount_all_other_areas=[]
)

for v in violations.values():
    V_ALL.total_count += v.total_count
    
    V_ALL.period_count = dict(Counter(V_ALL.period_count) + Counter(v.period_count))
    V_ALL.period_count = dict(sorted(V_ALL.period_count.items()))  # ensure ordered consecutively    

    V_ALL.period_fine = dict(Counter(V_ALL.period_fine) + Counter(v.period_fine))
    V_ALL.period_fine = dict(sorted(V_ALL.period_fine.items()))  # ensure ordered consecutively
    
    V_ALL.total_fine += v.total_fine
    V_ALL.total_penalty += v.total_penalty
    V_ALL.total_interest += v.total_interest
    V_ALL.total_reduction += v.total_reduction
    V_ALL.total_payment += v.total_payment
    V_ALL.total_due += v.total_due
    V_ALL.hour_dow_counts = V_ALL.hour_dow_counts + v.hour_dow_counts

    V_ALL.fine_amount_manhattan_96st_and_below += v.fine_amount_manhattan_96st_and_below
    V_ALL.fine_amount_manhattan_96st_and_below = sorted(list(set(V_ALL.fine_amount_manhattan_96st_and_below)))

    V_ALL.fine_amount_all_other_areas += v.fine_amount_all_other_areas
    V_ALL.fine_amount_all_other_areas = sorted(list(set(V_ALL.fine_amount_all_other_areas)))

    V_ALL.statuses = dict(Counter(V_ALL.statuses) + Counter(v.statuses))
    V_ALL.statuses = dict(sorted(V_ALL.statuses.items(), key=lambda x: x[1], reverse=True))

    V_ALL.agencies = dict(Counter(V_ALL.agencies) + Counter(v.agencies))
    V_ALL.agencies = dict(sorted(V_ALL.agencies.items(), key=lambda x: x[1], reverse=True))

    V_ALL.states = dict(Counter(V_ALL.states) + Counter(v.states))
    V_ALL.states = dict(sorted(V_ALL.states.items(), key=lambda x: x[1], reverse=True))

    V_ALL.license_types = dict(Counter(V_ALL.license_types) + Counter(v.license_types))
    V_ALL.license_types = dict(sorted(V_ALL.license_types.items(), key=lambda x: x[1], reverse=True))
    

## Serialize

In [None]:
with open(DATA_DIR / "nyc_parking_violation_data.json", 'w', encoding='utf-8') as f:
    all_violations = [V_ALL] + list(violations.values())
    json.dump([v.to_dict() for v in all_violations], f)
