In [1]:
import os
import numpy as np
from collections import Counter
from typing import Generator, Optional, Dict
from fitparse import FitFile, FitParseError
from dataclasses import dataclass
import traceback
import concurrent.futures
from threading import Lock
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import csv

In [2]:
def analyze_power_fields(fit_path: str):
    """Debug function to examine power-related fields in a FIT file"""
    try:
        fit_file = FitFile(fit_path)
        
        # Look at all record fields to find power-related ones
        for record in fit_file.get_messages('record'):
            fields = record.fields
            dev_fields = record.dev_fields if hasattr(record, 'dev_fields') else []
 
            print(f"\nRecord fields:")
            for field in fields:
                if 'power' in field.name.lower():
                    print(f"Native field: {field.name} = {record.get_value(field.name)}")
                    
            print(f"\nDeveloper fields:")
            for field in dev_fields:
                if 'power' in field.name.lower():
                    print(f"Developer field: {field.name} = {record.get_value(field.name)}")
            
            break  # Just look at first record
            
    except Exception as e:
        print(f"Error analyzing {fit_path}: {e}")

# Test with a few files
import glob
for fit_file in glob.glob('data/fit/*.fit')[:50]:
    print(f"\nAnalyzing {fit_file}:")
    analyze_power_fields(fit_file)


Analyzing data/fit/9697089315.fit:

Record fields:

Developer fields:

Analyzing data/fit/13526142366.fit:

Record fields:

Developer fields:

Analyzing data/fit/4501092610.fit:

Record fields:

Developer fields:

Analyzing data/fit/7355956985.fit:

Record fields:
Native field: power = 0

Developer fields:

Analyzing data/fit/1295842048.fit:

Record fields:

Developer fields:

Analyzing data/fit/12930284250.fit:

Record fields:
Native field: power = 0

Developer fields:

Analyzing data/fit/6447648957.fit:

Record fields:

Developer fields:

Analyzing data/fit/7602898040.fit:

Record fields:
Native field: power = 40
Native field: Form Power = 21
Native field: Air Power = 0

Developer fields:

Analyzing data/fit/5149258619.fit:

Record fields:

Developer fields:

Analyzing data/fit/7416863238.fit:

Record fields:

Developer fields:

Analyzing data/fit/5606804154.fit:

Record fields:
Native field: RP_Power = None

Developer fields:

Analyzing data/fit/5131044181.fit:

Record fields:

Dev

In [17]:
def debug_activity_messages(fit_path: str):
    """Debug function to examine activity-related messages in a FIT file"""
    try:
        fit_file = FitFile(fit_path)
        
        print("\nActivity messages:")
        for message in fit_file.get_messages('activity'):
            print("\nFields:")
            for field in message.fields:
                print(f"{field.name} = {message.get_value(field.name)}")
                
        print("\nSession messages:")
        for message in fit_file.get_messages('session'):
            print("\nFields:")
            for field in message.fields:
                if 'time' in field.name.lower() or 'start' in field.name.lower():
                    print(f"{field.name} = {message.get_value(field.name)}")
        
        print("\nEvent messages (type=start):")
        for message in fit_file.get_messages('event'):
            if message.get_value('event_type') == 'start':
                print("\nFields:")
                for field in message.fields:
                    print(f"{field.name} = {message.get_value(field.name)}")

        # Look at first few records
        for i, record in enumerate(fit_file.get_messages('record')):
            if i == 0:  # Just look at first record
                print("\nRecord fields:")
                for field in record.fields:
                    print(f"{field.name} = {record.get_value(field.name)}")
                    
                # Also look at raw fields if available
                if hasattr(record, 'raw_values'):
                    print("\nRaw values:")
                    for name, value in record.raw_values.items():
                        print(f"{name} = {value}")
            break
                
    except Exception as e:
        print(f"Error analyzing {fit_path}: {e}")

# Test with a few files
import glob
for fit_file in glob.glob('data/fit/*.fit')[:5]:
    print(f"\nAnalyzing {fit_file}:")
    debug_activity_messages(fit_file)


Analyzing data/fit/9697089315.fit:

Activity messages:

Fields:
timestamp = 2023-05-09 11:11:51
total_timer_time = 2013.94
local_timestamp = 2023-05-09 07:11:51
num_sessions = 1
type = manual
event = activity
event_type = stop
event_group = None
unknown_7 = None

Session messages:

Fields:
timestamp = 2023-05-09 11:11:51
start_time = 2023-05-09 10:38:09
start_position_lat = None
start_position_long = None
total_elapsed_time = 2013.94
total_timer_time = 2013.94
time_standing = None
avg_stance_time_percent = None
avg_stance_time = None
avg_stance_time_balance = None

Event messages (type=start):

Fields:
timestamp = 2023-05-09 10:38:09
timer_trigger = manual
unknown_15 = None
event = timer
event_type = start
event_group = 0
unknown_19 = None
unknown_20 = None

Record fields:
timestamp = 2023-05-09 10:38:09
distance = 0.0
heart_rate = 78
unknown_134 = None

Analyzing data/fit/13526142366.fit:

Activity messages:

Fields:
timestamp = 2024-10-18 16:53:38
total_timer_time = 1026.013
local_t

In [24]:

def preprocess_fit_files(fit_directory: str, output_path: str = 'data/processed/activities.csv') -> None:
    """
    Process all FIT files in directory and save relevant features to CSV incrementally.
    """
    
    # Create output directory if it doesn't exist
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    
    # Get all fit files
    fit_files = [f for f in Path(fit_directory).glob('*.fit')]
    print(f"Found {len(fit_files)} FIT files")
    
    # Initialize counters
    skipped_files = 0
    processed_files = 0
    total_records = 0
    
    # Define CSV headers
    headers = ['activity_id', 'timestamp', 'sport', 'heart_rate', 'speed', 'gap',
              'cadence', 'power_ind', 'power', 'latitude', 'longitude',
              'altitude', 'time_into_activity']
    
    # Create/open CSV file with headers
    with open(output_path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=headers)
        writer.writeheader()
        
        # Process each file
        for fit_path in tqdm(fit_files):
            try:
                fit_path_str = str(fit_path)
                fit_file = FitFile(fit_path_str)
                session_msg = next(fit_file.get_messages('session'))
                if session_msg.get_value('sport') not in ['running', 'cycling']:
                    continue
                if session_msg.get_value('sub_sport') in ['indoor_running', 'indoor_cycling', 'virtual_activity']:
                    continue
                records = list(fit_file.get_messages('record'))
                has_position_data = any(r.get_value('position_lat') is not None for r in records[:60])
                if not has_position_data:
                    continue
                
                # If we get here, process the activity records
                activity_id = fit_path.stem  # filename without extension
                
                activity_records = []  # Buffer for this activity's records

                activity_start_time = None
                for message in fit_file.get_messages('session'):
                    activity_start_time = message.get_value('start_time')
                    break
                if not activity_start_time:
                    print(f'Warning: no activity start time found for {fit_path}')
                    continue

                sport = session_msg.get_value('sport')
                has_stryd = any('air power' in field.name.lower() for field in session_msg.fields)
                has_power = any('power' in field.name.lower() for field in session_msg.fields)

                power_ind = 0

                if sport == 'running':
                    power_ind = 1 if has_stryd else 0
                elif sport == 'cycling':
                    power_ind = 1 if has_power else 0
                
                for record in fit_file.get_messages('record'):
                    try:
                        power = 0
                        if sport == 'running' and has_stryd:
                                power = record.get_value('power')
                        elif sport == 'cycling' and has_power:
                            power = record.get_value('power')

                        speed = record.get_value('speed') or record.get_value('enhanced_speed')
                        altitude = record.get_value('altitude') or record.get_value('enhanced_altitude') or 0

                        # Extract relevant features
                        record_dict = {
                            'activity_id': activity_id,
                            'timestamp': record.get_value('timestamp'),
                            'sport': sport,
                            'heart_rate': record.get_value('heart_rate'),
                            'speed': speed,
                            'gap': 0,
                            'cadence': record.get_value('cadence'),
                            'power_ind': power_ind,
                            'power': power,
                            'latitude': record.get_value('position_lat'),
                            'longitude': record.get_value('position_long'),
                            'altitude': altitude,
                            'time_into_activity': int((record.get_value('timestamp') - activity_start_time).total_seconds())
                        }
                        
                        # Only add records that have at least heart rate and speed
                        if record_dict['heart_rate'] is not None and speed is not None:
                            activity_records.append(record_dict)
                            
                    except Exception as e:
                        print(f"Error processing record in {fit_path}: {str(e)}")
                        continue
                
                # Write all records for this activity
                writer.writerows(activity_records)
                
                total_records += len(activity_records)
                processed_files += 1
                
                # Flush periodically to ensure data is written to disk
                if processed_files % 100 == 0:
                    f.flush()
                
            except Exception as e:
                print(f"Error processing file {fit_path}: {str(e)}")
                skipped_files += 1
                continue
    
    print(f"\nProcessing complete:")
    print(f"Processed {processed_files} files")
    print(f"Skipped {skipped_files} files")
    print(f"Total records: {total_records}")
    print(f"\nSaved to {output_path}")


In [25]:
preprocess_fit_files('./data/fit/', output_path='./data/records.csv')

Found 3090 FIT files


100%|██████████| 3090/3090 [20:43<00:00,  2.49it/s]  


Processing complete:
Processed 1261 files
Skipped 0 files
Total records: 3769174

Saved to ./data/records.csv





In [26]:
def convert_coordinates(input_path: str, output_path: str):
    """Convert coordinates from semicircles to degrees in place."""
    
    print("Converting coordinates from semicircles to degrees...")
    
    # Conversion constant
    SEMICIRCLES_TO_DEGREES = 180.0 / (2**31)
    
    # Read CSV in chunks to handle large files
    chunk_size = 100000
    first_chunk = True
    
    for chunk in pd.read_csv(input_path, chunksize=chunk_size):
        # Convert coordinates
        chunk['latitude'] = chunk['latitude'] * SEMICIRCLES_TO_DEGREES
        chunk['longitude'] = chunk['longitude'] * SEMICIRCLES_TO_DEGREES
        
        # Write to file
        if first_chunk:
            chunk.to_csv(output_path, index=False, mode='w')
            first_chunk = False
        else:
            chunk.to_csv(output_path, index=False, mode='a', header=False)
    
    print("Conversion complete!")
    
    # Verify a few values
    df_sample = pd.read_csv(output_path, nrows=5)
    print("\nSample of converted coordinates:")
    print(df_sample[['latitude', 'longitude']].head())

In [27]:
convert_coordinates(
    input_path='./data/records.csv',
    output_path='./data/records_converted.csv'
)

Converting coordinates from semicircles to degrees...
Conversion complete!

Sample of converted coordinates:
    latitude  longitude
0  41.500702 -81.603858
1  41.500709 -81.603847
2  41.500722 -81.603832
3  41.500736 -81.603809
4  41.500753 -81.603776


In [28]:
def verify_locations(input_path: str):
    """Sample one location per 30 minutes per activity and get city-level location data."""
    
    print("Loading and preprocessing data...")
    df = pd.read_csv(input_path)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    # Sample one location per 30 minutes per activity
    sampled_locations = []
    for activity_id, group in df.groupby('activity_id'):
        # Get location at each 30-minute mark
        activity_samples = group.groupby(pd.Grouper(key='timestamp', freq='60min')).agg({
            'latitude': 'first',
            'longitude': 'first'
        }).dropna()
        sampled_locations.append(activity_samples)
    
    locations_df = pd.concat(sampled_locations)
    print(f"Sampled {len(locations_df)} locations from {len(df['activity_id'].unique())} activities")
    
    # Get unique lat/lon pairs to minimize API calls
    unique_locations = locations_df.drop_duplicates(['latitude', 'longitude'])
    print(f"Found {len(unique_locations)} unique locations")
    
    # Query Nominatim for each unique location
    location_info = {}
    for idx, row in tqdm(unique_locations.iterrows(), total=len(unique_locations)):
        try:
            url = "https://nominatim.openstreetmap.org/reverse"
            params = {
                'lat': row['latitude'],
                'lon': row['longitude'],
                'format': 'jsonv2',
                'zoom': 10  # City level
            }
            
            response = requests.get(
                url, 
                params=params,
                headers={'User-Agent': 'KineticAI/1.0'}  # Required by Nominatim
            )
            response.raise_for_status()
            data = response.json()
            
            # Get city or nearest populated place
            city = (
                data['address'].get('city') or 
                data['address'].get('town') or 
                data['address'].get('village') or
                data['address'].get('suburb')
            )
            
            state = (
                data['address'].get('state') or
                data['address'].get('state_district')
            )
            
            country = data['address'].get('country')
            
            location_key = f"{city}, {state}, {country}" if state else f"{city}, {country}"
            location_info[(row['latitude'], row['longitude'])] = location_key
            
            # Respect rate limit
            time.sleep(1)
            
        except Exception as e:
            print(f"Error getting location info for {row['latitude']}, {row['longitude']}: {str(e)}")
            continue
    
    # Count activities per location
    location_counts = Counter()
    for idx, row in locations_df.iterrows():
        loc_key = location_info.get((row['latitude'], row['longitude']))
        if loc_key:
            location_counts[loc_key] += 1
    
    # Print results
    print("\nActivity distribution by location:")
    for location, count in sorted(location_counts.items(), key=lambda x: x[1], reverse=True):
        print(f"{location}: {count} records")

In [15]:
verify_locations('./data/records_converted.csv')

Loading and preprocessing data...
Sampled 902 locations from 387 activities
Found 891 unique locations


 24%|██▍       | 217/891 [04:52<12:18,  1.10s/it]

Error getting location info for 0.0, 0.0: 'address'


100%|██████████| 891/891 [19:58<00:00,  1.35s/it]



Activity distribution by location:
Manchester, New Hampshire, United States: 292 records
Cleveland, Ohio, United States: 141 records
None, Virginia, United States: 37 records
City of Yonkers, New York, United States: 35 records
Cleveland Heights, Ohio, United States: 34 records
Goffstown, New Hampshire, United States: 24 records
Ludlow, Vermont, United States: 22 records
Bedford, New Hampshire, United States: 20 records
Hooksett, New Hampshire, United States: 19 records
Town of Bedford, New York, United States: 15 records
Shaker Heights, Ohio, United States: 15 records
Ann Arbor, Michigan, United States: 14 records
City of New York, New York, United States: 13 records
None, Hawaii, United States: 13 records
Pepper Pike, Ohio, United States: 12 records
Merrimack, New Hampshire, United States: 10 records
Dunbarton, New Hampshire, United States: 10 records
None, New York, United States: 9 records
Gates Mills, Ohio, United States: 9 records
None, California, United States: 9 records
Beach

In [29]:
import pandas as pd
import requests
from datetime import datetime, timedelta
import time
from tqdm import tqdm

def add_weather_data(input_path: str, output_path: str):
    """Add weather data to activity records, minimizing API calls by sampling every 30 minutes per activity."""
    
    print("Loading and preprocessing data...")
    df = pd.read_csv(input_path)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    # Filter out (0,0) coordinates
    df = df[~((df['latitude'] == 0) & (df['longitude'] == 0))]
    df = df.sort_values(['activity_id', 'timestamp'])
    
    # Create weather timestamp column (rounded to nearest 30 minutes)
    df['weather_timestamp'] = df['timestamp'].dt.floor('30min')
    df['lat_rounded'] = df['latitude'].round(2)
    df['lon_rounded'] = df['longitude'].round(2)
    
    # For each activity, sample location every 30 minutes
    weather_points = df.groupby(['activity_id', 'weather_timestamp']).agg({
        'lat_rounded': 'first',
        'lon_rounded': 'first'
    }).reset_index()
    
    # Get unique location-time combinations
    unique_queries = weather_points.drop_duplicates(['lat_rounded', 'lon_rounded', 'weather_timestamp'])
    print(f"Found {len(unique_queries)} unique location-time combinations")
    
    # Initialize weather data storage
    weather_cache = {}
    
    # Weather variables we want
    hourly_params = [
        'temperature_2m',
        'relative_humidity_2m',
        'dew_point_2m',
        'wind_speed_10m',
        'wind_direction_10m',
        'precipitation',
        'cloud_cover',
        'surface_pressure'
    ]
    
    print("Fetching weather data...")
    # Process in batches to avoid rate limits
    batch_size = 100
    for i in tqdm(range(0, len(unique_queries), batch_size)):
        batch = unique_queries.iloc[i:i+batch_size]
        
        for _, row in batch.iterrows():
            cache_key = (row['lat_rounded'], row['lon_rounded'], row['weather_timestamp'])
            
            if cache_key in weather_cache:
                continue
                
            # Get weather data for this location and hour
            start_date = row['weather_timestamp'].strftime('%Y-%m-%d')
            end_date = (row['weather_timestamp'] + timedelta(days=1)).strftime('%Y-%m-%d')
            
            url = 'https://archive-api.open-meteo.com/v1/archive'
            params = {
                'latitude': row['lat_rounded'],
                'longitude': row['lon_rounded'],
                'start_date': start_date,
                'end_date': end_date,
                'hourly': ','.join(hourly_params)
            }
            
            try:
                response = requests.get(url, params=params)
                response.raise_for_status()
                data = response.json()
                
                # Find the matching hour in the response
                target_hour = row['weather_timestamp']
                target_hour_str = target_hour.strftime('%Y-%m-%dT%H:00')
                
                try:
                    hour_index = data['hourly']['time'].index(target_hour_str)
                    
                    # Store weather data for this location-hour
                    weather_cache[cache_key] = {
                        param: data['hourly'][param][hour_index]
                        for param in hourly_params
                    }
                except ValueError:
                    print(f"Could not find hour {target_hour_str} in weather data")
                    continue
                
                # Respect rate limits
                time.sleep(0.1)
                
            except Exception as e:
                print(f"Error fetching weather data for {row['lat_rounded']}, {row['lon_rounded']}, {row['weather_timestamp']}: {str(e)}")
                continue
    
    print("Adding weather data to records...")
    # Create a new dataframe with weather data
    weather_data = []
    
    # Group by activity to ensure we use the right weather data within each activity
    for activity_id, group in df.groupby('activity_id'):
        # Get weather points for this activity
        activity_weather = weather_points[weather_points['activity_id'] == activity_id]
        
        # For each record in the activity
        for _, record in group.iterrows():
            # Find the closest weather timestamp for this record
            weather_matches = activity_weather[
                (activity_weather['weather_timestamp'] >= record['weather_timestamp'] - pd.Timedelta(minutes=30)) &
                (activity_weather['weather_timestamp'] <= record['weather_timestamp'] + pd.Timedelta(minutes=30))
            ]
            
            if len(weather_matches) > 0:
                # Use the closest weather point in time
                closest_weather = weather_matches.iloc[0]
                cache_key = (
                    closest_weather['lat_rounded'],
                    closest_weather['lon_rounded'],
                    closest_weather['weather_timestamp']
                )
                
                if cache_key in weather_cache:
                    weather_row = weather_cache[cache_key].copy()
                    weather_row['activity_id'] = record['activity_id']
                    weather_row['timestamp'] = record['timestamp']
                    weather_data.append(weather_row)
                    continue
            
            # If no match found or no weather data, add empty record
            weather_data.append({
                'activity_id': record['activity_id'],
                'timestamp': record['timestamp'],
                **{param: None for param in hourly_params}
            })
    
    weather_df = pd.DataFrame(weather_data)
    
    # Merge weather data with original data
    result = pd.merge(
        df.drop(['lat_rounded', 'lon_rounded', 'weather_timestamp'], axis=1),
        weather_df,
        on=['activity_id', 'timestamp'],
        how='left'
    )
    
    # Save to new CSV
    result.to_csv(output_path, index=False)
    print(f"Saved enriched data to {output_path}")
    
    # Print some stats
    print("\nWeather data statistics:")
    for param in hourly_params:
        missing = result[param].isna().sum()
        print(f"{param}: {missing} missing values ({missing/len(result)*100:.1f}%)")

In [30]:
add_weather_data(
    input_path='./data/records_converted.csv',
    output_path='./data/records_with_weather.csv'
)

Loading and preprocessing data...
Found 3395 unique location-time combinations
Fetching weather data...


100%|██████████| 34/34 [29:21<00:00, 51.82s/it]


Adding weather data to records...
Saved enriched data to ./data/records_with_weather.csv

Weather data statistics:
temperature_2m: 0 missing values (0.0%)
relative_humidity_2m: 0 missing values (0.0%)
dew_point_2m: 0 missing values (0.0%)
wind_speed_10m: 0 missing values (0.0%)
wind_direction_10m: 0 missing values (0.0%)
precipitation: 0 missing values (0.0%)
cloud_cover: 0 missing values (0.0%)
surface_pressure: 0 missing values (0.0%)


Todo:

- [ ] Add time into activity
- [ ] Strip Garmin running power
- [ ] Make sure Stryd running power is used
- [ ] Calculate GAP for running

In [31]:
def calculate_grade_adjusted_speed(df: pd.DataFrame, window_size: int = 5) -> pd.DataFrame:
    """
    Calculate grade adjusted speed using a rolling window to determine gradient.
    
    Args:
        df: DataFrame with latitude, longitude, altitude, and speed columns
        window_size: Number of records to look forward/backward for gradient calculation
    """
    
    def calculate_gradient(group):
        """Calculate gradient for each point using surrounding records"""
        
        # Convert lat/lon to distances
        R = 6371000  # Earth radius in meters
        
        # Convert latitude and longitude to radians
        lat = np.radians(group['latitude'])
        lon = np.radians(group['longitude'])
        
        # Calculate distances between consecutive points
        dlat = lat.diff()
        dlon = dlon = lon.diff()
        
        # Haversine formula for distance
        a = np.sin(dlat/2)**2 + np.cos(lat) * np.cos(lat.shift()) * np.sin(dlon/2)**2
        c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
        distances = R * c
        
        # Calculate elevation changes
        elevation_changes = group['altitude'].diff()
        
        # Calculate gradients
        gradients = elevation_changes / distances
        
        # Replace inf/nan with 0
        gradients = gradients.fillna(0).replace([np.inf, -np.inf], 0)
        
        # Use rolling average to smooth gradients
        gradients = gradients.rolling(window=window_size, center=True).mean().fillna(0)
        
        return gradients
    
    def calculate_relative_cost(gradient):
        """Calculate relative cost using the formula"""
        i = gradient * 100  # Convert to percentage
        return 15.14 * (i/100)**2 - 2.896 * (i/100)
    
    # Group by activity_id to ensure we don't calculate gradients across activities
    result = []
    for activity_id, group in df.groupby('activity_id'):
        if group['sport'].iloc[0] != 'running':
            # Skip non-running activities
            group['gradient'] = 0
            group['grade_adjusted_speed'] = group['speed']
            result.append(group)
            continue
            
        # Calculate gradients
        group = group.copy()
        group['gradient'] = calculate_gradient(group)
        
        # Calculate relative cost
        group['relative_cost'] = calculate_relative_cost(group['gradient'])
        
        # Calculate grade adjusted speed
        group['grade_adjusted_speed'] = group['speed'] / (1 + group['relative_cost'])
        
        result.append(group)
    
    return pd.concat(result)

df = pd.read_csv('data/records_with_weather.csv')
df = calculate_grade_adjusted_speed(df)
df.to_csv('data/records_with_grade.csv', index=False)

# Print some statistics
print("\nGradient statistics for running activities:")
running_data = df[df['sport'] == 'running']
print(running_data['gradient'].describe())
print("\nGrade adjusted speed vs original speed (running only):")
print(pd.DataFrame({
    'original_speed': running_data['speed'],
    'grade_adjusted_speed': running_data['grade_adjusted_speed']
}).describe())


Gradient statistics for running activities:
count    2.352809e+06
mean     1.654370e-03
std      1.069148e-01
min     -2.401199e+01
25%     -1.311229e-02
50%      0.000000e+00
75%      1.451042e-02
max      2.330814e+01
Name: gradient, dtype: float64

Grade adjusted speed vs original speed (running only):
       original_speed  grade_adjusted_speed
count    2.352809e+06          2.352809e+06
mean     2.941069e+00          2.895937e+00
std      5.264921e-01          5.910295e-01
min      0.000000e+00          0.000000e+00
25%      2.731000e+00          2.687000e+00
50%      2.967000e+00          2.948000e+00
75%      3.182000e+00          3.181356e+00
max      2.185200e+01          2.185200e+01
