## Read YJMob100k Data

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from matplotlib.collections import LineCollection

df = pd.read_csv("data/yjmob100k-dataset1.csv.gz", compression="infer")

df = df.sort_values(["uid", "d", "t"]).reset_index(drop=True)

df = df.drop_duplicates(["uid","d","t","x","y"])

mob = df.copy()

## Split Trajectories

In [None]:
import pandas as pd
import numpy as np
from typing import Optional
from tqdm.auto import tqdm
import time

# ========= 1. Required helper functions (copied from previous discussion) =========

def ensure_int_types(df: pd.DataFrame) -> None:
    """Check and convert uid, d, t, x, y to int32 types"""
    need = ('uid','d','t','x','y')
    for c in need:
        if c not in df.columns:
            raise KeyError(f"Missing required column: {c}")
    for c in need:
        if pd.api.types.is_integer_dtype(df[c]) and df[c].isna().sum()==0:
            if c in ('x', 'y') and df[c].dtype == np.int64:
                continue
            df[c] = df[c].astype(np.int32)
        else:
            try:
                if df[c].isna().sum() == 0:
                    df[c] = df[c].astype(np.int32)
            except Exception as e:
                print(f"Warning: failed to convert column {c}: {e}")
                
def build_place_from_grid(df: pd.DataFrame, G: int = 1, inplace_col: Optional[str] = 'place'):
    """
    Modified version: ensure that 'xg' and 'yg' columns are created and added to df.
    """
    if G <= 0:
        raise ValueError("G must be a positive integer")
    xg_col, yg_col = 'xg', 'yg'
    xg = (df['x'].values // G).astype(np.int64)
    yg = (df['y'].values // G).astype(np.int64)
    df[xg_col] = xg
    df[yg_col] = yg
    place = (xg << 32) ^ (yg & 0xffffffff)
    if inplace_col:
        df[inplace_col] = place
    return place.astype(np.int64)

def consecutive_run_id_spatial_only(
    df_sorted: pd.DataFrame,
    by_cols=('uid','d'),
    xg_col='xg',
    yg_col='yg',
    max_grid_gap: int = 3
):
    """
    (Core function – improved version: anchor logic)
    Use an "anchor" logic to compute run_id.
    
    A new run starts when:
    1. by_cols changes (handled automatically by groupby)
    2. The Manhattan distance between the current point and the *first point of the run*
       (anchor) exceeds max_grid_gap
    
    Note: this method uses groupby().apply() with an internal Python loop.
    It may be slower than .diff() for datasets with a very large number of groups (uid, d).
    """
    
    # 1. Define a helper function for .apply()
    #    This function processes a single group (e.g., all records for one uid, d)
    def _apply_anchor_logic(group: pd.DataFrame) -> pd.Series:
        """
        Apply anchor logic within a single group.
        """
        # Convert columns to NumPy arrays for faster looping
        xg_arr = group[xg_col].values
        yg_arr = group[yg_col].values
        n = len(xg_arr)
        
        # If group is empty, return empty Series
        if n == 0:
            return pd.Series([], dtype=np.int32, index=group.index)

        # Output run_id array
        run_ids = np.empty(n, dtype=np.int32)
        
        # Initialize state
        current_run_id = 0       # current run ID (starting from 0)
        anchor_xg = xg_arr[0]    # anchor x of the current run
        anchor_yg = yg_arr[0]    # anchor y of the current run
        
        # 2. Core loop (state machine)
        for i in range(n):
            current_xg = xg_arr[i]
            current_yg = yg_arr[i]
            
            # Compute Manhattan distance to the anchor
            dist_x = abs(current_xg - anchor_xg)
            dist_y = abs(current_yg - anchor_yg)
            manhattan_dist = dist_x + dist_y
            
            # 3. Check for run break
            if manhattan_dist > max_grid_gap:
                current_run_id += 1
                # Update anchor to current point
                anchor_xg = current_xg
                anchor_yg = current_yg
            
            # Assign run_id
            run_ids[i] = current_run_id
        
        # Wrap NumPy array back into a Series
        # Important: use group.index to preserve alignment with original df
        return pd.Series(run_ids, index=group.index, dtype=np.int32)

    # --- Main logic ---
    
    # 4. Group by (uid, d) and apply anchor-based run_id logic
    #    Assumes df_sorted is already sorted by (uid, d, t)
    
    run_id_series = df_sorted.groupby(
        list(by_cols), 
        sort=False, 
        group_keys=False
    ).apply(_apply_anchor_logic)
    
    return run_id_series

# ========= 2. Main function (enhanced logging version) =========

def detect_stops_and_trips(
    df: pd.DataFrame,
    G: int = 1,
    min_stop_duration_slots: int = 2,  # (core parameter) duration in time slots
    max_grid_gap: int = 1,              # (core parameter) neighboring grid definition
    include_stops_in_trips: bool = False,
    place_col: str = 'place',
    verbose: bool = True
):
    """
    (Main function – enhanced logging version)
    Prints elapsed time and shape after each step.
    """
    t_start = time.perf_counter()
    if verbose:
        tqdm.write(f"--- detect_stops_and_trips (G={G}, min_duration={min_stop_duration_slots}, max_gap={max_grid_gap}) ---")
    
    total_steps = 8
    pbar = tqdm(total=total_steps, disable=not verbose, desc="Detect stops & trips")

    # Step 1) Copy and type check
    t_step = time.perf_counter()
    if verbose: tqdm.write("[1/8] Copy & ensure dtypes ...")
    df = df.copy()
    ensure_int_types(df)
    if verbose: tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s) | df.shape={df.shape}")
    pbar.update(1)

    # Step 2) Build/confirm place (ensure xg, yg columns exist)
    t_step = time.perf_counter()
    if verbose: tqdm.write(f"[2/8] Build/confirm place with G={G} (ensure xg, yg)...")
    build_place_from_grid(df, G=G, inplace_col=place_col)
    df['xg'] = df['xg'].astype(np.int64)
    df['yg'] = df['yg'].astype(np.int64)
    if verbose: tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s) | df.shape={df.shape}")
    pbar.update(1)

    # Step 3) Sort (!!! critical: must sort by t !!!)
    t_step = time.perf_counter()
    if verbose: tqdm.write("[3/8] Sort by (uid, d, t) ...")
    df.sort_values(['uid','d','t'], kind='mergesort', inplace=True)
    if verbose: tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s)")
    pbar.update(1)

    # Step 4) Compute consecutive run_id (spatial-only)
    t_step = time.perf_counter()
    if verbose:
        tqdm.write(f"[4/8] Compute consecutive run_id (spatial-only, max_gap={max_grid_gap})...")
    df['run_id'] = consecutive_run_id_spatial_only(
        df,
        by_cols=('uid','d'),
        xg_col='xg',
        yg_col='yg',
        max_grid_gap=max_grid_gap
    )
    if verbose: tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s)")
    pbar.update(1)

    # Step 5) Aggregate run-level statistics
    t_step = time.perf_counter()
    if verbose: tqdm.write("[5/8] Aggregate run stats ...")
    grp_keys = ['uid','d','run_id']
    run_stats = (
        df.groupby(grp_keys, sort=False)
          .agg(
              place=(place_col,'first'),
              xg=('xg', 'first'),
              yg=('yg', 'first'),
              start_t=('t','min'),
              end_t=('t','max'),
              run_len=('t','size')
          )
          .reset_index()
    )
    if verbose: tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s) | run_stats.shape={run_stats.shape}")
    pbar.update(1)

    # Step 6) Label STOP / MOVE (duration-based)
    t_step = time.perf_counter()
    if verbose:
        tqdm.write(f"[6/8] Label STOP (duration >= {min_stop_duration_slots}) / MOVE ...")
        
    run_stats['duration'] = (run_stats['end_t'] - run_stats['start_t']) + 1 
    run_stats['is_stop'] = (run_stats['duration'] >= min_stop_duration_slots)
    run_stats['segment_type'] = np.where(run_stats['is_stop'], 'STOP', 'MOVE')

    run_stats['stop_seq'] = (
        run_stats['is_stop']
        .groupby([run_stats['uid'], run_stats['d']], sort=False)
        .cumsum()
        .where(run_stats['is_stop'], other=np.nan)
        .astype(float)
    )
    if verbose: 
        n_stops = int(run_stats['is_stop'].sum())
        tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s) | Found {n_stops:,} STOPS")
    pbar.update(1)

    # Step 7) Compute trip_seq
    t_step = time.perf_counter()
    if verbose: tqdm.write("[7/8] Compute trip_seq for MOVE segments ...")
    run_stats = run_stats.sort_values(['uid','d','start_t'], kind='mergesort')

    cum_stops = (
        run_stats['is_stop']
        .groupby([run_stats['uid'], run_stats['d']], sort=False)
        .cumsum()
        .astype(np.int32)
    )
    run_stats['trip_seq'] = np.where(run_stats['segment_type']=='MOVE', cum_stops, np.nan).astype(float)

    if include_stops_in_trips:
        run_stats['trip_seq'] = np.where(
            run_stats['segment_type']=='STOP',
            cum_stops,
            run_stats['trip_seq']
        ).astype(float)
    if verbose: tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s)")
    pbar.update(1)

    # Step 8) Merge run-level fields back to row-level df
    t_step = time.perf_counter()
    if verbose: tqdm.write("[8/8] Merge run-level fields back to row-level ...")
    merge_cols = ['uid','d','run_id','run_len','duration','is_stop','stop_seq','segment_type','trip_seq']
    
    for col in merge_cols:
        if col not in run_stats.columns:
            raise KeyError(f"Internal error: missing column {col} in run_stats")

    df = df.merge(
        run_stats[merge_cols],
        on=['uid','d','run_id'],
        how='left'
    )
    
    df['is_stop'] = df['is_stop'].astype(bool)
    df['run_len'] = df['run_len'].astype(np.int32)
    df['duration'] = df['duration'].astype(np.int32)
    df['stop_seq'] = df['stop_seq'].astype(float)
    df['traj_seq'] = df['trip_seq'].astype(float)
    df.drop(columns=['trip_seq'], inplace=True)
    if verbose: tqdm.write(f"  > Done. ({(time.perf_counter()-t_step):.2f}s) | final df.shape={df.shape}")
    pbar.update(1)
    pbar.close()

    if verbose:
        n_runs = len(run_stats)
        n_stops = int(run_stats['is_stop'].sum())
        n_moves = int((run_stats['segment_type']=='MOVE').sum())
        tqdm.write(f"\n[Done] runs={n_runs:,}, STOPs={n_stops:,}, MOVEs={n_moves:,}")
        tqdm.write(f"--- Total time: {time.perf_counter()-t_start:.2f}s ---")

    return df, run_stats

In [None]:
MIN_DURATION = 2
GRID_SIZE = 1
GRID_GAP = 3

detailed_df, run_summary = detect_stops_and_trips(
        mob[mob["d"] == 0],
        G=GRID_SIZE,
        min_stop_duration_slots=MIN_DURATION,
        max_grid_gap=GRID_GAP,
        verbose=True
    )

detailed_df.to_parquet("data/detailed_df.parquet", index=False)
run_summary.to_parquet("data/run_summary.parquet", index=False)

## Stop-to-Stop Trajectory Extraction

In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os
import gc
from tqdm.auto import tqdm

# ==========================================================
# Configuration
# ==========================================================
DATA_FILE = "data/detailed_df.parquet"
OUTPUT_FILE = "all_stop_move_stop_sequences.parquet"

# ==========================================================
# Step 1: Stream loading and basic preprocessing
# ==========================================================
print("Loading basic information from detailed_df...")
try:
    metadata_df = pd.read_parquet(DATA_FILE, engine="pyarrow", columns=["uid", "d", "t"])
except Exception as e:
    print(f"Failed to load: {e}")
    exit()

print("Preparing for streaming read...")

# ==========================================================
# Step 2: Define streaming group generator
# ==========================================================
def stream_person_day_groups(df):
    """Requires df to be sorted by ['uid', 'd', 't']"""
    last_uid, last_d = None, None
    buffer = []
    for row in df.itertuples(index=False):
        if (row.uid, row.d) != (last_uid, last_d) and buffer:
            yield last_uid, last_d, pd.DataFrame(buffer, columns=df.columns)
            buffer = []
        buffer.append(row)
        last_uid, last_d = row.uid, row.d
    if buffer:
        yield last_uid, last_d, pd.DataFrame(buffer, columns=df.columns)

# ==========================================================
# Step 3: Load and filter data
# ==========================================================
print("Loading full trajectory table and sorting...")

cols_needed = ["uid", "x", "y", "d", "t", "stop_seq", "traj_seq"]
df_iter = pd.read_parquet(DATA_FILE, columns=cols_needed)

# ✅ Keep only d == 0
df_iter = df_iter[df_iter['d'] == 0].copy()

# Filter valid points and sort
df_iter = df_iter[(df_iter['stop_seq'] > 0) | (df_iter['traj_seq'] > 0)].sort_values(by=['uid', 'd', 't'])
print(f"Base data row count (d=0 only): {len(df_iter):,}")

# ==========================================================
# Step 4: Prepare output file
# ==========================================================
if os.path.exists(OUTPUT_FILE):
    os.remove(OUTPUT_FILE)

schema_df = df_iter.head(0).copy()
schema_df['trip_group_uid'] = pd.Series(dtype='int32')
schema_df['trip_group_d'] = pd.Series(dtype='int32')
schema_df['trip_group_id'] = pd.Series(dtype='int64')
schema_df['global_seq_id'] = pd.Series(dtype='int64')   # ✅ Global auto-increment ID column
# Define schema
pa_schema = pa.Table.from_pandas(schema_df, preserve_index=False).schema
del schema_df
gc.collect()

total_trips_found = 0
global_seq_id = 0   # ✅ Initialize global counter

# ==========================================================
# Step 5: Main loop (streaming)
# ==========================================================
with pq.ParquetWriter(OUTPUT_FILE, pa_schema, compression='snappy') as writer:
    for uid, d, daily_table in tqdm(stream_person_day_groups(df_iter), desc="Processing (uid, day)"):
        trip_ids = daily_table.loc[daily_table['traj_seq'] > 0, 'traj_seq'].unique()
        if len(trip_ids) == 0:
            continue

        for trip_id_float in trip_ids:
            trip_id = int(trip_id_float)
            sub_move = daily_table[daily_table['traj_seq'] == trip_id].copy()
            prev_stop = daily_table[daily_table['stop_seq'] == trip_id]
            next_stop = daily_table[daily_table['stop_seq'] == trip_id + 1]
            prev_stop_last = prev_stop.tail(1) if not prev_stop.empty else pd.DataFrame()
            next_stop_first = next_stop.head(1) if not next_stop.empty else pd.DataFrame()
            sub_df = pd.concat([prev_stop_last, sub_move, next_stop_first], ignore_index=True)
            
            if not sub_df.empty:
                global_seq_id += 1  # ✅ Increment global ID
                sub_df = sub_df.assign(
                    trip_group_uid=uid,
                    trip_group_d=d,
                    trip_group_id=trip_id,
                    global_seq_id=global_seq_id  # ✅ Assign global ID
                )
                writer.write_table(pa.Table.from_pandas(sub_df, schema=pa_schema, preserve_index=False))
                total_trips_found += 1

        del daily_table
        gc.collect()

print("\n--- ✅ Processing completed ---")
print(f"[Success] Saved {total_trips_found:,} stop-to-stop trajectories to {OUTPUT_FILE}")

# ==========================================================
# Step 6: Verification
# ==========================================================
print("\nVerifying output file...")
df_all = pd.read_parquet(OUTPUT_FILE)
print(f"Verification passed, total rows: {len(df_all):,}")
print("Number of unique trip groups:", len(df_all.groupby(['trip_group_uid', 'trip_group_d', 'trip_group_id'])))
print("Maximum global_seq_id:", df_all['global_seq_id'].max())