# Castro 311 Data Load Notebook

This notebook fetches San Francisco 311 service request data from the Socrata API, filters within 1 km of the Castro Muni station, cleans and converts columns to appropriate types, and loads the result into PostGIS.

## 1. Setup and Imports
Import all required packages and load environment variables.

In [1]:
import os
import requests
import datetime
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from shapely.geometry import shape
from sqlalchemy import create_engine, DateTime, BigInteger, Text, text
from dotenv import load_dotenv

# Load .env from working directory
load_dotenv('.env')

# PostGIS connection vars
POSTGIS_USER = os.getenv('POSTGRES_USER')
POSTGIS_PASS = os.getenv('POSTGRES_PASSWORD')
POSTGIS_HOST = os.getenv('POSTGRES_HOST')
POSTGIS_PORT = os.getenv('POSTGRES_PORT')
POSTGIS_DB   = os.getenv('POSTGRES_DB')
engine = create_engine(
    f"postgresql+psycopg2://{POSTGIS_USER}:{POSTGIS_PASS}@{POSTGIS_HOST}:{POSTGIS_PORT}/{POSTGIS_DB}"
)

## 2. Fetch Raw 311 Data
Download all records from the JSON endpoint

In [2]:
# endpoint + pagination settings
URL = 'https://data.sfgov.org/resource/vw6y-z8j6.json'
LIMIT = 50000

# compute January 1 of this year in Socrata’s ISO format
current_year = datetime.datetime.utcnow().year
year_start = f"{current_year}-01-01T00:00:00"

offset = 0
all_records = []

print(f"Fetching records opened since {year_start}…")
while True:
    params = {
        '$limit':  LIMIT,
        '$offset': offset,
        # only this year’s
        '$where':  f"requested_datetime >= '{year_start}'"
    }
    resp = requests.get(URL, params=params)
    resp.raise_for_status()
    chunk = resp.json()
    if not chunk:
        break
    all_records.extend(chunk)
    offset += LIMIT
    print(f"  fetched {len(all_records)} records so far…")

print(f"Done — total records fetched: {len(all_records)}")

Fetching records opened since 2025-01-01T00:00:00…
  fetched 50000 records so far…
  fetched 100000 records so far…
  fetched 150000 records so far…
  fetched 200000 records so far…
  fetched 250000 records so far…
  fetched 300000 records so far…
  fetched 320790 records so far…
Done — total records fetched: 320790


## 3. Create GeoDataFrame

In [3]:
df = pd.DataFrame(all_records)
df.head()

Unnamed: 0,service_request_id,requested_datetime,closed_date,updated_datetime,status_description,status_notes,agency_responsible,service_name,service_subtype,service_details,...,analysis_neighborhood,police_district,lat,long,point,point_geom,source,data_as_of,data_loaded_at,media_url
0,101001243928,2025-01-01T00:00:36.000,2025-01-01T10:26:42.000,2025-01-01T10:26:51.000,Closed,Case Resolved,Recology - Overflowing,Street and Sidewalk Cleaning,garbage_and_debris,city_garbage_can_overflowing,...,Marina,NORTHERN,37.7989126,-122.4359842,"{'latitude': '37.7989126', 'longitude': '-122....","{'type': 'Point', 'coordinates': [-122.4359842...",Mobile,2025-01-11T06:00:00.000,2025-01-12T08:58:27.000,
1,101001243930,2025-01-01T00:01:08.000,2025-01-01T09:46:32.000,2025-01-01T09:46:42.000,Closed,Case Resolved,Recology - Overflowing,Street and Sidewalk Cleaning,garbage_and_debris,city_garbage_can_overflowing,...,Castro/Upper Market,MISSION,37.763931498092944,-122.43319536218104,"{'latitude': '37.763931498092944', 'longitude'...","{'type': 'Point', 'coordinates': [-122.4331953...",Mobile,2025-01-11T06:00:00.000,2025-01-12T08:58:27.000,
2,101001243932,2025-01-01T00:05:36.000,2025-01-01T10:26:46.000,2025-01-01T10:26:53.000,Closed,Case Resolved,Recology - Overflowing,Street and Sidewalk Cleaning,garbage_and_debris,city_garbage_can_overflowing,...,Financial District/South Beach,CENTRAL,37.79220657089487,-122.3975187832069,"{'latitude': '37.79220657089487', 'longitude':...","{'type': 'Point', 'coordinates': [-122.3975187...",Mobile,2025-01-11T06:00:00.000,2025-01-12T08:58:27.000,
3,101001243933,2025-01-01T00:08:03.000,2025-01-01T08:51:16.000,2025-01-01T08:51:23.000,Closed,Case Resolved,Recology - Overflowing,Street and Sidewalk Cleaning,garbage_and_debris,city_garbage_can_overflowing,...,Outer Richmond,RICHMOND,37.780269873363686,-122.48354790743672,"{'latitude': '37.780269873363686', 'longitude'...","{'type': 'Point', 'coordinates': [-122.4835479...",Mobile,2025-01-11T06:00:00.000,2025-01-12T08:58:27.000,
4,101001243935,2025-01-01T00:08:49.000,2025-01-01T10:26:50.000,2025-01-01T10:26:56.000,Closed,Case Resolved,Recology - Overflowing,Street and Sidewalk Cleaning,garbage_and_debris,city_garbage_can_overflowing,...,Russian Hill,CENTRAL,37.80206323892925,-122.41975693040368,"{'latitude': '37.80206323892925', 'longitude':...","{'type': 'Point', 'coordinates': [-122.4197569...",Mobile,2025-01-11T06:00:00.000,2025-01-12T08:58:27.000,


In [4]:
# Convert 'point_geom' dicts to Shapely geometries
df['geometry'] = df['point_geom'].apply(lambda geom: shape(geom) if isinstance(geom, dict) else None)

# Build GeoDataFrame using that geometry column
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs='EPSG:4326')

print('GeoDataFrame created with', len(gdf), 'features')

GeoDataFrame created with 320790 features


## 4. Castro Buffer
1. Buffer 1 km around the Castro Muni stop and keep only those within the circle

In [5]:
# Project GeoDataFrame to Web Mercator (EPSG:3857) for accurate buffering in meters
gdf = gdf.to_crs('EPSG:3857')

# Define the Castro Muni stop as a point and buffer it by 1000 meters
castro_point = gpd.GeoSeries([Point(-122.435054, 37.762192)], crs='EPSG:4326').to_crs('EPSG:3857')
buffer = castro_point.buffer(1000).iloc[0]

# Filter features that fall within the buffer
gdf = gdf[gdf.geometry.within(buffer)]

print('Filtered to', len(gdf), 'features within 1 km of Castro')

Filtered to 11258 features within 1 km of Castro


## 5. Clean Columns and Types

In [6]:
# Columns to keep after renaming
keep = [
    'case_id',           # formerly service_request_id
    'opened_ts',         # parsed earlier from 'requested_datetime'
    'closed_ts',         # formerly closed_date
    'status',            # from status_description
    'status_notes',
    'request_type',      # from service_name
    'category',          # from service_subtype
    'address',
    'agency',            # from agency_responsible
    'geometry'           # spatial point
]

# Rename columns before filtering
gdf = gdf.rename(columns={
    'service_request_id': 'case_id',
    'requested_datetime': 'opened_ts',
    'closed_date': 'closed_ts',
    'status_description': 'status',
    'service_name': 'request_type',
    'service_subtype': 'category',
    'agency_responsible': 'agency'
})

# Convert datetime fields
gdf['opened_ts'] = pd.to_datetime(gdf['opened_ts'], errors='coerce')
gdf['closed_ts'] = pd.to_datetime(gdf['closed_ts'], errors='coerce')

# Drop rows with missing geometry or ID
gdf = gdf.dropna(subset=['geometry', 'case_id'])

# Keep only the desired cleaned columns
gdf = gdf[keep]

print('Cleaned and retained columns:', gdf.columns.tolist())

Cleaned and retained columns: ['case_id', 'opened_ts', 'closed_ts', 'status', 'status_notes', 'request_type', 'category', 'address', 'agency', 'geometry']


## 6. Load into PostGIS

In [7]:
# Define explicit data types for PostGIS schema
dtype_map = {
    'case_id':       BigInteger,
    'opened_ts':     DateTime,
    'closed_ts':     DateTime,
    'status':        Text,
    'status_notes':  Text,
    'request_type':  Text,
    'category':      Text,
    'address':       Text,
    'agency':        Text
}

with engine.begin() as conn:
    # Ensure table exists
    gdf.head(0).to_postgis(
        'castro_311', conn, if_exists='append', index=False, dtype=dtype_map
    )

    # Add unique constraint if not present
    conn.execute(text("""
        DO $$
        BEGIN
            IF NOT EXISTS (
                SELECT 1 FROM pg_constraint
                WHERE conname = 'castro_311_case_id_key'
            ) THEN
                ALTER TABLE castro_311 ADD CONSTRAINT castro_311_case_id_key UNIQUE (case_id);
            END IF;
        END$$;
    """))

    # Write to temp table
    gdf.to_postgis('_castro_311_temp', conn, if_exists='replace', index=False)

    # Perform upsert and track affected row count
    result = conn.execute(text("""
        INSERT INTO castro_311 (case_id, opened_ts, closed_ts, status, request_type, category, address, geometry)
        SELECT
            case_id::BIGINT,
            opened_ts,
            closed_ts,
            status,
            request_type,
            category,
            address,
            geometry
        FROM _castro_311_temp
        ON CONFLICT (case_id)
        DO UPDATE SET
            opened_ts = EXCLUDED.opened_ts,
            closed_ts = EXCLUDED.closed_ts,
            status = EXCLUDED.status,
            request_type = EXCLUDED.request_type,
            category = EXCLUDED.category,
            address = EXCLUDED.address,
            geometry = EXCLUDED.geometry;
    """))

    print(f"✅ Upserted {result.rowcount} records")

    # Drop temp
    conn.execute(text("DROP TABLE _castro_311_temp;"))

✅ Upserted 11258 records
