In [None]:
import polars as pl
import os
from tqdm import tqdm 
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
from pathlib import Path


# Cleaning stops_0.csv

In [None]:
# Load stops.csv
stops = pd.read_csv(r"C:\Users\moham\Downloads\Data_Cleaning\stops_0.csv")

# Load route_to_stop_sequence to filter stop_ids
route_to_stop = pd.read_csv(r"C:\Users\moham\Downloads\Data_Cleaning\route_to_stop_sequence_v2.csv")

# Extract all stop_ids used in any route
used_stop_ids = set()
for seq in route_to_stop['stop_id_list']:
    # Each stop_id_list might be a string representation of list -> convert properly
    ids = eval(seq) if isinstance(seq, str) else seq
    used_stop_ids.update(map(int, ids))

# Clean stops.csv
stops_clean = (
    stops
    .drop(columns=["stop_name"], errors="ignore")     # remove stop_name
    .query("stop_id in @used_stop_ids")               # keep only used stops
)

# Ensure correct dtypes
stops_clean = stops_clean.astype({
    "stop_id": "Int64",
    "stop_lat": "float64",
    "stop_lon": "float64"
})

print(stops_clean.dtypes)
stops_clean.to_csv("stops_clean.csv", index=False)

# Cleaning route_to_stop_sequence.csv

In [None]:
# Load route_to_stop_sequence
route_to_stop = pd.read_csv(r"C:\Users\moham\Downloads\Data_Cleaning\route_to_stop_sequence_v2.csv")

# Convert stop_id_list column (string ‚Üí list[int])
def parse_stop_list(x):
    if isinstance(x, str):
        return [int(i) for i in x.strip("[]").replace("'", "").split(",") if i.strip()]
    return []

route_to_stop["stop_id_list"] = route_to_stop["stop_id_list"].apply(parse_stop_list)

# Ensure correct dtype
route_to_stop = route_to_stop.astype({"route_id": "Int64"})

print(route_to_stop.head())
route_to_stop.to_csv("route_to_stop_clean.csv")  # save with Python list preserved

# PROCESSING PARQUET FILES WHICH WERE GIVEN TO US


In [None]:
# parent_dir = raw daily folders, each containing 5‚Äì6 parquet files
parent_dir = r"C:\Users\moham\OneDrive\Desktop\Bengaluru Vega\Code\task_1\Demo_date_wise_data" # INPUT data path

# NEW BASE OUTPUT LOCATION (outside of OneDrive)
new_base_dir = r"C:\Temp_Bengaluru_Output"  #Create your own temp folder here
base_output = os.path.join(new_base_dir, "cleaned_dataset_partitioned")

# OUTPUT DIRECTORY: Will hold the partitioned route folders (e.g., route_id_101)
output_dir = os.path.join(base_output, "routes_partitioned") # Changed name for clarity
checkpoint_dir = os.path.join(base_output, "checkpoints")


In [None]:


# make sure base folders exist
os.makedirs(output_dir, exist_ok=True) 
os.makedirs(checkpoint_dir, exist_ok=True) 


# ----------------------------
# HELPER: decode byte-like columns (Kept from first block)
# ----------------------------
def decode_bytes(df: pl.DataFrame, cols: list[str]) -> pl.DataFrame:
    """Decode b'...' style bytes safely into strings."""
    for col in cols:
        if col in df.columns:
            df = df.with_columns(
                pl.col(col).map_elements(
                    lambda x: (
                        x.decode("utf-8").strip('"')
                        if isinstance(x, (bytes, bytearray))
                        else str(x).strip('"') if x is not None else None
                    ),
                    return_dtype=pl.Utf8
                ).alias(col)
            )
    return df

# ----------------------------
# COLUMNS TO RETAIN 
# ----------------------------
cols = [
    "vehicle_timestamp", "trip_id", "schedule_relationship",
    "route_id", "latitude", "longitude", "bearing"
]

# ----------------------------
# PIPELINE
# ----------------------------
for day_folder in os.listdir(parent_dir):
    full_path = os.path.join(parent_dir, day_folder)
    if not os.path.isdir(full_path):
        continue

    checkpoint_file = os.path.join(checkpoint_dir, f"{day_folder}.done")

    if os.path.exists(checkpoint_file):
        print(f"‚è≠ Skipping {day_folder} (already processed)")
        continue

    print(f"\nüì¶ Processing {day_folder} ...")
    
    # List to hold lazy frames for this day
    lazy_frames = []

    # loop over parquet files inside this day folder
    parquet_files = [
        os.path.join(full_path, f)
        for f in os.listdir(full_path)
        if f.endswith(".parquet")
    ]

    for pq_file in tqdm(parquet_files, desc=f"  ‚Üí {day_folder}"):
        # lazy scan and select target columns
        lf = pl.scan_parquet(pq_file).select(
             [c for c in cols if c in pl.scan_parquet(pq_file).collect_schema().names()]
        )
        lazy_frames.append(lf)
    
    if not lazy_frames:
        print(f"‚ö† No parquet files found in {day_folder}, skipping.")
        continue

    # Concatenate all lazy frames for the day and collect into memory
    lf_all = pl.concat(lazy_frames)
    df = lf_all.collect()
    
    # ----------------------------
    # CLEANING
    # ----------------------------

    # REMOVE DUPLICATES (done once on the collected daily data)
    
    df = df.unique(subset=['trip_id', 'vehicle_timestamp'], keep='first')
    
    df = decode_bytes(df, cols)

    # Convert vehicle_timestamp ‚Üí datetime (USING MILLISECONDS CONVERSION)
    if "vehicle_timestamp" in df.columns:
        df = df.with_columns(
            pl.col("vehicle_timestamp")
            .cast(pl.Int64, strict=False)
            # Convert seconds (input) to milliseconds by multiplying by 1000
            .mul(1000) 
            .cast(pl.Datetime("ms"), strict=False) 
            .alias("vehicle_timestamp")
        )

    # Convert numerics
    # Note: Added "current_status" and "start_date" from the second block's numeric list
    for col in ["current_status", "trip_id", "route_id", "start_date"]:
        if col in df.columns:
            df = df.with_columns(pl.col(col).cast(pl.Int64, strict=False))

    # Normalize schedule_relationship & drop canceled
    if "schedule_relationship" in df.columns:
        df = df.with_columns(
            pl.col("schedule_relationship")
            .cast(pl.Utf8)
            .str.to_uppercase()
            .alias("schedule_relationship")
        )
        # Drop CANCELED trips (consistent with the cleaning goal)
        df = df.filter(pl.col("schedule_relationship") != "CANCELED")

    # Convert floats
    for col in ["latitude", "longitude", "bearing"]:
        if col in df.columns:
            df = df.with_columns(pl.col(col).cast(pl.Float64, strict=False))

    # ----------------------------
    # BROADCAST route_id WITHIN trip_id (Adopted from second block)
    # ----------------------------
    if "trip_id" in df.columns and "route_id" in df.columns:
        print("   ‚Üí Applying route_id broadcast within trip_id groups...")
        df = (
            df.sort(["trip_id", "vehicle_timestamp"])
             .with_columns(
                 pl.col("route_id")
                 .fill_null(strategy="forward")
                 .fill_null(strategy="backward")
                 .over("trip_id")
                 .alias("route_id")
             )
        )
        
    # ----------------------------
    # SPLIT BY route_id AND SAVE (Partitioning like the second block)
    # ----------------------------
    # Drop rows where route_id is still null after the broadcast (e.g., UNSCHEDULED)
    unique_routes = df["route_id"].unique().drop_nulls().to_list() 
    print(f"‚úÖ {len(unique_routes)} unique routes found in {day_folder} for saving.")

    # Sort the dataframe one last time for efficient filtering/saving
    df = df.sort(["route_id", "trip_id", "vehicle_timestamp"])
    
    for rid in tqdm(unique_routes, desc=f"‚úç Saving routes for date={day_folder}"):
        df_route = df.filter(pl.col("route_id") == rid).select(cols) 

        # Create folder for each route
        route_folder = os.path.join(output_dir, f"route_id_{rid}")
        os.makedirs(route_folder, exist_ok=True)

        # File format: route_<rid>day<day_folder>.parquet
        out_path = os.path.join(route_folder, f"route_{rid}day{day_folder}.parquet")
        
        # Save the partitioned file
        df_route.write_parquet(out_path)

    # create checkpoint marker
    with open(checkpoint_file, "w") as f:
        f.write("done")

    print(f"‚úî Finished {day_folder} and checkpointed. Files saved in {output_dir}")


print("\nüéâ All days processed successfully and saved partitioned by route_id!")


üì¶ Processing date=2025-08-10 ...


  ‚Üí date=2025-08-10: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2/2 [00:00<00:00, 96.81it/s]


   ‚Üí Applying route_id broadcast within trip_id groups...
‚úÖ 2883 unique routes found in date=2025-08-10 for saving.


‚úç Saving routes for date=date=2025-08-10: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2883/2883 [00:05<00:00, 493.96it/s]


‚úî Finished date=2025-08-10 and checkpointed. Files saved in C:\Temp_Bengaluru_Output\cleaned_dataset_partitioned\routes_partitioned

üì¶ Processing date=2025-08-17 ...


  ‚Üí date=2025-08-17: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2/2 [00:00<00:00, 127.84it/s]


   ‚Üí Applying route_id broadcast within trip_id groups...
‚úÖ 3955 unique routes found in date=2025-08-17 for saving.


‚úç Saving routes for date=date=2025-08-17: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 3955/3955 [00:11<00:00, 355.84it/s]

‚úî Finished date=2025-08-17 and checkpointed. Files saved in C:\Temp_Bengaluru_Output\cleaned_dataset_partitioned\routes_partitioned

üéâ All days processed successfully and saved partitioned by route_id!





# Comparison Between df_old and processed df_new_route_id_1657


In [14]:
df_old = pd.read_parquet(r"C:\Users\moham\OneDrive\Desktop\Bengaluru Vega\Code\task_1\Demo_date_wise_data\date=2025-08-10\part-00000-b87f0d47-fbdd-42bf-8877-034accb68352.c000.snappy_flat.parquet")

In [26]:
df_old.sample(4)

Unnamed: 0,id,system_time,current_status,stop_id,vehicle_timestamp,trip_id,start_time,start_date,schedule_relationship,route_id,latitude,longitude,bearing,vehicle_id,label
4093301,8229,2025-08-10T07:25:35.892169,1,"""22406""","""1754810708""","""67527469""","""06:15:00""","""20250810""","""SCHEDULED""","""2606""",12.948546,77.497093,196.0,"""20389""","""KA57F4286"""
6838002,9622,2025-08-10T05:53:35.931186,1,"""38058""","""1754805176""","""67428350""","""05:45:00""","""20250810""","""SCHEDULED""","""14189""",13.008821,77.643013,227.0,"""20804""","""KA57F2203"""
3469383,8303,2025-08-10T07:18:05.827047,1,"""36368""","""1754810255""","""67454621""","""06:35:00""","""20250810""","""SCHEDULED""","""3139""",13.000056,77.676849,269.0,"""23454""","""KA41D2747"""
2745982,6872,2025-08-10T06:35:15.881866,1,"""38058""","""1754807687""","""67454271""","""05:25:00""","""20250810""","""SCHEDULED""","""1856""",12.956732,77.697113,37.0,"""28343""","""KA01AR0778"""


In [23]:
df_new_route_1657= pd.read_parquet(r"C:\Temp_Bengaluru_Output\cleaned_dataset_partitioned\routes_partitioned\route_id_1657\route_1657daydate=2025-08-10.parquet")

In [13]:
df.head()

Unnamed: 0,vehicle_timestamp,trip_id,schedule_relationship,route_id,latitude,longitude,bearing
0,2025-08-10 05:25:48,67410793,SCHEDULED,1657,12.980172,77.571899,73.0
1,2025-08-10 05:26:48,67410793,SCHEDULED,1657,12.982033,77.574593,358.0
2,2025-08-10 05:27:18,67410793,SCHEDULED,1657,12.984824,77.574806,354.0
3,2025-08-10 05:27:48,67410793,SCHEDULED,1657,12.98701,77.574463,350.0
4,2025-08-10 05:28:18,67410793,SCHEDULED,1657,12.987882,77.574242,335.0


# Combining the different route_id's

In [27]:
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
from pathlib import Path


DATA_DIR = Path(r"C:\Temp_Bengaluru_Output\cleaned_dataset_partitioned\routes_partitioned")
STOPS_FILE = Path(r"C:\Users\moham\Downloads\Data_Cleaning\cleaned_data\stops_clean.csv")
ROUTE_STOPS_FILE = Path(r"C:\Users\moham\Downloads\Data_Cleaning\cleaned_data\route_to_stop_clean.csv")

# ---------------------------
# Data Loading and Initial Cleaning
# ---------------------------
def load_all_raw_data(base_dir, known_routes):
    all_data = []
    for route_dir in base_dir.iterdir():
        route_id_str = route_dir.name.replace('route_id_', '')
        if route_dir.is_dir() and int(route_id_str) in known_routes:
            print(f"Loading data for {route_dir.name}...")
            for file_path in route_dir.glob('*.parquet'):
                try:
                    df = pd.read_parquet(file_path)
                    df = df.drop_duplicates()
                    all_data.append(df)
                except Exception as e:
                    print(f"Error loading {file_path}: {e}")
    
    if all_data:
        combined_df = pd.concat(all_data, ignore_index=True)
        
        return combined_df
    else:
        return pd.DataFrame()

route_stops = pd.read_csv(ROUTE_STOPS_FILE)
known_routes = set(route_stops['route_id'].unique())
route_stops["stop_id_list"] = route_stops["stop_id_list"].apply(lambda x: eval(x))

combined_df = load_all_raw_data(DATA_DIR, known_routes)

Loading data for route_id_10004...
Loading data for route_id_10016...
Loading data for route_id_10019...
Loading data for route_id_10020...
Loading data for route_id_10034...
Loading data for route_id_10044...
Loading data for route_id_10045...
Loading data for route_id_10069...
Loading data for route_id_10070...
Loading data for route_id_10071...
Loading data for route_id_10072...
Loading data for route_id_10094...
Loading data for route_id_10108...
Loading data for route_id_10109...
Loading data for route_id_10122...
Loading data for route_id_10144...
Loading data for route_id_10164...
Loading data for route_id_10165...
Loading data for route_id_10201...
Loading data for route_id_10210...
Loading data for route_id_10211...
Loading data for route_id_10219...
Loading data for route_id_10221...
Loading data for route_id_10239...
Loading data for route_id_10260...
Loading data for route_id_10262...
Loading data for route_id_10264...
Loading data for route_id_10271...
Loading data for rou

In [30]:
combined_df.duplicated().sum() # no duplicates

np.int64(0)

In [1]:
#cleaning of trips

In [61]:
df_stops = pd.read_csv(r"C:\Users\moham\OneDrive\Desktop\Bengaluru Vega\Code\task_1\ref_data\stops_0.csv")
route_to_stop_sequence = pd.read_csv(r"C:\Users\moham\OneDrive\Desktop\Bengaluru Vega\Code\task_1\ref_data\route_to_stop_sequence_v2.csv")

In [None]:
import pandas as pd
import numpy as np
import ast
from tqdm import tqdm
from math import radians, sin, cos, atan2, sqrt

# -------------------------
# Utilities
# -------------------------
def haversine_vectorized(lat1, lon1, lat2, lon2):
    """
    Vectorized haversine:
      - lat1, lon1 : 1D arrays of shape (m,)
      - lat2, lon2 : 1D arrays of shape (n,)
    Returns (m, n) array of distances in meters.
    """
    R = 6371000.0  # Earth radius in meters
    lat1r = np.radians(lat1)[:, None]    # (m,1)
    lon1r = np.radians(lon1)[:, None]
    lat2r = np.radians(lat2)[None, :]    # (1,n)
    lon2r = np.radians(lon2)[None, :]

    dlat = lat2r - lat1r
    dlon = lon2r - lon1r

    a = np.sin(dlat / 2.0) ** 2 + np.cos(lat1r) * np.cos(lat2r) * np.sin(dlon / 2.0) ** 2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    return R * c  # (m,n)

def haversine_point(lat1, lon1, lat2, lon2):
    """Single-point haversine in meters (lat/lon scalars)."""
    R = 6371000.0
    phi1, phi2 = radians(lat1), radians(lat2)
    dphi = radians(lat2 - lat1)
    dlambda = radians(lon2 - lon1)
    a = sin(dphi/2)**2 + cos(phi1)*cos(phi2)*sin(dlambda/2)**2
    return 2*R*atan2(sqrt(a), sqrt(1-a))

def parse_stop_seq_field(val):
    """Safely parse stop_id_list which may be a string like "['123','456']" or already a list."""
    if isinstance(val, list):
        return [int(x) for x in val]
    if pd.isna(val):
        return []
    if isinstance(val, str):
        # try literal_eval first
        try:
            parsed = ast.literal_eval(val)
            return [int(x) for x in parsed]
        except Exception:
            # fallback: crude split
            s = val.strip()
            if s.startswith('[') and s.endswith(']'):
                s = s[1:-1]
            parts = [p.strip().strip("'\"") for p in s.split(",") if p.strip()]
            return [int(x) for x in parts]
    raise ValueError("Unsupported stop_id_list format")

# -------------------------
# Normalizers
# -------------------------
def normalize_live_df(df):
    df = df.copy()
    # rename common variations
    rename_map = {}
    if 'lat' in df.columns and 'latitude' not in df.columns: rename_map['lat'] = 'latitude'
    if 'lon' in df.columns and 'longitude' not in df.columns: rename_map['lon'] = 'longitude'
    if 'long' in df.columns and 'longitude' not in df.columns: rename_map['long'] = 'longitude'
    if 'timestamp' in df.columns and 'vehicle_timestamp' not in df.columns: rename_map['timestamp'] = 'vehicle_timestamp'
    if rename_map:
        df = df.rename(columns=rename_map)

    # required columns
    required = ['trip_id','route_id','vehicle_timestamp','latitude','longitude']
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"live_df missing required columns: {missing}")

    # parse route_id/trip_id to ints (if possible)
    try:
        df['route_id'] = df['route_id'].astype(int)
    except Exception:
        pass

    # parse vehicle_timestamp robustly:
    ts = df['vehicle_timestamp']
    if np.issubdtype(ts.dtype, np.number):
        df['vehicle_timestamp'] = pd.to_datetime(ts, unit='s', errors='coerce')
    else:
        parsed = pd.to_datetime(ts, errors='coerce')
        # if too many NaT, try numeric->unit seconds
        if parsed.isna().mean() > 0.5:
            try:
                df['vehicle_timestamp'] = pd.to_datetime(ts.astype(float), unit='s', errors='coerce')
            except Exception:
                df['vehicle_timestamp'] = parsed
        else:
            df['vehicle_timestamp'] = parsed

    # drop rows with no parsed timestamp or lat/lon
    df = df.dropna(subset=['vehicle_timestamp','latitude','longitude']).reset_index(drop=True)
    return df

def normalize_stops_df(stops_df):
    df = stops_df.copy()
    rename_map = {}
    if 'stop_lat' not in df.columns and 'latitude' in df.columns: rename_map['latitude'] = 'stop_lat'
    if 'stop_lon' not in df.columns and 'longitude' in df.columns: rename_map['longitude'] = 'stop_lon'
    if rename_map:
        df = df.rename(columns=rename_map)
    required = ['stop_id','stop_lat','stop_lon']
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"stops_df missing required columns: {missing}")
    df['stop_id'] = df['stop_id'].astype(int)
    return df

# -------------------------
# Find entry/exit pings (vectorized-friendly)
# -------------------------
def find_entry_exit_pings_trip_sorted(trip_df_sorted, stop_lat, stop_lon, radius=50):
    """
    Given a trip's pings (sorted by vehicle_timestamp), return (before, after)
      - before: last ping before entering radius (or first inside if nothing before)
      - after : first ping after leaving radius (or last inside if nothing after)
    If no inside pings, returns (None, None).
    """
    # use numpy vectorized haversine to compute distances to the single stop
    lats = trip_df_sorted['latitude'].values
    lons = trip_df_sorted['longitude'].values
    # compute distances to the single point (fast)
    dists = haversine_vectorized(lats, lons, np.array([stop_lat]), np.array([stop_lon])).ravel()  # shape (m,)
    inside_mask = dists <= radius
    if not inside_mask.any():
        return None, None
    idxs = np.where(inside_mask)[0]
    idx_first = idxs[0]
    idx_last = idxs[-1]

    # before = ping just before entering (if exists), else first inside
    before_idx = idx_first - 1 if idx_first - 1 >= 0 else idx_first
    after_idx = idx_last + 1 if idx_last + 1 < len(trip_df_sorted) else idx_last

    before = trip_df_sorted.iloc[before_idx]
    after = trip_df_sorted.iloc[after_idx]
    return before, after

# -------------------------
# Core segmentation
# -------------------------
def segmented_times_between_stops_for_trip(trip_df_sorted, stop_seq, stops_df, radius=50):
    """
    Return list of dicts: each dict = {trip_id, from_stop, to_stop, tA, tB, travel_time_min}
    trip_df_sorted must be sorted by vehicle_timestamp
    """
    results = []
    trip_id = trip_df_sorted['trip_id'].iloc[0]
    for i in range(len(stop_seq)-1):
        stopA = int(stop_seq[i]); stopB = int(stop_seq[i+1])
        # lookup stops coords
        stopA_row = stops_df.loc[stops_df['stop_id']==stopA]
        stopB_row = stops_df.loc[stops_df['stop_id']==stopB]
        if stopA_row.empty or stopB_row.empty:
            continue
        stopA_lat = float(stopA_row['stop_lat'].iloc[0]); stopA_lon = float(stopA_row['stop_lon'].iloc[0])
        stopB_lat = float(stopB_row['stop_lat'].iloc[0]); stopB_lon = float(stopB_row['stop_lon'].iloc[0])

        beforeA, afterA = find_entry_exit_pings_trip_sorted(trip_df_sorted, stopA_lat, stopA_lon, radius)
        beforeB, afterB = find_entry_exit_pings_trip_sorted(trip_df_sorted, stopB_lat, stopB_lon, radius)

        # note: afterA = first ping after leaving A; beforeB = last ping before entering B (we used same logic)
        if afterA is not None and beforeB is not None:
            tA = pd.to_datetime(afterA['vehicle_timestamp'])
            tB = pd.to_datetime(beforeB['vehicle_timestamp'])
            travel_time_min = (tB - tA).total_seconds() / 60.0
            if travel_time_min > 0:
                results.append({
                    "trip_id": trip_id,
                    "from_stop": stopA,
                    "to_stop": stopB,
                    "tA": tA,
                    "tB": tB,
                    "travel_time_min": travel_time_min
                })
    return results

# -------------------------
# Final pipeline (robust)
# -------------------------
def build_clean_segments(live_df, stops_df, route_to_stop_seq,
                         radius=50, max_travel_time_min=500, max_speed_m_s=30,
                         show_progress=True):
    """
    live_df: raw pings (sample you provided)
    stops_df: stops.csv (must contain stop_id, stop_lat, stop_lon)
    route_to_stop_seq: route_to_stop_sequence.csv (columns route_id, stop_id_list)
    returns: clean_segments_df (clean stop-to-stop segments)
    """
    # normalize inputs
    live = normalize_live_df(live_df)
    stops = normalize_stops_df(stops_df)
    rts = route_to_stop_seq.copy()
    if 'route_id' not in rts.columns or 'stop_id_list' not in rts.columns:
        raise ValueError("route_to_stop_seq must contain 'route_id' and 'stop_id_list'")

    # ensure route_id types align
    try:
        rts['route_id'] = rts['route_id'].astype(int)
        live['route_id'] = live['route_id'].astype(int)
    except Exception:
        pass

    routes_present = rts[rts['route_id'].isin(live['route_id'].unique())].reset_index(drop=True)
    if routes_present.empty:
        raise ValueError("No routes in route_to_stop_seq match live_df.route_id")

    all_segments = []
    route_iter = tqdm(routes_present.itertuples(index=False), total=len(routes_present), desc="Routes") if show_progress else routes_present.itertuples(index=False)

    for row in route_iter:
        route_id = int(row.route_id)
        stop_seq = parse_stop_seq_field(row.stop_id_list)
        if len(stop_seq) < 2:
            continue

        route_pings = live[live['route_id']==route_id].copy()
        if route_pings.empty:
            continue

        # vectorized nearest-stop assignment (fast)
        # get stops coords in same order as stop_seq
        stop_coords = stops.set_index('stop_id').loc[stop_seq].reset_index()
        st_lats = stop_coords['stop_lat'].values
        st_lons = stop_coords['stop_lon'].values

        p_lats = route_pings['latitude'].values
        p_lons = route_pings['longitude'].values

        # compute (m,n) distances and pick nearest
        dmat = haversine_vectorized(p_lats, p_lons, st_lats, st_lons)  # shape (m,n)
        min_idx = np.argmin(dmat, axis=1)
        min_dists = dmat[np.arange(dmat.shape[0]), min_idx]
        closest_stop_ids = stop_coords['stop_id'].values[min_idx]

        route_pings = route_pings.reset_index(drop=True)
        route_pings['closest_stop'] = closest_stop_ids
        route_pings['dist_to_closest_stop_m'] = min_dists
        route_pings['near_stop'] = route_pings['dist_to_closest_stop_m'] <= radius

        # Only process trips that have at least one near_stop ping (optional speedup)
        trips_to_process = route_pings[route_pings['near_stop']].trip_id.unique()
        if len(trips_to_process) == 0:
            continue

        # process each trip
        for trip_id, trip_df in route_pings[route_pings['trip_id'].isin(trips_to_process)].groupby('trip_id'):
            trip_df_sorted = trip_df.sort_values('vehicle_timestamp').reset_index(drop=True)
            segs = segmented_times_between_stops_for_trip(trip_df_sorted, stop_seq, stops, radius=radius)
            if segs:
                for s in segs:
                    s['route_id'] = route_id
                    all_segments.append(s)

    if not all_segments:
        return pd.DataFrame()

    seg_df = pd.DataFrame(all_segments)

    # -------------------------
    # Enrich: distances & speeds
    # -------------------------
    # Add from/to stop coords
    seg_df = seg_df.merge(stops[['stop_id','stop_lat','stop_lon']].rename(columns={'stop_id':'from_stop','stop_lat':'from_lat','stop_lon':'from_lon'}),
                          on='from_stop', how='left')
    seg_df = seg_df.merge(stops[['stop_id','stop_lat','stop_lon']].rename(columns={'stop_id':'to_stop','stop_lat':'to_lat','stop_lon':'to_lon'}),
                          on='to_stop', how='left')

    # compute straight-line distance between stops
    seg_df['distance_m'] = seg_df.apply(lambda r: haversine_point(r['from_lat'], r['from_lon'], r['to_lat'], r['to_lon']), axis=1)
    seg_df['avg_speed_m_s'] = seg_df['distance_m'] / (seg_df['travel_time_min'] * 60.0)

    # -------------------------
    # Clean filters
    # -------------------------
    seg_df = seg_df[seg_df['travel_time_min'].between(0.001, max_travel_time_min)]
    seg_df = seg_df[seg_df['avg_speed_m_s'].between(0.0001, max_speed_m_s)]

    # temporal features
    seg_df['start_hour'] = seg_df['tA'].dt.hour
    seg_df['day_of_week'] = seg_df['tA'].dt.dayofweek

    # final ordering
    cols = ['trip_id','route_id','from_stop','to_stop','tA','tB','travel_time_min','distance_m','avg_speed_m_s','start_hour','day_of_week']
    existing_cols = [c for c in cols if c in seg_df.columns]
    clean_segments_df = seg_df[existing_cols].reset_index(drop=True)
    return clean_segments_df

clean_segments_df = build_clean_segments(combined_df, df_stops, route_to_stop_sequence,
                                         radius=35, max_travel_time_min=180, max_speed_m_s=30,
                                         show_progress=True)

Routes: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 4083/4083 [11:36<00:00,  5.86it/s]  


This is again just a demo of the original workflow ....... ( The original took alot of time)