In [None]:
ZIP_FILENAME = 'car_ped_stops_2024-03-16T20_22_24.zip'
#DATA_DIR = os.path.dirname(deo_backend.__file__)

# This handles replacing individual year files with a separate backup but you have to update the Run object.

DATA_DIR = '.'

In [None]:
import pandas as pd
import requests
import click
import httpx
import sqlite3
import os
from pydantic import BaseModel
from datetime import datetime, date, timedelta
import zipfile
import io
import re

from tqdm import tqdm

MOST_RECENT_QUARTER_START_DT = "2023-10-01"
MOST_RECENT_COMPLETED_YEAR = (datetime.now() - timedelta(days=365)).year
DIST_FROM_HIN_THRESHOLD = 0.0001
"""
--- This shows that 0.0001 is between 8.54427772 and 11.10338786 meters
SELECT ST_Distance(
    ST_GeographyFromText('SRID=4326;POINT(' || -75.14144069 || ' ' || 39.96071159 || ')'),
    ST_GeographyFromText('SRID=4326;POINT(' || (-75.14144069 + 0.0001) || ' ' || 39.96071159 || ')')
) AS distance_in_meters_lng,
ST_Distance(
    ST_GeographyFromText('SRID=4326;POINT(' || -75.14144069 || ' ' || 39.96071159 || ')'),
    ST_GeographyFromText('SRID=4326;POINT(' || (-75.14144069) || ' ' || 39.96071159  + 0.0001 || ')')
) AS distance_in_meters_lat
"""


class Run(BaseModel):
    zip_filename: str
    zip_filename_override_dict: dict[str, str] = {}

    @property
    def zip_filepath(self):
        return os.path.join(DATA_DIR, self.zip_filename)

    @property
    def db_name(self):
        # car_ped_stops_2024-03-16T20_22_24.zip -> 2014_03_16
        return (
            self.zip_filename.replace("car_ped_stops_", "")
            .split("T")[0]
            .replace("-", "_")
        )


def get_q_end_from_q_start_str(q_start_str):
    q_start = pd.to_datetime(q_start_str)
    if q_start.month in [1, 2, 3]:
        q_end = datetime.combine(date(q_start.year, 3, 31), datetime.max.time())
    elif q_start.month in [4, 5, 6]:
        q_end = datetime.combine(date(q_start.year, 6, 30), datetime.max.time())
    elif q_start.month in [7, 8, 9]:
        q_end = datetime.combine(date(q_start.year, 9, 30), datetime.max.time())
    else:
        q_end = datetime.combine(date(q_start.year, 12, 31), datetime.max.time())
    return q_end


MOST_RECENT_QUARTER_END_DT = get_q_end_from_q_start_str(MOST_RECENT_QUARTER_START_DT)


def add_quarterly_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["quarter_dt_str"] = df["quarter"]
    df["quarter_dt"] = pd.to_datetime(df["quarter"])
    df["quarter"] = (
        df["quarter_dt"].dt.year.astype(str)
        + "-Q"
        + df["quarter_dt"].dt.quarter.astype(str)
    )
    df["quarter_date"] = df["quarter_dt"].dt.date
    df["q_str"] = "Q" + df["quarter_dt"].dt.quarter.astype(str)
    df["year"] = df["quarter_dt"].dt.year
    return df


# Used to get dtype dict
def get_pandas_dtype_and_parse_dates():
    response = requests.get(
        "https://phl.carto.com/api/v2/sql?q=select * from car_ped_stops limit 0"
    )
    schema = response.json()["fields"]

    def convert_to_pandas_dtype(schema):
        dtype_map = {
            "int4": "int32",
            "int8": "int64",
            "text": "str",
            "numeric": "float",
            # Add more mappings as needed
        }

        parse_dates = []  # list to keep track of date columns
        dtype_dict = {}  # dict to hold the resulting dtype mappings

        for column, info in schema.items():
            if info["type"] in ["geometry", "string"]:
                dtype_dict[column] = "str"
            elif info["type"] == "number":
                dtype_dict[column] = dtype_map.get(
                    info["pgtype"], "float"
                )  # Default to float if not specified
            elif info["type"] == "date":
                parse_dates.append(
                    column
                )  # Add to parse_dates list for later use with pandas
            else:
                raise ValueError(column, info)
        return dtype_dict, parse_dates

    return convert_to_pandas_dtype(schema)


dtype_dict = {
    "cartodb_id": "int64",
    "the_geom": "str",  # Special handling might be required for geometry
    "the_geom_webmercator": "str",  # Special handling might be required for geometry
    "objectid": "int32",
    "id": "int32",
    "weekday": "str",
    "location": "str",
    "districtoccur": "str",
    "psa": "str",
    "stopcode": "int32",
    "stoptype": "str",
    "inside_or_outside": "str",
    "gender": "str",
    "race": "str",
    "age": "float",
    "individual_frisked": "float",
    "individual_searched": "float",
    "individual_arrested": "float",
    "individual_contraband": "float",
    "vehicle_frisked": "float",
    "vehicle_searched": "float",
    "vehicle_contraband": "float",
    "vehicle_contraband_list": "str",
    "individual_contraband_list": "str",
    "mvc_code": "str",
    "mvc_reason": "str",
    "mvc_code_sec": "str",
    "mvc_code_sec_reason": "str",
    "point_x": "float64",  # Adjusted to 'float64' for numeric
    "point_y": "float64",  # Adjusted to 'float64' for numeric
}

query = f"""
SELECT
(
    strftime('%Y', datetimeoccur_local) || '-' ||
    CASE
      WHEN strftime('%m', datetimeoccur_local) BETWEEN '01' AND '03' THEN '01'
      WHEN strftime('%m', datetimeoccur_local) BETWEEN '04' AND '06' THEN '04'
      WHEN strftime('%m', datetimeoccur_local) BETWEEN '07' AND '09' THEN '07'
      WHEN strftime('%m', datetimeoccur_local) BETWEEN '10' AND '12' THEN '10'
    END || '-01T00:00:00.000000Z'
) AS quarter,
CAST(strftime('%Y', datetimeoccur_local) as int) as year,
--Replace Above with Postgres equivalents (using extract())
districtoccur, 
psa,
violation_category,
race as "Race", 
gender as "Gender", 
age_range as "Age Range",
count(*) as n_stopped, 
sum(n_people_in_car) as n_people_in_stopped_vehicles,
sum(was_searched) as n_searched,
sum(was_arrested) as n_arrested,
sum(was_found_with_contraband) as n_contraband,
sum(was_frisked) as n_frisked,
sum(was_intruded) as n_intruded
FROM (
    SELECT 
    stop.*, driver.race, driver.age_range, driver.gender,datetimeoccur_local, n_people_in_car,
    CASE
        WHEN 
            (mvc_code_clean LIKE '1332%' AND mvc_code_clean LIKE '%A%' AND mvc_code_clean NOT LIKE '1332AI%') 
        THEN 'Display License Plate'
        WHEN 
            (mvc_code_clean = '3111') OR
            (mvc_code_clean LIKE '3111%' AND mvc_code_clean LIKE '%A%')
        THEN 'Failure to Obey Traffic Sign/Light'   
        WHEN
            (mvc_code_clean LIKE '330%') OR
            (mvc_code_clean LIKE '3311%' AND mvc_code_clean LIKE '%A%') OR
            (mvc_code_clean LIKE '3313%') OR
            (mvc_code_clean LIKE '3315%') OR
            (mvc_code_clean LIKE '3703%')
        THEN 'Improper Pass, Lane, One Way'  
        WHEN 
            (mvc_code_clean LIKE '3331%') OR
            (mvc_code_clean LIKE '3332%') OR
            (mvc_code_clean LIKE '3334%' AND mvc_code_clean LIKE '%A%') OR
            (mvc_code_clean LIKE '3334%' AND mvc_code_clean LIKE '%B%') OR
            (mvc_code_clean LIKE '3335%') OR
            (mvc_code_clean LIKE '3336%') 
        THEN 'Improper Turn/Signal'
        WHEN 
            (mvc_code_clean LIKE '4703%') OR
            (mvc_code_clean LIKE '4706%' AND mvc_code_clean LIKE '%C%')
        THEN 'Inspection/Emission Sticker'   
        WHEN 
            (mvc_code_clean LIKE '4301%') OR
            (mvc_code_clean LIKE '4302%') OR
            (mvc_code_clean LIKE '4303%') OR
            (mvc_code_clean = '4306') 
        THEN 'Lights'
        WHEN 
            (mvc_code_clean LIKE '3112%') OR
            (mvc_code_clean LIKE '3321%') OR
            (mvc_code_clean LIKE '3322%') OR
            (mvc_code_clean LIKE '3323%') OR
            (mvc_code_clean LIKE '3324%') OR
            (mvc_code_clean LIKE '3325%') OR
            (mvc_code_clean LIKE '3342%' AND mvc_code_clean LIKE '%A%') OR
            (mvc_code_clean LIKE '3345%' AND mvc_code_clean LIKE '%A%') OR
            (mvc_code_clean LIKE '3542%') OR
            (mvc_code_clean LIKE '3710%')
        THEN 'Red Light/Stop Sign/Yield'
        WHEN 
            (mvc_code_clean LIKE '1301%' and mvc_code_clean LIKE '%A%') 
        THEN 'Registration'
        WHEN (mvc_code_clean LIKE '3361%') OR
             (mvc_code_clean LIKE '3362%') OR
             (mvc_code_clean LIKE '3363%') OR
             (mvc_code_clean LIKE '3365%') OR
             (mvc_code_clean LIKE '3367%') OR
             (mvc_code_clean LIKE '3714%') OR
             (mvc_code_clean LIKE '3736%') 
        THEN 'Speeding/Reckless/Careless Driving'
        WHEN
            (mvc_code_clean LIKE '4524%' AND mvc_code_clean NOT LIKE '%A%')
        THEN 'Tint'
        WHEN 
            (mvc_code_clean LIKE '4524%' AND mvc_code_clean LIKE '%A%')
        THEN 'Windshield Obstruction'
        WHEN mvc_code_clean is not null THEN 'Other'
        ELSE 'None'
    END AS violation_category
    FROM (
            SELECT car_ped_stops.datetimeoccur as datetimeoccur_d,location as location_d,gender, 
            n_people_in_car,
            CASE
                WHEN race = 'Black - Latino' THEN 'Latino'
                WHEN race = 'White - Latino' THEN 'Latino'
                WHEN race = 'White - Non-Latino' THEN 'White'
                WHEN race = 'Black - Non-Latino' THEN 'Black'
                WHEN race = 'Asian' THEN 'Asian'
                WHEN race = 'American Indian' THEN 'All Other Races'
                WHEN race = 'Unknown' THEN 'All Other Races'
                ELSE race
            END as race,
            CASE
                WHEN age <25 THEN 'Under 25'
                WHEN age <35 THEN '25-34'
                WHEN age <45 THEN '35-44'
                WHEN age <55 THEN '45-54'
                WHEN age < 65 THEN '55-64'
                ELSE '65+'
            END as age_range
            FROM (
                SELECT location as driverl, min(id) as id, count(*) as n_people_in_car
                FROM car_ped_stops
                where stoptype='vehicle'
                GROUP by datetimeoccur, location
            ) inner_driver
            LEFT JOIN car_ped_stops
            ON inner_driver.id = car_ped_stops.id
        ) driver
    LEFT JOIN (
        SELECT datetimeoccur as datetimeoccur_utc, datetimeoccur_local, 
        location, min(districtoccur) as districtoccur, min(psa) as psa,
        CASE 
            WHEN sum(individual_searched) > 0 or sum(vehicle_searched) > 0 
            THEN 1 ELSE 0 
        END as was_searched,
        CASE 
            WHEN sum(individual_arrested) > 0 
            THEN 1 ELSE 0 
        END as was_arrested,
        CASE 
            WHEN sum(individual_contraband) > 0 or sum(vehicle_contraband) > 0 
            THEN 1 ELSE 0 
        END as was_found_with_contraband,
        CASE
            WHEN sum(individual_frisked) > 0 or sum(vehicle_frisked) > 0 
            THEN 1 ELSE 0 
        END as was_frisked,
        CASE
            WHEN
                sum(individual_frisked) > 0 or sum(vehicle_frisked) > 0
                or sum(individual_searched) > 0 or sum(vehicle_searched) > 0
            THEN 1 ELSE 0
        END as was_intruded,
        UPPER(
            REPLACE(
                REPLACE(
                    REPLACE(
                        REPLACE(
                            REPLACE(max(mvc_code),'i','1'),
                        '(', ''),
                    ')', ''),
                '-',''),
            ' ','')
        ) as mvc_code_clean
        FROM car_ped_stops
        WHERE stoptype='vehicle'
        GROUP by datetimeoccur_utc, location
    ) stop
    ON stop.datetimeoccur_utc=driver.datetimeoccur_d
    AND stop.location=driver.location_d
) query
WHERE datetimeoccur_local  <= '{MOST_RECENT_QUARTER_END_DT}'
GROUP by districtoccur,psa, quarter,race, gender, age_range, violation_category
"""


def get_df_quarterly_reason_from_zipfiles(run: Run):
    def get_df_from_zipped_csv(zip_filename, csv_filename):
        with open(zip_filename, "rb") as file:
            zip_data = file.read()
            with zipfile.ZipFile(io.BytesIO(zip_data), "r") as z:
                csv_files = sorted([f for f in z.namelist() if f.endswith(".csv")])
                for available_filename in csv_files:
                    # in case the zip structure changed
                    if os.path.basename(available_filename) == os.path.basename(
                        csv_filename
                    ):
                        with z.open(available_filename) as csv_file:
                            return pd.read_csv(
                                csv_file,
                                dtype=dtype_dict,
                                parse_dates=["datetimeoccur"],
                            )

    with open(run.zip_filepath, "rb") as file:
        zip_data = file.read()
    dfs = []
    print(run.zip_filename)
    with zipfile.ZipFile(io.BytesIO(zip_data), "r") as z:
        csv_files = sorted([f for f in z.namelist() if f.endswith(".csv")])
        pbar = tqdm(total=len(csv_files))
        for filename in csv_files:
            filename_raw = os.path.basename(filename)
            zip_filename_override = run.zip_filename_override_dict.get(
                os.path.basename(filename)
            )
            if zip_filename_override:
                print(f"Overriding for {zip_filename_override}: {filename_raw}")
                this_df = get_df_from_zipped_csv(zip_filename_override, filename)
            else:
                with z.open(filename) as csv_file:
                    this_df = pd.read_csv(
                        csv_file, dtype=dtype_dict, parse_dates=["datetimeoccur"]
                    )
            this_df["datetimeoccur_local"] = (
                this_df["datetimeoccur"]
                .dt.tz_convert("America/New_York")
                .dt.tz_localize(None)
            )
            this_df = this_df.sort_values("id").reset_index(drop=True)
            # Dramatically improves speed for some reason.
            this_df["id"] = this_df.index
            pbar.set_description(f"{filename_raw} has {len(this_df)} rows")
            con = sqlite3.connect(":memory:")
            this_df.to_sql("car_ped_stops", if_exists="replace", con=con)
            df_this_result = pd.read_sql(query, con=con)
            dfs.append(df_this_result)
            pbar.update()

    return pd.concat(dfs)


def make_request(sql: str):
    with httpx.Client(timeout=90) as client:
        response = client.post("https://phl.carto.com/api/v2/sql", data={"q": sql})
        try:
            json = response.json()
            breakpoint()
        except Exception:
            raise ValueError(response.text)
        if "rows" not in json:
            raise ValueError(f"{sql}\n\n{json}")
        return json


def get_hin_random_sample_from_odp(dist_from_hin_threshold=DIST_FROM_HIN_THRESHOLD):
    # Static so only needed to download once
    """
    response = requests.get(
        "https://phl.carto.com/api/v2/sql?q=SELECT+*+FROM+high_injury_network_2020&filename=high_injury_network_2020&format=geojson&skipfields=cartodb_id"
    )
    geojson = response.json()
    json.dump(geojson, open(f"{DATA_DIR}/hin.geojson", "w"))
    """

    response_hin = make_request(
        f"""
        SELECT stops.point_x, stops.point_y, ST_DISTANCE(stops.the_geom, hin.the_geom) as distance_from_hin,
        hin.street_name is not null as on_hin FROM
        (
            SELECT * FROM car_ped_stops 
            where point_y < 42 
            and extract(year from datetimeoccur) = '{MOST_RECENT_COMPLETED_YEAR}'
            order by random() limit 1000
        ) stops
        LEFT JOIN high_injury_network_2020 hin
        ON ST_DWithin(stops.the_geom, hin.the_geom, {DIST_FROM_HIN_THRESHOLD})
        """
    )
    return pd.DataFrame(response_hin["rows"])


def get_hin_by_quarter_from_odp():
    df_hins = []
    quarters = make_request(
        """
    SELECT 
       distinct date_trunc('quarter',datetimeoccur AT TIME ZONE 'America/New_York') as quarter
       from car_ped_stops order by quarter
    """
    )
    for quarter_row in tqdm(quarters["rows"]):
        quarter = quarter_row["quarter"]

        response_hin_by_quarter = make_request(
            f"""
        SELECT 
            districtoccur, psa, date_trunc('quarter',datetimeoccur AT TIME ZONE 'America/New_York') as quarter,
            sum(CASE WHEN hin.street_name is not null 
            AND st_y(car_ped_stops.the_geom)<42 and car_ped_stops.the_geom is not null
            THEN 1 ELSE 0 END
            ) as n_stopped_locatable_on_hin,
            CAST(sum(
                case when st_y(car_ped_stops.the_geom)<42 and car_ped_stops.the_geom is not null then 1 else 0 end
            ) as float) as n_stopped_locatable
            FROM car_ped_stops
            LEFT JOIN high_injury_network_2020 hin
            ON ST_DWithin(car_ped_stops.the_geom, hin.the_geom, {DIST_FROM_HIN_THRESHOLD})
            WHERE datetimeoccur AT TIME ZONE 'America/New_York'  <= '{MOST_RECENT_QUARTER_END_DT}'
            AND date_trunc('quarter',datetimeoccur AT TIME ZONE 'America/New_York') = '{quarter}'
            GROUP BY districtoccur,psa, date_trunc('quarter',datetimeoccur AT TIME ZONE 'America/New_York')

        """
        )
        df_hin_quarter = pd.DataFrame(response_hin_by_quarter["rows"])
        df_hins.append(df_hin_quarter)
    df_hin_by_quarter = pd.concat(df_hins)
    df_hin_by_quarter = add_quarterly_columns(df_hin_by_quarter)
    return df_hin_by_quarter

def get_shootings_from_odp():
    query = """
        select
        count(*) as n_shootings,
        sum(case WHEN inside='0' THEN 1 else 0 END) as n_outside,
        dist as districtoccur,
        date_trunc('quarter',date_) as quarter
        from shootings
        group by date_trunc('quarter',date_), dist
        order by quarter,dist
    """
    query = re.sub(r"\s+", " ", query).strip()
    response = requests.get("https://phl.carto.com/api/v2/sql", params={"q": query})
    df_shootings = pd.DataFrame(response.json()["rows"])
    df_shootings["districtoccur"] = (
        df_shootings["districtoccur"].astype(str).str.zfill(2)
    )
    df_shootings = add_quarterly_columns(df_shootings)
    return df_shootings



def make_db(run:Run):
    sqlite_file = os.path.join(DATA_DIR, f"open_data_philly_{run.db_name}.db")
    df_quarterly_reason = get_df_quarterly_reason_from_zipfiles(run)
    df_quarterly = (
        df_quarterly_reason.drop("violation_category", axis=1)
        .groupby(
            [
                "districtoccur",
                "psa",
                "quarter",
                "Race",
                "Gender",
                "Age Range",
            ]
        )
        .sum()
    ).reset_index()
    df_quarterly = add_quarterly_columns(df_quarterly)
    df_quarterly.to_sql(
        "car_ped_stops_quarterly",
        con=sqlite3.connect(sqlite_file),
        index=False,
        if_exists="replace",
    )
    df_quarterly_reason = add_quarterly_columns(df_quarterly_reason)
    df_quarterly_reason.to_sql(
        "car_ped_stops_quarterly_reason",
        con=sqlite3.connect(sqlite_file),
        index=False,
        if_exists="replace",
    )
    print("Pulling from HIN")
    df_hin = get_hin_random_sample_from_odp()
    df_hin.to_sql(
        "car_ped_stops_hin_random_sample",
        if_exists="replace",
        con=sqlite3.connect(sqlite_file),
        index=False,
    )
    df_hin_by_quarter = get_hin_by_quarter_from_odp()
    df_hin_by_quarter = add_quarterly_columns(df_hin_by_quarter)
    df_hin_by_quarter.to_sql(
        'car_ped_stops_hin_pct',
        if_exists='replace', 
        con=sqlite3.connect(sqlite_file), 
        index=False
    )
    
    print("Get Shooting data")
    df_shootings = get_shootings_from_odp()
    df_shootings.to_sql(
        "shootings",
        con=sqlite3.connect(sqlite_file),
        index=False,
        if_exists="replace",
    )
    print(f"Complete and saved to {sqlite_file}")

run = Run(zip_filename=ZIP_FILENAME)
make_db(run=run)