# NYC 311 Service Requests Data Pipeline

*Library + jobs to fetch, cache, and analyze NYC 311 service request data.*

# As Jobs

## LTP Pipeline

In [2]:
from pathlib import Path

# Per-user, cross-platform base under the home dir
# retrieve for one day only:
dir_base = Path.home() / "TASK_HBC_TSY"

%cd -q ..
!python -m hbc.jobs.dispatch \
  --job-name=job_fetch_nyc_open_data_311_service_requests \
  --as-of=20091231 \
  --dir-base={dir_base} \
  --incremental=True \
  --log-level=DEBUG
%cd -q notebooks/

Log file: /Users/alexandershubert/TASK_HBC_TSY/LOGS/job_fetch_nyc_open_data_311_service_requests/job_fetch_nyc_open_data_311_service_requests_20251225123402.txt
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")


In [3]:
# restore cache integrity for the last missing dates
%cd -q ..
!python -m hbc.jobs.dispatch  \
      --job-name=job_fetch_nyc_open_data_311_service_requests \
      --as-of=20091231 \
      --incremental=false \
      --log-level=INFO \
      --last-missing-dates=10
%cd -q notebooks/

Log file: /var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/LOGS/job_fetch_nyc_open_data_311_service_requests/job_fetch_nyc_open_data_311_service_requests_20251225123432.txt
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.t

  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")
  data[col] = pd.to_datetime(data[col], errors="ignore")


## Analytics

In [5]:
%cd -q ..
!python -m hbc.jobs.dispatch  \
      --job-name=job_analyse_nyc_open_data_311_service_requests \
      --as-of=20091231 \
      --log-level=INFO \
      --n-worst=10 \
      --n-best=10 \
      --n-days=10
%cd -q notebooks/

Log file: /var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/LOGS/job_analyse_nyc_open_data_311_service_requests/job_analyse_nyc_open_data_311_service_requests_20251225123635.txt


***

In [10]:
dc = DataContainer('nyc_open_data_311_service_requests')

In [13]:
dc.get(query=f"$filter=created_date eq '{ul.date_as_iso_format(ul.str_as_date('20091231'))}'")

In [14]:
dc.df.shape

(3844, 55)

In [16]:
dc.df.to_csv('nyc_open_data_311_service_requests', index=False)

# As Library 

In [6]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

### Imports

In [7]:
import sys
from pathlib import Path

p = str(Path.cwd().parent) # one dir up
if p not in sys.path:
    sys.path.insert(0, p)

import pandas as pd
import os
import numpy as np
import logging
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

## Api

In [8]:
from hbc import app_context, DataContainer, utils as ul

from hbc.quant.analysis import AnalyticalEngine
from hbc.quant.plots import PlotEngine

In [9]:
app_context

AppContext
as_of : 2025-12-25
dir_base: PosixPath('/var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp'),
dir_analytics: PosixPath('/var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/ANALYTICS'),
dir_logging: PosixPath('/var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/LOGS')

## Logging

In [5]:
import logging
logger = logging.getLogger()

In [6]:
# Console-only (no file writes):
ul.conf_log(level=logging.DEBUG, console=True, file=False, reset_handlers=True)

# File-only (no console output at all):
ul.conf_log(level=logging.INFO, console=False, file=True, reset_handlers=True)

# Both:
ul.conf_log(level=logging.INFO, console=True, file=True, reset_handlers=True)

Log file: /var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/LOGS/hbc_job_generic.txt
Log file: /var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/LOGS/hbc_job_generic.txt


<RootLogger root (INFO)>

# Cache To SQLLite DataBase Via Asp.Net Rest Api with EF Core

In [7]:
import hbc
from hbc import DataContainer
from hbc.ltp.persistence.db import SqlLiteDataBase

In [8]:
logger.setLevel(logging.INFO)

In [9]:
# reset database
db = SqlLiteDataBase()
print(db.all_dbs)
print(db.all_tables)
status = [db.execute(f'delete from {table}') for table in db.all_tables if not table.startswith('_')]
print(status)
db.close()

['main:/Users/alexandershubert/git/hbc_tsy_enhanced/hbc_db/hbc.db']
['nyc_open_data_311_call_center_inquiry', 'nyc_open_data_311_customer_satisfaction_survey', 'nyc_open_data_311_service_requests']
[0, 0, 4155]


In [10]:
import os
from pathlib import Path

dc = DataContainer("nyc_open_data_311_customer_satisfaction_survey")
# retrieve: query first 100 rows
dc.get()
dc.df.shape

2025-12-25 12:26:17 fetch_nycopen.py       72 INFO  root    : Fetched 100 rows
2025-12-25 12:26:17 base.py                52 INFO  root    : using validator: ValidatorGeneric
2025-12-25 12:26:17 base.py                53 INFO  root    : cleaning...
2025-12-25 12:26:17 base.py                56 INFO  root    : normalizing...
2025-12-25 12:26:17 base.py                59 INFO  root    : validating...
2025-12-25 12:26:17 base.py                62 INFO  root    : dropping flagged rows...
2025-12-25 12:26:17 base.py                65 INFO  root    : finalizing...
2025-12-25 12:26:17 container.py           35 INFO  root    : Retrieved dataFrame with shape=(100, 14)


(100, 14)

In [24]:
# retrieve: query distinct
dc.get(query="$apply=groupby((campaign))")
dc.df.shape

2025-12-25 12:00:45 fetch_nycopen.py       43 INFO  root    : using pagination at fetching with page_size=10000 timeout=30
2025-12-25 12:00:46 fetch_nycopen.py       72 INFO  root    : Fetched 6 rows
2025-12-25 12:00:46 base.py                52 INFO  root    : using validator: ValidatorGeneric
2025-12-25 12:00:46 base.py                53 INFO  root    : cleaning...
2025-12-25 12:00:46 base.py                56 INFO  root    : normalizing...
2025-12-25 12:00:46 base.py                59 INFO  root    : validating...
2025-12-25 12:00:46 base.py                62 INFO  root    : finalizing...
2025-12-25 12:00:46 container.py           63 ERROR root    : DataContainer nyc_open_data_311_customer_satisfaction_survey does not adhere to schema. Missing columns: agent_customer_service, agent_job_knowledge, answer_satisfaction, channel, completion_time, nps, overall_satisfaction, start_time, survey_language, survey_type, wait_time, year


(6, 2)

In [25]:
# retrieve: query with filter
dc.get(query="$filter=campaign eq 'Campaign 4'")
dc.df.shape

2025-12-25 12:00:47 fetch_nycopen.py       43 INFO  root    : using pagination at fetching with page_size=10000 timeout=30
2025-12-25 12:00:48 fetch_nycopen.py       72 INFO  root    : Fetched 4889 rows
2025-12-25 12:00:48 base.py                52 INFO  root    : using validator: ValidatorGeneric
2025-12-25 12:00:48 base.py                53 INFO  root    : cleaning...
2025-12-25 12:00:48 base.py                56 INFO  root    : normalizing...
2025-12-25 12:00:48 base.py                59 INFO  root    : validating...
2025-12-25 12:00:48 base.py                62 INFO  root    : finalizing...


(4889, 14)

In [50]:
# caching: 
dc.to_cache()

2025-12-25 12:08:42 rest.py               182 INFO  root    : Synced 4155 rows via batch API


In [31]:
# cahcing: get 10 rows
dc.from_cache().shape

(10, 15)

In [32]:
# caching: get by filter
dc.from_cache(query="$filter=campaign eq 'Campaign 4'").shape

(4889, 15)

In [33]:
# caching: get distinct
dc.from_cache(query="$apply=groupby((year))").shape

(4, 1)

In [34]:
# caching: get page 2 with page size 50
dc.from_cache(query="$top=50&$skip=50").shape

(50, 15)

In [35]:
# caching: get total count
dc.from_cache(query="$count=true").shape

(4889, 15)

## DataContainer

In [11]:
dc = DataContainer('nyc_open_data_311_service_requests')

In [12]:
# we retrieve 100 rows
dc.get()

2025-12-25 12:26:21 fetch_nycopen.py       72 INFO  root    : Fetched 100 rows
2025-12-25 12:26:21 base.py                52 INFO  root    : using validator: ValidatorNYCOpen311Service
2025-12-25 12:26:21 base.py                53 INFO  root    : cleaning...
2025-12-25 12:26:21 base.py                56 INFO  root    : normalizing...
2025-12-25 12:26:21 base.py                59 INFO  root    : validating...
2025-12-25 12:26:21 valid_nycopen.py      183 INFO  root    : Validation summary -> flagged 10 rows. closed_date before created_date: 4; closed_date set but status not closed: 5; incident_zip outside NYC range: 2; resolution_action_updated_date before created_date: 7
2025-12-25 12:26:21 base.py                62 INFO  root    : dropping flagged rows...
2025-12-25 12:26:21 base.py                65 INFO  root    : finalizing...
2025-12-25 12:26:21 container.py           67 ERROR root    : DataContainer nyc_open_data_311_service_requests does not adhere to schema. Missing columns: br

In [13]:
app_context.as_of = ul.str_as_date('20091231')

In [14]:
dc.get(query=f"$filter=created_date eq '{ul.date_as_iso_format(app_context.as_of)}'")

2025-12-25 12:26:27 fetch_nycopen.py       43 INFO  root    : using pagination at fetching with page_size=10000 timeout=30
2025-12-25 12:26:30 fetch_nycopen.py       72 INFO  root    : Fetched 4155 rows
2025-12-25 12:26:30 base.py                52 INFO  root    : using validator: ValidatorNYCOpen311Service
2025-12-25 12:26:30 base.py                53 INFO  root    : cleaning...
2025-12-25 12:26:30 base.py                56 INFO  root    : normalizing...
2025-12-25 12:26:30 base.py                59 INFO  root    : validating...
2025-12-25 12:26:30 valid_nycopen.py      183 INFO  root    : Validation summary -> flagged 311 rows. closed_date before created_date: 52; closed_date set but status not closed: 89; incident_zip outside NYC range: 4; resolution_action_updated_date before created_date: 270
2025-12-25 12:26:30 base.py                62 INFO  root    : dropping flagged rows...
2025-12-25 12:26:31 base.py                65 INFO  root    : finalizing...
2025-12-25 12:26:31 containe

In [15]:
dc.to_cache()

2025-12-25 12:26:37 rest.py               182 INFO  root    : Synced 3844 rows via batch API


In [19]:
dc.from_cache(query='$top=0')

2025-12-25 12:27:06 container.py           58 INFO  root    : Retrieved dataFrame with shape=(3844, 54)


## Analytics

In [20]:
app_context

AppContext
as_of : 2009-12-31
dir_base: PosixPath('/var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp'),
dir_analytics: PosixPath('/var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/ANALYTICS'),
dir_logging: PosixPath('/var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/LOGS')

In [21]:
dc = DataContainer('nyc_open_data_311_service_requests')

In [22]:
dc.from_cache(query=f"$filter=created_date eq '{ul.date_as_iso_format(app_context.as_of)}'")

2025-12-25 12:27:19 container.py           58 INFO  root    : Retrieved dataFrame with shape=(3844, 54)


In [23]:
df = dc.df

In [24]:
cols = ul.cols_as_named_tuple(df)

In [25]:
df["hbc_days_to_close"] = (
    pd.to_datetime(df[cols.closed_date])
    - pd.to_datetime(df[cols.created_date])
).dt.days.astype("Int64")
cols = ul.cols_as_named_tuple(df)

In [26]:
m = df[cols.hbc_days_to_close] == 0
df_closed_not_same_day = df[~m]

In [27]:
path = ul.path_to_str(
                ul.mk_dir(app_context.dir_analytics / "plots")
                / "closed_requests_by_location.html"
            )
_ = PlotEngine.plot_geo_map(
            df=df_closed_not_same_day,
            col_latitude=cols.latitude,
            col_longitude=cols.longitude,
            aggregation="count",
            round_precision=3,
            cluster=True,
            start_zoom=11,
            tiles="CartoDB positron",
            savepath= path
        )
print(path)

/var/folders/jj/dn25brln45j26cvj4y_lgbzr0000gn/T/hbc_nyc_dp/ANALYTICS/plots/closed_requests_by_location.html


In [28]:
# by agency
res = AnalyticalEngine.descriptive_stats(
    n_best=10,
    n_worst=10,
    df=df_closed_not_same_day,
    col_metric=cols.hbc_days_to_close,
    group=[
        cols.agency,
        cols.agency_name,
    ],
)


In [29]:
res.keys()

dict_keys(['best', 'worst', 'median', 'mean'])

In [30]:
res['worst']

Unnamed: 0_level_0,Unnamed: 1_level_0,hbc_days_to_close
agency,agency_name,Unnamed: 2_level_1
DCA,Department of Consumer Affairs,559
TLC,Taxi and Limousine Commission,539
DOT,Department of Transportation,434
DSNY,Department of Sanitation,421
DOB,Department of Buildings,296
DEP,Department of Environmental Protection,223
HPD,Department of Housing Preservation and Development,126
DOE,Central - Department of Education,119
DOHMH,Department of Health and Mental Hygiene,61
DPR,Department of Parks and Recreation,50


### new dataset: nyc_open_data_311_call_center_inquiry:

In [None]:
dc = DataContainer('nyc_open_data_311_call_center_inquiry')

In [None]:
dc.get()

In [None]:
dc.get(where=f"agency='NYPD'", limit=250)

In [None]:
dc.df.shape

In [None]:
dc.df.head()

In [None]:
dc.df.head()

In [None]:
dc.get(where=f"date = '{ul.date_as_iso_format(ul.str_as_date('2010-01-03'))}'")

In [None]:
dc.df.head()

### new_dataset: nyc_open_data_311_customer_satisfaction_survey

In [None]:
dc = DataContainer('nyc_open_data_311_customer_satisfaction_survey')

In [None]:
dc.get()

In [None]:
dc.get(where=f"answer_satisfaction='Neutral'")

In [None]:
dc.df.head()