# Strava data ingestion

This notebook authenticates with the Strava API, fetches athletes data and activities and store them in JSON, CSV, PostgreSQL Database.

---

## Dependencies and configuration

Import everything needed and set up configuration (constants ...)

In [6]:
import json
import time
from pathlib import Path
from typing import Dict, Any
import threading
import requests
import polars as pl

from utils import config

In [2]:
_token_lock = threading.Lock()

def _save_token(data: Dict[str, Any]):
    config.TOKEN_FILE.write_text(json.dumps(data))

def _load_token():
    if config.TOKEN_FILE.exists():
        try:
            return json.loads(config.TOKEN_FILE.read_text())
        except json.JSONDecodeError:
            return None
    return None

def refresh_access_token(force=False) -> str:
    with _token_lock:
        cached = _load_token()
        now = time.time()
        if cached and not force and cached.get('expires_at', 0) - 30 > now:
            return cached['access_token']
        resp = requests.post(
            config.TOKEN_URL,
            data={
                'client_id': config.STRAVA_CLIENT_ID,
                'client_secret': config.STRAVA_CLIENT_SECRET,
                'grant_type': 'refresh_token',
                'refresh_token': config.STRAVA_REFRESH_TOKEN,
            }, timeout=30
        )
        if resp.status_code != 200:
            raise RuntimeError(f'Token refresh failed: {resp.status_code} {resp.text}')
        data = resp.json()
        token_record = {
            'access_token': data['access_token'],
            'expires_at': data['expires_at']
        }
        _save_token(token_record)
        return token_record['access_token']
    
def get_authorization_code() -> str:
    resp = requests.post(
        config.TOKEN_URL,
        data={
            'client_id': config.STRAVA_CLIENT_ID,
            'client_secret': config.STRAVA_CLIENT_SECRET,
            'code': config.STRAVA_USER_AUTHORIZATION_CODE,
            'grant_type': 'authorization_code',
        }, timeout=30
    )
    if resp.status_code != 200:
        raise RuntimeError(f'Authorization code exchange failed: {resp.status_code} {resp.text}')
    data = resp.json()
    token_record = {
        'access_token': data['access_token'],
        'expires_at': data['expires_at']
    }
    _save_token(token_record)
    return token_record['access_token']

# -----------------------------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------------------------
# -----------------------------------------------------------------------------------------------------

SESSION = requests.Session()

def api_get(path: str, params: Dict[str, Any] = None, retries: int = 3):
    url = f"{config.BASE_URL.rstrip('/')}/{path.lstrip('/')}"
    for attempt in range(retries):
        token = refresh_access_token()
        resp = SESSION.get(url, params=params, headers={'Authorization': f'Bearer {token}'}, timeout=60)

        print(f"GET {url} - Status: {resp.status_code}")

        if resp.status_code == 401:
            # Unauthorized, possibly token expired
            token = refresh_access_token(force=True)
            continue

        elif resp.status_code >= 500:
            # Server error, retry
            time.sleep(2 ** attempt)
            continue

        elif resp.status_code != 200:
            raise RuntimeError(f'API request failed: {resp.status_code} {resp.text}')

        return resp.json()
    raise RuntimeError(f'Exceeded retries for {url}')

In [3]:
get_authorization_code()

'b7b4dda1e219aeabb0a7813bed748fe4b34038fb'

## Connect to the API and fetches data.

Data is then stored in CSV or JSON files.

### 1: Athlete data

fetc basic athelte data - profile.

In [4]:
athlete_raw = api_get('athlete')  # triggers headers

# Normalize/convert the JSON payload to a Polars DataFrame
try:
    athlete_data = pl.json_normalize(athlete_raw)
except Exception:
    athlete_data = pl.from_dicts([athlete_raw])

print(f"Athlete: {athlete_data}")

# Persist as JSON (row-oriented to mirror a list of records)
athlete_data.write_json(str(config.ATHLETE_DATA))

GET https://www.strava.com/api/v3/athlete - Status: 200
Athlete: shape: (1, 33)
┌──────────┬──────────┬─────────────┬───────────┬───┬─────────────┬─────┬─────────────┬────────────┐
│ id       ┆ username ┆ resource_st ┆ firstname ┆ … ┆ postable_cl ┆ ftp ┆ bikes       ┆ shoes      │
│ ---      ┆ ---      ┆ ate         ┆ ---       ┆   ┆ ubs_count   ┆ --- ┆ ---         ┆ ---        │
│ i64      ┆ null     ┆ ---         ┆ str       ┆   ┆ ---         ┆ i64 ┆ list[struct ┆ list[struc │
│          ┆          ┆ i64         ┆           ┆   ┆ i64         ┆     ┆ [8]]        ┆ t[8]]      │
╞══════════╪══════════╪═════════════╪═══════════╪═══╪═════════════╪═════╪═════════════╪════════════╡
│ 10097604 ┆ null     ┆ 3           ┆ Baptiste  ┆ … ┆ 2           ┆ 230 ┆ [{"b7438340 ┆ [{"g221679 │
│          ┆          ┆             ┆           ┆   ┆             ┆     ┆ ",false,"Tr ┆ 02",false, │
│          ┆          ┆             ┆           ┆   ┆             ┆     ┆ iban 520…   ┆ "Saucony   │
│          

### 2. Activities data

Get all activities for this athlete by 200 batches.

The 'all activities' DF is concatanated with the one containing fetched activities through the last request.

In [5]:
ACTIVITIES_PER_PAGE = 200

activities_df = None
athlete_activities = pl.DataFrame()
page = 1

while True:
    print(f"Fetching activities page {max(1, (athlete_activities.height // ACTIVITIES_PER_PAGE) + 1)} - {ACTIVITIES_PER_PAGE} per page")
    activities_raw = api_get('athlete/activities', params={'page': page, 'per_page': ACTIVITIES_PER_PAGE})
    page += 1

    # Convert list[dict] to Polars DataFrame
    activities_df = pl.from_dicts(activities_raw) if activities_raw else pl.DataFrame()

    # Use diagonal concat to allow differing schemas between pages
    athlete_activities = pl.concat([athlete_activities, activities_df], how="diagonal", rechunk=True) if not athlete_activities.is_empty() else activities_df

    if activities_df.height < ACTIVITIES_PER_PAGE:
        break

print(f"Fetched {athlete_activities.height} activities")

# Write in a line-delimited JSON format similar to pandas orient='records', lines=True
athlete_activities.write_ndjson(str(config.ATHLETE_ACTIVITIES))

Fetching activities page 1 - 200 per page
GET https://www.strava.com/api/v3/athlete/activities - Status: 200
Fetching activities page 2 - 200 per page
GET https://www.strava.com/api/v3/athlete/activities - Status: 200
Fetching activities page 3 - 200 per page
GET https://www.strava.com/api/v3/athlete/activities - Status: 200
Fetching activities page 4 - 200 per page
GET https://www.strava.com/api/v3/athlete/activities - Status: 200
Fetched 709 activities


### 3. Stream data

Get for each activities all available stream. This includes :

- Heart rate
- Pace / Speed
- Elevation
- Cadance (bike)
- Watts

In [None]:
if 'athlete_activities' not in locals() or athlete_activities is None or not isinstance(athlete_activities, pl.DataFrame) or athlete_activities.is_empty():
    if config.ATHLETE_ACTIVITIES.exists():
        # Read line-delimited JSON
        athlete_activities = pl.read_ndjson(str(config.ATHLETE_ACTIVITIES))
    else:
        raise FileNotFoundError(f"{config.ATHLETE_ACTIVITIES} does not exist. Please fetch activities first.")

activity_ids = athlete_activities.get_column('id').to_list()


def fetch_activity_streams(activity_id: int) -> Dict[str, Any]:
    types = ['time', 'latlng', 'distance', 'altitude', 'velocity_smooth', 'heartrate', 'cadence', 'watts']

    path = f'activities/{activity_id}/streams'
    params = {'keys': ','.join(types), 'key_by_type': 'true'}
    return api_get(path, params=params)


for activity_id in activity_ids:
    # Skip manual activities
    manual_df = athlete_activities.filter(pl.col('id') == activity_id).select('manual')
    is_manual = manual_df.to_series().item() if not manual_df.is_empty() else False

    if is_manual:
        print(f"Skipping manual activity {activity_id}")
        continue

    # if stream file already exists, skip (respect configured suffix)
    stream_file = Path(f"{config.STREAM_PATH}{activity_id}.json")
    if stream_file.exists():
        print(f"Skipping existing streams for activity {activity_id}")
        continue

    streams_json = fetch_activity_streams(activity_id)

    # Normalize JSON and persist using Polars
    try:
        streams_df = pl.json_normalize(streams_json)
    except Exception:
        # Fallback if json_normalize fails due to nested/irregular structure
        streams_df = pl.from_dicts([streams_json]) if isinstance(streams_json, dict) else pl.from_dicts(streams_json)

    # Write line-delimited JSON to match existing data pattern
    streams_df.write_ndjson(str(stream_file))

GET https://www.strava.com/api/v3/activities/15863024854/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15843027348/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15820804606/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15812832711/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15799475039/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15784031938/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15733260383/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15697711082/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15695740011/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15663331065/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15645363840/streams - Status: 200
GET https://www.strava.com/api/v3/activities/15619585116/streams - Status: 200
GET https://www.strava.com/api/v3/activities/1561540