In [1]:
%pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:

from trino import dbapi
from trino.auth import OAuth2Authentication

conn = dbapi.connect(
    host="smart-cities-trino.pre-prod.cloud.vtti.vt.edu",
    port=443,
    http_scheme="https",
    auth=OAuth2Authentication(),   # <-- this is the right one
    catalog="smartcities_iceberg",  # optional default
    # schema="...",                # optional default
)

In [3]:

cur = conn.cursor()
cur.execute("SHOW SCHEMAS")
print(cur.fetchall())

Open the following URL in browser for the external authentication:
https://smart-cities-trino.pre-prod.cloud.vtti.vt.edu/oauth2/token/initiate/2f700dfab49b71babd3aca4004865125447eddff533becdcbfca2e3bcc90c7c0


TrinoAuthError: Exceeded max attempts while getting the token

In [None]:
cur.execute("SHOW CATALOGS")
print(cur.fetchall())

[['smartcities_iceberg'], ['system']]


In [None]:
cur.execute("SHOW SCHEMAS FROM smartcities_iceberg")
print(cur.fetchall())

[['alexandria'], ['cci'], ['falls-church'], ['information_schema'], ['smart_cities_test'], ['system'], ['tables'], ['vtti']]


In [None]:
cur.execute("""
SELECT table_schema, table_name
FROM smartcities_iceberg.information_schema.tables
ORDER BY table_schema, table_name
LIMIT 20
""")
print(cur.fetchall())

[['alexandria', 'bsm'], ['alexandria', 'psm'], ['alexandria', 'safety-event'], ['alexandria', 'speed-distribution'], ['alexandria', 'vehicle-count'], ['alexandria', 'vru-count'], ['cci', 'bsm'], ['falls-church', 'hiresdata'], ['falls-church', 'maple_washington'], ['falls-church', 'mediantraveltimes'], ['falls-church', 'old_hiresdata'], ['falls-church', 'old_mediantraveltimes'], ['falls-church', 'old_priority_requests'], ['falls-church', 'old_safety_conflicts'], ['falls-church', 'old_safety_pedcompliance'], ['falls-church', 'old_safety_redlightrunners'], ['falls-church', 'old_safety_simpledelay'], ['falls-church', 'old_tmc'], ['falls-church', 'old_tmc_crosswalk'], ['falls-church', 'old_tmc_lanes']]


## Best Practices for Querying

### 1. Always Verify Column Names First

**CRITICAL:** Column names may not be what you expect! Run the schema verification cells in this notebook to see actual field names.

```python
# See actual columns in a table
cur.execute("SELECT * FROM smartcities_iceberg.alexandria.bsm LIMIT 1")
actual_columns = [desc[0] for desc in cur.description]
print(actual_columns)
```

**Common Issues:**

- ❌ `vehicle_id` may not exist
- ❌ `latitude`, `longitude` → Use `lat`, `lon` instead
- ✅ Always check the schema verification output first!

### 2. Use Cursor Method (Not pd.read_sql)

To avoid pandas SQLAlchemy warnings, use the cursor method:

```python
# Good: Use cursor method
cur.execute("SELECT * FROM table LIMIT 5")
columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df = pd.DataFrame(results, columns=columns)

# Avoid: pd.read_sql() triggers warning
# df = pd.read_sql("SELECT * FROM table", conn)
```

### 3. Start with SELECT \*

When exploring or unsure about column names, use `SELECT *` to get all columns:

```sql
SELECT
    from_unixtime(publish_timestamp / 1000000) as timestamp,
    *
FROM smartcities_iceberg.alexandria.bsm
LIMIT 10
```

Then filter columns in pandas:

```python
df_filtered = df[['timestamp', 'lat', 'lon', 'speed']]
```

### 4. Handle publish_timestamp Correctly

`publish_timestamp` is a **bigint** (microseconds since Unix epoch), NOT a timestamp type.

**Convert to readable timestamp:**

```sql
from_unixtime(publish_timestamp / 1000000) as timestamp
```

**Filter by time:**

```sql
WHERE publish_timestamp >= to_unixtime(current_timestamp - interval '24' hour) * 100000000
```

**Use in aggregations:**

```sql
date_trunc('hour', from_unixtime(publish_timestamp / 1000000))
```


In [None]:
import pandas as pd

# Get all tables from all schemas
cur.execute("""
SELECT table_schema, table_name
FROM smartcities_iceberg.information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'system')
ORDER BY table_schema, table_name
""")
all_tables = cur.fetchall()

tables_df = pd.DataFrame(all_tables, columns=['Schema', 'Table'])
print(f"Total tables found: {len(tables_df)}\n")
print(tables_df.to_string(index=False))

Total tables found: 52

           Schema                      Table
       alexandria                        bsm
       alexandria                        psm
       alexandria               safety-event
       alexandria         speed-distribution
       alexandria              vehicle-count
       alexandria                  vru-count
              cci                        bsm
     falls-church                  hiresdata
     falls-church           maple_washington
     falls-church          mediantraveltimes
     falls-church              old_hiresdata
     falls-church      old_mediantraveltimes
     falls-church      old_priority_requests
     falls-church       old_safety_conflicts
     falls-church   old_safety_pedcompliance
     falls-church old_safety_redlightrunners
     falls-church     old_safety_simpledelay
     falls-church                    old_tmc
     falls-church          old_tmc_crosswalk
     falls-church              old_tmc_lanes
     falls-church           saf

In [None]:
# Function to examine table structure
def describe_table(schema, table):
    cur.execute(f"""
    SELECT column_name, data_type
    FROM smartcities_iceberg.information_schema.columns
    WHERE table_schema = '{schema}' AND table_name = '{table}'
    ORDER BY ordinal_position
    """)
    columns = cur.fetchall()
    return pd.DataFrame(columns, columns=['Column', 'Data Type'])


# Let's examine some key tables from each schema
print("=" * 80)
print("ALEXANDRIA SCHEMA - BSM Table")
print("=" * 80)
print(describe_table('alexandria', 'bsm'))

ALEXANDRIA SCHEMA - BSM Table
Empty DataFrame
Columns: [Column, Data Type]
Index: []


In [None]:
print("\n" + "=" * 80)
print("ALEXANDRIA SCHEMA - PSM Table (Pedestrian Safety Message)")
print("=" * 80)
print(describe_table('alexandria', 'psm'))

print("\n" + "=" * 80)
print("ALEXANDRIA SCHEMA - Safety Event Table")
print("=" * 80)
print(describe_table('alexandria', 'safety-event'))

print("\n" + "=" * 80)
print("ALEXANDRIA SCHEMA - Vehicle Count Table")
print("=" * 80)
print(describe_table('alexandria', 'vehicle-count'))

print("\n" + "=" * 80)
print("FALLS CHURCH SCHEMA - HiRes Data Table")
print("=" * 80)
print(describe_table('falls-church', 'hiresdata'))


ALEXANDRIA SCHEMA - PSM Table (Pedestrian Safety Message)
                  Column  Data Type
0                   city    varchar
1           intersection    varchar
2                  table    varchar
3      publish_timestamp     bigint
4          location_name    varchar
5            source_type    varchar
6            vendor_name    varchar
7         vendor_version    varchar
8                   misc    varchar
9                msg_cnt    integer
10            basic_type    integer
11                    id     bigint
12              sec_mark    integer
13                   lat     double
14                   lon     double
15                  elev       real
16   accuracy_semi_major       real
17   accuracy_semi_minor       real
18  accuracy_orientation       real
19                 speed       real
20               heading       real
21             accel_lon       real
22             accel_lat       real
23            accel_vert       real
24             accel_yaw       real
25  e

In [None]:
# Store schemas in variables for later use to ensure correct field names
print("=" * 80)
print("STORING SCHEMAS FOR KEY TABLES")
print("=" * 80)

# Get and store BSM schema
bsm_schema = describe_table('alexandria', 'bsm')
print("\nAlexandria BSM Schema:")
print(bsm_schema.to_string(index=False))
bsm_columns = bsm_schema['Column'].tolist()

# Get and store PSM schema
psm_schema = describe_table('alexandria', 'psm')
print("\n\nAlexandria PSM Schema:")
print(psm_schema.to_string(index=False))
psm_columns = psm_schema['Column'].tolist()

# Get and store Safety Event schema
safety_event_schema = describe_table('alexandria', 'safety-event')
print("\n\nAlexandria Safety Event Schema:")
print(safety_event_schema.to_string(index=False))
safety_event_columns = safety_event_schema['Column'].tolist()

# Get and store Vehicle Count schema
vehicle_count_schema = describe_table('alexandria', 'vehicle-count')
print("\n\nAlexandria Vehicle Count Schema:")
print(vehicle_count_schema.to_string(index=False))
vehicle_count_columns = vehicle_count_schema['Column'].tolist()

# Get and store Falls Church HiRes Data schema
hiresdata_schema = describe_table('falls-church', 'hiresdata')
print("\n\nFalls Church HiRes Data Schema:")
print(hiresdata_schema.to_string(index=False))
hiresdata_columns = hiresdata_schema['Column'].tolist()

print("\n" + "=" * 80)
print("Schemas stored! Use these column lists to ensure correct field names in queries.")
print("=" * 80)

STORING SCHEMAS FOR KEY TABLES

Alexandria BSM Schema:
Empty DataFrame
Columns: [Column, Data Type]
Index: []


Alexandria PSM Schema:
              Column Data Type
                city   varchar
        intersection   varchar
               table   varchar
   publish_timestamp    bigint
       location_name   varchar
         source_type   varchar
         vendor_name   varchar
      vendor_version   varchar
                misc   varchar
             msg_cnt   integer
          basic_type   integer
                  id    bigint
            sec_mark   integer
                 lat    double
                 lon    double
                elev      real
 accuracy_semi_major      real
 accuracy_semi_minor      real
accuracy_orientation      real
               speed      real
             heading      real
           accel_lon      real
           accel_lat      real
          accel_vert      real
           accel_yaw      real
event_responder_type   integer
       activity_type   integ

In [None]:
# Helper function to check if columns exist in a table
def check_columns(columns_to_check, available_columns, table_name):
    """
    Verify that requested columns exist in the table schema

    Args:
        columns_to_check: List of column names you want to use
        available_columns: List of actual columns from the schema
        table_name: Name of the table (for error messages)

    Returns:
        tuple: (all_valid: bool, missing_columns: list)
    """
    missing = [col for col in columns_to_check if col not in available_columns]

    if missing:
        print(f"WARNING: The following columns do not exist in {table_name}:")
        print(f"  Missing: {missing}")
        print(f"  Available columns: {available_columns}")
        return False, missing
    else:
        print(f"✓ All columns exist in {table_name}")
        return True, []


# Example: Verify BSM columns before using them
print("Example: Checking if columns exist in BSM table:")
columns_i_want = ['publish_timestamp',
                  'vehicle_id', 'lat', 'lon', 'speed', 'heading']
check_columns(columns_i_want, bsm_columns, 'alexandria.bsm')

# Show common column name corrections
print("\n" + "=" * 80)
print("COMMON FIELD NAME CORRECTIONS:")
print("=" * 80)
print("BSM Table:")
print("  ✗ latitude  → ✓ lat")
print("  ✗ longitude → ✓ lon")
print("\nSafety Event Table:")
print("  ✗ latitude  → ✓ lat")
print("  ✗ longitude → ✓ lon")
print("=" * 80)

Example: Checking if columns exist in BSM table:
  Missing: ['publish_timestamp', 'vehicle_id', 'lat', 'lon', 'speed', 'heading']
  Available columns: []

COMMON FIELD NAME CORRECTIONS:
BSM Table:
  ✗ latitude  → ✓ lat
  ✗ longitude → ✓ lon

Safety Event Table:
  ✗ latitude  → ✓ lat
  ✗ longitude → ✓ lon


## Using Stored Schemas

The schemas for key tables are now stored in variables:

- `bsm_columns` - List of columns in alexandria.bsm
- `psm_columns` - List of columns in alexandria.psm
- `safety_event_columns` - List of columns in alexandria.safety-event
- `vehicle_count_columns` - List of columns in alexandria.vehicle-count
- `hiresdata_columns` - List of columns in falls-church.hiresdata

**Always verify column names before writing queries** to avoid errors!


In [None]:
# Example: Dynamically build a query using stored schema
def build_select_query(table_schema, table_name, columns_to_select, available_columns, limit=10):
    """
    Build a SELECT query dynamically after verifying columns exist

    Args:
        table_schema: Schema name (e.g., 'alexandria')
        table_name: Table name (e.g., 'bsm')
        columns_to_select: List of columns to select
        available_columns: List of available columns from schema
        limit: Number of rows to return

    Returns:
        str: SQL query string or None if columns are invalid
    """
    # Verify all columns exist
    is_valid, missing = check_columns(
        columns_to_select, available_columns, f"{table_schema}.{table_name}")

    if not is_valid:
        print(f"Cannot build query - missing columns: {missing}")
        return None

    # Build column list
    column_str = ", ".join(columns_to_select)

    # Build query
    query = f"""
    SELECT {column_str}
    FROM smartcities_iceberg.{table_schema}."{table_name}"
    LIMIT {limit}
    """

    return query


# Example: Build a safe BSM query
print("Building a query for BSM data:")
columns_needed = ['publish_timestamp', 'vehicle_id', 'lat', 'lon', 'speed']
query = build_select_query(
    'alexandria', 'bsm', columns_needed, bsm_columns, limit=5)

if query:
    print("\nGenerated Query:")
    print(query)

Building a query for BSM data:
  Missing: ['publish_timestamp', 'vehicle_id', 'lat', 'lon', 'speed']
  Available columns: []
Cannot build query - missing columns: ['publish_timestamp', 'vehicle_id', 'lat', 'lon', 'speed']


In [None]:
# Let's see what the ACTUAL column names are by querying sample data
print("=" * 80)
print("VERIFYING ACTUAL COLUMN NAMES IN BSM TABLE")
print("=" * 80)

# Get a sample row to see actual column names
cur.execute("""
SELECT *
FROM smartcities_iceberg.alexandria.bsm
LIMIT 1
""")

# Get actual column names from the query result
actual_bsm_columns = [desc[0] for desc in cur.description]
print(f"\nActual columns in BSM table ({len(actual_bsm_columns)} total):")
for i, col in enumerate(actual_bsm_columns, 1):
    print(f"  {i:2d}. {col}")

# Store for comparison
print(f"\nColumns stored from schema query: {len(bsm_columns)} total")
if set(actual_bsm_columns) != set(bsm_columns):
    print("WARNING: Mismatch between schema query and actual columns!")
else:
    print("✓ Schema matches actual columns")

VERIFYING ACTUAL COLUMN NAMES IN BSM TABLE


TrinoExternalError: TrinoExternalError(type=EXTERNAL, name=ICEBERG_CATALOG_ERROR, message="Failed to load table: bsm in alexandria namespace", query_id=20251112_025236_00037_y6d6c)

In [None]:
# Check Safety Event columns too
print("\n" + "=" * 80)
print("VERIFYING ACTUAL COLUMN NAMES IN SAFETY-EVENT TABLE")
print("=" * 80)

cur.execute("""
SELECT *
FROM smartcities_iceberg.alexandria."safety-event"
LIMIT 1
""")

actual_safety_columns = [desc[0] for desc in cur.description]
print(
    f"\nActual columns in Safety-Event table ({len(actual_safety_columns)} total):")
for i, col in enumerate(actual_safety_columns, 1):
    print(f"  {i:2d}. {col}")

print("\n" + "=" * 80)
print("STORE THESE VERIFIED COLUMNS FOR USE IN QUERIES")
print("=" * 80)
print(f"\nbsm_columns_verified = {actual_bsm_columns}")
print(f"\nsafety_event_columns_verified = {actual_safety_columns}")

# Update the stored columns with verified ones
bsm_columns = actual_bsm_columns
safety_event_columns = actual_safety_columns

## ⚠️ IMPORTANT: Workflow for Using This Notebook

**Before running the example queries below, you MUST:**

1. **Run ALL cells in order** from the beginning of the notebook
2. **Pay special attention to the schema verification cells** (above) which show you the ACTUAL column names
3. **Note the actual column names** displayed in the verification output
4. **Update queries** to use the correct column names based on what you see

### Common Issues:

- ❌ Assuming field names like `vehicle_id`, `latitude`, `longitude`
- ✅ Use the ACTUAL field names shown in the verification cells above
- ❌ Using `timestamp` instead of `publish_timestamp` for filtering
- ✅ `publish_timestamp` is a **bigint** (microseconds) - use conversion functions

### Safe Approach:

All example queries below now use `SELECT *` to get ALL columns. After seeing what's available, you can filter to specific columns in your DataFrame:

```python
# After running a query with SELECT *
df_filtered = df[['column1', 'column2', 'column3']]
```


In [None]:
# Function to check data availability and time range
def check_data_range(schema, table, timestamp_col='publish_timestamp'):
    try:
        query = f"""
        SELECT 
            COUNT(*) as record_count,
            from_unixtime(MIN({timestamp_col}) / 1000) as earliest_record,
            from_unixtime(MAX({timestamp_col}) / 1000) as latest_record
        FROM smartcities_iceberg.{schema}."{table}"
        """
        cur.execute(query)
        result = cur.fetchone()
        return {
            'Schema': schema,
            'Table': table,
            'Record Count': result[0] if result else 0,
            'Earliest': result[1] if result else None,
            'Latest': result[2] if result else None
        }
    except Exception as e:
        return {
            'Schema': schema,
            'Table': table,
            'Error': str(e)
        }


# Check data ranges for key tables
print("=" * 80)
print("DATA AVAILABILITY AND TIME RANGES")
print("=" * 80)

ranges = []
tables_to_check = [
    ('alexandria', 'bsm'),
    ('alexandria', 'psm'),
    ('alexandria', 'safety-event'),
    ('alexandria', 'vehicle-count'),
    ('falls-church', 'hiresdata'),
]

for schema, table in tables_to_check:
    print(f"\nChecking {schema}.{table}...")
    range_info = check_data_range(schema, table)
    ranges.append(range_info)
    if 'Error' not in range_info:
        print(f"  Records: {range_info['Record Count']:,}")
        print(
            f"  Time range: {range_info['Earliest']} to {range_info['Latest']}")
    else:
        print(f"  Error: {range_info['Error']}")

In [None]:
# Get sample data from BSM table (using cursor to avoid pandas warning)
print("=" * 80)
print("SAMPLE DATA: Alexandria BSM (Basic Safety Message)")
print("=" * 80)

cur.execute("""
SELECT *
FROM smartcities_iceberg.alexandria.bsm
LIMIT 5
""")

columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df_bsm = pd.DataFrame(results, columns=columns)
print(df_bsm)

In [None]:
# Get sample data from Safety Event table (using cursor to avoid pandas warning)
print("\n" + "=" * 80)
print("SAMPLE DATA: Alexandria Safety Events")
print("=" * 80)

cur.execute("""
SELECT *
FROM smartcities_iceberg.alexandria."safety-event"
LIMIT 5
""")

columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df_safety = pd.DataFrame(results, columns=columns)
print(df_safety)

In [None]:
# Get sample data from Falls Church median travel times (using cursor to avoid pandas warning)
print("\n" + "=" * 80)
print("SAMPLE DATA: Falls Church - Median Travel Times")
print("=" * 80)

cur.execute("""
SELECT *
FROM smartcities_iceberg."falls-church".mediantraveltimes
LIMIT 5
""")

columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df_travel = pd.DataFrame(results, columns=columns)
print(df_travel)

## API Summary

This smart-cities API provides access to connected vehicle and traffic management data from multiple cities:

### Available Data Types:

1. **BSM (Basic Safety Message)**: Real-time vehicle position, speed, heading from connected vehicles
2. **PSM (Pedestrian Safety Message)**: Pedestrian detection and safety data
3. **Safety Events**: Detected safety conflicts and incidents
4. **Vehicle/VRU Counts**: Traffic volume data
5. **Speed Distribution**: Speed profiles across locations
6. **High-Resolution Traffic Data**: Detailed signal performance metrics
7. **Travel Times**: Corridor travel time measurements

### Geographic Coverage:

- Alexandria, VA
- Falls Church, VA
- CCI (Center for Connected Infrastructure)
- VTTI (Virginia Tech Transportation Institute)

### Data Collection Approaches:

Run the cells below to see examples of collecting data over time periods.


## Example 1: Collecting BSM Data Over a Time Period

This example shows how to collect vehicle trajectory data (BSM) for a specific time range:


In [None]:
# Collect BSM data for the last 24 hours
# Using SELECT * to get all columns - customize after verifying column names above
# Note: publish_timestamp is a bigint (microseconds since epoch)

cur.execute("""
SELECT 
    from_unixtime(publish_timestamp / 1000000) as timestamp,
    *
FROM smartcities_iceberg.alexandria.bsm
WHERE publish_timestamp >= to_unixtime(current_timestamp - interval '24' hour) * 1000000
ORDER BY publish_timestamp DESC
LIMIT 1000
""")

columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df_bsm_daily = pd.DataFrame(results, columns=columns)

print(f"Collected {len(df_bsm_daily)} BSM records from the last 24 hours")
print(f"\nColumns available: {list(df_bsm_daily.columns)}")
print(f"\nFirst few rows:")
print(df_bsm_daily.head())

# After seeing the columns, you can select specific ones like this:
# columns_i_want = ['timestamp', 'publish_timestamp', 'lat', 'lon', 'speed', 'heading']
# df_bsm_daily_filtered = df_bsm_daily[columns_i_want]

## Example 2: Collecting Safety Events Over a Date Range

This example shows how to collect safety event data between specific dates:


In [None]:
# Collect safety events for a specific date range
# Using SELECT * to get all columns - customize after verifying column names above
# Note: publish_timestamp is a bigint (microseconds since epoch)

cur.execute("""
SELECT 
    from_unixtime(publish_timestamp / 1000000) as timestamp,
    *
FROM smartcities_iceberg.alexandria."safety-event"
WHERE publish_timestamp BETWEEN 
    to_unixtime(timestamp '2024-01-01 00:00:00') * 1000000 AND 
    to_unixtime(timestamp '2024-12-31 23:59:59') * 1000000
ORDER BY publish_timestamp DESC
""")

columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df_safety_events = pd.DataFrame(results, columns=columns)

print(f"Collected {len(df_safety_events)} safety events")
print(f"\nColumns available: {list(df_safety_events.columns)}")

if len(df_safety_events) > 0:
    # Check if 'event_type' column exists before using it
    if 'event_type' in df_safety_events.columns:
        print("\nEvent type breakdown:")
        print(df_safety_events['event_type'].value_counts())
    print(f"\nFirst few rows:")
    print(df_safety_events.head())

## Example 3: Aggregated Traffic Data by Hour

This example shows how to aggregate traffic metrics over time periods:


In [None]:
# Aggregate vehicle counts by hour for the last 7 days
# Note: publish_timestamp is a bigint (microseconds since epoch)
# Note: Adjust aggregation fields based on actual columns available

cur.execute("""
SELECT 
    date_trunc('hour', from_unixtime(publish_timestamp / 1000000)) as hour,
    COUNT(*) as vehicle_count,
    AVG(speed) as avg_speed,
    MAX(speed) as max_speed,
    MIN(speed) as min_speed
FROM smartcities_iceberg.alexandria.bsm
WHERE publish_timestamp >= to_unixtime(current_timestamp - interval '7' day) * 1000000
GROUP BY date_trunc('hour', from_unixtime(publish_timestamp / 1000000))
ORDER BY hour DESC
""")

columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df_hourly_traffic = pd.DataFrame(results, columns=columns)

print(f"Collected {len(df_hourly_traffic)} hourly aggregates")
print(f"\nColumns: {list(df_hourly_traffic.columns)}")
print(df_hourly_traffic.head(10))

# Plot if matplotlib is available
try:
    import matplotlib.pyplot as plt

    if len(df_hourly_traffic) > 0:
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

        ax1.plot(df_hourly_traffic['hour'], df_hourly_traffic['vehicle_count'])
        ax1.set_ylabel('Vehicle Count')
        ax1.set_title('Hourly Vehicle Counts - Last 7 Days')
        ax1.grid(True)

        ax2.plot(df_hourly_traffic['hour'], df_hourly_traffic['avg_speed'])
        ax2.set_ylabel('Average Speed')
        ax2.set_xlabel('Time')
        ax2.set_title('Average Speed by Hour')
        ax2.grid(True)

        plt.tight_layout()
        plt.show()
except ImportError:
    print("\nInstall matplotlib to visualize the data: %pip install matplotlib")

## Example 4: Batch Collection Over Multiple Days

This example shows how to collect data in batches over a longer time period:


In [None]:
from datetime import datetime, timedelta

# Function to collect data in daily batches (using cursor to avoid pandas warning)


def collect_data_batches(start_date, end_date, table_schema, table_name, columns_to_select='*'):
    """
    Collect data in daily batches to avoid memory issues with large datasets

    Args:
        start_date: Start date (string or datetime)
        end_date: End date (string or datetime)
        table_schema: Database schema name
        table_name: Table name
        columns_to_select: Columns to select (default '*' for all columns)

    Returns:
        List of dataframes, one per day
    """
    if isinstance(start_date, str):
        start_date = datetime.strptime(start_date, '%Y-%m-%d')
    if isinstance(end_date, str):
        end_date = datetime.strptime(end_date, '%Y-%m-%d')

    batches = []
    current_date = start_date

    while current_date <= end_date:
        next_date = current_date + timedelta(days=1)

        # Note: publish_timestamp is a bigint (microseconds since epoch)
        # Using SELECT * to get all columns, or specify columns_to_select
        if columns_to_select == '*':
            select_clause = "from_unixtime(publish_timestamp / 1000000) as timestamp, *"
        else:
            select_clause = f"from_unixtime(publish_timestamp / 1000000) as timestamp, {columns_to_select}"

        query = f"""
        SELECT {select_clause}
        FROM smartcities_iceberg.{table_schema}."{table_name}"
        WHERE publish_timestamp >= to_unixtime(timestamp '{current_date.strftime('%Y-%m-%d 00:00:00')}') * 100000000
          AND publish_timestamp < to_unixtime(timestamp '{next_date.strftime('%Y-%m-%d 00:00:00')}') * 100000000
        """

        print(f"Collecting data for {current_date.strftime('%Y-%m-%d')}...")

        # Use cursor instead of pd.read_sql to avoid pandas warning
        cur.execute(query)
        columns = [desc[0] for desc in cur.description]
        results = cur.fetchall()
        df_batch = pd.DataFrame(results, columns=columns)

        print(f"  Found {len(df_batch)} records")

        if len(df_batch) > 0:
            batches.append(df_batch)

        current_date = next_date

    return batches


# Example: Collect safety events for a week in daily batches
print("=" * 80)
print("Collecting safety events in daily batches...")
print("=" * 80)

# Note: Using SELECT * to get all columns - adjust dates based on actual data availability
batches = collect_data_batches(
    start_date='2024-01-01',
    end_date='2024-01-07',
    table_schema='alexandria',
    table_name='safety-event',
    columns_to_select='*'  # Get all columns
)

if batches:
    # Combine all batches
    df_all = pd.concat(batches, ignore_index=True)
    print(f"\nTotal records collected: {len(df_all)}")
    print(f"\nColumns available: {list(df_all.columns)}")
    print(
        f"\nDate range: {df_all['timestamp'].min()} to {df_all['timestamp'].max()}")
    print(f"\nFirst few rows:")
    print(df_all.head())
else:
    print("\nNo data found in this date range")

## Tips for Data Collection

### STEP 1: Always Verify Column Names First!

**Run the schema verification cells at the beginning of this notebook to see actual column names.**

Don't assume field names! Common mistakes:

- ❌ `vehicle_id` - may not exist
- ❌ `latitude`, `longitude` - actual names are `lat`, `lon`
- ✅ Check the verification output to see real column names

### STEP 2: Understand the Data Types

**publish_timestamp is a BIGINT (Unix epoch in milliseconds)**

Since `publish_timestamp` is stored as a bigint (microseconds since Unix epoch), you need to convert it:

**For display/conversion to readable timestamp:**

```sql
from_unixtime(publish_timestamp / 1000000) as timestamp
```

**For time-based filtering:**

```sql
WHERE publish_timestamp >= to_unixtime(current_timestamp - interval '24' hour) * 100000000
```

**For date range filtering:**

```sql
WHERE publish_timestamp BETWEEN
    to_unixtime(timestamp '2024-01-01 00:00:00') * 100000000 AND
    to_unixtime(timestamp '2024-12-31 23:59:59') * 1000000
```

**For aggregation:**

```sql
date_trunc('hour', from_unixtime(publish_timestamp / 1000000))
```

### STEP 3: Start with SELECT \*

When exploring a new table, always start with `SELECT *` to see all available columns:

```python
cur.execute("SELECT * FROM smartcities_iceberg.alexandria.bsm LIMIT 5")
columns = [desc[0] for desc in cur.description]
results = cur.fetchall()
df = pd.DataFrame(results, columns=columns)
print(df.columns.tolist())  # See what columns are actually available
```

Then filter in pandas after seeing the data:

```python
df_filtered = df[['column1', 'column2', 'column3']]
```

### Performance Tips:

1. **Verify column names first** - Run schema verification cells
2. **Start with SELECT \*** - See what's available before selecting specific columns
3. **Add LIMIT**: Always test queries with LIMIT first to check structure
4. **Use WHERE clauses**: Filter by publish_timestamp to reduce data volume
5. **Aggregate when possible**: Use GROUP BY and aggregation functions (COUNT, AVG, etc.)
6. **Batch large requests**: Use the batch collection function for multi-day/multi-week requests

### Common Time Intervals:

- Last hour: `to_unixtime(current_timestamp - interval '1' hour) * 1000000`
- Last 24 hours: `to_unixtime(current_timestamp - interval '24' hour) * 100000000`
- Last week: `to_unixtime(current_timestamp - interval '7' day) * 1000000`
- Last month: `to_unixtime(current_timestamp - interval '30' day) * 1000000`

### Key Tables for Traffic Safety Analysis:

- **alexandria.bsm**: Vehicle trajectories (position, speed, acceleration)
- **alexandria.safety-event**: Detected safety conflicts
- **alexandria.vehicle-count**: Traffic volumes
- **alexandria.speed-distribution**: Speed profiles
- **falls-church.hiresdata**: High-resolution signal data
- **falls-church.mediantraveltimes**: Travel time measurements


In [None]:
# Phase 1.2: Identify intersections with sufficient data overlap
print("=" * 80)
print("PHASE 1.2: INTERSECTION DATA OVERLAP ANALYSIS")
print("=" * 80)

# Find intersections that have data in all key tables
query_intersections = """
WITH bsm_intersections AS (
    SELECT DISTINCT intersection
    FROM alexandria.bsm
    WHERE publish_timestamp > 0 AND publish_timestamp < 9999999999999999
    LIMIT 100
),
psm_intersections AS (
    SELECT DISTINCT intersection  
    FROM alexandria.psm
    WHERE publish_timestamp > 0 AND publish_timestamp < 9999999999999999
    LIMIT 100
),
event_intersections AS (
    SELECT DISTINCT intersection
    FROM alexandria."safety-event"
    WHERE time_at_site > 0 AND time_at_site < 9999999999999999
    LIMIT 100
),
count_intersections AS (
    SELECT DISTINCT intersection
    FROM alexandria."vehicle-count"
    WHERE publish_timestamp > 0 AND publish_timestamp < 9999999999999999
    LIMIT 100
)
SELECT 
    b.intersection,
    CASE WHEN p.intersection IS NOT NULL THEN 'Y' ELSE 'N' END as has_psm,
    CASE WHEN e.intersection IS NOT NULL THEN 'Y' ELSE 'N' END as has_events,
    CASE WHEN c.intersection IS NOT NULL THEN 'Y' ELSE 'N' END as has_counts
FROM bsm_intersections b
LEFT JOIN psm_intersections p ON b.intersection = p.intersection
LEFT JOIN event_intersections e ON b.intersection = e.intersection
LEFT JOIN count_intersections c ON b.intersection = c.intersection
ORDER BY b.intersection
"""

try:
    cur.execute(query_intersections)
    columns = [desc[0] for desc in cur.description]
    results = cur.fetchall()
    intersection_coverage = pd.DataFrame(results, columns=columns)

    print(f"\nFound {len(intersection_coverage)} intersections with BSM data")
    print("\nData availability by intersection:")
    print(intersection_coverage.to_string(index=False))

    # Find intersections with complete data
    complete_data = intersection_coverage[
        (intersection_coverage['has_psm'] == 'Y') &
        (intersection_coverage['has_events'] == 'Y') &
        (intersection_coverage['has_counts'] == 'Y')
    ]

    print(f"\n{'='*80}")
    print(
        f"Intersections with COMPLETE data (BSM + PSM + Events + Counts): {len(complete_data)}")
    if len(complete_data) > 0:
        print("Recommended intersections for analysis:")
        print(complete_data['intersection'].tolist())
    else:
        print("⚠ WARNING: No intersections have complete data across all tables")
        print("Consider using intersections with partial data and handling missing values")

except Exception as e:
    print(f"Error analyzing intersection coverage: {e}")
    print("Proceeding with available data...")

In [None]:
# Phase 2.1: Collect severity-weighted safety events for baseline
from datetime import datetime, timedelta
print("=" * 80)
print("PHASE 2.1: SEVERITY-WEIGHTED BASELINE CONSTRUCTION")
print("=" * 80)

# Severity weights from checkpoint document
SEVERITY_WEIGHTS = {
    'FATAL': 10,
    'INJURY': 3,
    'PDO': 1,  # Property Damage Only
    'DEFAULT': 1  # For events without explicit severity
}

# Define function that will eventually go in FastAPI backend


def collect_baseline_events(intersection=None, start_date=None, end_date=None):
    """
    Collect and weight safety events for baseline risk calculation.
    This will become an API backend function.

    Args:
        intersection: Specific intersection or None for all
        start_date: Start of analysis period (datetime or timestamp)
        end_date: End of analysis period

    Returns:
        DataFrame with weighted events by intersection, TOD, DOW
    """

    # Build query with optional filters
    where_clauses = ["time_at_site > 0", "time_at_site < 9999999999999999"]

    if intersection:
        where_clauses.append(f"intersection = '{intersection}'")
    if start_date:
        # Convert to milliseconds timestamp if needed
        where_clauses.append(
            f"time_at_site >= {int(start_date.timestamp() * 1000000)}")
    if end_date:
        where_clauses.append(
            f"time_at_site <= {int(end_date.timestamp() * 1000000)}")

    where_clause = " AND ".join(where_clauses)

    query = f"""
    SELECT 
        intersection,
        event_type,
        event_id,
        from_unixtime(time_at_site / 1000000) as event_time,
        time_at_site,
        HOUR(from_unixtime(time_at_site / 1000000)) as hour_of_day,
        DAY_OF_WEEK(from_unixtime(time_at_site / 1000000)) as day_of_week,
        object1_class,
        object2_class,
        detection_area
    FROM alexandria."safety-event"
    WHERE {where_clause}
    ORDER BY intersection, time_at_site
    """

    try:
        cur.execute(query)
        columns = [desc[0] for desc in cur.description]
        results = cur.fetchall()
        df = pd.DataFrame(results, columns=columns)

        if len(df) > 0:
            # Apply severity weighting
            # Note: We'll need to map event types to severity levels
            # For now, use a simple heuristic
            def assign_severity_weight(row):
                event_type = str(row['event_type']).upper()

                # VRU-involved events get higher weight
                is_vru = ('PEDESTRIAN' in str(row['object1_class']).upper() or
                          'CYCLIST' in str(row['object1_class']).upper() or
                          'PEDESTRIAN' in str(row['object2_class']).upper() or
                          'CYCLIST' in str(row['object2_class']).upper())

                if 'FATAL' in event_type or 'DEATH' in event_type:
                    return SEVERITY_WEIGHTS['FATAL']
                elif 'INJURY' in event_type or is_vru:
                    return SEVERITY_WEIGHTS['INJURY']
                else:
                    return SEVERITY_WEIGHTS['PDO']

            df['severity_weight'] = df.apply(assign_severity_weight, axis=1)
            df['is_vru_involved'] = df.apply(
                lambda row: 1 if ('PEDESTRIAN' in str(row['object1_class']).upper() or
                                  'CYCLIST' in str(row['object1_class']).upper() or
                                  'PEDESTRIAN' in str(row['object2_class']).upper() or
                                  'CYCLIST' in str(row['object2_class']).upper()) else 0,
                axis=1
            )

        return df

    except Exception as e:
        print(f"Error collecting baseline events: {e}")
        return pd.DataFrame()


# Test the function - collect last 30 days of data

end_dt = datetime.now()
start_dt = end_dt - timedelta(days=30)

print(f"\nCollecting events from {start_dt.date()} to {end_dt.date()}...")
baseline_events = collect_baseline_events(start_date=start_dt, end_date=end_dt)

if len(baseline_events) > 0:
    print(f"\n✓ Collected {len(baseline_events)} events")
    print(f"\nEvent type distribution:")
    print(baseline_events['event_type'].value_counts())
    print(f"\nVRU-involved events: {baseline_events['is_vru_involved'].sum()}")
    print(f"\nSeverity weight distribution:")
    print(baseline_events['severity_weight'].value_counts().sort_index())
    print(f"\nSample records:")
    print(baseline_events[['intersection', 'event_time',
          'event_type', 'severity_weight', 'is_vru_involved']].head(10))
else:
    print("⚠ No events found in the specified date range")
    print("Try adjusting the date range or checking data availability")

In [None]:
# Phase 2.2: Build exposure metrics from vehicle and VRU count data
print("\n" + "=" * 80)
print("PHASE 2.2: EXPOSURE METRICS CONSTRUCTION")
print("=" * 80)


def collect_exposure_metrics(intersection=None, start_date=None, end_date=None):
    """
    Collect vehicle and VRU counts for exposure normalization.
    This will become an API backend function.

    Returns:
        Two DataFrames: vehicle_counts, vru_counts
    """

    # Build WHERE clause
    where_clauses = ["publish_timestamp > 0",
                     "publish_timestamp < 9999999999999999"]

    if intersection:
        where_clauses.append(f"intersection = '{intersection}'")
    if start_date:
        where_clauses.append(
            f"publish_timestamp >= {int(start_date.timestamp() * 1000000)}")
    if end_date:
        where_clauses.append(
            f"publish_timestamp <= {int(end_date.timestamp() * 1000000)}")

    where_clause = " AND ".join(where_clauses)

    # Collect vehicle counts
    vehicle_query = f"""
    SELECT 
        intersection,
        from_unixtime(publish_timestamp / 1000000) as time,
        publish_timestamp,
        approach,
        movement,
        class as vehicle_class,
        count as vehicle_count,
        HOUR(from_unixtime(publish_timestamp / 1000000)) as hour_of_day,
        DAY_OF_WEEK(from_unixtime(publish_timestamp / 1000000)) as day_of_week
    FROM alexandria."vehicle-count"
    WHERE {where_clause}
    ORDER BY intersection, publish_timestamp
    """

    # Collect VRU counts
    vru_query = f"""
    SELECT 
        intersection,
        from_unixtime(publish_timestamp / 1000000) as time,
        publish_timestamp,
        approach,
        count as vru_count,
        HOUR(from_unixtime(publish_timestamp / 1000000)) as hour_of_day,
        DAY_OF_WEEK(from_unixtime(publish_timestamp / 1000000)) as day_of_week
    FROM alexandria."vru-count"
    WHERE {where_clause}
    ORDER BY intersection, publish_timestamp
    """

    try:
        # Get vehicle counts
        cur.execute(vehicle_query)
        columns = [desc[0] for desc in cur.description]
        results = cur.fetchall()
        vehicle_df = pd.DataFrame(results, columns=columns)

        # Get VRU counts
        cur.execute(vru_query)
        columns = [desc[0] for desc in cur.description]
        results = cur.fetchall()
        vru_df = pd.DataFrame(results, columns=columns)

        return vehicle_df, vru_df

    except Exception as e:
        print(f"Error collecting exposure metrics: {e}")
        return pd.DataFrame(), pd.DataFrame()


# Test the function
print(
    f"\nCollecting exposure metrics from {start_dt.date()} to {end_dt.date()}...")
vehicle_counts, vru_counts = collect_exposure_metrics(
    start_date=start_dt, end_date=end_dt)

if len(vehicle_counts) > 0:
    print(f"\n✓ Vehicle Counts: {len(vehicle_counts)} records")
    print(f"  Intersections: {vehicle_counts['intersection'].nunique()}")
    print(f"  Total vehicles: {vehicle_counts['vehicle_count'].sum():,}")
    print(f"\n  Sample vehicle counts:")
    print(vehicle_counts[['intersection', 'time',
          'approach', 'movement', 'vehicle_count']].head(5))
else:
    print("\n⚠ No vehicle count data found")

if len(vru_counts) > 0:
    print(f"\n✓ VRU Counts: {len(vru_counts)} records")
    print(f"  Intersections: {vru_counts['intersection'].nunique()}")
    print(f"  Total VRUs: {vru_counts['vru_count'].sum():,}")
    print(f"\n  Sample VRU counts:")
    print(vru_counts[['intersection', 'time',
          'approach', 'vru_count']].head(5))
else:
    print("\n⚠ No VRU count data found")

# Calculate normalization constants (V_max, N_VRU_max) for index formulas
if len(vehicle_counts) > 0 and len(vru_counts) > 0:
    print("\n" + "=" * 80)
    print("NORMALIZATION CONSTANTS FOR INDEX COMPUTATION")
    print("=" * 80)

    # Aggregate to 15-minute intervals to match index granularity
    vehicle_counts['time_15min'] = pd.to_datetime(
        vehicle_counts['time'], utc=True).dt.floor('15min')
    vru_counts['time_15min'] = pd.to_datetime(
        vru_counts['time'], utc=True).dt.floor('15min')

    vehicle_15min = vehicle_counts.groupby(['intersection', 'time_15min'])[
        'vehicle_count'].sum().reset_index()
    vru_15min = vru_counts.groupby(['intersection', 'time_15min'])[
        'vru_count'].sum().reset_index()

    V_max = vehicle_15min['vehicle_count'].max()
    N_VRU_max = vru_15min['vru_count'].max()

    print(f"\nV_max (max vehicle volume per 15-min): {V_max}")
    print(f"N_VRU_max (max VRU count per 15-min): {N_VRU_max}")
    print(f"\nThese constants will be used to normalize the Safety Index formulas.")

    # Store for later use
    NORMALIZATION_CONSTANTS = {
        'V_max': V_max,
        'N_VRU_max': N_VRU_max
    }

In [None]:
# Phase 3.1: BSM-derived vehicle features (15-minute aggregates)
import numpy as np
print("\n" + "=" * 80)
print("PHASE 3.1: BSM-DERIVED VEHICLE FEATURES")
print("=" * 80)


def collect_bsm_features(intersection=None, start_date=None, end_date=None):
    """
    Extract vehicle behavior features from BSM data aggregated to 15-minute intervals.

    Features computed:
    - Vehicle count (exposure metric V)
    - Average speed (S)
    - Speed variance (σ_S) 
    - Hard braking frequency
    - Heading change rate (proxy for weaving/erratic behavior)

    Args:
        intersection: Specific intersection or None for all
        start_date: Start of analysis period
        end_date: End of analysis period

    Returns:
        DataFrame with 15-minute aggregated features
    """

    # Build WHERE clause
    where_clauses = ["publish_timestamp > 0",
                     "publish_timestamp < 9999999999999999"]

    if intersection:
        where_clauses.append(f"intersection = '{intersection}'")
    if start_date:
        where_clauses.append(
            f"publish_timestamp >= {int(start_date.timestamp() * 1000000)}")
    if end_date:
        where_clauses.append(
            f"publish_timestamp <= {int(end_date.timestamp() * 1000000)}")

    where_clause = " AND ".join(where_clauses)

    # Query BSM data with relevant fields
    # Note: Using verified column names (lat, lon, id, speed, heading, brake_applied_status)
    query = f"""
    SELECT 
        intersection,
        from_unixtime(publish_timestamp / 1000000) as time,
        publish_timestamp,
        id as vehicle_id,
        lat,
        lon,
        speed,
        heading,
        brake_applied_status,
        accel_lon,
        accel_lat
    FROM alexandria.bsm
    WHERE {where_clause}
    ORDER BY intersection, publish_timestamp
    """

    try:
        cur.execute(query)
        columns = [desc[0] for desc in cur.description]
        results = cur.fetchall()
        df = pd.DataFrame(results, columns=columns)

        if len(df) == 0:
            print("⚠ No BSM data found for specified criteria")
            return pd.DataFrame()

        print(f"✓ Retrieved {len(df):,} BSM records")

        # Convert time to datetime and create 15-minute bins
        df['time'] = pd.to_datetime(df['time'], utc=True)
        df['time_15min'] = df['time'].dt.floor('15min')

        # Identify hard braking events (brake_applied_status & 0x04 for hard braking)
        # Brake status is a bitmask: bit 2 (0x04) indicates hard braking
        df['hard_braking'] = df['brake_applied_status'].apply(
            lambda x: 1 if pd.notna(x) and (int(x) & 0x04) else 0
        )

        # Calculate features by 15-minute interval and intersection
        print("Computing aggregated features...")

        # Group by intersection and 15-minute interval
        grouped = df.groupby(['intersection', 'time_15min'])

        features = grouped.agg({
            'vehicle_id': 'nunique',  # Unique vehicle count
            'speed': ['mean', 'std'],  # Average speed and speed variance
            'hard_braking': 'sum',  # Count of hard braking events
            # Heading variance (weaving proxy)
            'heading': lambda x: calculate_heading_change_rate(x),
            'accel_lon': 'std',  # Longitudinal acceleration variance
            'accel_lat': 'std'  # Lateral acceleration variance
        }).reset_index()

        # Flatten column names
        features.columns = [
            'intersection',
            'time_15min',
            'vehicle_count',
            'avg_speed',
            'speed_variance',
            'hard_braking_count',
            'heading_change_rate',
            'accel_lon_variance',
            'accel_lat_variance'
        ]

        # Add time-of-day and day-of-week for contextual analysis
        features['hour_of_day'] = features['time_15min'].dt.hour
        features['day_of_week'] = features['time_15min'].dt.dayofweek

        print(f"✓ Generated {len(features)} 15-minute feature records")

        return features

    except Exception as e:
        print(f"Error collecting BSM features: {e}")
        import traceback
        traceback.print_exc()
        return pd.DataFrame()


def calculate_heading_change_rate(heading_series):
    """
    Calculate rate of heading changes as a proxy for weaving behavior.
    Returns standard deviation of heading changes.
    """
    if len(heading_series) < 2:
        return 0.0

    # Calculate absolute differences in heading
    # Note: Need to handle circular nature of heading (0-360 degrees)
    headings = heading_series.dropna().values
    if len(headings) < 2:
        return 0.0

    # Calculate angular differences
    diffs = []
    for i in range(1, len(headings)):
        diff = abs(headings[i] - headings[i-1])
        # Handle wrap-around (e.g., 359° to 1° is a 2° change, not 358°)
        if diff > 180:
            diff = 360 - diff
        diffs.append(diff)

    return np.std(diffs) if len(diffs) > 0 else 0.0


# Test the function

print(
    f"\nCollecting BSM features from {start_dt.date()} to {end_dt.date()}...")
bsm_features = collect_bsm_features(start_date=start_dt, end_date=end_dt)

if len(bsm_features) > 0:
    print(f"\n{'='*80}")
    print("BSM FEATURE SUMMARY")
    print("=" * 80)
    print(f"Total 15-minute intervals: {len(bsm_features)}")
    print(f"Intersections covered: {bsm_features['intersection'].nunique()}")
    print(
        f"Date range: {bsm_features['time_15min'].min()} to {bsm_features['time_15min'].max()}")

    print(f"\nFeature statistics:")
    print(bsm_features[['vehicle_count', 'avg_speed', 'speed_variance',
          'hard_braking_count', 'heading_change_rate']].describe())

    print(f"\nSample records:")
    print(bsm_features[['intersection', 'time_15min', 'vehicle_count', 'avg_speed',
          'speed_variance', 'hard_braking_count']].head(10).to_string(index=False))

    # Store normalization constants
    if 'NORMALIZATION_CONSTANTS' not in dir():
        NORMALIZATION_CONSTANTS = {}

    NORMALIZATION_CONSTANTS['speed_ref'] = bsm_features['avg_speed'].quantile(
        0.85)  # 85th percentile speed
    NORMALIZATION_CONSTANTS['speed_variance_max'] = bsm_features['speed_variance'].max(
    )

    print(f"\n{'='*80}")
    print("UPDATED NORMALIZATION CONSTANTS")
    print("=" * 80)
    print(
        f"S_ref (reference speed, 85th percentile): {NORMALIZATION_CONSTANTS['speed_ref']:.2f} m/s")
    print(
        f"σ_max (max speed variance): {NORMALIZATION_CONSTANTS['speed_variance_max']:.2f}")
else:
    print("⚠ No BSM features generated - check data availability")

In [None]:
# Phase 3.2: PSM-derived VRU features (15-minute aggregates)
print("\n" + "=" * 80)
print("PHASE 3.2: PSM-DERIVED VRU FEATURES")
print("=" * 80)


def collect_psm_features(intersection=None, start_date=None, end_date=None):
    """
    Extract VRU (Vulnerable Road User) behavior features from PSM data.

    Features computed:
    - VRU count (pedestrians, cyclists)
    - Average VRU speed
    - VRU activity type distribution
    - Emergency responder presence

    Args:
        intersection: Specific intersection or None for all
        start_date: Start of analysis period
        end_date: End of analysis period

    Returns:
        DataFrame with 15-minute aggregated VRU features
    """

    # Build WHERE clause
    where_clauses = ["publish_timestamp > 0",
                     "publish_timestamp < 9999999999999999"]

    if intersection:
        where_clauses.append(f"intersection = '{intersection}'")
    if start_date:
        where_clauses.append(
            f"publish_timestamp >= {int(start_date.timestamp() * 1000000)}")
    if end_date:
        where_clauses.append(
            f"publish_timestamp <= {int(end_date.timestamp() * 1000000)}")

    where_clause = " AND ".join(where_clauses)

    # Query PSM data with relevant fields
    query = f"""
    SELECT 
        intersection,
        from_unixtime(publish_timestamp / 1000000) as time,
        publish_timestamp,
        id as vru_id,
        lat,
        lon,
        speed,
        heading,
        basic_type,
        activity_type,
        activity_sub_type,
        event_responder_type
    FROM alexandria.psm
    WHERE {where_clause}
    ORDER BY intersection, publish_timestamp
    """

    try:
        cur.execute(query)
        columns = [desc[0] for desc in cur.description]
        results = cur.fetchall()
        df = pd.DataFrame(results, columns=columns)

        if len(df) == 0:
            print("⚠ No PSM data found for specified criteria")
            print("   Note: PSM data may be sparse due to limited VRU device adoption")
            return pd.DataFrame()

        print(f"✓ Retrieved {len(df):,} PSM records")

        # Convert time to datetime and create 15-minute bins
        df['time'] = pd.to_datetime(df['time'], utc=True)
        df['time_15min'] = df['time'].dt.floor('15min')

        # Classify VRU types based on basic_type field
        # PSM basic_type values (from J2735 standard):
        # 0 = unknown, 1 = pedestrian, 2 = cyclist, 3 = wheelchair, 4 = animal
        df['is_pedestrian'] = df['basic_type'].apply(
            lambda x: 1 if x == 1 else 0)
        df['is_cyclist'] = df['basic_type'].apply(lambda x: 1 if x == 2 else 0)
        df['is_emergency_responder'] = df['event_responder_type'].apply(
            lambda x: 1 if pd.notna(x) and x > 0 else 0
        )

        # Calculate features by 15-minute interval and intersection
        print("Computing aggregated VRU features...")

        grouped = df.groupby(['intersection', 'time_15min'])

        features = grouped.agg({
            'vru_id': 'nunique',  # Unique VRU count
            'speed': 'mean',  # Average VRU speed
            'is_pedestrian': 'sum',  # Pedestrian count
            'is_cyclist': 'sum',  # Cyclist count
            'is_emergency_responder': 'sum'  # Emergency responder count
        }).reset_index()

        # Flatten column names
        features.columns = [
            'intersection',
            'time_15min',
            'vru_count',
            'avg_vru_speed',
            'pedestrian_count',
            'cyclist_count',
            'emergency_responder_count'
        ]

        # Add time context
        features['hour_of_day'] = features['time_15min'].dt.hour
        features['day_of_week'] = features['time_15min'].dt.dayofweek

        print(f"✓ Generated {len(features)} 15-minute VRU feature records")

        return features

    except Exception as e:
        print(f"Error collecting PSM features: {e}")
        import traceback
        traceback.print_exc()
        return pd.DataFrame()


# Test the function
print(
    f"\nCollecting PSM features from {start_dt.date()} to {end_dt.date()}...")
psm_features = collect_psm_features(start_date=start_dt, end_date=end_dt)

if len(psm_features) > 0:
    print(f"\n{'='*80}")
    print("PSM FEATURE SUMMARY")
    print("=" * 80)
    print(f"Total 15-minute intervals: {len(psm_features)}")
    print(f"Intersections covered: {psm_features['intersection'].nunique()}")
    print(
        f"Date range: {psm_features['time_15min'].min()} to {psm_features['time_15min'].max()}")

    print(f"\nVRU statistics:")
    print(f"  Total VRU observations: {psm_features['vru_count'].sum():.0f}")
    print(f"  Pedestrians: {psm_features['pedestrian_count'].sum():.0f}")
    print(f"  Cyclists: {psm_features['cyclist_count'].sum():.0f}")
    print(
        f"  Emergency responders: {psm_features['emergency_responder_count'].sum():.0f}")

    print(f"\nFeature statistics:")
    print(psm_features[['vru_count', 'avg_vru_speed',
          'pedestrian_count', 'cyclist_count']].describe())

    print(f"\nSample records:")
    print(psm_features[['intersection', 'time_15min', 'vru_count',
          'pedestrian_count', 'cyclist_count']].head(10).to_string(index=False))
else:
    print("⚠ No PSM features generated")
    print("   PSM data may be limited or unavailable for this time period")
    print("   The Safety Index can still be computed with reduced VRU component accuracy")

In [None]:
# Phase 3.3: Aggregate safety events by 15-minute intervals
print("\n" + "=" * 80)
print("PHASE 3.3: SAFETY EVENT AGGREGATION")
print("=" * 80)


def aggregate_safety_events(intersection=None, start_date=None, end_date=None):
    """
    Aggregate safety events to 15-minute intervals with severity weighting.

    Features computed:
    - Total event count
    - VRU-involved event count (I_VRU)
    - Vehicle-only event count
    - Severity-weighted event score
    - Event type distribution

    Args:
        intersection: Specific intersection or None for all
        start_date: Start of analysis period
        end_date: End of analysis period

    Returns:
        DataFrame with 15-minute aggregated safety events
    """

    # Reuse the baseline_events collection function but aggregate differently
    events_df = collect_baseline_events(intersection, start_date, end_date)

    if len(events_df) == 0:
        print("⚠ No safety events to aggregate")
        return pd.DataFrame()

    print(f"✓ Retrieved {len(events_df)} safety events")
    print("Aggregating to 15-minute intervals...")

    # Convert event_time to datetime and create 15-minute bins
    events_df['event_time'] = pd.to_datetime(events_df['event_time'], utc=True)
    events_df['time_15min'] = events_df['event_time'].dt.floor('15min')

    # Group by intersection and 15-minute interval
    grouped = events_df.groupby(['intersection', 'time_15min'])

    aggregated = grouped.agg({
        'event_id': 'count',  # Total event count
        'is_vru_involved': 'sum',  # VRU-involved events (I_VRU)
        'severity_weight': 'sum',  # Severity-weighted score
        # Most common event type
        'event_type': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0]
    }).reset_index()

    # Rename columns
    aggregated.columns = [
        'intersection',
        'time_15min',
        'total_event_count',
        'vru_event_count',
        'severity_weighted_score',
        'dominant_event_type'
    ]

    # Calculate vehicle-only events
    aggregated['vehicle_event_count'] = aggregated['total_event_count'] - \
        aggregated['vru_event_count']

    # Add time context
    aggregated['hour_of_day'] = aggregated['time_15min'].dt.hour
    aggregated['day_of_week'] = aggregated['time_15min'].dt.dayofweek

    # Calculate I_VRU (VRU conflict intensity) per interval
    # This is the key metric for VRU Safety Index
    aggregated['I_VRU'] = aggregated['vru_event_count']

    print(f"✓ Generated {len(aggregated)} 15-minute event aggregates")

    return aggregated


# Test the function
print(
    f"\nAggregating safety events from {start_dt.date()} to {end_dt.date()}...")
aggregated_events = aggregate_safety_events(
    start_date=start_dt, end_date=end_dt)

if len(aggregated_events) > 0:
    print(f"\n{'='*80}")
    print("SAFETY EVENT AGGREGATION SUMMARY")
    print("=" * 80)
    print(f"Total 15-minute intervals with events: {len(aggregated_events)}")
    print(
        f"Intersections covered: {aggregated_events['intersection'].nunique()}")
    print(
        f"Date range: {aggregated_events['time_15min'].min()} to {aggregated_events['time_15min'].max()}")

    print(f"\nEvent statistics:")
    print(
        f"  Total events: {aggregated_events['total_event_count'].sum():.0f}")
    print(
        f"  VRU-involved: {aggregated_events['vru_event_count'].sum():.0f} ({100*aggregated_events['vru_event_count'].sum()/aggregated_events['total_event_count'].sum():.1f}%)")
    print(
        f"  Vehicle-only: {aggregated_events['vehicle_event_count'].sum():.0f}")
    print(
        f"  Total severity-weighted score: {aggregated_events['severity_weighted_score'].sum():.0f}")

    print(f"\nI_VRU (VRU conflict intensity) statistics:")
    print(aggregated_events['I_VRU'].describe())

    print(f"\nSample records:")
    print(aggregated_events[['intersection', 'time_15min', 'total_event_count',
          'vru_event_count', 'I_VRU', 'severity_weighted_score']].head(10).to_string(index=False))

    # Store I_max for normalization
    if 'NORMALIZATION_CONSTANTS' not in dir():
        NORMALIZATION_CONSTANTS = {}

    NORMALIZATION_CONSTANTS['I_max'] = aggregated_events['I_VRU'].max()

    print(f"\n{'='*80}")
    print("UPDATED NORMALIZATION CONSTANTS")
    print("=" * 80)
    print(
        f"I_max (max VRU conflict intensity): {NORMALIZATION_CONSTANTS['I_max']:.0f}")
else:
    print("⚠ No aggregated events generated")

In [None]:
# Phase 4: Create master feature table by joining all data sources
print("\n" + "=" * 80)
print("PHASE 4: MASTER FEATURE TABLE CONSTRUCTION")
print("=" * 80)


def create_master_feature_table(bsm_features, psm_features, aggregated_events,
                                vehicle_counts, vru_counts):
    """
    Join all feature sources into a unified feature table.

    Inputs:
    - bsm_features: Vehicle behavior features (speed, variance, braking)
    - psm_features: VRU features (pedestrian/cyclist counts, speed)
    - aggregated_events: Safety events aggregated to 15-min intervals
    - vehicle_counts: Vehicle exposure metrics
    - vru_counts: VRU exposure metrics

    Returns:
        Comprehensive feature table ready for index computation
    """

    print("Building master feature table...")

    # Start with BSM features as the base (most comprehensive)
    if len(bsm_features) == 0:
        print("⚠ ERROR: No BSM features available - cannot build master table")
        return pd.DataFrame()

    master = bsm_features.copy()
    print(f"  Base table (BSM features): {len(master)} records")

    # Aggregate vehicle counts to 15-minute intervals
    if len(vehicle_counts) > 0:
        vehicle_counts['time_15min'] = pd.to_datetime(
            vehicle_counts['time'], utc=True).dt.floor('15min')
        vehicle_agg = vehicle_counts.groupby(['intersection', 'time_15min'])[
            'vehicle_count'].sum().reset_index()
        vehicle_agg.rename(
            columns={'vehicle_count': 'vehicle_volume'}, inplace=True)

        # Left join to master
        master = master.merge(
            vehicle_agg, on=['intersection', 'time_15min'], how='left')
        print(f"  + Vehicle counts: {len(vehicle_agg)} records merged")
    else:
        master['vehicle_volume'] = 0
        print("  ⚠ No vehicle count data available")

    # Aggregate VRU counts to 15-minute intervals
    if len(vru_counts) > 0:
        vru_counts['time_15min'] = pd.to_datetime(
            vru_counts['time'], utc=True).dt.floor('15min')
        vru_agg = vru_counts.groupby(['intersection', 'time_15min'])[
            'vru_count'].sum().reset_index()
        vru_agg.rename(columns={'vru_count': 'vru_volume'}, inplace=True)

        # Left join to master
        master = master.merge(
            vru_agg, on=['intersection', 'time_15min'], how='left')
        print(f"  + VRU counts: {len(vru_agg)} records merged")
    else:
        master['vru_volume'] = 0
        print("  ⚠ No VRU count data available")

    # Join PSM features
    if len(psm_features) > 0:
        psm_cols = ['intersection', 'time_15min', 'vru_count', 'avg_vru_speed',
                    'pedestrian_count', 'cyclist_count']
        psm_subset = psm_features[psm_cols].copy()
        psm_subset.rename(columns={'vru_count': 'psm_vru_count'}, inplace=True)

        # Left join to master
        master = master.merge(
            psm_subset, on=['intersection', 'time_15min'], how='left')
        print(f"  + PSM features: {len(psm_features)} records merged")
    else:
        master['psm_vru_count'] = 0
        master['avg_vru_speed'] = 0
        master['pedestrian_count'] = 0
        master['cyclist_count'] = 0
        print("  ⚠ No PSM data available")

    # Join aggregated safety events
    if len(aggregated_events) > 0:
        event_cols = ['intersection', 'time_15min', 'total_event_count', 'vru_event_count',
                      'vehicle_event_count', 'severity_weighted_score', 'I_VRU']
        event_subset = aggregated_events[event_cols].copy()

        # Left join to master
        master = master.merge(
            event_subset, on=['intersection', 'time_15min'], how='left')
        print(f"  + Safety events: {len(aggregated_events)} records merged")
    else:
        master['total_event_count'] = 0
        master['vru_event_count'] = 0
        master['vehicle_event_count'] = 0
        master['severity_weighted_score'] = 0
        master['I_VRU'] = 0
        print("  ⚠ No safety event data available")

    # Fill NaN values with 0 for count/intensity metrics
    count_cols = ['vehicle_volume', 'vru_volume', 'psm_vru_count', 'pedestrian_count',
                  'cyclist_count', 'total_event_count', 'vru_event_count',
                  'vehicle_event_count', 'severity_weighted_score', 'I_VRU']

    for col in count_cols:
        if col in master.columns:
            master[col] = master[col].fillna(0)

    # Fill NaN values for continuous features with median
    continuous_cols = ['avg_vru_speed']
    for col in continuous_cols:
        if col in master.columns and master[col].notna().any():
            master[col] = master[col].fillna(master[col].median())

    print(f"\n✓ Master feature table created: {len(master)} records")
    print(f"  Columns: {len(master.columns)}")
    print(f"  Intersections: {master['intersection'].nunique()}")
    print(
        f"  Date range: {master['time_15min'].min()} to {master['time_15min'].max()}")

    return master


# Create the master feature table
print("Creating master feature table from collected data...")

# Check which data sources we have
data_sources_available = {
    'bsm_features': 'bsm_features' in dir() and len(bsm_features) > 0,
    'psm_features': 'psm_features' in dir() and len(psm_features) > 0,
    'aggregated_events': 'aggregated_events' in dir() and len(aggregated_events) > 0,
    'vehicle_counts': 'vehicle_counts' in dir() and len(vehicle_counts) > 0,
    'vru_counts': 'vru_counts' in dir() and len(vru_counts) > 0
}

print("\nData sources availability:")
for source, available in data_sources_available.items():
    status = "✓" if available else "✗"
    print(f"  {status} {source}")

if data_sources_available['bsm_features']:
    master_features = create_master_feature_table(
        bsm_features=bsm_features,
        psm_features=psm_features if data_sources_available['psm_features'] else pd.DataFrame(
        ),
        aggregated_events=aggregated_events if data_sources_available['aggregated_events'] else pd.DataFrame(
        ),
        vehicle_counts=vehicle_counts if data_sources_available['vehicle_counts'] else pd.DataFrame(
        ),
        vru_counts=vru_counts if data_sources_available['vru_counts'] else pd.DataFrame(
        )
    )

    if len(master_features) > 0:
        print(f"\n{'='*80}")
        print("MASTER FEATURE TABLE SUMMARY")
        print("=" * 80)
        print(f"\nShape: {master_features.shape}")
        print(f"\nColumn names:")
        for i, col in enumerate(master_features.columns, 1):
            print(f"  {i:2d}. {col}")

        print(f"\nKey metrics summary:")
        key_metrics = ['vehicle_count', 'vehicle_volume', 'avg_speed', 'speed_variance',
                       'hard_braking_count', 'vru_volume', 'I_VRU', 'total_event_count']
        available_metrics = [
            m for m in key_metrics if m in master_features.columns]
        print(master_features[available_metrics].describe())

        print(f"\nSample records:")
        display_cols = ['intersection', 'time_15min', 'vehicle_count', 'avg_speed',
                        'I_VRU', 'total_event_count', 'vru_volume']
        display_cols = [
            c for c in display_cols if c in master_features.columns]
        print(master_features[display_cols].head(10).to_string(index=False))
else:
    print("\n⚠ ERROR: Cannot create master feature table - BSM features are required")
    master_features = pd.DataFrame()

In [None]:
# Phase 5: Compute all normalization constants
print("\n" + "=" * 80)
print("PHASE 5: NORMALIZATION CONSTANTS")
print("=" * 80)


def compute_normalization_constants(master_features):
    """
    Compute all normalization constants from the master feature table.

    Constants computed:
    - I_max: Maximum VRU conflict intensity (events per 15-min)
    - V_max: Maximum vehicle volume per 15-min interval
    - σ_max: Maximum speed variance
    - S_ref: Reference speed (85th percentile of average speeds)
    - N_VRU_max: Maximum VRU count per 15-min interval

    Returns:
        Dictionary of normalization constants
    """

    if len(master_features) == 0:
        print("⚠ ERROR: No master features available")
        return {}

    constants = {}

    # I_max: Maximum VRU conflict intensity
    if 'I_VRU' in master_features.columns:
        constants['I_max'] = float(master_features['I_VRU'].max())
        if constants['I_max'] == 0:
            constants['I_max'] = 1.0  # Avoid division by zero
            print("  ⚠ Warning: I_max is 0, setting to 1.0 to avoid division by zero")
    else:
        constants['I_max'] = 1.0
        print("  ⚠ Warning: I_VRU not found, setting I_max to 1.0")

    # V_max: Maximum vehicle volume (use both BSM vehicle_count and detector vehicle_volume)
    v_sources = []
    if 'vehicle_count' in master_features.columns:
        v_sources.append(master_features['vehicle_count'].max())
    if 'vehicle_volume' in master_features.columns:
        v_sources.append(master_features['vehicle_volume'].max())

    constants['V_max'] = float(max(v_sources)) if v_sources else 1.0
    if constants['V_max'] == 0:
        constants['V_max'] = 1.0
        print("  ⚠ Warning: V_max is 0, setting to 1.0 to avoid division by zero")

    # σ_max: Maximum speed variance
    if 'speed_variance' in master_features.columns:
        constants['sigma_max'] = float(master_features['speed_variance'].max())
        if constants['sigma_max'] == 0:
            constants['sigma_max'] = 1.0
            print("  ⚠ Warning: sigma_max is 0, setting to 1.0 to avoid division by zero")
    else:
        constants['sigma_max'] = 1.0
        print("  ⚠ Warning: speed_variance not found, setting sigma_max to 1.0")

    # S_ref: Reference speed (85th percentile)
    if 'avg_speed' in master_features.columns:
        # Filter out zeros and NaNs for realistic speed reference
        valid_speeds = master_features['avg_speed'][master_features['avg_speed'] > 0]
        if len(valid_speeds) > 0:
            constants['S_ref'] = float(valid_speeds.quantile(0.85))
            if constants['S_ref'] == 0:
                constants['S_ref'] = 1.0
                print("  ⚠ Warning: S_ref is 0, setting to 1.0")
        else:
            constants['S_ref'] = 1.0
            print("  ⚠ Warning: No valid speeds found, setting S_ref to 1.0")
    else:
        constants['S_ref'] = 1.0
        print("  ⚠ Warning: avg_speed not found, setting S_ref to 1.0")

    # N_VRU_max: Maximum VRU count (use both PSM and detector counts)
    vru_sources = []
    if 'psm_vru_count' in master_features.columns:
        vru_sources.append(master_features['psm_vru_count'].max())
    if 'vru_volume' in master_features.columns:
        vru_sources.append(master_features['vru_volume'].max())
    if 'pedestrian_count' in master_features.columns:
        vru_sources.append(master_features['pedestrian_count'].max())
    if 'cyclist_count' in master_features.columns:
        vru_sources.append(master_features['cyclist_count'].max())

    constants['N_VRU_max'] = float(max(vru_sources)) if vru_sources else 1.0
    if constants['N_VRU_max'] == 0:
        constants['N_VRU_max'] = 1.0
        print("  ⚠ Warning: N_VRU_max is 0, setting to 1.0 to avoid division by zero")

    # Additional constants for Vehicle Index
    # Hard braking rate normalization
    if 'hard_braking_count' in master_features.columns:
        constants['hard_braking_max'] = float(
            master_features['hard_braking_count'].max())
        if constants['hard_braking_max'] == 0:
            constants['hard_braking_max'] = 1.0
    else:
        constants['hard_braking_max'] = 1.0

    # Heading change rate normalization
    if 'heading_change_rate' in master_features.columns:
        constants['heading_change_max'] = float(
            master_features['heading_change_rate'].max())
        if constants['heading_change_max'] == 0:
            constants['heading_change_max'] = 1.0
    else:
        constants['heading_change_max'] = 1.0

    return constants


# Compute normalization constants from master feature table
if 'master_features' in dir() and len(master_features) > 0:
    NORMALIZATION_CONSTANTS = compute_normalization_constants(master_features)

    print("\n✓ Normalization constants computed:")
    print("=" * 80)
    print(
        f"  I_max (max VRU conflict intensity):    {NORMALIZATION_CONSTANTS['I_max']:.2f}")
    print(
        f"  V_max (max vehicle volume):            {NORMALIZATION_CONSTANTS['V_max']:.2f}")
    print(
        f"  σ_max (max speed variance):            {NORMALIZATION_CONSTANTS['sigma_max']:.2f}")
    print(
        f"  S_ref (reference speed, 85th pct):     {NORMALIZATION_CONSTANTS['S_ref']:.2f} m/s")
    print(
        f"  N_VRU_max (max VRU count):             {NORMALIZATION_CONSTANTS['N_VRU_max']:.2f}")
    print(
        f"  hard_braking_max:                      {NORMALIZATION_CONSTANTS['hard_braking_max']:.2f}")
    print(
        f"  heading_change_max:                    {NORMALIZATION_CONSTANTS['heading_change_max']:.2f}")

    print(f"\n{'='*80}")
    print("These constants will be used to normalize features in the Safety Index formulas.")
    print("In production, these should be periodically recalibrated (e.g., monthly).")
    print("=" * 80)
else:
    print("⚠ ERROR: Master feature table not available - cannot compute constants")
    NORMALIZATION_CONSTANTS = {}

## Phase 6: Safety Index Computation

Implement the VRU, Vehicle, and Combined Safety Index formulas from the checkpoint document.

**Formulas:**

- **VRU Index** = 100 × [0.4×(I_VRU/I_max) + 0.2×(V/V_max) + 0.2×(S/S_ref) + 0.2×(σ_S/σ_max)]
- **Vehicle Index** = 100 × [0.3×(I_vehicle/I_max) + 0.3×(V/V_max) + 0.2×(σ_S/σ_max) + 0.2×(hard_braking_rate)]
- **Combined Index** = 0.6×VRU_Index + 0.4×Vehicle_Index


In [None]:
# Phase 6: Implement Safety Index formulas
print("\n" + "=" * 80)
print("PHASE 6: SAFETY INDEX COMPUTATION")
print("=" * 80)


def compute_safety_indices(master_features, norm_constants):
    """
    Compute VRU, Vehicle, and Combined Safety Indices for each 15-minute interval.

    Formulas from checkpoint document:
    - VRU Index = 100 × [0.4×(I_VRU/I_max) + 0.2×(V/V_max) + 0.2×(S/S_ref) + 0.2×(σ_S/σ_max)]
    - Vehicle Index = 100 × [0.3×(I_vehicle/I_max) + 0.3×(V/V_max) + 0.2×(σ_S/σ_max) + 0.2×(hard_braking)]
    - Combined Index = 0.6×VRU_Index + 0.4×Vehicle_Index

    Args:
        master_features: Master feature table with all metrics
        norm_constants: Dictionary of normalization constants

    Returns:
        DataFrame with computed indices added
    """

    if len(master_features) == 0:
        print("⚠ ERROR: No master features available")
        return pd.DataFrame()

    if not norm_constants:
        print("⚠ ERROR: No normalization constants available")
        return master_features

    df = master_features.copy()

    # Extract normalization constants
    I_max = norm_constants.get('I_max', 1.0)
    V_max = norm_constants.get('V_max', 1.0)
    sigma_max = norm_constants.get('sigma_max', 1.0)
    S_ref = norm_constants.get('S_ref', 1.0)
    N_VRU_max = norm_constants.get('N_VRU_max', 1.0)
    hard_braking_max = norm_constants.get('hard_braking_max', 1.0)

    print("Computing normalized components...")

    # ========== VRU Safety Index Components ==========

    # Component 1: VRU conflict intensity (I_VRU/I_max)
    df['I_VRU_norm'] = df['I_VRU'] / I_max if I_max > 0 else 0

    # Component 2: Vehicle volume exposure (V/V_max)
    # Use vehicle_count from BSM as primary, fall back to vehicle_volume from detectors
    df['V'] = df['vehicle_count'].fillna(0)
    if 'vehicle_volume' in df.columns:
        df['V'] = df['V'].combine_first(df['vehicle_volume'])
    df['V_norm'] = df['V'] / V_max if V_max > 0 else 0

    # Component 3: Speed factor (S/S_ref)
    df['S_norm'] = df['avg_speed'] / S_ref if S_ref > 0 else 0

    # Component 4: Speed variance (σ_S/σ_max)
    df['sigma_norm'] = df['speed_variance'] / sigma_max if sigma_max > 0 else 0

    # Compute VRU Safety Index
    df['VRU_Index'] = 100 * (
        0.4 * df['I_VRU_norm'] +
        0.2 * df['V_norm'] +
        0.2 * df['S_norm'] +
        0.2 * df['sigma_norm']
    )

    # Cap at 100
    df['VRU_Index'] = df['VRU_Index'].clip(0, 100)

    print("  ✓ VRU Safety Index computed")

    # ========== Vehicle Safety Index Components ==========

    # Component 1: Vehicle-vehicle conflict intensity
    if 'vehicle_event_count' in df.columns:
        df['I_vehicle'] = df['vehicle_event_count']
    else:
        # Fall back to total events minus VRU events
        df['I_vehicle'] = df.get('total_event_count', 0) - \
            df.get('vru_event_count', 0)

    df['I_vehicle_norm'] = df['I_vehicle'] / I_max if I_max > 0 else 0

    # Component 2: Vehicle volume (same as VRU index)
    # Already computed as V_norm

    # Component 3: Speed variance (same as VRU index)
    # Already computed as sigma_norm

    # Component 4: Hard braking rate
    if 'hard_braking_count' in df.columns:
        df['hard_braking_norm'] = df['hard_braking_count'] / \
            hard_braking_max if hard_braking_max > 0 else 0
    else:
        df['hard_braking_norm'] = 0

    # Compute Vehicle Safety Index
    df['Vehicle_Index'] = 100 * (
        0.3 * df['I_vehicle_norm'] +
        0.3 * df['V_norm'] +
        0.2 * df['sigma_norm'] +
        0.2 * df['hard_braking_norm']
    )

    # Cap at 100
    df['Vehicle_Index'] = df['Vehicle_Index'].clip(0, 100)

    print("  ✓ Vehicle Safety Index computed")

    # ========== Combined Safety Index ==========

    df['Combined_Index'] = (
        0.6 * df['VRU_Index'] +
        0.4 * df['Vehicle_Index']
    )

    # Cap at 100
    df['Combined_Index'] = df['Combined_Index'].clip(0, 100)

    print("  ✓ Combined Safety Index computed")

    return df


# Compute safety indices
if 'master_features' in dir() and len(master_features) > 0 and NORMALIZATION_CONSTANTS:
    print("Computing safety indices for all 15-minute intervals...")

    indices_df = compute_safety_indices(
        master_features, NORMALIZATION_CONSTANTS)

    if len(indices_df) > 0:
        print(f"\n{'='*80}")
        print("SAFETY INDEX SUMMARY")
        print("=" * 80)

        print(f"\nVRU Safety Index statistics:")
        print(indices_df['VRU_Index'].describe())

        print(f"\nVehicle Safety Index statistics:")
        print(indices_df['Vehicle_Index'].describe())

        print(f"\nCombined Safety Index statistics:")
        print(indices_df['Combined_Index'].describe())

        # Show distribution by intersection
        print(f"\n{'='*80}")
        print("AVERAGE SAFETY INDICES BY INTERSECTION")
        print("=" * 80)

        intersection_summary = indices_df.groupby('intersection').agg({
            'VRU_Index': ['mean', 'std', 'max'],
            'Vehicle_Index': ['mean', 'std', 'max'],
            'Combined_Index': ['mean', 'std', 'max'],
            'time_15min': 'count'
        }).round(2)

        intersection_summary.columns = [
            '_'.join(col).strip() for col in intersection_summary.columns.values]
        intersection_summary.rename(
            columns={'time_15min_count': 'num_intervals'}, inplace=True)

        print(intersection_summary.to_string())

        # Show highest risk intervals
        print(f"\n{'='*80}")
        print("TOP 10 HIGHEST RISK INTERVALS (Combined Index)")
        print("=" * 80)

        top_risk = indices_df.nlargest(10, 'Combined_Index')[
            ['intersection', 'time_15min', 'VRU_Index', 'Vehicle_Index', 'Combined_Index',
             'I_VRU', 'vehicle_count', 'avg_speed', 'speed_variance']
        ]
        print(top_risk.to_string(index=False))

        # Show safest intervals
        print(f"\n{'='*80}")
        print("TOP 10 SAFEST INTERVALS (Combined Index)")
        print("=" * 80)

        safest = indices_df.nsmallest(10, 'Combined_Index')[
            ['intersection', 'time_15min', 'VRU_Index',
                'Vehicle_Index', 'Combined_Index']
        ]
        print(safest.to_string(index=False))

        # Time-of-day analysis
        print(f"\n{'='*80}")
        print("AVERAGE SAFETY INDICES BY HOUR OF DAY")
        print("=" * 80)

        hourly_summary = indices_df.groupby('hour_of_day').agg({
            'VRU_Index': 'mean',
            'Vehicle_Index': 'mean',
            'Combined_Index': 'mean'
        }).round(2)

        print(hourly_summary.to_string())

else:
    print("⚠ ERROR: Cannot compute safety indices - missing master features or normalization constants")
    indices_df = pd.DataFrame()

In [None]:
# Phase 7: Empirical Bayes stabilization
print("\n" + "=" * 80)
print("PHASE 7: EMPIRICAL BAYES STABILIZATION")
print("=" * 80)


def apply_empirical_bayes(indices_df, baseline_events, lambda_param=None):
    """
    Apply Empirical Bayes adjustment to safety indices.

    Adjusts raw indices based on:
    1. Historical baseline risk at the intersection
    2. Reliability of the current estimate (based on sample size)

    Formula: Adjusted_Index = λ × Raw_Index + (1-λ) × Baseline_Index

    Where λ = N / (N + k), with:
    - N = number of observations in current period
    - k = tuning parameter (default: 50 for 15-min intervals)

    Args:
        indices_df: DataFrame with computed raw indices
        baseline_events: Historical safety events for baseline calculation
        lambda_param: Fixed lambda value (0-1), or None for adaptive lambda

    Returns:
        DataFrame with EB-adjusted indices added
    """

    if len(indices_df) == 0:
        print("⚠ No indices to adjust")
        return indices_df

    df = indices_df.copy()

    # Calculate historical baseline indices by intersection
    print("Computing baseline risk scores from historical data...")

    if len(baseline_events) > 0:
        # Calculate baseline severity-weighted event rate by intersection and time-of-day
        baseline_events['hour_of_day'] = baseline_events['hour_of_day'].astype(
            int)

        baseline_summary = baseline_events.groupby(['intersection', 'hour_of_day']).agg({
            'severity_weight': 'sum',
            'event_id': 'count',
            'is_vru_involved': 'sum'
        }).reset_index()

        baseline_summary.rename(columns={
            'severity_weight': 'baseline_severity',
            'event_id': 'baseline_event_count',
            'is_vru_involved': 'baseline_vru_count'
        }, inplace=True)

        # Merge baseline with current data
        df['hour_of_day'] = df['hour_of_day'].astype(int)
        df = df.merge(baseline_summary, on=[
                      'intersection', 'hour_of_day'], how='left')

        # Fill missing baselines with intersection average
        intersection_avg = baseline_summary.groupby('intersection').agg({
            'baseline_severity': 'mean',
            'baseline_event_count': 'mean',
            'baseline_vru_count': 'mean'
        })

        for col in ['baseline_severity', 'baseline_event_count', 'baseline_vru_count']:
            df[col] = df.apply(
                lambda row: intersection_avg.loc[row['intersection'], col]
                if pd.isna(row[col]) and row['intersection'] in intersection_avg.index
                else row[col],
                axis=1
            )

        # Fill any remaining NaNs with global mean
        df['baseline_severity'] = df['baseline_severity'].fillna(
            baseline_summary['baseline_severity'].mean())
        df['baseline_event_count'] = df['baseline_event_count'].fillna(
            baseline_summary['baseline_event_count'].mean())
        df['baseline_vru_count'] = df['baseline_vru_count'].fillna(
            baseline_summary['baseline_vru_count'].mean())

        print(
            f"  ✓ Baseline computed for {len(baseline_summary)} intersection-hour combinations")
    else:
        print("  ⚠ No baseline data available - using global mean")
        df['baseline_severity'] = df['severity_weighted_score'].mean(
        ) if 'severity_weighted_score' in df.columns else 0
        df['baseline_event_count'] = df['total_event_count'].mean(
        ) if 'total_event_count' in df.columns else 0
        df['baseline_vru_count'] = df['vru_event_count'].mean(
        ) if 'vru_event_count' in df.columns else 0

    # Calculate lambda (weight for raw vs baseline)
    print("\nCalculating Empirical Bayes weights...")

    if lambda_param is not None:
        # Use fixed lambda
        df['lambda'] = lambda_param
        print(f"  Using fixed lambda = {lambda_param}")
    else:
        # Adaptive lambda based on sample size
        # Lambda = N / (N + k), where k is a tuning parameter
        k = 50  # Tuning parameter: higher k = more shrinkage toward baseline

        # Use vehicle count as proxy for sample size (more vehicles = more reliable estimate)
        df['N'] = df['vehicle_count'].fillna(
            0) + 1  # Add 1 to avoid division by zero
        df['lambda'] = df['N'] / (df['N'] + k)

        print(f"  Using adaptive lambda with k={k}")
        print(
            f"  Lambda range: {df['lambda'].min():.3f} to {df['lambda'].max():.3f}")
        print(f"  Mean lambda: {df['lambda'].mean():.3f}")

    # Convert baseline to index scale (approximate)
    # Use baseline event count relative to maximum as a simple baseline index
    max_baseline = df['baseline_event_count'].max()
    df['baseline_index'] = (df['baseline_event_count'] /
                            max_baseline * 100) if max_baseline > 0 else 20.0
    df['baseline_index'] = df['baseline_index'].fillna(
        20.0)  # Default baseline = 20 (low-medium risk)

    # Apply Empirical Bayes adjustment
    print("\nApplying Empirical Bayes adjustment...")

    df['VRU_Index_EB'] = df['lambda'] * df['VRU_Index'] + \
        (1 - df['lambda']) * df['baseline_index']
    df['Vehicle_Index_EB'] = df['lambda'] * df['Vehicle_Index'] + \
        (1 - df['lambda']) * df['baseline_index']
    df['Combined_Index_EB'] = df['lambda'] * df['Combined_Index'] + \
        (1 - df['lambda']) * df['baseline_index']

    # Cap at 100
    df['VRU_Index_EB'] = df['VRU_Index_EB'].clip(0, 100)
    df['Vehicle_Index_EB'] = df['Vehicle_Index_EB'].clip(0, 100)
    df['Combined_Index_EB'] = df['Combined_Index_EB'].clip(0, 100)

    print("  ✓ Empirical Bayes adjustment applied")

    # Calculate adjustment magnitude
    df['EB_adjustment'] = df['Combined_Index'] - df['Combined_Index_EB']

    print(f"\n  Adjustment statistics:")
    print(f"    Mean adjustment: {df['EB_adjustment'].mean():.2f}")
    print(f"    Std adjustment: {df['EB_adjustment'].std():.2f}")
    print(f"    Max upward adjustment: {df['EB_adjustment'].min():.2f}")
    print(f"    Max downward adjustment: {df['EB_adjustment'].max():.2f}")

    return df


# Apply Empirical Bayes stabilization
if 'indices_df' in dir() and len(indices_df) > 0:
    print("Applying Empirical Bayes stabilization to safety indices...")

    baseline_data = baseline_events if 'baseline_events' in dir() and len(
        baseline_events) > 0 else pd.DataFrame()

    indices_df_eb = apply_empirical_bayes(
        indices_df, baseline_data, lambda_param=None)

    if len(indices_df_eb) > 0:
        print(f"\n{'='*80}")
        print("EMPIRICAL BAYES ADJUSTED INDICES SUMMARY")
        print("=" * 80)

        print(f"\nVRU Safety Index (EB-adjusted):")
        print(indices_df_eb['VRU_Index_EB'].describe())

        print(f"\nVehicle Safety Index (EB-adjusted):")
        print(indices_df_eb['Vehicle_Index_EB'].describe())

        print(f"\nCombined Safety Index (EB-adjusted):")
        print(indices_df_eb['Combined_Index_EB'].describe())

        # Compare raw vs EB-adjusted
        print(f"\n{'='*80}")
        print("COMPARISON: RAW vs EB-ADJUSTED INDICES")
        print("=" * 80)

        comparison = pd.DataFrame({
            'Metric': ['VRU Index', 'Vehicle Index', 'Combined Index'],
            'Raw Mean': [
                indices_df_eb['VRU_Index'].mean(),
                indices_df_eb['Vehicle_Index'].mean(),
                indices_df_eb['Combined_Index'].mean()
            ],
            'EB Mean': [
                indices_df_eb['VRU_Index_EB'].mean(),
                indices_df_eb['Vehicle_Index_EB'].mean(),
                indices_df_eb['Combined_Index_EB'].mean()
            ],
            'Raw Std': [
                indices_df_eb['VRU_Index'].std(),
                indices_df_eb['Vehicle_Index'].std(),
                indices_df_eb['Combined_Index'].std()
            ],
            'EB Std': [
                indices_df_eb['VRU_Index_EB'].std(),
                indices_df_eb['Vehicle_Index_EB'].std(),
                indices_df_eb['Combined_Index_EB'].std()
            ]
        })

        print(comparison.round(2).to_string(index=False))

        print(f"\n{'='*80}")
        print("IMPACT OF EMPIRICAL BAYES ADJUSTMENT")
        print("=" * 80)
        print("Note: EB adjustment typically:")
        print("  • Reduces variance (more stable estimates)")
        print("  • Shrinks extreme values toward baseline")
        print("  • Improves prediction accuracy for low-sample situations")
        print("=" * 80)

else:
    print("⚠ ERROR: No indices available for Empirical Bayes adjustment")
    indices_df_eb = pd.DataFrame()

## Phase 8: Validation & Next Steps

### Validation Approaches

**1. Predictive Validation**

- Split data into training (historical) and testing (recent) periods
- Test if high-index intervals predict future safety events
- Metrics: ROC-AUC, precision-recall curves

**2. Sensitivity Analysis**

- Vary formula weights (e.g., VRU vs Vehicle Index contributions)
- Test different Empirical Bayes k parameters
- Assess robustness to normalization constant changes

**3. Expert Review**

- Present high-risk intervals to traffic safety experts
- Validate against known problematic intersections
- Incorporate domain knowledge for weight calibration

**4. Comparison with Baseline Methods**

- Compare against simple crash rate
- Benchmark against national safety metrics (e.g., USDOT FAST tool)

### Production Deployment Checklist

**Backend (FastAPI)**

- [ ] Refactor data collection functions into API endpoints
- [ ] Implement PostgreSQL schema for feature storage
- [ ] Set up Cloud Scheduler for 15-minute triggers
- [ ] Add caching layer (Redis) for normalization constants
- [ ] Implement error handling and data quality checks
- [ ] Add logging and monitoring (Cloud Logging)

**Frontend (Streamlit)**

- [ ] Create interactive map visualization (Folium/Pydeck)
- [ ] Build real-time dashboard with index charts
- [ ] Add intersection comparison tool
- [ ] Implement time-series plots and trend analysis
- [ ] Add download functionality for reports

**Infrastructure**

- [ ] Deploy to Google Cloud Run (auto-scaling)
- [ ] Set up Cloud SQL PostgreSQL with PostGIS
- [ ] Configure VPC for secure Trino access
- [ ] Implement CI/CD pipeline (GitHub Actions)
- [ ] Set up monitoring and alerting

### Calibration & Tuning

**Normalization Constants**: Recalibrate monthly using rolling 3-6 month window

**Empirical Bayes k parameter**: Tune via cross-validation

- Lower k (e.g., 20) → more responsive to recent data
- Higher k (e.g., 100) → more stable, less noisy

**Formula Weights**: Consider intersection-specific tuning

- High-speed corridors → increase speed variance weight
- School zones → increase VRU component weight

### Future Enhancements

1. **Predictive Modeling**: Train ML model to forecast next-interval risk
2. **Weather Integration**: Incorporate precipitation, visibility data
3. **Event Detection**: Real-time alerting for extreme index values
4. **Mobile App**: Push notifications for high-risk conditions
5. **Policy Evaluation**: Before/after analysis for interventions


In [None]:
# ============================================================================
# COMPLETE END-TO-END WORKFLOW: Safety Index Computation
# ============================================================================

import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')


# ============================================================================
# CONFIGURATION
# ============================================================================

DATE_RANGE_DAYS = 7              # Number of days to analyze
# None = all intersections, or specify like "glebe-potomac"
TARGET_INTERSECTION = None
EXPORT_RESULTS = True            # Set to False to skip CSV export
EXPORT_PATH = None               # Auto-generate if None
CREATE_VISUALIZATIONS = True     # Set to False to skip plots

# Time range
end_dt = datetime.now()
start_dt = end_dt - timedelta(days=DATE_RANGE_DAYS)

print("=" * 80)
print("VIRGINIA TRANSPORTATION SAFETY INDEX - END-TO-END WORKFLOW")
print("=" * 80)
print(f"\nConfiguration:")
print(
    f"  Date range: {start_dt.date()} to {end_dt.date()} ({DATE_RANGE_DAYS} days)")
print(f"  Target intersection: {TARGET_INTERSECTION or 'ALL'}")
print(f"  Export results: {EXPORT_RESULTS}")
print(f"  Create visualizations: {CREATE_VISUALIZATIONS}")
print("\n" + "=" * 80)

# ============================================================================
# PHASE 1: DATA QUALITY ASSESSMENT
# ============================================================================

print("\n[1/7] Assessing data quality...")

try:
    tables_to_check = [
        ('alexandria', 'bsm', 'publish_timestamp'),
        ('alexandria', 'psm', 'publish_timestamp'),
        ('alexandria', 'safety-event', 'time_at_site'),
        ('alexandria', 'vehicle-count', 'publish_timestamp'),
        ('alexandria', 'vru-count', 'publish_timestamp'),
    ]

    quality_results = []
    for schema, table, ts_col in tables_to_check:
        result = check_table_coverage(schema, table, ts_col)
        quality_results.append(result)

    print(f"  ✓ Quality assessment complete")
except Exception as e:
    print(f"  ⚠ Warning: Quality assessment failed: {e}")

# ============================================================================
# PHASE 2: HISTORICAL BASELINE CONSTRUCTION
# ============================================================================

print("\n[2/7] Collecting historical baseline data...")

try:
    # Collect severity-weighted safety events
    baseline_events_wf = collect_baseline_events(
        intersection=TARGET_INTERSECTION,
        start_date=start_dt,
        end_date=end_dt
    )

    # Collect exposure metrics
    vehicle_counts_wf, vru_counts_wf = collect_exposure_metrics(
        intersection=TARGET_INTERSECTION,
        start_date=start_dt,
        end_date=end_dt
    )

    print(
        f"  ✓ Baseline: {len(baseline_events_wf)} events, {len(vehicle_counts_wf)} vehicle counts, {len(vru_counts_wf)} VRU counts")
except Exception as e:
    print(f"  ✗ Error: Baseline collection failed: {e}")
    raise

# ============================================================================
# PHASE 3: FEATURE ENGINEERING
# ============================================================================

print("\n[3/7] Engineering features from BSM, PSM, and safety events...")

try:
    # BSM-derived vehicle features
    bsm_features_wf = collect_bsm_features(
        intersection=TARGET_INTERSECTION,
        start_date=start_dt,
        end_date=end_dt
    )

    # PSM-derived VRU features
    psm_features_wf = collect_psm_features(
        intersection=TARGET_INTERSECTION,
        start_date=start_dt,
        end_date=end_dt
    )

    # Aggregate safety events
    aggregated_events_wf = aggregate_safety_events(
        intersection=TARGET_INTERSECTION,
        start_date=start_dt,
        end_date=end_dt
    )

    print(f"  ✓ Features: {len(bsm_features_wf)} BSM intervals, {len(psm_features_wf)} PSM intervals, {len(aggregated_events_wf)} event intervals")
except Exception as e:
    print(f"  ✗ Error: Feature engineering failed: {e}")
    raise

# ============================================================================
# PHASE 4: MASTER FEATURE TABLE
# ============================================================================

print("\n[4/7] Creating master feature table...")

try:
    master_features_wf = create_master_feature_table(
        bsm_features=bsm_features_wf,
        psm_features=psm_features_wf,
        aggregated_events=aggregated_events_wf,
        vehicle_counts=vehicle_counts_wf,
        vru_counts=vru_counts_wf
    )

    print(
        f"  ✓ Master table: {len(master_features_wf)} 15-minute intervals across {master_features_wf['intersection'].nunique()} intersections")
except Exception as e:
    print(f"  ✗ Error: Master table creation failed: {e}")
    raise

# ============================================================================
# PHASE 5: NORMALIZATION CONSTANTS
# ============================================================================

print("\n[5/7] Computing normalization constants...")

try:
    norm_constants_wf = compute_normalization_constants(master_features_wf)

    print(
        f"  ✓ Constants: I_max={norm_constants_wf['I_max']:.1f}, V_max={norm_constants_wf['V_max']:.1f}, σ_max={norm_constants_wf['sigma_max']:.2f}")
except Exception as e:
    print(f"  ✗ Error: Normalization computation failed: {e}")
    raise

# ============================================================================
# PHASE 6: SAFETY INDEX COMPUTATION
# ============================================================================

print("\n[6/7] Computing safety indices...")

try:
    indices_wf = compute_safety_indices(master_features_wf, norm_constants_wf)

    print(
        f"  ✓ Indices computed: Mean Combined Index = {indices_wf['Combined_Index'].mean():.1f}")
except Exception as e:
    print(f"  ✗ Error: Index computation failed: {e}")
    raise

# ============================================================================
# PHASE 7: EMPIRICAL BAYES STABILIZATION
# ============================================================================

print("\n[7/7] Applying Empirical Bayes stabilization...")

try:
    indices_eb_wf = apply_empirical_bayes(
        indices_wf, baseline_events_wf, lambda_param=None)

    print(
        f"  ✓ EB adjustment applied: Mean EB Combined Index = {indices_eb_wf['Combined_Index_EB'].mean():.1f}")
except Exception as e:
    print(f"  ✗ Error: Empirical Bayes adjustment failed: {e}")
    raise

# ============================================================================
# RESULTS SUMMARY
# ============================================================================

print("\n" + "=" * 80)
print("RESULTS SUMMARY")
print("=" * 80)

print(f"\n📊 Data Coverage:")
print(f"  Total 15-minute intervals: {len(indices_eb_wf)}")
print(f"  Intersections analyzed: {indices_eb_wf['intersection'].nunique()}")
print(
    f"  Date range: {indices_eb_wf['time_15min'].min()} to {indices_eb_wf['time_15min'].max()}")

print(f"\n🎯 Safety Index Statistics (EB-Adjusted):")
print(f"  Combined Index:")
print(f"    Mean: {indices_eb_wf['Combined_Index_EB'].mean():.2f}")
print(f"    Std:  {indices_eb_wf['Combined_Index_EB'].std():.2f}")
print(f"    Min:  {indices_eb_wf['Combined_Index_EB'].min():.2f}")
print(f"    Max:  {indices_eb_wf['Combined_Index_EB'].max():.2f}")

print(f"\n  VRU Index:")
print(f"    Mean: {indices_eb_wf['VRU_Index_EB'].mean():.2f}")
print(f"    Max:  {indices_eb_wf['VRU_Index_EB'].max():.2f}")

print(f"\n  Vehicle Index:")
print(f"    Mean: {indices_eb_wf['Vehicle_Index_EB'].mean():.2f}")
print(f"    Max:  {indices_eb_wf['Vehicle_Index_EB'].max():.2f}")

# Top 10 highest risk intervals
print(f"\n⚠️  TOP 10 HIGHEST RISK INTERVALS:")
top_risk = indices_eb_wf.nlargest(10, 'Combined_Index_EB')[
    ['intersection', 'time_15min', 'Combined_Index_EB',
        'VRU_Index_EB', 'Vehicle_Index_EB', 'I_VRU', 'vehicle_count']
]
print(top_risk.to_string(index=False))

# ============================================================================
# EXPORT RESULTS
# ============================================================================

if EXPORT_RESULTS:
    print(f"\n📁 Exporting results...")

    if EXPORT_PATH is None:
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        EXPORT_PATH = f'safety_indices_{timestamp}.csv'

    # Select columns for export
    export_cols = [
        'intersection', 'time_15min', 'hour_of_day', 'day_of_week',
        'Combined_Index_EB', 'VRU_Index_EB', 'Vehicle_Index_EB',
        'Combined_Index', 'VRU_Index', 'Vehicle_Index',
        'vehicle_count', 'avg_speed', 'speed_variance', 'hard_braking_count',
        'I_VRU', 'total_event_count', 'vru_event_count', 'vehicle_event_count',
        'vru_volume', 'vehicle_volume'
    ]

    # Only include columns that exist
    export_cols = [c for c in export_cols if c in indices_eb_wf.columns]

    indices_eb_wf[export_cols].to_csv(EXPORT_PATH, index=False)
    print(f"  ✓ Results exported to: {EXPORT_PATH}")

# ============================================================================
# VISUALIZATIONS
# ============================================================================

if CREATE_VISUALIZATIONS:
    print(f"\n📈 Creating visualizations...")

    try:
        import matplotlib.pyplot as plt
        import seaborn as sns

        # 1. Time series plot
        fig, axes = plt.subplots(3, 1, figsize=(14, 10))

        # If analyzing specific intersection, show detailed view
        if TARGET_INTERSECTION:
            data = indices_eb_wf[indices_eb_wf['intersection']
                                 == TARGET_INTERSECTION].sort_values('time_15min')
        else:
            # Show average across all intersections
            data = indices_eb_wf.groupby('time_15min').agg({
                'Combined_Index_EB': 'mean',
                'VRU_Index_EB': 'mean',
                'Vehicle_Index_EB': 'mean'
            }).reset_index().sort_values('time_15min')

        # Combined Index
        axes[0].plot(data['time_15min'], data['Combined_Index_EB'],
                     linewidth=2, color='purple')
        axes[0].axhline(y=50, color='orange', linestyle='--',
                        alpha=0.5, label='Medium Risk')
        axes[0].axhline(y=70, color='red', linestyle='--',
                        alpha=0.5, label='High Risk')
        axes[0].set_ylabel('Combined Index')
        axes[0].set_title(
            f'Safety Indices Over Time - {TARGET_INTERSECTION or "All Intersections (Average)"}')
        axes[0].grid(True, alpha=0.3)
        axes[0].legend()

        # VRU Index
        axes[1].plot(data['time_15min'], data['VRU_Index_EB'],
                     linewidth=2, color='blue')
        axes[1].axhline(y=50, color='orange', linestyle='--', alpha=0.5)
        axes[1].axhline(y=70, color='red', linestyle='--', alpha=0.5)
        axes[1].set_ylabel('VRU Index')
        axes[1].grid(True, alpha=0.3)

        # Vehicle Index
        axes[2].plot(data['time_15min'], data['Vehicle_Index_EB'],
                     linewidth=2, color='green')
        axes[2].axhline(y=50, color='orange', linestyle='--', alpha=0.5)
        axes[2].axhline(y=70, color='red', linestyle='--', alpha=0.5)
        axes[2].set_ylabel('Vehicle Index')
        axes[2].set_xlabel('Time')
        axes[2].grid(True, alpha=0.3)

        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

        # 2. Heatmap: Average index by hour and day of week
        fig, ax = plt.subplots(figsize=(10, 8))
        pivot = indices_eb_wf.groupby(['hour_of_day', 'day_of_week'])[
            'Combined_Index_EB'].mean().reset_index()
        heatmap_data = pivot.pivot(
            index='hour_of_day', columns='day_of_week', values='Combined_Index_EB')

        sns.heatmap(heatmap_data, cmap='RdYlGn_r', annot=True, fmt='.1f',
                    xticklabels=['Mon', 'Tue', 'Wed',
                                 'Thu', 'Fri', 'Sat', 'Sun'],
                    yticklabels=range(24), cbar_kws={'label': 'Combined Index'})
        plt.title('Average Safety Index by Hour and Day of Week')
        plt.xlabel('Day of Week')
        plt.ylabel('Hour of Day')
        plt.tight_layout()
        plt.show()

        # 3. Distribution histogram
        fig, axes = plt.subplots(1, 3, figsize=(15, 4))

        for idx, (col, ax, color) in enumerate(zip(
            ['Combined_Index_EB', 'VRU_Index_EB', 'Vehicle_Index_EB'],
            axes,
            ['purple', 'blue', 'green']
        )):
            ax.hist(indices_eb_wf[col], bins=30,
                    edgecolor='black', alpha=0.7, color=color)
            mean_val = indices_eb_wf[col].mean()
            ax.axvline(mean_val, color='red', linestyle='--',
                       linewidth=2, label=f'Mean: {mean_val:.1f}')
            ax.set_xlabel('Index Value')
            ax.set_ylabel('Frequency')
            ax.set_title(col.replace('_EB', '').replace('_', ' '))
            ax.legend()
            ax.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

        print(f"  ✓ Visualizations created")

    except ImportError:
        print(f"  ⚠ Matplotlib not available - skipping visualizations")
    except Exception as e:
        print(f"  ⚠ Visualization error: {e}")

# ============================================================================
# COMPLETION
# ============================================================================

print("\n" + "=" * 80)
print("✓ WORKFLOW COMPLETE!")
print("=" * 80)
print(f"\nNext steps:")
print(
    f"  1. Review the exported CSV: {EXPORT_PATH if EXPORT_RESULTS else '(export disabled)'}")
print(f"  2. Analyze high-risk intervals for intervention opportunities")
print(f"  3. Compare indices across intersections to prioritize resources")
print(f"  4. Use for real-time monitoring or policy evaluation")
print("\n" + "=" * 80)

## 🚀 Complete End-to-End Workflow

Run this cell to execute the entire safety index computation pipeline with one click.

**Configurable parameters:**

- `DATE_RANGE_DAYS`: Number of days to analyze (default: 7)
- `TARGET_INTERSECTION`: Specific intersection or `None` for all (default: None)
- `EXPORT_PATH`: Where to save results CSV (default: auto-generated filename)


## Phase 7: Empirical Bayes Stabilization

Apply Empirical Bayes adjustment to reduce noise in low-sample-size situations.

**Formula:** `Adjusted_Index = λ × Raw_Index + (1-λ) × Baseline_Index`

Where λ depends on sample size (more data → higher λ → trust raw index more)


## Phase 5: Normalization Constants

Compute all normalization constants required for the Safety Index formulas.

These constants scale raw metrics to [0, 1] range before combining into the final index.


## Phase 4: Master Feature Table Construction

Join all feature sources into a single unified table aligned on 15-minute intervals.

**This becomes the core data model in PostgreSQL for the production system.**


## Phase 3: Feature Engineering

Extract and aggregate features from BSM, PSM, and safety event data at 15-minute intervals.

**This becomes the core data processing pipeline in the FastAPI backend.**


## Phase 2: Historical Baseline Construction

Collect severity-weighted crash data to establish baseline risk scores for Empirical Bayes stabilization.

**Note**: This prototype will become the `build_baseline()` function in the FastAPI backend.


In [None]:
# Phase 1.1: Check temporal coverage and data availability for each key table
import pandas as pd
from datetime import datetime

print("=" * 80)
print("PHASE 1: DATA QUALITY ASSESSMENT")
print("=" * 80)

# Function to safely check table coverage


def check_table_coverage(schema, table, timestamp_col='publish_timestamp'):
    """
    Check temporal coverage and record counts for a table.
    Handles both bigint timestamps and actual timestamp columns.
    """
    try:
        # Try with publish_timestamp first
        query = f"""
        SELECT 
            COUNT(*) as total_records,
            COUNT(DISTINCT intersection) as num_intersections,
            MIN({timestamp_col}) as earliest_ts,
            MAX({timestamp_col}) as latest_ts
        FROM {schema}."{table}"
        WHERE {timestamp_col} > 0 AND {timestamp_col} < 9999999999999999
        """

        cur.execute(query)
        result = cur.fetchone()

        if result and result[0] > 0:
            # Convert timestamps if they're bigints
            try:
                earliest = datetime.fromtimestamp(
                    result[2] / 1000) if result[2] and result[2] > 999999999999 else result[2]
                latest = datetime.fromtimestamp(
                    result[3] / 1000) if result[3] and result[3] > 999999999999 else result[3]
            except:
                earliest = result[2]
                latest = result[3]

            return {
                'schema': schema,
                'table': table,
                'total_records': result[0],
                'num_intersections': result[1],
                'earliest': earliest,
                'latest': latest,
                'status': 'OK'
            }
        else:
            return {'schema': schema, 'table': table, 'status': 'EMPTY'}

    except Exception as e:
        return {'schema': schema, 'table': table, 'status': f'ERROR: {str(e)[:100]}'}


# Check all key tables
tables_to_check = [
    ('alexandria', 'bsm', 'publish_timestamp'),
    ('alexandria', 'psm', 'publish_timestamp'),
    # Note: using time_at_site not publish_timestamp
    ('alexandria', 'safety-event', 'time_at_site'),
    ('alexandria', 'vehicle-count', 'publish_timestamp'),
    ('alexandria', 'vru-count', 'publish_timestamp'),
]

coverage_results = []
for schema, table, ts_col in tables_to_check:
    print(f"\nChecking {schema}.{table}...")
    result = check_table_coverage(schema, table, ts_col)
    coverage_results.append(result)

    if result['status'] == 'OK':
        print(f"  ✓ Records: {result['total_records']:,}")
        print(f"  ✓ Intersections: {result['num_intersections']}")
        print(f"  ✓ Date range: {result['earliest']} to {result['latest']}")
    else:
        print(f"  ✗ Status: {result['status']}")

# Create summary DataFrame
coverage_df = pd.DataFrame(coverage_results)
print("\n" + "=" * 80)
print("COVERAGE SUMMARY")
print("=" * 80)
print(coverage_df.to_string(index=False))

## Phase 1: Data Quality Assessment

Before collecting data, we need to assess what's actually available and identify any quality issues.


# DATA COLLECTION FOR SAFETY INDEX

This section implements the systematic data collection workflow for the Virginia Transportation Safety Index (VTSI).

## Project Requirements

Based on the checkpoint document, we need to compute three indices refreshed every 15 minutes:

1. **VRU Safety Index**: Risk to pedestrians and cyclists
2. **Vehicle Safety Index**: Risk of vehicle-vehicle collisions
3. **Combined Safety Index**: Weighted combination of both

## Data Collection Phases

1. **Phase 1**: Data source verification and quality assessment
2. **Phase 2**: Historical baseline construction (severity-weighted crashes)
3. **Phase 3**: Feature engineering (BSM, PSM, safety events, counts)
4. **Phase 4**: Safety index computation with normalization
5. **Phase 5**: Validation and calibration


## ⚠️ IMPORTANT: Notebook vs. Production Architecture

### This Notebook's Purpose

This notebook is for **development, prototyping, and demonstration** purposes. It allows us to:

- Validate data collection queries and logic
- Explore data quality issues
- Test index computation formulas
- Demonstrate the methodology to stakeholders

### Production Architecture (Eventual Deployment)

```
┌─────────────────────────────────────────────────────────────┐
│                     Production System                        │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  ┌──────────────┐      ┌────────────────────────────────┐  │
│  │ Cloud        │      │   FastAPI Backend              │  │
│  │ Scheduler    │─────>│   (Google Cloud Run)           │  │
│  │ (Every 15min)│      │                                 │  │
│  └──────────────┘      │   - Data Collection Service    │  │
│                        │   - Index Computation Service   │  │
│                        │   - Feature Engineering         │  │
│                        └──────────┬─────────────────────┘  │
│                                   │                          │
│                                   ↓                          │
│                        ┌──────────────────────────┐         │
│                        │  PostgreSQL + PostGIS    │         │
│                        │  (Google Cloud SQL)      │         │
│                        │                          │         │
│                        │  - safety_index_features │         │
│                        │  - safety_index_computed │         │
│                        │  - intersection_baselines│         │
│                        └──────────┬───────────────┘         │
│                                   │                          │
│                                   ↓                          │
│                        ┌──────────────────────────┐         │
│                        │  Streamlit Frontend      │         │
│                        │  (Google Cloud Run)      │         │
│                        │                          │         │
│                        │  - Interactive Map       │         │
│                        │  - Real-time Dashboard   │         │
│                        │  - VRU/Vehicle Views     │         │
│                        └──────────────────────────┘         │
└─────────────────────────────────────────────────────────────┘
```

### API Endpoints (To Be Implemented)

- `POST /api/collect-data` - Triggered every 15 minutes to collect and process new data
- `GET /api/safety-index?intersection=X&time=Y` - Query computed indices
- `GET /api/features?intersection=X` - Get raw features for debugging
- `GET /api/intersections` - List available intersections

### Event-Driven Data Collection Flow

1. **Cloud Scheduler** triggers `/api/collect-data` every 15 minutes
2. **FastAPI service** queries Trino API for latest BSM, PSM, events, counts
3. **Feature engineering** computes aggregates and derives metrics
4. **Index computation** applies formulas and stores results in PostgreSQL
5. **Frontend** queries PostgreSQL for visualization (not Trino directly)

**Note**: The code developed in this notebook will be refactored into the FastAPI backend services.
