In [1]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from scipy.stats import gaussian_kde
from sklearn.neighbors import NearestNeighbors
from collections import defaultdict
from tqdm import tqdm
import os
from pathlib import Path
from geopy.distance import great_circle
import traceback

os.chdir("../..")
os.getcwd()

'/home/bwool/RESEARCH/TRB-Home-Data-Quality-2025'

#  Cleaning 2019 Mobile Location Data

##  Overview

This notebook processes raw mobile location data from 2019 by:
- Removing duplicate and low-accuracy observations
- Deduplicating spatially within 30-minute bins
- Adding derived features
- Saving cleaned parquet files

📦 Raw data size: **~7.5 GB**  
📁 Cleaned data size: **~2.7 GB**


##  Data Processing Flow

```mermaid
flowchart TD
    A[Read Parquet File] --> B[Sort by caid + utc_timestamp]
    B --> C[Convert utc_timestamp to datetime_pdt]
    C --> D[Drop low-accuracy pings (accuracy > 50m)]
    D --> E[Compute speed & acceleration per user]
    E --> F[Drop implausible speed (>50 m/s) or accel (>10 m/s²)]
    F --> G[Drop duplicate timestamps per user]
    G --> H[Drop duplicate location pings per 30-min bin (rounded lat/lon)]
    H --> I[Drop users with <10 observations]
    I --> J[Compute time_diff_minutes]
    J --> K[Map id_type to is_iOS]
    K --> L[Drop unneeded columns]
    L --> M[Reorder columns]
    M --> N[Write cleaned parquet]    
```

## Cleaning Function

In [2]:
# helper function to remove implausible points
def compute_speed_accel(df):
    df = df.sort_values('datetime_pdt')
    coords = df[['latitude', 'longitude']].to_numpy()
    times = (df['datetime_pdt'].astype('int64') / 1e9).to_numpy()  # seconds

    speeds = [0]
    accels = [0]
    for i in range(1, len(df)):
        d = great_circle(coords[i], coords[i-1]).meters
        dt = times[i] - times[i-1]
        if dt <= 0:
            speeds.append(speeds[-1])
            accels.append(0)
            continue
        speed = d / dt
        accel = (speed - speeds[-1]) / dt
        speeds.append(speed)
        accels.append(accel)

    df['speed_mps'] = speeds
    df['accel_mps2'] = accels
    return df

def process_parquet(filepath:str, output_filepath:str=None) -> pd.DataFrame:
    # === Load & sort ===
    data= pd.read_parquet(filepath, engine='pyarrow')
    data = data.sort_values(['caid', 'utc_timestamp'], kind='mergesort')

    # === Drop users with less than 10 observations ===
    valid_users = data['caid'].value_counts()[lambda x: x >= 10].index
    data = data[data['caid'].isin(valid_users)]

    # === Timestamps ===
    data['datetime_utc'] = pd.to_datetime(data['utc_timestamp'], unit='s', utc=True)
    data['datetime_pdt'] = data['datetime_utc'].dt.tz_convert('America/Los_Angeles')

    # === Remove low-accuracy observations ===
    data = data[data['horizontal_accuracy'] <= 50]
    
    # === Remove improbable observations ===
    """data = data.groupby('caid', group_keys=False).apply(compute_speed_accel)
    data = data[(data['speed_mps'] <= 50) & (data['accel_mps2'].abs() <= 10)]"""

    # === Remove duplicate timestamps ===
    data = data.drop_duplicates(subset=["caid", "datetime_pdt"])

    # === Remove duplicate location recordings per 30-minute bin ===
    data['time_bin'] = data['datetime_pdt'].dt.floor('30min')
    #  round lat/lon for fuzzy deduplication (4th decimal ~8.5m)
    data['lat_bin'] = data['latitude'].round(4)
    data['lon_bin'] = data['longitude'].round(4)
    data = data.sort_values('datetime_pdt').drop_duplicates(subset=['caid', 'time_bin', 'lat_bin', 'lon_bin'])
    data = data.drop(columns=['time_bin', 'lat_bin', 'lon_bin'])

    # === Again, Drop users with less than 10 observations ===
    valid_users = data['caid'].value_counts()[lambda x: x >= 10].index
    data = data[data['caid'].isin(valid_users)]

    # === Compute time difference in minutes within the same user ===
    data["time_diff_minutes"] = data.groupby("caid")["datetime_pdt"].diff().shift(-1) / 60

    # === Encode device type ===
    data['is_iOS'] = data['id_type'].map({'idfa': True, 'aaid': False})

    # === Drop uneccesary columns===
    data = data.drop(columns=['id_type', 'geo_hash', 'altitude', 'iso_country_code', 'utc_timestamp', 'datetime_utc'])

    # === Reorder ===
    new_order = ['caid', 'datetime_pdt', 'latitude', 'longitude', 'is_iOS', 'time_diff_minutes', 'horizontal_accuracy', 'ip_address']
    data = data[new_order]

    # === Save output ===
    if output_filepath:
        data.to_parquet(output_filepath, engine='pyarrow', index=False)

    return data

##  Run Processing

In [None]:
def process_2019_data():
    # === Set up folder and file list ===
    folder_2019 = "00_Data/01_Sample_Data/2019_Sample_Data"
    output_folder = "00_Data/02_Cleaned_Sample_Data/2019_Cleaned_Data"
    os.makedirs(output_folder, exist_ok=True)

    raw_files_2019 = [os.path.join(folder_2019, f) for f in os.listdir(folder_2019) if f.endswith(".parquet")]


    # === Process files with progress tracking ===
    for filepath in tqdm(raw_files_2019, desc="Processing 2019 files"):
        try:
            # Compute original file size
            original_size_bytes = os.path.getsize(filepath)

            # Build output path
            filename = os.path.basename(filepath)
            output_path = os.path.join(output_folder, filename)

            # Process and save cleaned file
            process_parquet(filepath, output_filepath=output_path)

            # Compute cleaned file size
            cleaned_size_bytes = os.path.getsize(output_path)

            # Report sizes
            original_gb = original_size_bytes / (1024 ** 3)
            cleaned_gb = cleaned_size_bytes / (1024 ** 3)

            print(f"{filename}: {original_gb:.3f} GB -> {cleaned_gb:.3f} GB")

        except Exception as e:
            print(f"\nError processing {filepath}: {type(e).__name__}: {e}")
            traceback.print_exc()            

process_2019_data()
# 7m 19.1s

Processing 2019 files:  11%|█         | 1/9 [00:48<06:27, 48.40s/it]

part-00001-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.788 GB -> 0.310 GB


Processing 2019 files:  22%|██▏       | 2/9 [01:36<05:39, 48.50s/it]

part-00002-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.761 GB -> 0.299 GB


Processing 2019 files:  33%|███▎      | 3/9 [02:23<04:46, 47.75s/it]

part-00003-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.771 GB -> 0.296 GB


Processing 2019 files:  44%|████▍     | 4/9 [03:13<04:02, 48.49s/it]

part-00000-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.782 GB -> 0.301 GB


Processing 2019 files:  56%|█████▌    | 5/9 [04:02<03:15, 48.81s/it]

part-00005-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.786 GB -> 0.308 GB


Processing 2019 files:  67%|██████▋   | 6/9 [04:52<02:27, 49.21s/it]

part-00007-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.782 GB -> 0.311 GB


Processing 2019 files:  78%|███████▊  | 7/9 [05:40<01:37, 48.78s/it]

part-00004-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.779 GB -> 0.306 GB


Processing 2019 files:  89%|████████▉ | 8/9 [06:29<00:48, 48.90s/it]

part-00008-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.767 GB -> 0.295 GB


Processing 2019 files: 100%|██████████| 9/9 [07:19<00:00, 48.78s/it]

part-00006-85a0c7d9-db42-457d-ab9d-d104038b7a1e-c000.snappy.parquet: 0.777 GB -> 0.302 GB





##  Feature Comparison: Raw vs Cleaned

| **Field Name**           | **Kept?** | **Renamed / Derived?**     | **Notes**                                      |
|--------------------------|-----------|-----------------------------|------------------------------------------------|
| `caid`                   | ✅        | —                           | Unique user ID                                 |
| `utc_timestamp`          | ❌        | ➡ `datetime_pdt`           | Converted to timezone datetime (PDT)           |
| `id_type`                | ❌        | ➡ `is_iOS`                 | Encoded to boolean                             |
| `geo_hash`               | ❌        | —                           | Dropped                                        |
| `latitude`               | ✅        | —                           | Retained                                       |
| `longitude`              | ✅        | —                           | Retained                                       |
| `horizontal_accuracy`    | ✅        | —                           | Retained (filtered: must be ≤ 50 m)                                     |
| `ip_address`             | ✅        | —                           | Retained                                       |
| `altitude`               | ❌        | —                           | Dropped                                        |
| `iso_country_code`       | ❌        | —                           | Dropped                                        |
| *(derived)*              | ✅        | `time_diff_minutes`        | Time diff to next observation (same user)      |
