# AADT Confidence Interval - State Route (SR) 99, District 3


---

## FHWA Links
* Guidelines for Obtaining AADT Estimates from Non-Traditional Sources:
    * https://www.fhwa.dot.gov/policyinformation/travel_monitoring/pubs/aadtnt/Guidelines_for_AADT_Estimates_Final.pdf

---
  
## AADT Analysis Locations
* 10 locations were used in the analysis
* Locations were determined based on the location on installed & recording Traffic Operations cameras
    * for additional information contact Zhenyu Zhu with Traffic Operations

## Traffic Census Data
* https://dot.ca.gov/programs/traffic-operations/census/traffic-volumes
* Back AADT, Peak Month, and Peak Hour usually represents traffic South or West of the count location.  
* Ahead AADT, Peak Month, and Peak Hour usually represents traffic North or East of the count location. Listing of routes with their designated  

* Because the Back & Ahead counts are included at each location in the Traffic Census Data, (e.g., "IRWINDALE, ARROW HIGHWAY") only one [OBJECTID*] per location was pulled; for this analysis the North Bound Nodes were used for the analysis. 
    * for more information see the diagram: https://traffic.onramp.dot.ca.gov/downloads/traffic/files/performance/census/Back_and_Ahead_Leg_Traffic_Count_Diagram.pdf

## StreetLight Analysis Data
* Analysis Type == Network Performance
* Segment Metrics
* 2022 was used to match currently available Traffic Census Data (as of 8/27/2025)
* pulled a variety of Day Types, but plan to just look at """All Day Types"""
* pulled a variety of Day Parts, but plan to just look at """All Day Parts"""

---


## How this notebook estimates StreetLight vs. Traffic Census differences

**What we’re trying to answer:**  
Across selected corridor locations, is the Non-Traditional AADT generally higher or lower than Traffic Census (aka Traditional) AADT, and by how much? We also show how certain we are about that average difference.

---

### The data we use
- **Traffic Census (TC):** The official counts by location (`objectid`) with two directions: *ahead* and *back*.
- **StreetLight (STL):** Volume by road segment (“**zonename**”) with tags like **daytype** (e.g., All Days) and **daypart** (e.g., All Day).
- **Location mapping:** For each TC location, a list of the STL zosenames that represent the *ahead* side and the *behind* side of that location.

---

### How we build one number per location (AADT)
1) **Pick the TC value** that matches the counter’s direction:  
   - Even `objectid` → use the TC *back* value  
   - Odd `objectid` → use the TC *ahead* value  
   *(This mirrors the direction convention previously reviewed.)*

2) **Filter StreetLight to the same conditions** you care about (usually **All Days** and **All Day**).

3) **For each STL zonename**, take the average volume within that filter.  
   *(This gives one “typical” value per segment under the chosen daytype/daypart.)*

4) **Sum the STL segments for this location**:  
   - Add up the “ahead” segments.  
   - Add up the “behind” segments.  
   - Then add those two sides together.  
   *(Result = StreetLight AADT for that location.)*

Now each location has:
- **TC AADT** (the benchmark)  
- **STL AADT** (the estimate from StreetLight)

---

### Turn those into apples-to-apples differences (TCE, in %)
For every location with both numbers:
- **Traffic Count Error (TCE)** = the percent difference between STL and TC.  
  - Negative TCE → STL is lower than TC.  
  - Positive TCE → STL is higher than TC.

We collect one TCE value per location.

---

### Summarize and add a confidence band (CI)
- **Average TCE**: the typical over/under across all locations.  
- **95% Confidence Interval**: a “margin of error” around that average, based on how much the location-level TCEs vary and how many locations you have.  
  - If the interval **crosses 0%**, the average difference isn’t statistically clear (could be slightly above or below zero).  
  - If the interval is **entirely below 0%**, STL tends to be lower than TC.  
  - If it’s **entirely above 0%**, STL tends to be higher.

We also show a **t-statistic** and **p-value** for the “is the average difference basically zero?” question; lower p-values mean a clearer difference.

---

### What to look for
- **The average TCE** (direction and size).  
- **Whether the 95% CI includes 0%.**  
- **Any locations with missing segments or mismatched data** (these are flagged so you can QA them).

## import packages

In [1]:
import numpy as np
import pandas as pd
import scipy.stats as stats
import csv
import re

In [2]:
# pull in the coordinates from the utils docs
#from osow_frp_o_d_utils_v3 import origin_intersections, destination_intersections
import shs_ct_tc_locations_utils as tc_locs

### Identify the corridor

In [3]:
# Identify the corridor to be analyzed
CORRIDOR_VAR_NAME = "sr_99_d3_tc_aadt_locations"

In [4]:
# Resolve the object from the module by name
try:
    aadt_locations = getattr(tc_locs, CORRIDOR_VAR_NAME)
except AttributeError:
    raise KeyError(
        f"'{CORRIDOR_VAR_NAME}' not found in shs_ct_tc_locations_utils. "
        "Double-check the variable name."
    )

### Identify the Google Cloud Storage path

In [5]:
# Identify the GCS path to the data
gcs_path = "gs://calitp-analytics-data/data-analyses/big_data/compare_traffic_counts/0_2022/"

## Step 0, Pull in the Data

In [6]:
# This function will pull in the data and clean the column headers in a way that will make them easier to work with
def getdata_and_cleanheaders(path):
    # Read the CSV file
    df = pd.read_csv(path)

    # Clean column headers: remove spaces, convert to lowercase, and strip trailing asterisks
    cleaned_columns = []
    for column in df.columns:
        cleaned_column = column.replace(" ", "").lower().rstrip("*")
        cleaned_columns.append(cleaned_column)

    df.columns = cleaned_columns
    return df

In [7]:
# pull in the data & create dataframes
df_tc = getdata_and_cleanheaders(f"{gcs_path}caltrans_traffic_census_2022.csv")  # Traffic Census

_request non-retriable exception: ('Error code invalid_grant: Refresh token has expired', '{"error":"invalid_grant","error_description":"Refresh token has expired"}')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/gcsfs/retry.py", line 135, in retry_request
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/gcsfs/core.py", line 467, in _request
    headers=self._get_headers(headers),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/gcsfs/core.py", line 444, in _get_headers
    self.credentials.apply(out)
  File "/opt/conda/lib/python3.11/site-packages/gcsfs/credentials.py", line 223, in apply
    self.maybe_refresh()
  File "/opt/conda/lib/python3.11/site-packages/gcsfs/credentials.py", line 211, in maybe_refresh
    self.credentials.refresh(req)
  File "/opt/conda/lib/python3.11/site-packages/google/auth/external_account_authorized_user.p

OAuthError: ('Error code invalid_grant: Refresh token has expired', '{"error":"invalid_grant","error_description":"Refresh token has expired"}')

In [None]:
# Identify the StreetLight Analysis to be used in the AADT comparison
df_stl = getdata_and_cleanheaders(f"{gcs_path}streetlight_605_d7_all_vehicles_np_2022.csv")  # StreetLight

In [None]:
# comparing
df_tc.to_csv("df_tc.csv", index=False)

In [None]:
# comparing
df_stl.to_csv("df_stl.csv", index=False)

## Normalizer

In [None]:
def _ensure_list(x):
    if x is None: return []
    if isinstance(x, (list, tuple, set)): return list(x)
    return [x]

def explode_locations_to_objectids(aadt_locs):
    """
    Returns a list of dicts where each item is ONE objectid with:
      name, daytype, objectids [list[str]], ahead_zones [list[str]], behind_zones [list[str]]
    This shape is accepted by your existing traditional/non_traditional builders.
    """
    rows = []

    # Case A: "flat" list like interstate_605_aadt_locations
    if isinstance(aadt_locs, list) and aadt_locs and isinstance(aadt_locs[0], dict) and "objectid" in aadt_locs[0]:
        for loc in aadt_locs:
            oid = str(loc.get("objectid"))
            nm  = f"{loc.get('location_description','UNKNOWN')} [{oid}]"
            day = loc.get("daytype", "0: All Days (M-Su)")

            ahead, behind = [], []
            for k, v in loc.items():
                if not k.startswith("zonename_"):
                    continue
                idx = int(k.split("_")[1])
                # assume even indexes (0,2) are "ahead"/NB and odd (1,3) are "behind"/SB (matches your list)
                if idx % 2 == 0: ahead.append(v)
                else:            behind.append(v)

            rows.append({
                "name": nm,
                "daytype": day,
                "objectids": [oid],
                "ahead_zones": [z for z in ahead if z],
                "behind_zones": [z for z in behind if z],
            })
        return rows

    # Case B: nested dict(s) like sr_605_d7_tc_aadt_locations
    def _gather_objectids(node):
        ids = []
        if "objectid"  in node: ids.extend(_ensure_list(node["objectid"]))
        if "objectids" in node: ids.extend(_ensure_list(node["objectids"]))
        return [str(i) for i in ids if i is not None and str(i).strip() != ""]

    if isinstance(aadt_locs, list):
        iterable = []
        for item in aadt_locs:
            if isinstance(item, dict):
                iterable.append(item)
    elif isinstance(aadt_locs, dict):
        iterable = [aadt_locs]
    else:
        iterable = []

    for block in iterable:
        for base_name, loc in block.items():
            day = loc.get("daytype", "0: All Days (M-Su)")
            nodes = loc.get("nodes", {}) or {}
            for node_name, node in nodes.items():
                oids = _gather_objectids(node)
                if not oids: continue
                nm = f"{base_name} [{','.join(oids)}]"

                ahead = _ensure_list(node.get("zonename_ahead", []))
                behind = _ensure_list(node.get("zonename_behind", []))

                rows.append({
                    "name": nm,
                    "daytype": day,
                    "objectids": oids,
                    "ahead_zones": [z for z in ahead if z],
                    "behind_zones": [z for z in behind if z],
                })
    return rows

## Step 1, Build a per-location summary of Traffic Census locations

In [None]:
def traditional_aadt_by_location(aadt_locations, df_tc, as_df=True, use_parity=True):
    """
    Build a per-location summary of *traditional* (Traffic Census) AADT.

    Output columns:
      location, daytype, objectids, n_objectids, n_found_in_tc, missing_objectids,
      traditional_ahead_mean, traditional_behind_mean, traditional_aadt
    """
    # Requires: import pandas as pd; import numpy as np

    def _ensure_list(x):
        if x is None: return []
        if isinstance(x, (list, tuple, set)): return list(x)
        return [x]

    def _gather_objectids(node_dict):
        ids = []
        if not isinstance(node_dict, dict): return ids
        if "objectid"  in node_dict: ids.extend(_ensure_list(node_dict["objectid"]))
        if "objectids" in node_dict: ids.extend(_ensure_list(node_dict["objectids"]))
        return [str(i) for i in ids if i is not None and str(i).strip() != ""]

    def _dedup(seq):
        seen=set(); out=[]
        for x in seq:
            if x not in seen:
                out.append(x); seen.add(x)
        return out

    def _normalize_one_location(name, loc, include_oid_in_name=True):
        # Handle nested "nodes" dict (sr_605_d7_tc_aadt_locations-style)
        nodes = (loc.get("nodes") if isinstance(loc, dict) else None) or {}
        all_ids=[]
        for _, node in nodes.items():
            all_ids.extend(_gather_objectids(node))
        if not all_ids and isinstance(loc, dict) and "objectid" in loc:
            all_ids = [str(loc["objectid"])]

        name_out = name
        if include_oid_in_name and all_ids:
            name_out = f"{name} [{','.join(all_ids)}]"

        return {
            "name": name_out,
            "daytype": (loc.get("daytype") if isinstance(loc, dict) else None) or "0: All Days (M-Su)",
            "objectids": _dedup(all_ids),
        }

    def _normalize_input(aadt_locs):
        # Already normalized DataFrame?
        if isinstance(aadt_locs, pd.DataFrame) and {"name","daytype","objectids"}.issubset(aadt_locs.columns):
            return aadt_locs.to_dict(orient="records")
        # Already normalized list[dict]?
        if isinstance(aadt_locs, list) and aadt_locs and isinstance(aadt_locs[0], dict) and \
           {"name","daytype","objectids"}.issubset(aadt_locs[0].keys()):
            return aadt_locs

        recs = []
        # Case 1: dict keyed by location names
        if isinstance(aadt_locs, dict):
            for nm, loc in aadt_locs.items():
                recs.append(_normalize_one_location(nm, loc))
            return recs

        # Case 2: list of items
        if isinstance(aadt_locs, list):
            for item in aadt_locs:
                if not isinstance(item, dict):
                    continue
                if "nodes" in item:
                    nm = item.get("location_description") or item.get("name") or "UNKNOWN"
                    recs.append(_normalize_one_location(nm, item))
                elif "objectid" in item:
                    # Flat interstate_605_aadt_locations-style row
                    oid = str(item.get("objectid"))
                    nm  = item.get("location_description") or item.get("name") or "UNKNOWN"
                    recs.append({
                        "name": f"{nm} [{oid}]",
                        "daytype": item.get("daytype", "0: All Days (M-Su)"),
                        "objectids": [oid],
                    })
                else:
                    # Fallback: assume dict keyed by name
                    for nm, loc in item.items():
                        recs.append(_normalize_one_location(nm, loc))
        return recs

    def _traditional_aadt_for_ids(df_tc_in, obj_ids):
        """
        If use_parity=True: even OID -> back_aadt, odd OID -> ahead_aadt (matches your reviewed analysis).
        Else: average ahead/back per objectid (original behavior).
        """
        obj_ids = [str(x) for x in (obj_ids or []) if str(x).strip()]
        if not obj_ids:
            return np.nan, np.nan, np.nan, 0

        sub = df_tc_in[df_tc_in["objectid"].astype(str).isin(obj_ids)].copy()
        if sub.empty:
            return np.nan, np.nan, np.nan, 0

        if use_parity:
            vals = []
            for oid in obj_ids:
                row = sub[sub["objectid"].astype(str) == oid]
                if row.empty:
                    continue
                val = row.iloc[0]["back_aadt"] if int(oid) % 2 == 0 else row.iloc[0]["ahead_aadt"]
                vals.append(pd.to_numeric(val, errors="coerce"))
            vals = pd.Series(vals, dtype="float64").dropna()
            if vals.empty:
                return np.nan, np.nan, np.nan, 0
            overall = float(vals.mean())
            return overall, np.nan, np.nan, int(vals.shape[0])

        # Fallback: average ahead/back per objectid group
        ahead_vals = pd.to_numeric(sub.get("ahead_aadt"), errors="coerce").dropna()
        back_vals  = pd.to_numeric(sub.get("back_aadt"),  errors="coerce").dropna()
        mean_ahead = float(ahead_vals.mean()) if not ahead_vals.empty else np.nan
        mean_back  = float(back_vals.mean())  if not back_vals.empty  else np.nan
        overall    = np.nanmean([mean_ahead, mean_back])
        count_used = int(sub.shape[0])
        return overall, mean_ahead, mean_back, count_used

    # ---- main ----
    norm = _normalize_input(aadt_locations)
    tc_ids_all = set(df_tc["objectid"].astype(str).unique())

    rows = []
    for loc in norm:
        obj_ids = [str(x) for x in (loc.get("objectids") or [])]
        overall, mean_ahead, mean_back, n_found = _traditional_aadt_for_ids(df_tc, obj_ids)
        missing = [x for x in obj_ids if x not in tc_ids_all]

        rows.append({
            "location": loc.get("name"),
            "daytype":  loc.get("daytype"),
            "objectids": "|".join(obj_ids),   # pipe-separated string
            "n_objectids": len(obj_ids),
            "n_found_in_tc": int(n_found),
            "missing_objectids": "|".join(missing) if missing else "",
            "traditional_ahead_mean": mean_ahead,
            "traditional_behind_mean": mean_back,
            "traditional_aadt": overall,
        })

    return pd.DataFrame(rows) if as_df else rows




In [None]:
# run step 1 - traditional aadt counts
trad_df = traditional_aadt_by_location(aadt_locations, df_tc, as_df=True)

In [None]:
#trad_df.head()

In [None]:
# Export Step 1 as a CSV to take a look
trad_df.to_csv("step_1_traditional_aadt_by_location.csv", index=False)

## Step 2 Identify Traffic Census location names for the StreetLight segments

In [None]:
def non_traditional_aadt_by_location(
    aadt_locations,
    df_stl,
    daytype_filter="0: All Days (M-Su)",
    daypart_filter="0: All Day (12am-12am)",
    zonename_col="zonename",
    stl_volume_col="averagedailysegmenttraffic(stlvolume)",
    as_df=True,
    agg="sum",                 # "sum" mirrors your pipeline; "mean" averages zones
    key_mode="id",             # "id" = use trailing numeric id after '/', else "label"
    **kwargs                   # absorb unused args (e.g., modeoftravel_filter)
):
    """
    Minimal version that just computes numbers. No mutation of df_stl.
    Handles aadt_locations as:
      - dict: { "LOC": {daytype, nodes{...}}, ... }
      - list[dict] where each dict is either a normalized record or a mapping of many LOCs.
    """
    import re
    import numpy as np
    import pandas as pd

    def _ensure_list(x):
        if x is None: return []
        if isinstance(x, (list, tuple, set)): return list(x)
        return [x]

    def _dedup(seq):
        seen=set(); out=[]
        for v in seq:
            if v not in seen:
                out.append(v); seen.add(v)
        return out

    # --- normalize inputs into list of dicts with ahead/behind arrays ---
    def _normalize_one_location(name, loc, include_oid=True):
        ahead, behind, oids = [], [], []
        nodes = (loc.get("nodes") or {}) if isinstance(loc, dict) else {}
        for _, n in nodes.items():
            ahead += [z for z in _ensure_list(n.get("zonename_ahead")) if z]
            behind += [z for z in _ensure_list(n.get("zonename_behind")) if z]
            if "objectid"  in n:  oids += _ensure_list(n["objectid"])
            if "objectids" in n:  oids += _ensure_list(n["objectids"])
        nm = name
        if include_oid and oids:
            nm = f"{name} [{','.join(_dedup([str(x) for x in oids if str(x).strip()]))}]"
        return {
            "name": nm,
            "daytype": loc.get("daytype", "0: All Days (M-Su)") if isinstance(loc, dict) else "0: All Days (M-Su)",
            "ahead_zones": _dedup(ahead),
            "behind_zones": _dedup(behind),
        }

    def _normalize_locations(aadt_locs):
        import pandas as pd
        # Already normalized dataframe?
        if isinstance(aadt_locs, pd.DataFrame) and {"name","daytype","ahead_zones","behind_zones"}.issubset(aadt_locs.columns):
            return aadt_locs.to_dict("records")

        # Mapping dict? (your case if you pass sr_99_d3_tc_aadt_locations[0])
        if isinstance(aadt_locs, dict):
            return [_normalize_one_location(nm, loc) for nm, loc in aadt_locs.items()]

        # List inputs
        recs=[]
        if isinstance(aadt_locs, list):
            if not aadt_locs:
                return recs
            first = aadt_locs[0]

            # Case 1: list of normalized dicts
            if isinstance(first, dict) and {"name","daytype","ahead_zones","behind_zones"}.issubset(first.keys()):
                return aadt_locs

            # Case 2: list where items are mappings of many locations (YOUR CASE)
            # e.g., [ { "LOC1": {...}, "LOC2": {...} }, { "LOC3": {...} } ]
            all_items_are_mappings = all(isinstance(item, dict) and not {"name","ahead_zones","behind_zones"}.issubset(item.keys()) for item in aadt_locs)
            if all_items_are_mappings:
                for mapping in aadt_locs:
                    for nm, loc in mapping.items():
                        recs.append(_normalize_one_location(nm, loc))
                return recs

            # Case 3: list where each item is a single location dict with "nodes"
            for item in aadt_locs:
                if not isinstance(item, dict):
                    continue
                if "nodes" in item:
                    nm = item.get("location_description") or item.get("name") or "UNKNOWN"
                    recs.append(_normalize_one_location(nm, item))
                elif "objectid" in item:
                    # flat record variant
                    oid = str(item.get("objectid"))
                    nm  = item.get("location_description") or item.get("name") or "UNKNOWN"
                    day = item.get("daytype", "0: All Days (M-Su)")
                    ahead, behind = [], []
                    for k, v in item.items():
                        if isinstance(k, str) and k.startswith("zonename_"):
                            try: idx = int(k.split("_")[1])
                            except: idx = None
                            (ahead if (idx is not None and idx % 2 == 0) else behind).append(v)
                    recs.append({
                        "name": f"{nm} [{oid}]",
                        "daytype": day,
                        "ahead_zones": _dedup([z for z in ahead if z]),
                        "behind_zones": _dedup([z for z in behind if z]),
                    })
        return recs

    # --- choose key function (no mutations) ---
    if key_mode == "id":
        def make_key_series(labels_series):
            s = labels_series.astype(str).str.strip()
            keys = s.str.extract(r'/\s*([0-9]+)\s*$')[0]
            fallback = s.str.extract(r'([0-9]+)(?!.*[0-9])')[0]
            keys = keys.fillna(fallback)
            keys = keys.fillna(s.str.lower())
            return keys
        def key_from_label(label):
            lab = str(label).strip()
            m = re.search(r'/\s*([0-9]+)\s*$', lab) or re.search(r'([0-9]+)(?!.*[0-9])', lab)
            return m.group(1) if m else lab.lower()
    else:  # key_mode == "label"
        def make_key_series(labels_series):
            return labels_series.astype(str).str.strip()
        def key_from_label(label):
            return str(label).strip()

    # --- filter df_stl (exact if provided; otherwise use all) ---
    required = {zonename_col, stl_volume_col}
    if not required.issubset(df_stl.columns) or "daytype" not in df_stl.columns or "daypart" not in df_stl.columns:
        missing = required.union({"daytype","daypart"}) - set(df_stl.columns)
        raise KeyError(f"df_stl missing required column(s): {sorted(missing)}")

    if daytype_filter is None or daypart_filter is None:
        stl_slice = df_stl.loc[:, [zonename_col, stl_volume_col]].copy()
    else:
        mask = (df_stl["daytype"] == daytype_filter) & (df_stl["daypart"] == daypart_filter)
        stl_slice = df_stl.loc[mask, [zonename_col, stl_volume_col]].copy()
        if stl_slice.empty:
            # fall back to all rows if the exact strings don't exist
            stl_slice = df_stl.loc[:, [zonename_col, stl_volume_col]].copy()

    # coerce numeric; key by label or id
    import pandas as pd
    vals = pd.to_numeric(stl_slice[stl_volume_col], errors="coerce")
    labels = stl_slice[zonename_col]
    keys = make_key_series(labels)
    good = vals.notna() & keys.notna()
    if not good.any():
        raise ValueError("No usable StreetLight rows after filtering/keying; check stl_volume_col, zonename_col, and key_mode.")

    tmp = pd.DataFrame({"key": keys[good].astype(str), "val": vals[good].astype(float)})
    zone_mean = tmp.groupby("key")["val"].mean()
    zone_rows = tmp.groupby("key")["val"].size()
    present_keys = set(zone_mean.index)

    def agg_for_labels(label_list):
        labels = [str(z).strip() for z in _ensure_list(label_list) if str(z).strip()]
        if not labels:
            return np.nan, 0, []
        ks = [key_from_label(z) for z in labels]
        present = [labels[i] for i, k in enumerate(ks) if k in present_keys]
        missing = [labels[i] for i, k in enumerate(ks) if k not in present_keys]
        vals_here = zone_mean.reindex([key_from_label(z) for z in present]).dropna().to_numpy()
        if agg == "sum":
            val = float(np.sum(vals_here)) if vals_here.size else np.nan
        else:
            val = float(np.mean(vals_here)) if vals_here.size else np.nan
        n_rows = int(zone_rows.reindex([key_from_label(z) for z in present]).fillna(0).sum())
        return val, n_rows, missing

    # --- build rows ---
    norm = _normalize_locations(aadt_locations)
    if not norm:
        raise ValueError("aadt_locations normalized to 0 locations. If you pass a list containing a mapping, pass the mapping (e.g., sr_99_d3_tc_aadt_locations[0]) or keep this function's new normalization.")

    rows = []
    for loc in norm:
        ahead = _ensure_list(loc.get("ahead_zones"))
        behind = _ensure_list(loc.get("behind_zones"))
        val_a, n_a, miss_a = agg_for_labels(ahead)
        val_b, n_b, miss_b = agg_for_labels(behind)
        overall = float(np.nansum([v for v in (val_a, val_b) if v is not None]))
        rows.append({
            "location": loc.get("name"),
            "daytype_expected": loc.get("daytype"),
            "daytype_used": (daytype_filter if daytype_filter is not None else ""),
            "daypart_used": (daypart_filter if daypart_filter is not None else ""),
            "ahead_zones": "|".join(ahead),
            "behind_zones": "|".join(behind),
            "non_trad_ahead_mean": val_a,
            "non_trad_behind_mean": val_b,
            "non_trad_aadt": overall,
            "stl_ahead_rows": n_a,
            "stl_behind_rows": n_b,
            "missing_ahead_zones": "|".join(miss_a) if miss_a else "",
            "missing_behind_zones": "|".join(miss_b) if miss_b else "",
        })

    return pd.DataFrame(rows) if as_df else rows


In [None]:
# this will run the "non_traditional_aadt_by_location" function if  you have the raw nested structure:
# stl_df = non_traditional_aadt_by_location(
#     aadt_locations,
#     df_stl,
#     daytype_filter="0: All Days (M-Su)",
#     daypart_filter="0: All Day (12am-12am)",
#     zonename_col="zonename",
#     stl_volume_col="averagedailysegmenttraffic(stlvolume)",
#     as_df=True
# )

# stl_df = non_traditional_aadt_by_location(
#     aadt_locations=sr_99_d3_tc_aadt_locations[0],   # <-- note the [0]
#     df_stl=df_stl,
#     daytype_filter="0: All Days (M-Su)",
#     daypart_filter="0: All Day (12am-12am)",
#     zonename_col="zonename",
#     stl_volume_col="averagedailysegmenttraffic(stlvolume)",
#     key_mode="id",        # use 'label' if IDs aren't in zonename
#     as_df=True
# )


def _pick_mapping(locs):
    # your data is a list with one big dict; if it’s already a dict, just return it
    return locs[0] if isinstance(locs, list) else locs

stl_df = non_traditional_aadt_by_location(
    aadt_locations=_pick_mapping(sr_99_d3_tc_aadt_locations),
    df_stl=df_stl,
    daytype_filter="0: All Days (M-Su)",
    daypart_filter="0: All Day (12am-12am)",
    zonename_col="zonename",
    stl_volume_col="averagedailysegmenttraffic(stlvolume)",
    key_mode="id",
    as_df=True
)

In [None]:
# Export step 2 to a CSV
stl_df.to_csv("step_2_non_traditional_aadt_by_location.csv", index=False)

### Step 3, Build the per-location comparison DataFrame

In [None]:
# # ------------------------------------------------------
# # 3) Build the per-location comparison DataFrame
# # ------------------------------------------------------

def build_aadt_comparison_df(
    aadt_locations,
    df_tc,
    df_stl,
    daytype_filter="0: All Days (M-Su)",
    daypart_filter="0: All Day (12am-12am)",
    modeoftravel_filter=None,
    zonename_col="zonename",
    stl_volume_col="averagedailysegmenttraffic(stlvolume)"
) -> pd.DataFrame:
    """
    Build a per-location comparison combining:
      - Traditional (Traffic Census) AADT
      - Non-traditional (StreetLight) AADT
      - TCE (%) = 100 * (non_trad_aadt - traditional_aadt) / traditional_aadt

    Returns a pandas DataFrame (one row per location).
    """

    # 1) Build the two sides using your updated functions
    trad_df = traditional_aadt_by_location(
        aadt_locations=aadt_locations,
        df_tc=df_tc,
        as_df=True
    )

    nt_df = non_traditional_aadt_by_location(
        aadt_locations=aadt_locations,
        df_stl=df_stl,
        daytype_filter=daytype_filter,
        daypart_filter=daypart_filter,
        modeoftravel_filter=modeoftravel_filter,
        zonename_col=zonename_col,
        stl_volume_col=stl_volume_col,
        as_df=True
    )

    # 2) Merge on 'location'
    merged = pd.merge(
        trad_df,
        nt_df,
        how="inner",
        on="location",
        suffixes=("_trad", "_nt")
    )

    # 3) Compute TCE (%), guarding against zero / NaN
    def _tce(row):
        t = row.get("traditional_aadt")
        n = row.get("non_trad_aadt")
        if pd.notna(t) and t != 0 and pd.notna(n):
            return 100.0 * (n - t) / t
        return np.nan

    merged["tce_percent"] = merged.apply(_tce, axis=1)

    # 4) Stable, readable column order (only keep those that exist)
    preferred_cols = [
        "location",
        # IDs & zones (pipe-joined for spreadsheet safety)
        "objectids", "n_objectids", "n_found_in_tc", "missing_objectids",
        "ahead_zones", "behind_zones",
        # AADT metrics
        "traditional_ahead_mean", "traditional_behind_mean", "traditional_aadt",
        "non_trad_ahead_mean", "non_trad_behind_mean", "non_trad_aadt",
        "tce_percent",
        # Filters / metadata
        "daytype",            # from Step 1
        "daytype_expected",   # from Step 2 (original location metadata)
        "daytype_used", "daypart_used", "modeoftravel_used",
        # Debug / row counts
        "stl_ahead_rows", "stl_behind_rows",
        "missing_ahead_zones", "missing_behind_zones",
    ]
    cols = [c for c in preferred_cols if c in merged.columns]
    merged = merged[cols].copy()

    return merged

In [None]:
#3.1) Build the combined comparison DataFrame
cmp_df = build_aadt_comparison_df(
    aadt_locations=aadt_locations,  # your dict/list structure
    df_tc=df_tc,                                 # Traffic Census dataframe
    df_stl=df_stl,                               # StreetLight dataframe
    daytype_filter="0: All Days (M-Su)",
    daypart_filter="0: All Day (12am-12am)",
    zonename_col="zonename",
    stl_volume_col="averagedailysegmenttraffic(stlvolume)"
)

In [None]:
print("rows in cmp_df:", len(cmp_df))
print(cmp_df["tce_percent"].describe())
print("any STL zeros?", (cmp_df["non_trad_aadt"] == 0).sum())
print("missing STL zones (any)?",
      (cmp_df["missing_ahead_zones"] != "").sum() +
      (cmp_df["missing_behind_zones"] != "").sum())

In [None]:
# 3.2) Quick peek
#cmp_df.head()

In [None]:
# 3.3) (Optional) sort by absolute TCE to see big deltas first
cmp_df = cmp_df.sort_values("tce_percent", key=lambda s: s.abs(), ascending=False)

In [None]:
# 3.4) Export to CSV 
cmp_df.to_csv("step_3_comparison_dataframe.csv", index=False)

## Step 4 Confidence Interval over TCE

In [None]:
# # ------------------------------------------------------
# # 4) Confidence interval over TCE
# # ------------------------------------------------------

def tce_confidence_interval(detail_df, confidence=0.95):
    """
    Compute summary stats over `detail_df["tce_percent"]`.
    Returns: (mean_tce, ci_lo, ci_hi, tcrit, t_stat)
    """
    # Clean and extract
    tces = pd.to_numeric(detail_df["tce_percent"], errors="coerce") \
             .replace([np.inf, -np.inf], np.nan) \
             .dropna().values
    n = len(tces)
    if n == 0:
        return None, None, None, None, None

    mean_tce = float(np.mean(tces))
    if n > 1:
        std_tce = float(np.std(tces, ddof=1))
        se = std_tce / np.sqrt(n)
        if se > 0:
            dof = n - 1
            tcrit = float(stats.t.ppf((1 + confidence) / 2, dof))
            ci_lo = mean_tce - tcrit * se
            ci_hi = mean_tce + tcrit * se
            t_stat = mean_tce / se
        else:
            tcrit = ci_lo = ci_hi = t_stat = None
    else:
        std_tce = 0.0
        se = 0.0
        tcrit = ci_lo = ci_hi = t_stat = None

    return mean_tce, ci_lo, ci_hi, tcrit, t_stat

def tce_confidence_interval_df(detail_df, confidence=0.95) -> pd.DataFrame:
    """
    Same as tce_confidence_interval, but returns a one-row DataFrame with
    extra fields useful for reporting/export.
    """
    tces = pd.to_numeric(detail_df["tce_percent"], errors="coerce") \
             .replace([np.inf, -np.inf], np.nan) \
             .dropna()
    n = int(tces.shape[0])
    if n == 0:
        return pd.DataFrame([{
            "confidence": confidence,
            "n": 0,
            "dof": None,
            "mean_tce": None,
            "std_tce": None,
            "se": None,
            "t_critical": None,
            "margin_of_error": None,
            "ci_lower": None,
            "ci_upper": None,
            "t_statistic": None,
            "p_value_two_sided": None
        }])

    mean_tce = float(tces.mean())
    if n > 1:
        std_tce = float(tces.std(ddof=1))
        se = std_tce / np.sqrt(n)
        dof = n - 1
        if se > 0:
            tcrit = float(stats.t.ppf((1 + confidence) / 2, dof))
            moe = tcrit * se
            ci_lo = mean_tce - moe
            ci_hi = mean_tce + moe
            t_stat = mean_tce / se
            p_val = float(2 * (1 - stats.t.cdf(abs(t_stat), dof)))
        else:
            tcrit = moe = ci_lo = ci_hi = t_stat = p_val = None
    else:
        std_tce = 0.0
        se = 0.0
        dof = None
        tcrit = moe = ci_lo = ci_hi = t_stat = p_val = None

    return pd.DataFrame([{
        "confidence": confidence,
        "n": n,
        "dof": dof,
        "mean_tce": mean_tce,
        "std_tce": std_tce if n > 1 else None,
        "se": se if n > 1 else None,
        "t_critical": tcrit,
        "margin_of_error": moe,
        "ci_lower": ci_lo,
        "ci_upper": ci_hi,
        "t_statistic": t_stat,
        "p_value_two_sided": p_val
    }])


In [None]:
# 4.0) Normalize to objectid rows (works for either of your location formats)
norm_rows = explode_locations_to_objectids(aadt_locations)  # or sr_605_d7_tc_aadt_locations

# 4.1) Build comparison
cmp_df = build_aadt_comparison_df(
    aadt_locations=norm_rows,
    df_tc=df_tc,
    df_stl=df_stl,
    daytype_filter="0: All Days (M-Su)",
    daypart_filter="0: All Day (12am-12am)",
    modeoftravel_filter=None,
    zonename_col="zonename",
    stl_volume_col="averagedailysegmenttraffic(stlvolume)"
)

In [None]:
print("rows in cmp_df:", len(cmp_df))
print(cmp_df["tce_percent"].describe())
print("any STL zeros?", (cmp_df["non_trad_aadt"] == 0).sum())
print("missing STL zones (any)?",
      (cmp_df["missing_ahead_zones"] != "").sum() +
      (cmp_df["missing_behind_zones"] != "").sum())

In [None]:
# 4.2) Get the CI summary as a DataFrame
# tce_summary_df = tce_confidence_interval_df(cmp_df, confidence=0.95)
tce_summary_df = tce_confidence_interval_df(cmp_df, confidence=0.95)

In [None]:
# 4.3) Quick peek
print(tce_summary_df)

In [None]:
#cmp_df["tce_percent"].describe()

In [None]:
# how many observations? should ≈ number of objectids that matched both TC and STL
#len(cmp_df)

In [None]:
# # spot check a row you know well
# cmp_df.loc[cmp_df["location"].str.contains("IRWINDALE", case=False)].head()

In [None]:
# 4.4) Export to CSV 
cmp_df.to_csv("step_4_summary.csv", index=False)

In [None]:
mean_tce, ci_lower, ci_upper, t_critical, t_statistic = tce_confidence_interval(
    cmp_df, confidence=0.95
)

print("Mean TCE:", mean_tce)
print("95% Confidence Interval:", (ci_lower, ci_upper))
print("t-test statistic:", t_statistic)
print("t-critical:", t_critical)

## Sanity Checks

In [None]:
# 1) Did the merge keep enough rows?
print("cmp_df rows:", len(cmp_df))

In [None]:
# If this is small (< 5) for SR-99, inspect which side is missing:
trad_df = traditional_aadt_by_location(aadt_locations, df_tc, as_df=True)
nt_df   = non_traditional_aadt_by_location(aadt_locations, df_stl, as_df=True)
print("trad rows:", len(trad_df), "nt rows:", len(nt_df))
print("only in trad:", len(set(trad_df.location) - set(nt_df.location)))
print("only in nt:",   len(set(nt_df.location)   - set(trad_df.location)))

In [None]:
# 2) See what labels SR-99 actually has
print("daytype samples:", df_stl["daytype"].dropna().unique()[:10])
print("daypart samples:", df_stl["daypart"].dropna().unique()[:10])

In [None]:
# 3) How many rows survive the filter?
filt = (df_stl["daytype"] == "0: All Days (M-Su)") & (df_stl["daypart"] == "0: All Day (12am-12am)")
print("stl_filtered rows (strict):", int(filt.sum()))

### Mean TCE: -3.62
Traffic Census Error (TCE)
* A negative TCE of -3.62% means that on average, the StreetLight AADT estimates are about 3.62% lower than the official Caltrans Traffic Census counts.

### 95% Confidence Interval (-10.78%, 3.54%)
* Based on the sample of locations, the results suggest 95% confidence that the true average TCE (i.e., the average percent difference between StreetLight and Census across the entire population) falls somewhere between -10.78% and +3.54%.
    * Since this interval includes zero, it's possible that the true average error is zero, meaning StreetLight might not be significantly over- or underestimating, on average.
    * But the range is quite wide (~14 percentage points), which indicates some variability in the data or a small sample size.

### T-Test Statistic  
* **-1.059**: This means your observed sample mean is about **1.059 standard errors** below the expected population mean. Since it's not far enough from the threshold (2.093), the result is **not significant**.

### Summary
* On average, StreetLight data is underestimating AADT by about 3.6% on this subset of locations.
* But with 95% confidence, the actual average error could be as much as 10.8% under or 3.5% over the true value.
* Because zero is in that range, you can't definitively say it's underestimating — the difference might not be statistically significant.


# AADT Confidence Interval - Interstate 605, District 7

## FHWA Links
* Guidelines for Obtaining AADT Estimates from Non-Traditional Sources:
    * https://www.fhwa.dot.gov/policyinformation/travel_monitoring/pubs/aadtnt/Guidelines_for_AADT_Estimates_Final.pdf

## AADT Analysis Locations
* Locations were determined based on the location on installed & recording Traffic Operations cameras
    * for additional information contact Zhenyu Zhu with Traffic Operations

## Traffic Census Data
* https://dot.ca.gov/programs/traffic-operations/census/traffic-volumes
* Back AADT, Peak Month, and Peak Hour usually represents traffic South or West of the count location.  
* Ahead AADT, Peak Month, and Peak Hour usually represents traffic North or East of the count location. Listing of routes with their designated  

* Because the Back & Ahead counts are included at each location in the Traffic Census Data, (e.g., "IRWINDALE, ARROW HIGHWAY") only one [OBJECTID*] per location was pulled; for this analysis the North Bound Nodes were used for the analysis. 
    * for more information see the diagram: https://traffic.onramp.dot.ca.gov/downloads/traffic/files/performance/census/Back_and_Ahead_Leg_Traffic_Count_Diagram.pdf

## StreetLight Analysis Data
* Analysis Type == Network Performance
* Segment Metrics
* 2022 was used to match currently available Traffic Census Data (as of 8/27/2025)
* pulled a variety of Day Types, but plan to just look at """All Day Types"""
* pulled a variety of Day Parts, but plan to just look at """All Day Parts"""


