<a href="https://colab.research.google.com/github/hawa1983/Traffic-Data-Bank/blob/main/trafficdatabank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
from pathlib import Path

# --------- Config ----------
CSV_PATH = "Port_Jefferson_LP_Count_Cleaned.csv"

PLATE_COL = "Plate"
LOCATION_COL = "Location"
DIRECTION_COL = "Direction"
TIME_COL = "Time"              # <- in file
TIMESTAMP_COL = "Timestamp"    # <- will be created

# Use any single date; you said same date is fine
BASE_DATE = "2025-01-01"

WINDOW_MIN = 15
# ---------------------------

def normalize_text(s):
    if pd.isna(s):
        return s
    return str(s).strip().lower()

def appears_within(df_plate, idx, pairs, window_min=15):
    t0 = df_plate.loc[idx, TIMESTAMP_COL]
    mask = (df_plate[TIMESTAMP_COL] > t0) & (df_plate[TIMESTAMP_COL] <= t0 + pd.Timedelta(minutes=window_min))
    if not mask.any():
        return False
    future = df_plate.loc[mask, [LOCATION_COL, DIRECTION_COL]]
    for loc, direc in pairs:
        if ((future[LOCATION_COL] == loc) & (future[DIRECTION_COL] == direc)).any():
            return True
    return False

def no_other_appearance_within(df_plate, idx, window_min=15):
    t0 = df_plate.loc[idx, TIMESTAMP_COL]
    loc0 = df_plate.loc[idx, LOCATION_COL]
    mask = (df_plate[TIMESTAMP_COL] > t0) & (df_plate[TIMESTAMP_COL] <= t0 + pd.Timedelta(minutes=window_min))
    if not mask.any():
        return True
    future = df_plate.loc[mask, [LOCATION_COL]]
    return (future[LOCATION_COL] == loc0).all()

def classify_observation(row, df_plate, idx):
    loc = row[LOCATION_COL]
    direc = row[DIRECTION_COL]

    # Rule 1
    if ((loc == "woodhull ave" and direc == "northbound") or
        (loc == "lincoln ave" and direc == "northbound")):
        pairs = [("ardmer dr", "eastbound"), ("chereb lane", "eastbound")]
        return "Pass Thru" if appears_within(df_plate, idx, pairs, WINDOW_MIN) else "Resident Entry"

    # Rule 2
    if ((loc == "ardmer dr" and direc == "westbound") or
        (loc == "chereb lane" and direc == "westbound")):
        pairs = [("woodhull ave", "southbound"),
                 ("lincoln ave", "southbound"),
                 ("norwood ave", "westbound")]
        return "Pass Thru" if appears_within(df_plate, idx, pairs, WINDOW_MIN) else "Resident Entry"

    # Rule 3
    if (loc == "norwood ave" and direc == "eastbound"):
        pairs = [("ardmer dr", "eastbound"), ("chereb lane", "eastbound")]
        return "Pass Thru" if appears_within(df_plate, idx, pairs, WINDOW_MIN) else "HWY Bound or Resident Entry"

    # Rule 4 (exits)
    if ((loc == "woodhull ave" and direc == "southbound") or
        (loc == "lincoln ave" and direc == "southbound") or
        (loc == "ardmer dr" and direc == "eastbound") or
        (loc == "chereb lane" and direc == "eastbound")):
        if no_other_appearance_within(df_plate, idx, WINDOW_MIN):
            return "Resident Exit"

    return "Unclassified"

def final_plate_classification(per_obs_classes):
    priority = ["Pass Thru", "Resident Exit", "Resident Entry", "HWY Bound or Resident Entry", "Unclassified"]
    for label in priority:
        if label in per_obs_classes:
            return label
    return "Unclassified"

def main():
    df = pd.read_csv(CSV_PATH)

    # Build Timestamp from single date + Time column (e.g., "6:00 AM")
    # Example result: "2025-01-01 06:00 AM"
    df[TIMESTAMP_COL] = pd.to_datetime(BASE_DATE + " " + df[TIME_COL].astype(str), errors="coerce")

    # Normalize for matching
    df["_loc_norm"] = df[LOCATION_COL].map(normalize_text)
    df["_dir_norm"] = df[DIRECTION_COL].map(normalize_text)

    # Use normalized for rule logic (keep originals in separate cols if you like)
    df[LOCATION_COL] = df["_loc_norm"]
    df[DIRECTION_COL] = df["_dir_norm"]

    # Sort by plate then time
    df = df.sort_values([PLATE_COL, TIMESTAMP_COL]).reset_index(drop=True)

    # Classify each observation
    classifications = []
    for plate, _ in df.groupby(PLATE_COL, sort=False):
        df_plate = df[df[PLATE_COL] == plate]  # already sorted
        for idx in df_plate.index:
            cls = classify_observation(df.loc[idx], df_plate, idx)
            classifications.append((idx, cls))

    df["Observation_Classification"] = pd.Series(dict(classifications))

    # Final per-plate
    plate_final = (
        df.groupby(PLATE_COL)["Observation_Classification"]
          .apply(lambda s: final_plate_classification(set(s.dropna())))
          .rename("Plate_Final_Classification")
          .reset_index()
    )

    # Output
    out_obs = Path(CSV_PATH).with_name("Port_Jefferson_LP_Classified_Observations.csv")
    out_plate = Path(CSV_PATH).with_name("Port_Jefferson_LP_Final_By_Plate.csv")

    df_out = df.drop(columns=["_loc_norm", "_dir_norm"])
    df_out.to_csv(out_obs, index=False)
    plate_final.to_csv(out_plate, index=False)

    print(f"Saved per-observation classifications to: {out_obs}")
    print(f"Saved final per-plate classifications to: {out_plate}")

if __name__ == "__main__":
    main()


Saved per-observation classifications to: Port_Jefferson_LP_Classified_Observations.csv
Saved final per-plate classifications to: Port_Jefferson_LP_Final_By_Plate.csv


## New

In [1]:
import pandas as pd
from pathlib import Path

# --------- Config ----------
CSV_PATH = "Port_Jefferson_LP_Count_Cleaned.csv"

PLATE_COL = "Plate"
LOCATION_COL = "Location"
DIRECTION_COL = "Direction"
PERIOD_COL = "Period"
TIME_COL = "Time"              # <- in file (e.g., "6:00 AM")
TIMESTAMP_COL = "Timestamp"    # <- we will create

# Use any single date; same date for all times is fine
BASE_DATE = "2025-01-01"

WINDOW_MIN = 15
# ---------------------------

def normalize_text(s):
    if pd.isna(s):
        return s
    return str(s).strip().lower()

def find_match_within(df_plate, idx, pairs, window_min=15):
    """
    Return (matched:bool, match_row:Series|None) for any of the (location, direction) pairs
    within the forward time window from this observation.
    """
    t0 = df_plate.loc[idx, TIMESTAMP_COL]
    mask = (df_plate[TIMESTAMP_COL] > t0) & (df_plate[TIMESTAMP_COL] <= t0 + pd.Timedelta(minutes=window_min))
    if not mask.any():
        return False, None
    future = df_plate.loc[mask]
    for loc, direc in pairs:
        hit = future[(future[LOCATION_COL] == loc) & (future[DIRECTION_COL] == direc)]
        if not hit.empty:
            # take earliest match
            j = hit.index[0]
            return True, df_plate.loc[j]
    return False, None

def no_other_appearance_within(df_plate, idx, window_min=15):
    t0 = df_plate.loc[idx, TIMESTAMP_COL]
    loc0 = df_plate.loc[idx, LOCATION_COL]
    mask = (df_plate[TIMESTAMP_COL] > t0) & (df_plate[TIMESTAMP_COL] <= t0 + pd.Timedelta(minutes=window_min))
    if not mask.any():
        return True
    future = df_plate.loc[mask, [LOCATION_COL]]
    return (future[LOCATION_COL] == loc0).all()

def classify_observation(row, df_plate, idx):
    loc = row[LOCATION_COL]
    direc = row[DIRECTION_COL]

    # Defaults for match details
    match = {
        "Match_Location": None,
        "Match_Direction": None,
        "Match_Timestamp": pd.NaT
    }

    # Rule 1: Entry S->N
    if ((loc == "woodhull ave" and direc == "northbound") or
        (loc == "lincoln ave" and direc == "northbound")):
        pairs = [("ardmer dr", "eastbound"), ("chereb lane", "eastbound")]
        ok, mrow = find_match_within(df_plate, idx, pairs, WINDOW_MIN)
        if ok:
            match.update({
                "Match_Location": mrow[LOCATION_COL],
                "Match_Direction": mrow[DIRECTION_COL],
                "Match_Timestamp": mrow[TIMESTAMP_COL]
            })
            return "Pass Thru", match
        else:
            return "Resident Entry", match

    # Rule 2: Entry E->W
    if ((loc == "ardmer dr" and direc == "westbound") or
        (loc == "chereb lane" and direc == "westbound")):
        pairs = [("woodhull ave", "southbound"),
                 ("lincoln ave", "southbound"),
                 ("norwood ave", "westbound")]
        ok, mrow = find_match_within(df_plate, idx, pairs, WINDOW_MIN)
        if ok:
            match.update({
                "Match_Location": mrow[LOCATION_COL],
                "Match_Direction": mrow[DIRECTION_COL],
                "Match_Timestamp": mrow[TIMESTAMP_COL]
            })
            return "Pass Thru", match
        else:
            return "Resident Entry", match

    # Rule 3: Entry from West via Norwood EB
    if (loc == "norwood ave" and direc == "eastbound"):
        pairs = [("ardmer dr", "eastbound"), ("chereb lane", "eastbound")]
        ok, mrow = find_match_within(df_plate, idx, pairs, WINDOW_MIN)
        if ok:
            match.update({
                "Match_Location": mrow[LOCATION_COL],
                "Match_Direction": mrow[DIRECTION_COL],
                "Match_Timestamp": mrow[TIMESTAMP_COL]
            })
            return "Pass Thru", match
        else:
            return "HWY Bound or Resident Entry", match

    # Rule 4: Exit
    if ((loc == "woodhull ave" and direc == "southbound") or
        (loc == "lincoln ave" and direc == "southbound") or
        (loc == "ardmer dr" and direc == "eastbound") or
        (loc == "chereb lane" and direc == "eastbound")):
        if no_other_appearance_within(df_plate, idx, WINDOW_MIN):
            return "Resident Exit", match

    return "Unclassified", match

def final_plate_classification(per_obs_classes):
    # Priority for final roll-up
    priority = ["Pass Thru", "Resident Exit", "Resident Entry", "HWY Bound or Resident Entry", "Unclassified"]
    for label in priority:
        if label in per_obs_classes:
            return label
    return "Unclassified"

def main():
    df = pd.read_csv(CSV_PATH)

    # Build a Timestamp from BASE_DATE + Time (e.g., "6:00 AM")
    df[TIMESTAMP_COL] = pd.to_datetime(BASE_DATE + " " + df[TIME_COL].astype(str), errors="coerce")

    # Normalize for rule matching (keep originals too)
    df["_loc_norm"] = df[LOCATION_COL].map(normalize_text)
    df["_dir_norm"] = df[DIRECTION_COL].map(normalize_text)

    # Use normalized fields for rules
    df[LOCATION_COL] = df["_loc_norm"]
    df[DIRECTION_COL] = df["_dir_norm"]

    # Sort
    df = df.sort_values([PLATE_COL, TIMESTAMP_COL]).reset_index(drop=True)

    # Per-observation classification + match details
    obs_cls = []
    match_loc = []
    match_dir = []
    match_ts = []

    for plate, plate_grp in df.groupby(PLATE_COL, sort=False):
        df_plate = plate_grp  # already sorted slice
        for idx in df_plate.index:
            cls, match = classify_observation(df.loc[idx], df[df[PLATE_COL] == plate], idx)
            obs_cls.append((idx, cls))
            match_loc.append((idx, match["Match_Location"]))
            match_dir.append((idx, match["Match_Direction"]))
            match_ts.append((idx, match["Match_Timestamp"]))

    df["Observation_Classification"] = pd.Series(dict(obs_cls))
    df["Match_Location"] = pd.Series(dict(match_loc))
    df["Match_Direction"] = pd.Series(dict(match_dir))
    df["Match_Timestamp"] = pd.Series(dict(match_ts))

    # Plate-level final classification
    plate_final = (
        df.groupby(PLATE_COL)["Observation_Classification"]
          .apply(lambda s: final_plate_classification(set(s.dropna())))
          .rename("Plate_Final_Classification")
          .reset_index()
    )

    # Merge plate final back to each row
    df = df.merge(plate_final, on=PLATE_COL, how="left")

    # ---------- Output 1: full_tracking_classified.csv ----------
    full_tracking_cols = [
        PLATE_COL, LOCATION_COL, DIRECTION_COL, PERIOD_COL, TIME_COL, TIMESTAMP_COL,
        "Observation_Classification", "Plate_Final_Classification",
        "Match_Location", "Match_Direction", "Match_Timestamp"
    ]
    # Restore original-cased Location/Direction if desired
    # (You can comment these two lines if you prefer normalized in the export)
    if "_loc_norm" in df.columns and "_dir_norm" in df.columns:
        df["Location_Original"] = df["_loc_norm"]
        df["Direction_Original"] = df["_dir_norm"]

    out_full = "full_tracking_classified.csv"
    df[full_tracking_cols].to_csv(out_full, index=False)

    # ---------- Output 2: pass_thru_summary.csv ----------
    pass_thru = df[df["Observation_Classification"] == "Pass Thru"].copy()
    if not pass_thru.empty:
        pass_thru["Entry_Timestamp"] = pass_thru[TIMESTAMP_COL]
        pass_thru["Exit_Timestamp"] = pass_thru["Match_Timestamp"]
        pass_thru["Minutes_Between"] = (pass_thru["Exit_Timestamp"] - pass_thru["Entry_Timestamp"]).dt.total_seconds() / 60.0
        pass_thru["Entry_Point"] = pass_thru[LOCATION_COL].str.title() + " " + pass_thru[DIRECTION_COL].str.title()
        pass_thru["Exit_Point"] = pass_thru["Match_Location"].fillna("").str.title() + " " + pass_thru["Match_Direction"].fillna("").str.title()
        pass_thru["Path"] = pass_thru["Entry_Point"] + " -> " + pass_thru["Exit_Point"]

        pass_thru_cols = [
            PLATE_COL, "Entry_Timestamp", "Exit_Timestamp", "Minutes_Between",
            "Entry_Point", "Exit_Point", "Path"
        ]
        pass_thru = pass_thru.sort_values([PLATE_COL, "Entry_Timestamp"])
        pass_thru[pass_thru_cols].to_csv("pass_thru_summary.csv", index=False)
    else:
        # write empty schema if none
        pd.DataFrame(columns=[
            PLATE_COL, "Entry_Timestamp", "Exit_Timestamp", "Minutes_Between",
            "Entry_Point", "Exit_Point", "Path"
        ]).to_csv("pass_thru_summary.csv", index=False)

    # ---------- Output 3: all_movements_summary.csv ----------
    # Aggregate by Location/Direction/Period/Time
    grp = df.groupby([LOCATION_COL, DIRECTION_COL, PERIOD_COL, TIME_COL], dropna=False)
    summary = grp.agg(
        Observations=("Plate", "count"),
        UniquePlates=("Plate", pd.Series.nunique)
    ).reset_index()

    summary = summary.sort_values([LOCATION_COL, DIRECTION_COL, PERIOD_COL, TIME_COL])
    summary.to_csv("all_movements_summary.csv", index=False)

    print(f"Saved:\n- {out_full}\n- pass_thru_summary.csv\n- all_movements_summary.csv")

if __name__ == "__main__":
    main()


Saved:
- full_tracking_classified.csv
- pass_thru_summary.csv
- all_movements_summary.csv
