## Step 1: Configuration & Imports

This cell contains all necessary library imports and user-configurable parameters. **Modify the configuration variables in this cell to match your environment.**


In [1]:
import os
import time
from datetime import timedelta, datetime
# DataFrame libraries
import pandas as pd
import cudf # The core GPU DataFrame library
from urllib.parse import quote_plus
# Database and Parallelization
from sqlalchemy import create_engine, text
from dask.distributed import Client, progress
from dask_cuda import LocalCUDACluster # For managing GPU workers

In [2]:

# --- 1A. Configuration ---
username = 'xuhang.liu'
password = 'xuhangLIU@HOMES'
hostname = 'homes-database.epfl.ch'
port = '30767'
dbname = 'Shenzhen_Taxi'
# Database credentials
DB_CONFIG = {
    "user": username,
    "password":password,
    "host": hostname,
    "port": port,
    "dbname": dbname
}

# Create a SQLAlchemy database URL
# DB_URL = f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}"
password = quote_plus(password)
DB_URL = f"postgresql://{username}:{password}@{hostname}:{port}/{dbname}"
# Table and File names
GPS_TABLE_NAME = "taxi_data2020_01_01"
OUTPUT_CSV_FILE = "stay_points_gpu_output.csv"

# Algorithm parameters
VELOCITY_THRESHOLD_KMH = 1
TIME_THRESHOLD_MINUTES = 10

# --- 1B. Display Settings ---
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

print("Step 1: Configuration loaded and libraries imported successfully.")
print("cuDF has been imported for GPU acceleration.")


Step 1: Configuration loaded and libraries imported successfully.
cuDF has been imported for GPU acceleration.


## Step 2: Define Data Scope by Time Range

Instead of fetching all taxi IDs, we now define the processing scope by the total time range of the data. We will query the earliest and latest timestamps to determine the entire period to be analyzed. This avoids the bottleneck of fetching millions of unique IDs.


In [3]:
def get_time_range(db_url: str):
    """Queries the database to find the maximum timestamp and sets a fixed start time of 2020-01-01."""
    try:
        engine = create_engine(db_url)
        with engine.connect() as connection:
            print("Connection successful. Querying for max time...")
            # Query only for the maximum time, as the start time is fixed.
            query = text(f"SELECT MAX(time) as max_time FROM {GPS_TABLE_NAME} WHERE time >= '2020-01-01 00:00:00';")
            result = connection.execute(query).fetchone()

            # Hardcode the start time to the beginning of 2020
            min_time = datetime(2020, 1, 1, 0, 0, 0)

            if result and result.max_time:
                max_time = result.max_time
                print(f"Time range has been set: from {min_time} to {max_time}")
                # Ensure the found max_time is not before our hardcoded min_time
                if max_time < min_time:
                    print("Warning: The latest timestamp in the database is before 2020. No data will be processed.")
                    return min_time, min_time # Return a zero-duration range
                return min_time, max_time
            else:
                raise ValueError("Could not retrieve a valid max_time from the database.")
    except Exception as e:
        print(f"Error in Step 2: Could not connect or fetch time range. Please check DB_CONFIG.")
        print(f"Details: {e}")
        return None, None

# --- 2A. Connect and get the total time range ---
print("Step 2: Defining processing scope by time...")
total_start_time, total_end_time = get_time_range(DB_URL)

# --- 2B. Generate time chunks for parallel processing ---
time_chunks_all = []
if total_start_time and total_end_time:
    chunk_duration = timedelta(hours=1)  # Process data in 1-hour chunks
    
    # CRITICAL: Overlap must be >= TIME_THRESHOLD_MINUTES to catch boundary-spanning events
    overlap_duration = timedelta(minutes=TIME_THRESHOLD_MINUTES + 5) # Add a 5-min buffer
    
    current_start = total_start_time
    while current_start < total_end_time:
        current_end = current_start + chunk_duration
        if current_end > total_end_time:
            current_end = total_end_time
            
        time_chunks_all.append((current_start, current_end))
        
        if current_end == total_end_time:
            break # Reached the end
            
        current_start = current_end - overlap_duration

    print(f"\nGenerated {len(time_chunks_all)} time chunks to process in parallel.")
    print(f"Example chunks:")

else:
    print("Could not determine time range. Halting execution.")



Step 2: Defining processing scope by time...
Connection successful. Querying for max time...
Time range has been set: from 2020-01-01 00:00:00 to 2020-01-12 12:08:24

Generated 368 time chunks to process in parallel.
Example chunks:


## Step 3: Process Trajectories & Identify Stay Points

This is the core of the project. We define the functions for processing a single taxi, ensuring the heavy computation happens on the GPU using `cudf`. We then use `Dask` and `dask-cuda` to distribute these tasks across one or more GPUs.


In [None]:
# --- 3A. Define the logic for processing a single time chunk on the GPU ---

def fetch_chunk_data_cpu(start_time, end_time, db_url: str) -> pd.DataFrame:
    """Fetches all GPS data within a specific time window into a CPU-based Pandas DataFrame."""
    query = text(f"""
        SELECT taxiid, time, lon, lat, velocity
        FROM {GPS_TABLE_NAME}
        WHERE 
            time >= :start_time AND time < :end_time AND
            validity = '1' AND passenger = '0' AND zoneid > 0 AND
            velocity < :velocity_threshold
    """)
    try:
        worker_engine = create_engine(db_url)
        with worker_engine.connect() as connection:
            return pd.read_sql_query(
                query,
                connection,
                params={
                    "start_time": start_time,
                    "end_time": end_time,
                    "velocity_threshold": VELOCITY_THRESHOLD_KMH
                }
            )
    except Exception as e:
        print(f"Worker failed to fetch data for chunk {start_time}-{end_time}: {e}")
        return pd.DataFrame()

def identify_stay_points_gpu(gdf: cudf.DataFrame) -> cudf.DataFrame:
    """The core algorithm, operating on a cuDF DataFrame for a single taxi's trajectory."""
    # This function is now designed to be used with groupby().apply()
    if gdf.shape[0] < 2:
        return None # Not enough data to process

    gdf = gdf.sort_values('time')
    gdf['is_staying'] = gdf['velocity'] < VELOCITY_THRESHOLD_KMH
    gdf['block_id'] = gdf['is_staying'].diff().ne(0).cumsum()
    
    staying_blocks = gdf[gdf['is_staying']]
    if staying_blocks.empty:
        return None
    
    agg_funcs = {'time': ['min', 'max'], 'lon': ['mean'], 'lat': ['mean']}
    stay_points = staying_blocks.groupby('block_id').agg(agg_funcs)
    
    stay_points.columns = ['start_time', 'end_time', 'lon', 'lat']
    stay_points['duration'] = stay_points['end_time'] - stay_points['start_time']
    
    min_duration = pd.to_timedelta(TIME_THRESHOLD_MINUTES, unit='m')
    significant_stays = stay_points[stay_points['duration'] >= min_duration].copy()

    if significant_stays.empty:
        return None
        
    significant_stays['duration_minutes'] = significant_stays['duration'].dt.total_seconds() / 60
    return significant_stays[['start_time', 'end_time', 'duration_minutes', 'lon', 'lat']].reset_index(drop=True)

def process_time_chunk_gpu(time_chunk, db_url: str):
    """Master function for a single worker: Fetch chunk to CPU -> GPU transfer -> GPU process -> CPU result."""
    start_time, end_time = time_chunk
    try:
        # 1. Fetch data for the entire time chunk to CPU memory
        chunk_df_cpu = fetch_chunk_data_cpu(start_time, end_time, db_url)
        if chunk_df_cpu.empty:
            return None

        # 2. Transfer data from CPU to GPU
        chunk_gdf = cudf.from_pandas(chunk_df_cpu)
        
        # 3. Process entirely on the GPU using groupby().apply()
        # This is the key change: process all taxis in the chunk in one go.
        result_gdf = chunk_gdf.groupby('taxiid').apply(identify_stay_points_gpu)

        if result_gdf.empty:
            return None
            
        # 4. Transfer result back to CPU for consolidation
        result_df_cpu = result_gdf.to_pandas()
        return result_df_cpu.reset_index() # taxiid is in the index after groupby
            
    except Exception as e:
        print(f"Worker error on time chunk {start_time}-{end_time}: {e}")
    return None

print("Step 3A: GPU-based processing logic for time chunks defined.")


Step 3A: GPU-based processing logic for time chunks defined.


In [5]:
time_chunks = time_chunks_all[-2]
print(time_chunks)

(datetime.datetime(2020, 1, 12, 10, 30), datetime.datetime(2020, 1, 12, 11, 30))


In [6]:
# --- 3B. Execute Processing in Parallel on the GPU ---

start_time = time.time()

if time_chunks:
    # Use dask_cuda to create a cluster of workers, one for each visible GPU
    print("Initializing Dask CUDA cluster...")
    cluster = LocalCUDACluster()
    client = Client(cluster)
    print(f"Dask client connected to GPU cluster: {client.dashboard_link}")

    print(f"\nSubmitting {len(time_chunks)} time chunks as tasks to Dask workers...")
    futures = client.map(process_time_chunk_gpu, time_chunks, db_url=DB_URL)
    
    progress(futures)
    
    print("\nGathering results from workers...")
    results = client.gather(futures)
    

    print("Dask client and CUDA cluster closed.")
else:
    print("No time chunks to process. Skipping parallel execution.")
    results = []

print(f"\nStep 3B: Parallel processing finished in {time.time() - start_time:.2f} seconds.")



Initializing Dask CUDA cluster...


  "plugins": worker_plugins(
2025-10-14 19:35:08,339 - tornado.application - ERROR - Exception in callback <bound method SystemMonitor.update of <SystemMonitor: cpu: 0 memory: 346 MB fds: 80>>
Traceback (most recent call last):
  File "/home/lxhep/miniconda3/envs/rapids-env/lib/python3.10/site-packages/tornado/ioloop.py", line 945, in _run
    val = self.callback()
  File "/home/lxhep/miniconda3/envs/rapids-env/lib/python3.10/site-packages/distributed/system_monitor.py", line 210, in update
    gpu_metrics = nvml.real_time()
  File "/home/lxhep/miniconda3/envs/rapids-env/lib/python3.10/site-packages/distributed/diagnostics/nvml.py", line 370, in real_time
    "utilization": _get_utilization(h),
  File "/home/lxhep/miniconda3/envs/rapids-env/lib/python3.10/site-packages/distributed/diagnostics/nvml.py", line 339, in _get_utilization
    return pynvml.nvmlDeviceGetUtilizationRates(h).gpu
  File "/home/lxhep/miniconda3/envs/rapids-env/lib/python3.10/site-packages/pynvml.py", line 3711, in

Dask client connected to GPU cluster: http://127.0.0.1:8787/status

Submitting 2 time chunks as tasks to Dask workers...

Gathering results from workers...


2025-10-14 19:35:10,442 - distributed.worker - ERROR - Compute Failed
Key:       process_time_chunk_gpu-fdf710478d9df61643443f414406ffc5
State:     executing
Task:  <Task 'process_time_chunk_gpu-fdf710478d9df61643443f414406ffc5' process_time_chunk_gpu(..., ...)>
Exception: "TypeError('cannot unpack non-iterable datetime.datetime object')"
Traceback: '  File "/tmp/ipykernel_44109/4185844314.py", line 51, in process_time_chunk_gpu\n'

2025-10-14 19:35:10,444 - distributed.worker - ERROR - Compute Failed
Key:       process_time_chunk_gpu-6dc062b2f4a867a5689f6d752331a605
State:     executing
Task:  <Task 'process_time_chunk_gpu-6dc062b2f4a867a5689f6d752331a605' process_time_chunk_gpu(..., ...)>
Exception: "TypeError('cannot unpack non-iterable datetime.datetime object')"
Traceback: '  File "/tmp/ipykernel_44109/4185844314.py", line 51, in process_time_chunk_gpu\n'



TypeError: cannot unpack non-iterable datetime.datetime object

## Step 4: Consolidate & Store Results

The final step is to combine the results (which are now standard Pandas DataFrames) into a single master DataFrame and save it to a CSV file.


In [None]:
print("Step 4: Consolidating and deduplicating results...")

# Filter out any `None` values which may result from empty chunks or errors
final_results_list = [res for res in results if res is not None and not res.empty]

if not final_results_list:
    print("No stay points were identified across all time chunks.")
else:
    # Concatenate all individual Pandas DataFrames into one
    final_df = pd.concat(final_results_list, ignore_index=True)
    print(f"Consolidated {final_df.shape[0]} raw stay points before deduplication.")
    
    # --- Deduplication Step ---
    # Sort by duration to ensure we keep the longest record of an event
    final_df.sort_values('duration_minutes', ascending=False, inplace=True)
    
    # Drop duplicates based on the unique identifier of a stay point event
    final_df.drop_duplicates(subset=['taxiid', 'start_time'], keep='first', inplace=True)
    
    # Reorder columns for better readability and sort the final output
    final_df = final_df[['taxiid', 'start_time', 'end_time', 'duration_minutes', 'lon', 'lat']]
    final_df.sort_values(['taxiid', 'start_time'], inplace=True)
    
    print(f"Successfully consolidated to {len(final_df)} unique stay points after deduplication.")
    
    try:
        final_df.to_csv(OUTPUT_CSV_FILE, index=False)
        print(f"Results successfully saved to '{OUTPUT_CSV_FILE}'")
    except Exception as e:
        print(f"Error saving results to file: {e}")

    print("\n--- Final Results Sample ---")
    display(final_df.head(20))



In [None]:
client.close()
cluster.close()