# Data Processing



## Athlete Data Processing Pipeline

This notebook rewrites the data processing portion of our project so that we can interactively test and tweak the transformation steps. In this pipeline, we:
- Load athlete data (from local JSON files)
- Clean the raw activity data
- Extract heart rate zones and build a pace-to-HR regressor
- Process activities into weekly and block features
- (Optionally) process personal best blocks


In [1]:
import os
import json
import math
import datetime
import time
import logging
from statistics import stdev
from scipy import signal, stats
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## Configuration and Helper Functions

Set up file paths and basic helper functions such as loading JSON data,
extracting error‑free activities, and retrieving athlete heart rate zones.

In [2]:
DATA_DIR = "../data"  # Directory where athlete files are stored

def load_json_file(filepath):
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        logger.error(f"Error loading {filepath}: {e}")
        return None

def get_error_free_activities(activities):
    """Filter out activities that contain errors."""
    error_free = []
    for act in activities:
        if act.get("errors"):
            continue
        error_free.append(act)
    return error_free

def get_athlete_zones(athlete_data):
    """Extract HR zones from athlete data. Falls back to default zones if missing."""
    try:
        zones_raw = athlete_data["_Zones"]["heart_rate"]["zones"]
        return [zones_raw[i]['max'] for i in range(4)]
    except Exception:
        max_hr = 190
        return [round(max_hr * 0.6), round(max_hr * 0.7), round(max_hr * 0.8), round(max_hr * 0.9)]

## HR Regressor and Signal Metrics

We need a function to build a pace-to-heart‑rate regressor for cases where HR is missing.
Also, helper functions to compute signal metrics (standard deviation and frequency of peaks).

In [3]:
def calculate_signal_metrics(values):
    """Calculate standard deviation and frequency of peaks in a signal."""
    if len(values) < 2:
        return None, None
    try:
        std_val = stdev(values)
        peaks = signal.find_peaks(values)[0]
        freq = round(len(peaks) / len(values), 2) if len(peaks) > 0 else None
        return std_val, freq
    except Exception as e:
        logger.error(f"Error in calculate_signal_metrics: {e}")
        return None, None

def build_pace_to_hr_regressor(activities, athlete_id, zones):
    """
    Build a linear regressor from pace to average heart rate using run activities.
    Returns a fitted LinearRegression object and the valid data used.
    """
    data = []
    for act in activities:
        if act.get("type") == "Run":
            # Here we assume each run has an average pace (e.g., average_speed) and HR.
            hr = act.get("average_heartrate")
            pace = act.get("average_speed")
            if hr is not None and pace is not None:
                data.append({"athlete_id": athlete_id, "mean_hr": hr, "pace": pace})
    df = pd.DataFrame(data)
    if df.empty or df.shape[0] <= 5:
        return None, df
    df = df.dropna(subset=["mean_hr", "pace"])
    X = df["pace"].values.reshape(-1,1)
    y = df["mean_hr"].values.reshape(-1,1)
    reg = LinearRegression()
    reg.fit(X, y)
    return reg, df

## Activity Processing Functions

The following functions mimic your original pipeline but are now rewritten to work in a notebook.

- `get_non_run_activity_data`: Extracts basic data from any activity.
- `get_run_activity_data`: Processes run-specific data (including HR zones, cadence, elevation changes, and signal metrics).
- `extract_activity_features`: Converts a raw activity into a feature row (as a dict) for later DataFrame assembly.

In [4]:
def get_non_run_activity_data(activity, zones):
    """Extract basic fields from non-run activities."""
    # Default to activity type 34 ("Other") if not found.
    activity_types = {
        'Ride': 1, 'Run': 2, 'Swim': 3, 'Walk': 4,
        # ... other types as needed
    }
    activity_type = activity_types.get(activity.get("type"), 34)
    return {
        "activity_id": activity.get("id"),
        "activity_type": activity_type,
        "elapsed_time": activity.get("elapsed_time"),
        "distance": activity.get("distance"),
        "mean_hr": activity.get("average_heartrate")
    }

def get_run_activity_data(activity, zones, hr_regressor=None):
    """
    Process running activity to extract detailed metrics:
      - basic data (id, type, elapsed_time, distance, mean_hr)
      - if HR is missing, optionally use hr_regressor to estimate it.
      - compute cadence, elevation change, pace, and signal metrics from lap data.
    """
    basic = get_non_run_activity_data(activity, zones)
    additional = {}
    additional["cadence"] = activity.get("average_cadence")
    elev_high = activity.get("elev_high", 0)
    elev_low = activity.get("elev_low", 0)
    additional["elevation"] = elev_high - elev_low
    additional["pace"] = activity.get("average_speed")
    # Process lap data if available
    lap_data = {"hr": [], "elevation": [], "pace": []}
    laps = activity.get("laps", [])
    for lap in laps:
        hr = lap.get("average_heartrate")
        if hr is None and hr_regressor is not None:
            # Estimate HR from pace
            pace_val = lap.get("average_speed", 0)
            hr = hr_regressor.predict([[pace_val]])[0][0]
        if hr is not None:
            lap_data["hr"].append(hr)
        lap_data["elevation"].append(lap.get("total_elevation_gain", 0))
        lap_data["pace"].append(lap.get("average_speed", 0))
    # Calculate HR signal metrics
    hr_std, hr_freq = calculate_signal_metrics(lap_data["hr"]) if lap_data["hr"] else (None, None)
    additional["hr_std"] = hr_std
    additional["hr_freq"] = hr_freq
    # For simplicity, we add similar (but simplified) metrics for elevation and pace
    elev_std, elev_freq = calculate_signal_metrics(lap_data["elevation"]) if lap_data["elevation"] else (None, None)
    pace_std, pace_freq = calculate_signal_metrics(lap_data["pace"]) if lap_data["pace"] else (None, None)
    additional["elev_std"] = elev_std
    additional["elev_freq"] = elev_freq
    additional["pace_std"] = pace_std
    additional["pace_freq"] = pace_freq

    # Merge basic and additional data
    merged = {**basic, **additional}
    return merged

def extract_activity_features(activity, zones, hr_regressor=None):
    """
    Determine if an activity is a run or not, and extract corresponding features.
    """
    if activity.get("type") in ["Run", "Trail Run"]:
        features = get_run_activity_data(activity, zones, hr_regressor)
    else:
        features = get_non_run_activity_data(activity, zones)
    return features

## Processing Blocks and Weeks

We now define functions that group activities into blocks (e.g. 3-month periods)
and further split them into weeks. For simplicity, the notebook version assumes that
the activities are already sorted in chronological order.

In [5]:
def get_weeks(activities, week_duration=7):
    """
    Split activities (assumed sorted by start_date ascending) into weekly lists.
    """
    if not activities:
        return []
    weeks = []
    # Convert start_date strings to datetime objects
    for act in activities:
        act["start_date_dt"] = datetime.datetime.strptime(act["start_date"][:10], '%Y-%m-%d')
    # Sort by date (oldest first)
    activities.sort(key=lambda x: x["start_date_dt"])
    start_date = activities[0]["start_date_dt"]
    current_week = []
    for act in activities:
        if (act["start_date_dt"] - start_date).days < week_duration:
            current_week.append(act)
        else:
            weeks.append(current_week)
            current_week = [act]
            start_date = act["start_date_dt"]
    if current_week:
        weeks.append(current_week)
    return weeks

def process_activity_block(activities, zones, hr_regressor=None):
    """
    Process a block of activities:
      - Extract features for each activity.
      - Group activities into weeks and compute weekly aggregate metrics.
    Returns:
      - A DataFrame of activity-level features.
      - A DataFrame of weekly features.
    """
    features_list = []
    for act in activities:
        feat = extract_activity_features(act, zones, hr_regressor)
        # Optionally add athlete or block info here
        features_list.append(feat)
    activities_df = pd.DataFrame(features_list)
    
    # Process weekly aggregates
    weeks = get_weeks(activities)
    weekly_features = []
    for i, week in enumerate(weeks):
        week_feats = {}
        week_df = pd.DataFrame(week)
        week_feats["week_num"] = i
        week_feats["total_activities"] = len(week)
        week_feats["total_distance"] = week_df["distance"].sum() if "distance" in week_df.columns else 0
        week_feats["avg_elapsed_time"] = week_df["elapsed_time"].mean() if "elapsed_time" in week_df.columns else None
        weekly_features.append(week_feats)
    weeks_df = pd.DataFrame(weekly_features)
    return activities_df, weeks_df


## Main Transformation Function

This function loads the athlete’s raw data (athlete metadata, zones, activities),
builds the HR regressor, cleans the activities, and then processes the block and week features.

In [6]:
def transform_athlete_data(athlete_id, populate_all_from_files=True):
    """
    Main pipeline to transform athlete data.
    Expects files:
      - athlete_{athlete_id}_athlete.json
      - athlete_{athlete_id}_zones.json
      - A directory with individual activity JSON files (named as {activity_id}.json)
    Returns:
      - A dictionary containing DataFrames for raw activities and weekly aggregates.
    """
    athlete_file = os.path.join(DATA_DIR, f"athlete_{athlete_id}_athlete.json")
    zones_file = os.path.join(DATA_DIR, f"athlete_{athlete_id}_zones.json")
    activities_dir = os.path.join(DATA_DIR, str(athlete_id))
    
    athlete_data = load_json_file(athlete_file)
    zones_data = load_json_file(zones_file)
    
    if athlete_data is None or zones_data is None:
        logger.error("Athlete metadata or zones data could not be loaded.")
        return None
    
    # Combine zones into athlete_data under "_Zones"
    athlete_data["_Zones"] = zones_data
    # For simplicity, we won’t use stats in this example.
    
    # Load all activity files from the athlete directory
    all_activities = []
    if os.path.exists(activities_dir):
        for fname in os.listdir(activities_dir):
            if fname.endswith(".json") and fname[:-5].isdigit():
                activity = load_json_file(os.path.join(activities_dir, fname))
                if activity:
                    all_activities.append(activity)
    else:
        logger.error(f"Activities directory {activities_dir} does not exist.")
    
    logger.info(f"Loaded {len(all_activities)} activities for athlete {athlete_id}")
    # Filter out activities with errors
    clean_activities = get_error_free_activities(all_activities)
    
    # Get HR zones
    zones = get_athlete_zones(athlete_data)
    logger.info(f"Using HR zones: {zones}")
    
    # Build HR regressor for run activities
    hr_regressor, reg_data = build_pace_to_hr_regressor(clean_activities, athlete_id, zones)
    if hr_regressor:
        logger.info("HR regressor built successfully.")
    else:
        logger.info("Insufficient data for HR regressor; proceeding without it.")
    
    # Process activities into features and weeks
    activities_df, weeks_df = process_activity_block(clean_activities, zones, hr_regressor)
    logger.info("Processed activities into feature DataFrames.")
    
    return {
        "athlete_data": athlete_data,
        "activities_df": activities_df,
        "weeks_df": weeks_df
    }

## Testing the Pipeline

Let's run the pipeline for a given athlete (make sure the required JSON files exist in the correct folders).

In [None]:
athlete_id = 64383208  # example athlete id
result = transform_athlete_data(athlete_id)

if result:
    print("Athlete metadata:")
    print(result["athlete_data"])
    print("\nActivity-level features:")
    display(result["activities_df"].head())
    print("\nWeekly aggregates:")
    display(result["weeks_df"].head())
else:
    print("Error processing athlete data.")

# PEUT ETRE REFAIRE CAR IL A PAS TOUT PRIS EN COMPTE