# Load Data



### Set env variables

assumes that `SOCRATA_APP_TOKEN`, `SOCRATA_API_KEY_ID`, and `SOCRATA_API_KEY_SECRET` are set in `.env`

In [1]:
from dotenv import load_dotenv

load_dotenv()


True

### Load Packages

In [2]:
import os
import sys

INGESTION_PATH = os.path.abspath(os.path.join(os.getcwd(), '..', 'src'))
sys.path.insert(0, INGESTION_PATH)

from ingestion import fetch
from ingestion import config



### Full Pull of Data
Takes about 48 minutes

In [3]:
# await fetch.fetch_all_data(save = True)

### Incremental Pull of Data

In [4]:
# await fetch.fetch_current_month(save = True)

### Function

In [5]:
# def _create_socrata_client(api_endpoint, app_token, username, password):
#     """"
    
#     """
#     client = Socrata(api_endpoint, app_token=app_token, username=username, password=password, timeout = 600)
#     return client


In [6]:
# def _fetch_and_save_month_sync(year: int, month: int):
#     """Runs in a worker thread. Creates its own Socrata client (no sharing)."""
#     # per-thread client
#     # client = Socrata("data.cityofnewyork.us", APP_TOKEN)
#     client = _create_socrata_client(api_endpoint="data.cityofnewyork.us", app_token=APP_TOKEN, username=API_KEY_ID, password=API_KEY_SECRET)

#     start = f"{year}-{month:02d}-01T00:00:00"
#     if month == 12:
#         end = f"{year+1}-01-01T00:00:00"
#     else:
#         end = f"{year}-{month+1:02d}-01T00:00:00"

#     where_clause = f"{date_column} >= '{start}' AND {date_column} < '{end}'"
#     print(f"Fetching {year}-{month:02d} ...")

#     # client.get_all handles paging internally (blocking)
#     results = list(client.get_all(dataset_id, where=where_clause))

#     if results:
#         df = pd.DataFrame.from_records(results)
#         # write to year/month partition
#         file_path = os.path.join(output_dir, f"year={year}/month={month:02d}/part-0000.parquet")
#         os.makedirs(os.path.dirname(file_path), exist_ok=True)
#         # choose your engine; fastparquet avoids some pyarrow quirks in notebooks
#         df.to_parquet(file_path, index=False, engine="fastparquet")
#         print(f"Saved {file_path} ({len(df):,} rows)")

#         # clean up memory
#         del df
#         del results
#         gc.collect()
#     else:
#         print(f"No data for {year}-{month:02d}")

# async def fetch_and_save(year: int, month: int, sem: asyncio.Semaphore):
#     async with sem:
#         # run the sync worker in a thread
#         await asyncio.to_thread(_fetch_and_save_month_sync, year, month)

# async def pull_latest_data():
#     sem = asyncio.Semaphore(MAX_CONCURRENCY)
#     tasks = [fetch_and_save(y, m, sem) for y in years for m in months]
#     await asyncio.gather(*tasks)


### Apply Function

In [7]:
# dataset_id = "erm2-nwe9"
# date_column = "created_date"
# output_dir = "../data/311-service-requests"
# os.makedirs(output_dir, exist_ok=True)

# years = range(2010, 2026)   # e.g., 2010–2011
# months = range(1, 13)       # 1..12
# MAX_CONCURRENCY = 20         # tune for your machine / API limits

In [8]:
# await pull_latest_data()

### Load Data

In [9]:
import polars as pl
import numpy as np
from pathlib import Path


## Exploratory Data Analysis with Polars


### Load all parquet files using Polars lazy API


In [10]:
data_path = Path("../data/landing/311-service-requests")

lf = pl.scan_parquet(
    str(data_path / "**/*.parquet"),
    hive_partitioning=True,
    # schema=config.SCHEMA
)

### Basic Information


In [11]:
# Get shape and basic info
print("Dataset Shape:")
print(f"Rows: {lf.select(pl.len()).collect().item():,}")
print(f"Columns: {len(lf.collect_schema())}")
print(f"\nColumn Names and Types:")
for col, dtype in lf.collect_schema().items():
    print(f"  {col}: {dtype}")


Dataset Shape:
Rows: 41,188,398
Columns: 43

Column Names and Types:
  unique_key: String
  created_date: Datetime(time_unit='ms', time_zone=None)
  closed_date: Datetime(time_unit='ms', time_zone=None)
  agency: String
  agency_name: String
  complaint_type: String
  descriptor: String
  location_type: String
  incident_zip: String
  incident_address: String
  street_name: String
  cross_street_1: String
  cross_street_2: String
  intersection_street_1: String
  intersection_street_2: String
  address_type: String
  city: String
  landmark: String
  facility_type: String
  status: String
  due_date: Datetime(time_unit='ms', time_zone=None)
  resolution_description: String
  resolution_action_updated_date: Datetime(time_unit='ms', time_zone=None)
  community_board: String
  bbl: String
  borough: String
  x_coordinate_state_plane: Float64
  y_coordinate_state_plane: Float64
  open_data_channel_type: String
  park_facility_name: String
  park_borough: String
  vehicle_type: String
  tax

### Preview the Data


In [12]:
# Collect a sample to view
lf.head(10).collect()


unique_key,created_date,closed_date,agency,agency_name,complaint_type,descriptor,location_type,incident_zip,incident_address,street_name,cross_street_1,cross_street_2,intersection_street_1,intersection_street_2,address_type,city,landmark,facility_type,status,due_date,resolution_description,resolution_action_updated_date,community_board,bbl,borough,x_coordinate_state_plane,y_coordinate_state_plane,open_data_channel_type,park_facility_name,park_borough,vehicle_type,taxi_company_borough,taxi_pick_up_location,bridge_highway_name,bridge_highway_direction,road_ramp,bridge_highway_segment,latitude,longitude,location,year,month
str,datetime[ms],datetime[ms],str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,datetime[ms],str,datetime[ms],str,str,str,f64,f64,str,str,str,str,str,str,str,str,str,str,f64,f64,str,i64,i64
"""15630099""",2010-01-01 00:00:00,2010-01-01 00:00:00,"""HPD""","""Department of Housing Preserva…","""HEATING""","""HEAT""","""RESIDENTIAL BUILDING""","""10466""","""1405 EAST 233 STREET""","""EAST 233 STREET""","""SETON AVENUE""","""AMUNDSON AVENUE""",,,"""ADDRESS""","""BRONX""",,"""N/A""","""Closed""",,"""The Department of Housing Pres…",2010-01-01 00:00:00,"""0 Unspecified""","""2049610077""","""Unspecified""",1029542.0,263206.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.888997,-73.836192,"""{'latitude': '40.8889973845275…",2010,1
"""15630431""",2010-01-01 00:00:00,2010-01-02 00:00:00,"""HPD""","""Department of Housing Preserva…","""HEATING""","""HEAT""","""RESIDENTIAL BUILDING""","""11226""","""22 MARTENSE STREET""","""MARTENSE STREET""","""FLATBUSH AVENUE""","""MARTENSE COURT""",,,"""ADDRESS""","""BROOKLYN""",,"""N/A""","""Closed""",,"""More than one complaint was re…",2010-01-02 00:00:00,"""0 Unspecified""","""3050890015""","""Unspecified""",995823.0,176542.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.651236,-73.958293,"""{'latitude': '40.6512361161984…",2010,1
"""15630315""",2010-01-01 00:00:00,2010-01-16 00:00:00,"""HPD""","""Department of Housing Preserva…","""PLUMBING""","""BASIN/SINK""","""RESIDENTIAL BUILDING""","""11106""","""31-54 29 STREET""","""29 STREET""","""31 AVENUE""","""BROADWAY""",,,"""ADDRESS""","""ASTORIA""",,"""N/A""","""Closed""",,"""The Department of Housing Pres…",2010-01-16 00:00:00,"""0 Unspecified""","""4005797501""","""Unspecified""",1004767.0,217644.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.764035,-73.925935,"""{'latitude': '40.7640351269140…",2010,1
"""15631513""",2010-01-01 00:00:00,2010-01-02 00:00:00,"""HPD""","""Department of Housing Preserva…","""HEATING""","""HEAT""","""RESIDENTIAL BUILDING""","""11210""","""3103 FOSTER AVENUE""","""FOSTER AVENUE""","""NOSTRAND AVENUE""","""NEW YORK AVENUE""",,,"""ADDRESS""","""BROOKLYN""",,"""N/A""","""Closed""",,"""More than one complaint was re…",2010-01-02 00:00:00,"""0 Unspecified""","""3049640047""","""Unspecified""",998974.0,171975.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.638696,-73.946947,"""{'latitude': '40.6386960094759…",2010,1
"""15629899""",2010-01-01 00:00:00,2010-01-16 00:00:00,"""HPD""","""Department of Housing Preserva…","""PLUMBING""","""WATER-LEAKS""","""RESIDENTIAL BUILDING""","""11106""","""31-54 29 STREET""","""29 STREET""","""31 AVENUE""","""BROADWAY""",,,"""ADDRESS""","""ASTORIA""",,"""N/A""","""Closed""",,"""The Department of Housing Pres…",2010-01-16 00:00:00,"""0 Unspecified""","""4005797501""","""Unspecified""",1004767.0,217644.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.764035,-73.925935,"""{'latitude': '40.7640351269140…",2010,1
"""15631565""",2010-01-01 00:00:00,2010-01-05 00:00:00,"""HPD""","""Department of Housing Preserva…","""HEATING""","""HEAT""","""RESIDENTIAL BUILDING""","""11213""","""1510 CARROLL STREET""","""CARROLL STREET""","""ALBANY AVENUE""","""TROY AVENUE""",,,"""ADDRESS""","""BROOKLYN""",,"""N/A""","""Closed""",,"""The Department of Housing Pres…",2010-01-05 00:00:00,"""0 Unspecified""","""3014120025""","""Unspecified""",1001429.0,182100.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.666482,-73.938076,"""{'latitude': '40.6664824995760…",2010,1
"""15629728""",2010-01-01 00:00:00,2010-01-12 00:00:00,"""HPD""","""Department of Housing Preserva…","""HEATING""","""HEAT""","""RESIDENTIAL BUILDING""","""11210""","""3101 FOSTER AVENUE""","""FOSTER AVENUE""","""NOSTRAND AVENUE""","""NEW YORK AVENUE""",,,"""ADDRESS""","""BROOKLYN""",,"""N/A""","""Closed""",,"""The Department of Housing Pres…",2010-01-12 00:00:00,"""0 Unspecified""","""3049640047""","""Unspecified""",998968.0,171975.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.638696,-73.946969,"""{'latitude': '40.6386960194481…",2010,1
"""15631756""",2010-01-01 00:00:00,2010-01-16 00:00:00,"""HPD""","""Department of Housing Preserva…","""PLUMBING""","""WATER-LEAKS""","""RESIDENTIAL BUILDING""","""11106""","""31-54 29 STREET""","""29 STREET""","""31 AVENUE""","""BROADWAY""",,,"""ADDRESS""","""ASTORIA""",,"""N/A""","""Closed""",,"""The Department of Housing Pres…",2010-01-16 00:00:00,"""0 Unspecified""","""4005797501""","""Unspecified""",1004767.0,217644.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.764035,-73.925935,"""{'latitude': '40.7640351269140…",2010,1
"""15631479""",2010-01-01 00:00:00,2010-01-08 00:00:00,"""HPD""","""Department of Housing Preserva…","""HEATING""","""HEAT""","""RESIDENTIAL BUILDING""","""11212""","""1115 WILLMOHR STREET""","""WILLMOHR STREET""","""EAST 95 STREET""","""EAST 96 STREET""",,,"""ADDRESS""","""BROOKLYN""",,"""N/A""","""Closed""",,"""More than one complaint was re…",2010-01-08 00:00:00,"""0 Unspecified""","""3046700001""","""Unspecified""",1007155.0,178733.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.657228,-73.917447,"""{'latitude': '40.6572278703010…",2010,1
"""15634816""",2010-01-01 00:00:00,2010-01-06 00:00:00,"""HPD""","""Department of Housing Preserva…","""HEATING""","""HEAT""","""RESIDENTIAL BUILDING""","""11237""","""305 ELDERT STREET""","""ELDERT STREET""","""KNICKERBOCKER AVENUE""","""IRVING AVENUE""",,,"""ADDRESS""","""BROOKLYN""",,"""N/A""","""Closed""",,"""The Department of Housing Pres…",2010-01-06 00:00:00,"""0 Unspecified""","""3034130066""","""Unspecified""",1010053.0,191755.0,"""UNKNOWN""","""Unspecified""","""Unspecified""",,,,,,,,40.692962,-73.906953,"""{'latitude': '40.6929623370148…",2010,1


### Missing Values Analysis


In [13]:
# Calculate null counts and percentages
total_rows = lf.select(pl.len()).collect().item()

null_counts = lf.select([
    pl.col(col).is_null().sum().alias(col) 
    for col in lf.collect_schema().names()
]).collect()

# Convert to long format for better readability
null_summary = pl.DataFrame({
    "Column": null_counts.columns,
    "Null_Count": null_counts.row(0),
}).with_columns([
    (pl.col("Null_Count") / total_rows * 100).round(2).alias("Null_Percentage")
]).sort("Null_Count", descending=True)

print(f"Missing Values Summary (Total Rows: {total_rows:,})")
null_summary


Missing Values Summary (Total Rows: 41,188,398)


Column,Null_Count,Null_Percentage
str,i64,f64
"""taxi_company_borough""",41158784,99.93
"""road_ramp""",41094402,99.77
"""bridge_highway_direction""",41072784,99.72
"""bridge_highway_name""",41017072,99.58
"""bridge_highway_segment""",41013461,99.58
…,…,…
"""status""",0,0.0
"""open_data_channel_type""",0,0.0
"""park_facility_name""",0,0.0
"""year""",0,0.0


NameError: name 'pd' is not defined

In [None]:
null_summary.filter(pl.col('Column') == 'vehicle_type')


### Top Complaint Types


In [None]:
# Top complaint types
top_complaints = df.group_by("complaint_type").agg([
    pl.len().alias("count")
]).sort("count", descending=True).limit(20).collect()

print("Top 20 Complaint Types:")
top_complaints


### Temporal Analysis


In [None]:
# Requests over time (by year-month)
temporal = df.with_columns([
    pl.col("created_date").str.to_datetime().alias("created_dt")
]).with_columns([
    pl.col("created_dt").dt.year().alias("year"),
    pl.col("created_dt").dt.month().alias("month")
]).group_by(["year", "month"]).agg([
    pl.len().alias("num_requests")
]).sort(["year", "month"]).collect()

print("Service Requests Over Time:")
temporal


### Geographic Distribution


In [None]:
# Requests by borough
by_borough = df.group_by("borough").agg([
    pl.len().alias("count")
]).sort("count", descending=True).collect()

print("Service Requests by Borough:")
by_borough


### Agency Analysis


In [None]:
# Top agencies handling requests
top_agencies = df.group_by("agency").agg([
    pl.len().alias("count")
]).sort("count", descending=True).limit(15).collect()

print("Top 15 Agencies:")
top_agencies


### Status Distribution


In [None]:
# Request status distribution
status_dist = df.group_by("status").agg([
    pl.len().alias("count")
]).sort("count", descending=True).collect()

print("Status Distribution:")
status_dist


### Response Time Analysis


In [None]:
# Calculate response times where both created_date and closed_date exist
response_times = df.filter(
    pl.col("closed_date").is_not_null() & pl.col("created_date").is_not_null()
).with_columns([
    pl.col("created_date").str.to_datetime().alias("created_dt"),
    pl.col("closed_date").str.to_datetime().alias("closed_dt")
]).with_columns([
    (pl.col("closed_dt") - pl.col("created_dt")).dt.total_hours().alias("response_hours")
]).filter(
    pl.col("response_hours") >= 0  # Only positive response times
).select([
    pl.col("response_hours").mean().alias("mean_hours"),
    pl.col("response_hours").median().alias("median_hours"),
    pl.col("response_hours").std().alias("std_hours"),
    pl.col("response_hours").min().alias("min_hours"),
    pl.col("response_hours").max().alias("max_hours"),
    pl.col("response_hours").quantile(0.25).alias("q25_hours"),
    pl.col("response_hours").quantile(0.75).alias("q75_hours"),
    pl.col("response_hours").quantile(0.90).alias("q90_hours"),
    pl.len().alias("count_with_close_date")
]).collect()

print("Response Time Statistics (in hours):")
response_times


### Year-over-Year Growth


In [None]:
# Yearly trends
yearly = df.with_columns([
    pl.col("created_date").str.to_datetime().dt.year().alias("year")
]).group_by("year").agg([
    pl.len().alias("num_requests")
]).sort("year").collect()

print("Year-over-Year Requests:")
yearly


### Top Complaint Types by Borough


In [None]:
# Top complaints by borough
complaints_by_borough = df.group_by(["borough", "complaint_type"]).agg([
    pl.len().alias("count")
]).sort(["borough", "count"], descending=[False, True]).collect()

# Get top 5 complaints per borough
top_per_borough = complaints_by_borough.group_by("borough").head(5)

print("Top 5 Complaint Types per Borough:")
top_per_borough
