### 1. Setup and Imports
 Import all necessary libraries for data manipulation, geospatial analysis, machine learning, and visualization.


In [1]:
!pip install geopy folium shapely

Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl.metadata (6.8 kB)
Collecting folium
  Downloading folium-0.20.0-py2.py3-none-any.whl.metadata (4.2 kB)
Collecting shapely
  Downloading shapely-2.1.1-cp311-cp311-win_amd64.whl.metadata (7.0 kB)
Collecting geographiclib<3,>=1.52 (from geopy)
  Downloading geographiclib-2.0-py3-none-any.whl.metadata (1.4 kB)
Collecting branca>=0.6.0 (from folium)
  Downloading branca-0.8.1-py3-none-any.whl.metadata (1.5 kB)
Collecting xyzservices (from folium)
  Downloading xyzservices-2025.4.0-py3-none-any.whl.metadata (4.3 kB)
Downloading geopy-2.4.1-py3-none-any.whl (125 kB)
Downloading geographiclib-2.0-py3-none-any.whl (40 kB)
Downloading folium-0.20.0-py2.py3-none-any.whl (113 kB)
Downloading shapely-2.1.1-cp311-cp311-win_amd64.whl (1.7 MB)
   ---------------------------------------- 0.0/1.7 MB ? eta -:--:--
   ------------------ --------------------- 0.8/1.7 MB 16.9 MB/s eta 0:00:01
   ------------------ --------------------- 0.8/1.7 MB 1

In [1]:
import os
import pandas as pd
import numpy as np
from datetime import datetime
import time
from geopy.distance import great_circle
from sklearn.cluster import DBSCAN
import folium
from shapely.geometry import Point, LineString

### 2. Configuration and Constants
Define paths and parameters that will be used throughout the notebook. This makes the code cleaner and easier to modify.

 --- Configuration ---
 IMPORTANT: This assumes your notebook is in the root folder, and the data is in a subfolder.
 Project Root
  |- main.ipynb
  |- Geolife Trajectories 1.3/
     |- Data/
        |- 000/
        |- 001/
        ...

In [2]:
GEOLIFE_BASE_PATH = os.path.join('Geolife Trajectories 1.3', 'Data')

In [3]:
# For faster development/demonstration, we can limit the number of users to process.
USER_SUBSET = 181 

# Parameters for filtering driving trajectories
MIN_AVG_SPEED_KMH = 20  # Minimum average speed for a trip 'driving'
MAX_ALLOWED_SPEED_KMH = 150 # A sanity check to remove erroneous GPS points

# DBSCAN parameters for clustering Origin-Destination points
OD_CLUSTER_EPS = 0.5  # Epsilon in km. Two points are neighbors if they are within 0.5 km.
OD_CLUSTER_MIN_SAMPLES = 10 # Minimum number of trips to form a dense O-D cluster.

# Anomaly detection parameters
ANOMALY_DISTANCE_THRESHOLD_METERS = 200 

# Output file
OUTPUT_MAP_FILE = 'geolife_route_analysis.html'

### 3. Data Ingestion and Preprocessing
 This section contains functions to parse the raw .plt files, clean the data, and compile it into a single Pandas DataFrame.

In [4]:
def parse_plt_file(file_path: str, user_id: str) -> pd.DataFrame:
    """Parses a single .plt file into a DataFrame."""
    try:
        col_names = ['lat', 'lon', 'ignored', 'altitude_ft', 'days_since_1899', 'date', 'time']
        df = pd.read_csv(file_path, skiprows=6, header=None, names=col_names, usecols=['lat', 'lon', 'date', 'time'])
        if df.empty: return pd.DataFrame()
        
        df['timestamp'] = pd.to_datetime(df['date'] + ' ' + df['time'])
        df['user_id'] = user_id
        df['trajectory_id'] = os.path.basename(file_path).replace('.plt', '')
        return df[['user_id', 'trajectory_id', 'timestamp', 'lat', 'lon']]
    except Exception:
        return pd.DataFrame()

In [7]:
def load_all_trajectories(base_path: str, user_limit: int = None) -> pd.DataFrame:
    """Loads all trajectories from the GeoLife dataset into a single DataFrame."""
    all_dfs = []
    user_folders = sorted([d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))])
    
    if user_limit:
        user_folders = user_folders[:user_limit]
        
    print(f"Starting to process {len(user_folders)} users...")
    
    for i, user_id in enumerate(user_folders):
        user_path = os.path.join(base_path, user_id, 'Trajectory')
        if not os.path.exists(user_path): continue
        
        plt_files = [f for f in os.listdir(user_path) if f.endswith('.plt')]
        for traj_file in plt_files:
            df = parse_plt_file(os.path.join(user_path, traj_file), user_id)
            if not df.empty:
                all_dfs.append(df)
        print(f"Progress: {i+1}/{len(user_folders)} users processed.", end='\r')
        
    print("\nConcatenating all trajectories...")
    return pd.concat(all_dfs, ignore_index=True)

In [8]:
# Execute the data loading
start_time = time.time()
raw_df = load_all_trajectories(GEOLIFE_BASE_PATH, USER_SUBSET)
end_time = time.time()

print(f"\nData loading complete. Took {end_time - start_time:.2f} seconds.")
print(f"Loaded {len(raw_df)} points from {raw_df['trajectory_id'].nunique()} trajectories.")
raw_df.head()


Starting to process 181 users...
Progress: 110/181 users processed.

  df['timestamp'] = pd.to_datetime(df['date'] + ' ' + df['time'])


Progress: 181/181 users processed.
Concatenating all trajectories...

Data loading complete. Took 641.78 seconds.
Loaded 24859853 points from 17727 trajectories.


Unnamed: 0,user_id,trajectory_id,timestamp,lat,lon
0,0,20090402060732,2009-04-02 06:07:32,40.000102,116.327021
1,0,20090402060732,2009-04-02 06:07:42,40.002471,116.327546
2,0,20090402060732,2009-04-02 06:07:47,39.999973,116.326985
3,0,20090402060732,2009-04-02 06:07:52,39.999907,116.327082
4,0,20090402060732,2009-04-02 06:07:57,39.999883,116.327228


### 4. Feature Engineering & Driving Segment Detection
 We will now process the raw data to calculate speed and filter for trajectories that represent vehicle travel.

In [9]:
print("Validating GPS coordinates...")
points_before_cleaning = len(raw_df)
raw_df = raw_df[
    (raw_df['lat'].between(-90, 90)) &
    (raw_df['lon'].between(-180, 180))
]
points_after_cleaning = len(raw_df)
points_removed = points_before_cleaning - points_after_cleaning
print(f"Removed {points_removed} invalid GPS points.")
print(f"Remaining points: {points_after_cleaning}")

Validating GPS coordinates...
Removed 1 invalid GPS points.
Remaining points: 24859852


In [10]:
def calculate_speed(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculates distance and speed for each point in the trajectories.
    This function uses the Haversine formula for accurate distance calculation on a sphere.
    """
    df = df.sort_values(['trajectory_id', 'timestamp'])
    
    # Shift coordinates to get previous point's lat/lon
    df['prev_lat'] = df.groupby('trajectory_id')['lat'].shift(1)
    df['prev_lon'] = df.groupby('trajectory_id')['lon'].shift(1)
    df['prev_timestamp'] = df.groupby('trajectory_id')['timestamp'].shift(1)

   
    df = df.dropna()

    # Calculate distance using geopy's great_circle function (Haversine)
    df['distance_km'] = df.apply(
        lambda row: great_circle((row['lat'], row['lon']), (row['prev_lat'], row['prev_lon'])).kilometers,
        axis=1
    )
    
  
    df['time_diff_hr'] = (df['timestamp'] - df['prev_timestamp']).dt.total_seconds() / 3600.0
    
    
    df['speed_kmh'] = df['distance_km'] / df['time_diff_hr'].replace(0, np.nan)
    
    # Clean up infinite values and NaNs that might result from the calculation
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.dropna(subset=['speed_kmh'], inplace=True)
    
    return df[['trajectory_id', 'timestamp', 'lat', 'lon', 'speed_kmh']]

print("Calculating speeds for all trajectories...")
speed_df = calculate_speed(raw_df.copy())
print("Speed calculation complete.")
speed_df.head()

Calculating speeds for all trajectories...
Speed calculation complete.


Unnamed: 0,trajectory_id,timestamp,lat,lon,speed_kmh
22561929,20000101231219,2000-01-01 23:13:21,39.990964,116.327041,12.732507
22561930,20000101231219,2000-01-01 23:15:23,39.993207,116.326827,7.379291
19395124,20070412093132,2007-04-12 09:39:37,39.974317,116.33045,0.080677
19395125,20070412093132,2007-04-12 09:40:49,39.974467,116.33045,0.833963
19395126,20070412093132,2007-04-12 09:43:37,39.974583,116.33045,0.277988


#### Filtering for Driving Trips
We apply heuristics on speed to identify trips likely made by car.

In [11]:
print("Filtering for driving trajectories...")

trajectory_stats = speed_df.groupby('trajectory_id')['speed_kmh'].agg(['mean', 'max']).reset_index()

# Filter based on our defined speed thresholds
driving_trajectories_ids = trajectory_stats[
    (trajectory_stats['mean'] >= MIN_AVG_SPEED_KMH) & 
    (trajectory_stats['max'] < MAX_ALLOWED_SPEED_KMH)
]['trajectory_id']

driving_df = speed_df[speed_df['trajectory_id'].isin(driving_trajectories_ids)].copy()

print(f"Filtered down to {driving_df['trajectory_id'].nunique()} driving trajectories.")
print(f"Total driving GPS points: {len(driving_df)}")


Filtering for driving trajectories...
Filtered down to 4792 driving trajectories.
Total driving GPS points: 4643511


### 5. Origin-Destination (O-D) Pair Identification
 We use DBSCAN clustering on the start and end points of trips to find common O-D zones.

In [12]:
# Extract start and end points for each driving trajectory
od_points = driving_df.groupby('trajectory_id').agg(
    start_lat=('lat', 'first'),
    start_lon=('lon', 'first'),
    end_lat=('lat', 'last'),
    end_lon=('lon', 'last')
).reset_index()

print(f"Extracted {len(od_points)} Origin-Destination pairs.")


Extracted 4792 Origin-Destination pairs.


In [14]:
# Cluster the start points
print("Clustering start points with DBSCAN...")
# Convert epsilon from km to degrees for DBSCAN (approximate)
kms_per_radian = 6371.0088
eps_in_degrees = OD_CLUSTER_EPS / kms_per_radian

dbscan_start = DBSCAN(eps=eps_in_degrees, min_samples=OD_CLUSTER_MIN_SAMPLES, algorithm='ball_tree', metric='haversine')
od_points['start_cluster'] = dbscan_start.fit_predict(np.radians(od_points[['start_lat', 'start_lon']]))

Clustering start points with DBSCAN...


In [13]:
# Cluster the end points
print("Clustering end points with DBSCAN...")
dbscan_end = DBSCAN(eps=eps_in_degrees, min_samples=OD_CLUSTER_MIN_SAMPLES, algorithm='ball_tree', metric='haversine')
od_points['end_cluster'] = dbscan_end.fit_predict(np.radians(od_points[['end_lat', 'end_lon']]))

Clustering end points with DBSCAN...


In [15]:
# Filter out trips that don't belong to a cluster (noise points, labeled -1)
clustered_od = od_points[(od_points['start_cluster'] != -1) & (od_points['end_cluster'] != -1)]

# Identify the most frequent O-D pair
if not clustered_od.empty:
    od_pair_counts = clustered_od.groupby(['start_cluster', 'end_cluster']).size().reset_index(name='count')
    most_frequent_pair = od_pair_counts.sort_values('count', ascending=False).iloc[0]

    start_cluster_id = int(most_frequent_pair['start_cluster'])
    end_cluster_id = int(most_frequent_pair['end_cluster'])

    print(f"\nMost frequent O-D pair is from Start Cluster {start_cluster_id} to End Cluster {end_cluster_id}")
    print(f"Number of trips in this pair: {most_frequent_pair['count']}")
else:
    print("\nCould not find any dense O-D clusters with the current settings. Try adjusting DBSCAN parameters or processing more users.")
    start_cluster_id = -1 # Set to invalid to skip subsequent steps
    end_cluster_id = -1


Most frequent O-D pair is from Start Cluster 0 to End Cluster 0
Number of trips in this pair: 254


### 6. Normal Route Generation
 For the most frequent O-D pair, we will now generate a "normal" or representative route.
 A simple and effective method is to find the trajectory that is most "central" to all other trajectories in the cluster.

In [16]:
if start_cluster_id != -1:
    # Get all trajectory IDs for our chosen O-D pair
    target_od_trips_ids = clustered_od[
        (clustered_od['start_cluster'] == start_cluster_id) & 
        (clustered_od['end_cluster'] == end_cluster_id)
    ]['trajectory_id']

    target_od_df = driving_df[driving_df['trajectory_id'].isin(target_od_trips_ids)]

    # For simplicity, we'll select one of these trips as our "normal route".
    # A more advanced method would involve averaging or finding a central path.
    normal_route_id = target_od_trips_ids.iloc[0]
    normal_route_df = target_od_df[target_od_df['trajectory_id'] == normal_route_id]

    print(f"Selected trajectory {normal_route_id} as the 'Normal Route'.")
    print(f"Normal route has {len(normal_route_df)} points.")
else:
    normal_route_df = pd.DataFrame()

Selected trajectory 20070416032733 as the 'Normal Route'.
Normal route has 32 points.


### 7. Anomaly Detection
 Now, we'll take another trip from the same O-D pair and check how much it deviates from our "normal route".

In [17]:
# Select a different trip from the same cluster to act as our "test trip"
if len(target_od_trips_ids) > 1:
    test_trip_id = target_od_trips_ids.iloc[1]
    test_trip_df = target_od_df[target_od_df['trajectory_id'] == test_trip_id].copy()
    print(f"Selected trajectory {test_trip_id} as the 'Test Trip'.")

    # Use Shapely for efficient distance calculation
    # Create a LineString object for the normal route
    normal_route_line = LineString(zip(normal_route_df['lon'], normal_route_df['lat']))

    # Calculate the minimum distance from each point in the test trip to the normal route line
    def calculate_deviation_meters(row):
        point = Point(row['lon'], row['lat'])
        # .distance returns distance in the units of the coordinates (degrees)
        # We need to convert this to meters. This is an approximation.
        # A more precise way would be to project to a metric CRS, but this is sufficient.
        degree_dist = point.distance(normal_route_line)
        # Approx 1 degree = 111 km
        return degree_dist * 111000

    print("Calculating deviation from normal route for test trip...")
    test_trip_df['deviation_m'] = test_trip_df.apply(calculate_deviation_meters, axis=1)

    # Flag points as anomalous if their deviation exceeds the threshold
    test_trip_df['is_anomaly'] = test_trip_df['deviation_m'] > ANOMALY_DISTANCE_THRESHOLD_METERS
    
    anomalies_found = test_trip_df['is_anomaly'].sum()
    print(f"Anomaly detection complete. Found {anomalies_found} anomalous points.")

else:
    if normal_route_df.empty:
        print("Skipping anomaly detection because no normal route was generated.")
    else:
        print("Not enough trips in the selected O-D cluster to perform anomaly detection.")
    test_trip_df = pd.DataFrame()


Selected trajectory 20070425154640 as the 'Test Trip'.
Calculating deviation from normal route for test trip...
Anomaly detection complete. Found 38 anomalous points.


### 8. Interactive Visualization
 We use Folium to plot the normal route, the test trip, and the detected anomalies on an interactive map.

In [18]:
if not test_trip_df.empty:
    # Create a map centered on the start of the normal route
    map_center = [normal_route_df['lat'].iloc[0], normal_route_df['lon'].iloc[0]]
    m = folium.Map(location=map_center, zoom_start=12, tiles='CartoDB dark_matter')

    # Plot the Normal Route (blue)
    folium.PolyLine(
        locations=normal_route_df[['lat', 'lon']].values,
        color='blue',
        weight=5,
        opacity=0.8,
        tooltip='Normal Route'
    ).add_to(m)

    # Plot the Test Trip (green)
    folium.PolyLine(
        locations=test_trip_df[['lat', 'lon']].values,
        color='green',
        weight=3,
        opacity=0.8,
        tooltip='Test Trip'
    ).add_to(m)

    # Plot the Anomalous Points (red circles)
    anomalous_points = test_trip_df[test_trip_df['is_anomaly']]
    for _, row in anomalous_points.iterrows():
        folium.CircleMarker(
            location=[row['lat'], row['lon']],
            radius=6,
            color='red',
            fill=True,
            fill_color='red',
            fill_opacity=0.7,
            tooltip=f"Anomaly! Deviation: {row['deviation_m']:.0f}m"
        ).add_to(m)

    # Save the map to an HTML file
    m.save(OUTPUT_MAP_FILE)
    print(f"\nInteractive map saved to '{OUTPUT_MAP_FILE}'.")
    print("Open this file in a web browser to explore the results.")

else:
    print("Skipping map generation as no test trip was available.")


Interactive map saved to 'geolife_route_analysis.html'.
Open this file in a web browser to explore the results.


### 9. Scalability Discussion 
### How This Solution Scales with Apache Spark

While this notebook uses Pandas for its implementation, the logic is designed to be scalable to a distributed computing environment like **Apache Spark**, which is essential for handling petabyte-scale vehicle data at a company like Motorq.

1.  **Data Ingestion:**
    * **Current:** Reads files sequentially into memory.
    * **Spark:** Spark can read thousands of files in parallel directly from a distributed file system (like S3 or HDFS) into a Spark DataFrame. The initial parsing logic (`parse_plt_file`) can be converted into a User-Defined Function (UDF) and applied across the cluster.

2.  **Feature Engineering (Speed Calculation):**
    * **Current:** Uses `groupby().shift()` in Pandas.
    * **Spark:** Spark's Window functions (`lag` over a `WindowSpec` partitioned by `trajectory_id` and ordered by `timestamp`) are designed for exactly this type of calculation and will execute it in a distributed manner, avoiding memory bottlenecks.

3.  **O-D Clustering:**
    * **Current:** Runs DBSCAN on a single machine.
    * **Spark:** While traditional DBSCAN is not easily parallelizable, Spark MLlib has implementations of scalable clustering algorithms like K-Means or Bisecting K-Means. For a more direct parallel DBSCAN, custom implementations or third-party libraries built on Spark could be used to handle massive numbers of O-D points.

4.  **Anomaly Detection:**
    * **Current:** Calculates distance for a single test trip.
    * **Spark:** The process of comparing millions of trips against their respective "normal routes" can be massively parallelized. A `crossJoin` followed by a UDF to calculate geospatial distance could find deviations across an entire fleet's data simultaneously.

By leveraging Spark, this entire pipeline can transition from a proof-of-concept on a single machine to a production-grade system capable of providing real-time insights for millions of vehicles.
