In [34]:
# OS and Dotenv imports used for environment variable management, keeping API keys secure.
import os
from dotenv import load_dotenv, set_key, find_dotenv

# Requests library for making HTTP requests to the Strava API.
import requests

# Pretty print for better readability of JSON responses.
from pprint import pprint

# Pathlib for file path management
from pathlib import Path

# JSON for handling JSON data,
import json
# datetime for managing timestamps.
from datetime import datetime
# time for sleep functionality.
import time


# Helper Functions
- Strava access tokens expire after about 6 hours for security reasons. This function is the actual mechanism that communicates with Strava to get replacement tokens.
- Get access token is another function to ensure its not refreshing every time, just when needed.
Workflow: Is my current token still good?
Yes > Good news! No refresh needed
No > Call Refresh function

In [10]:
# Load environment variables
load_dotenv()
CLIENT_ID = os.getenv("STRAVA_CLIENT_ID")
CLIENT_SECRET = os.getenv("STRAVA_CLIENT_SECRET")
REFRESH_TOKEN = os.getenv("STRAVA_REFRESH_TOKEN")

def refresh_access_token():
    """
    Refresh the Strava access token using the refresh token.
    Returns the new access token and updates the .env file.
    """
    auth_url = "https://www.strava.com/oauth/token"
    
    payload = {
        'client_id': CLIENT_ID,
        'client_secret': CLIENT_SECRET,
        'refresh_token': REFRESH_TOKEN,
        'grant_type': 'refresh_token'
    }
    
    print("Refreshing access token...")
    response = requests.post(auth_url, data=payload)
    
    if response.status_code == 200:
        token_data = response.json()
        new_access_token = token_data['access_token']
        new_refresh_token = token_data['refresh_token']
        expires_at = token_data['expires_at']
        
        print(f"New access token obtained! Expires at: {expires_at}")
        
        # Update the .env file with new tokens
        env_path = find_dotenv()
        if env_path:
            set_key(env_path, "STRAVA_ACCESS_TOKEN", new_access_token)
            set_key(env_path, "STRAVA_REFRESH_TOKEN", new_refresh_token)
            set_key(env_path, "STRAVA_TOKEN_EXPIRES_AT", str(expires_at))
            print("✓ .env file updated with new tokens")
        else:
            print("⚠ Warning: .env file not found, tokens not saved")
        
        return new_access_token, new_refresh_token, expires_at
    else:
        print(f"Error refreshing token: {response.status_code} - {response.text}")
        return None, None, None
    


def get_access_token():
    """
    Get a valid access token, refreshing if necessary.
    """
    access_token = os.getenv("STRAVA_ACCESS_TOKEN")
    expires_at = os.getenv("STRAVA_TOKEN_EXPIRES_AT")
    
    # Check if token exists and is still valid
    import time
    current_time = int(time.time())
    
    if access_token and expires_at:
        if current_time < int(expires_at):
            print("Using existing access token (still valid)")
            return access_token
    
    # Token expired or doesn't exist, refresh it
    print("Token expired or missing, refreshing...")
    new_token, _, _ = refresh_access_token()
    return new_token

Firstly, lets look at the first API call, athlete.

In [25]:
# Get a valid access token (auto-refreshes if needed)
ACCESS_TOKEN = get_access_token()

if ACCESS_TOKEN:
    # Now make your API call with the valid token
    athlete_url = "https://www.strava.com/api/v3/athlete"
    headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
    
    response = requests.get(athlete_url, headers=headers)
    
    if response.status_code == 200:
        print("\n" + "="*50)
        print("ATHLETE PROFILE")
        print("="*50)
        pprint(response.json())
    else:
        print(f"Error: {response.status_code} - {response.text}")
else:
    print("Failed to obtain access token")

Token expired or missing, refreshing...
Refreshing access token...
New access token obtained! Expires at: 1759747181
✓ .env file updated with new tokens

ATHLETE PROFILE
{'badge_type_id': 0,
 'bio': None,
 'city': None,
 'country': None,
 'created_at': '2020-07-21T05:11:27Z',
 'firstname': 'William',
 'follower': None,
 'friend': None,
 'id': 64341718,
 'lastname': 'Catt',
 'premium': False,
 'profile': 'https://graph.facebook.com/2619742961675163/picture?height=256&width=256',
 'profile_medium': 'https://graph.facebook.com/2619742961675163/picture?height=256&width=256',
 'resource_state': 2,
 'sex': 'M',
 'state': None,
 'summit': False,
 'updated_at': '2023-09-25T00:56:44Z',
 'username': 'cattw',
 'weight': None}


###  Initial Thoughts based on Athlete details
- Name Display "Athlete: <  >"
- Load profile picture into a circle icon (or omit)
- Use their country as a little flag
- * these are all completely non-essential but add a little personality to the display *
Name Display, with profile picture and flag as a sort of header?

What's Next? See the activities output
>

#

In [None]:
# Step 1: Generate the authorization URL
auth_url = f"https://www.strava.com/oauth/authorize?client_id={CLIENT_ID}&response_type=code&redirect_uri=http://localhost&approval_prompt=force&scope=read,activity:read_all"

print("Visit this URL in your browser -- Manually do it, then paste the code below:")
print(auth_url)

Visit this URL in your browser:
https://www.strava.com/oauth/authorize?client_id=174374&response_type=code&redirect_uri=http://localhost&approval_prompt=force&scope=read,activity:read_all


In [None]:
# Step 2: Paste the code from the URL here
authorisation_code = "8a798c513a9f2905445a48951445ddffef8e0223"  # Replace with your actual code

token_url = "https://www.strava.com/oauth/token"

payload = {
    'client_id': CLIENT_ID,
    'client_secret': CLIENT_SECRET,
    'code': authorisation_code,
    'grant_type': 'authorisation_code'
}

response = requests.post(token_url, data=payload)

if response.status_code == 200:
    tokens = response.json()
    print("✓ Authorisation successful!")
    
    # Update .env file
    env_path = find_dotenv()
    set_key(env_path, "STRAVA_ACCESS_TOKEN", tokens['access_token'])
    set_key(env_path, "STRAVA_REFRESH_TOKEN", tokens['refresh_token'])
    set_key(env_path, "STRAVA_TOKEN_EXPIRES_AT", str(tokens['expires_at']))
    
    print("✓ .env file updated!")
else:
    print(f"Error: {response.status_code} - {response.text}")

✓ Authorization successful!
✓ .env file updated!


### Authroisation complete, now we want to start loading the activities
- We will not want to keep reloading the activities, so we'll cache them.

In [None]:
# Cache directory setup
CACHE_DIR = Path("strava_cache")
CACHE_DIR.mkdir(exist_ok=True)


def get_cache_filepath(activity_id):
    """Generate a cache file path for a specific activity."""
    return CACHE_DIR / f"activity_{activity_id}.json"


def load_cached_activity(activity_id):
    """Load an activity from cache if it exists."""
    cache_file = get_cache_filepath(activity_id)
    
    if cache_file.exists():
        print(f"✓ Loading activity {activity_id} from cache")
        with open(cache_file, 'r') as f:
            return json.load(f)
    return None

def save_activity_to_cache(activity_id, activity_data):
    """Save an activity to cache."""
    cache_file = get_cache_filepath(activity_id)
    
    cache_data = {
        'cached_at': datetime.now().isoformat(),
        'activity': activity_data
    }
    
    with open(cache_file, 'w') as f:
        json.dump(cache_data, f, indent=2)
    
    print(f"✓ Activity {activity_id} saved to cache")

def get_athlete_activities(access_token, per_page=30, page=1):
    """Fetch a list of the athlete's activities."""
    activities_url = "https://www.strava.com/api/v3/athlete/activities"
    headers = {"Authorization": f"Bearer {access_token}"}
    params = {'per_page': per_page, 'page': page}
    
    print(f"\nFetching activities (page {page}, {per_page} per page)...")
    response = requests.get(activities_url, headers=headers, params=params)
    
    if response.status_code == 200:
        activities = response.json()
        print(f"✓ Found {len(activities)} activities")
        return activities
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

def get_activity_details(activity_id, access_token, use_cache=True):
    """Get detailed information about a specific activity."""
    # Check cache first
    if use_cache:
        cached_data = load_cached_activity(activity_id)
        if cached_data:
            return cached_data['activity']
    
    # Fetch from API
    activity_url = f"https://www.strava.com/api/v3/activities/{activity_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    
    print(f"Fetching activity {activity_id} from API...")
    response = requests.get(activity_url, headers=headers)
    
    if response.status_code == 200:
        activity_data = response.json()
        print(f"✓ Activity fetched successfully")
        
        if use_cache:
            save_activity_to_cache(activity_id, activity_data)
        
        return activity_data
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

def fetch_all_activities(access_token, use_cache=True):
    """
    Fetch all activities from Strava, with caching support.
    Handles pagination automatically.
    """
    all_activities = []
    page = 1
    per_page = 200  # Maximum allowed by Strava
    
    print("Fetching all activities...")
    
    while True:
        activities = get_athlete_activities(access_token, per_page=per_page, page=page)
        
        if not activities or len(activities) == 0:
            break
        
        all_activities.extend(activities)
        print(f"✓ Fetched page {page}: {len(activities)} activities (Total so far: {len(all_activities)})")
        
        # If we got fewer than per_page, we've reached the end
        if len(activities) < per_page:
            break
        
        page += 1
        time.sleep(0.5)  # Be nice to the API
    
    print(f"\n✓ Total activities fetched: {len(all_activities)}")
    return all_activities

def fetch_and_cache_all_activity_details(activities, access_token):
    """
    Fetch detailed data for all activities and cache them.
    Handles rate limiting with exponential backoff.
    """
    print("\nFetching detailed data for all activities...")
    detailed_activities = []
    
    for i, activity in enumerate(activities, 1):
        activity_id = activity['id']
        
        # Check if already cached first
        cached = load_cached_activity(activity_id)
        if cached:
            print(f"[{i}/{len(activities)}] ✓ {activity['name']} (from cache)")
            detailed_activities.append(cached['activity'])
            continue
        
        print(f"[{i}/{len(activities)}] Processing: {activity['name']}")
        
        # Try to fetch with retry logic for rate limits
        max_retries = 5
        retry_count = 0
        
        while retry_count < max_retries:
            detailed = get_activity_details(activity_id, access_token, use_cache=True)
            
            if detailed:
                detailed_activities.append(detailed)
                break
            else:
                # Rate limit hit, wait and retry
                retry_count += 1
                wait_time = 15 * 60 * retry_count  # 15, 30, 45, 60, 75 minutes
                
                if retry_count < max_retries:
                    print(f"⚠ Rate limit hit. Waiting {wait_time/60:.0f} minutes before retry {retry_count}/{max_retries}...")
                    print(f"   Activities processed so far: {len(detailed_activities)}/{len(activities)}")
                    time.sleep(wait_time)
                else:
                    print(f"✗ Failed after {max_retries} retries. Skipping activity.")
                    break
        
        # Normal rate limiting between requests
        time.sleep(1)  # Increased from 0.3 to 1 second
    
    print(f"\n✓ Successfully processed {len(detailed_activities)}/{len(activities)} activities")
    return detailed_activities
    


def save_all_activities_summary(activities, filename="all_activities_summary.json"):
    """
    Save a summary of all activities to a single JSON file.
    """
    filepath = CACHE_DIR / filename
    
    summary_data = {
        'fetched_at': datetime.now().isoformat(),
        'total_activities': len(activities),
        'activities': activities
    }
    
    with open(filepath, 'w') as f:
        json.dump(summary_data, f, indent=2)
    
    print(f"✓ Summary saved to {filepath}")

def display_all_activities_overview(activities):
    """
    Display an overview of all activities.
    """
    print("\n" + "="*80)
    print("ALL ACTIVITIES OVERVIEW")
    print("="*80)
    
    # Group by activity type
    from collections import Counter
    activity_types = Counter([a['type'] for a in activities])
    
    print(f"\nTotal Activities: {len(activities)}")
    print(f"\nActivity Types:")
    for activity_type, count in activity_types.most_common():
        print(f"  - {activity_type}: {count}")
    
    # Calculate totals
    total_distance = sum(a.get('distance', 0) for a in activities) / 1000  # km
    total_time = sum(a.get('moving_time', 0) for a in activities) / 3600  # hours
    total_elevation = sum(a.get('total_elevation_gain', 0) for a in activities)
    
    print(f"\nTotals:")
    print(f"  - Distance: {total_distance:.2f} km")
    print(f"  - Moving Time: {total_time:.2f} hours")
    print(f"  - Elevation Gain: {total_elevation:.0f} m")
    
    print("\n" + "="*80)
    print("ACTIVITY LIST")
    print("="*80)
    
    for i, activity in enumerate(activities, 1):
        date = activity['start_date_local'][:10]  # Just the date part
        name = activity['name']
        activity_type = activity['type']
        distance = activity.get('distance', 0) / 1000
        
        print(f"{i:3d}. {date} | {activity_type:15s} | {distance:6.2f}km | {name}")

In [35]:
# Reload token
ACCESS_TOKEN = get_access_token()

# Step 1: Fetch list of all activities (lightweight)
all_activities = fetch_all_activities(ACCESS_TOKEN, use_cache=True)

# Step 2: Display overview
display_all_activities_overview(all_activities)

# Step 3: Save summary
save_all_activities_summary(all_activities)

# Step 4: Fetch and cache detailed data for ALL activities
# WARNING: This will make one API call per activity (cached ones will be skipped)
detailed_activities = fetch_and_cache_all_activity_details(all_activities, ACCESS_TOKEN)

print("\n" + "="*80)
print("✓ ALL DONE!")
print("="*80)
print(f"Summary file: strava_cache/all_activities_summary.json")
print(f"Individual activity files: strava_cache/activity_*.json")
print(f"Total activities cached: {len(detailed_activities)}")

Using existing access token (still valid)
Fetching all activities...

Fetching activities (page 1, 200 per page)...
✓ Found 200 activities
✓ Fetched page 1: 200 activities (Total so far: 200)

Fetching activities (page 2, 200 per page)...
✓ Found 161 activities
✓ Fetched page 2: 161 activities (Total so far: 361)

✓ Total activities fetched: 361

ALL ACTIVITIES OVERVIEW

Total Activities: 361

Activity Types:
  - Run: 254
  - Walk: 46
  - RockClimbing: 27
  - Workout: 12
  - Hike: 7
  - Ride: 6
  - Swim: 5
  - AlpineSki: 2
  - Snowboard: 2

Totals:
  - Distance: 2573.85 km
  - Moving Time: 439.09 hours
  - Elevation Gain: 29166 m

ACTIVITY LIST
  1. 2025-09-20 | Run             |   5.02km | Fountains abbey ParkRun!
  2. 2025-09-16 | Run             |  10.05km | Trail Hills with Lou
  3. 2025-09-16 | Walk            |   9.48km | Fudge walk
  4. 2025-09-14 | Run             |  12.23km | hilly Ilkley with andy
  5. 2025-09-13 | Run             |   5.13km | Ilkley ParkRun with Lou
  6. 202

KeyboardInterrupt: 

Didn't think about the rate limits, do the remaining activities in batches!


In [36]:
def fetch_remaining_activities_in_batches(activities, access_token, batch_size=50):
    """
    Fetch activities in batches to avoid rate limits.
    Automatically skips already cached activities.
    """
    # Find which activities still need fetching
    activities_to_fetch = []
    cached_count = 0
    
    for activity in activities:
        if not get_cache_filepath(activity['id']).exists():
            activities_to_fetch.append(activity)
        else:
            cached_count += 1
    
    print(f"Already cached: {cached_count}")
    print(f"Need to fetch: {len(activities_to_fetch)}")
    
    if len(activities_to_fetch) == 0:
        print("✓ All activities already cached!")
        return
    
    # Process in batches
    for batch_num in range(0, len(activities_to_fetch), batch_size):
        batch = activities_to_fetch[batch_num:batch_num + batch_size]
        batch_end = min(batch_num + batch_size, len(activities_to_fetch))
        
        print(f"\n{'='*60}")
        print(f"Processing batch: {batch_num + 1} to {batch_end} of {len(activities_to_fetch)}")
        print(f"{'='*60}")
        
        for i, activity in enumerate(batch, 1):
            activity_id = activity['id']
            print(f"[{batch_num + i}/{len(activities_to_fetch)}] {activity['name']}")
            
            detailed = get_activity_details(activity_id, access_token, use_cache=True)
            
            if not detailed:
                print("⚠ Rate limit likely hit. Stopping batch.")
                print(f"Resume by running this function again - it will skip cached activities.")
                return
            
            time.sleep(1.5)  # Slower to be safe
        
        print(f"\n✓ Batch complete. Waiting 5 minutes before next batch...")
        time.sleep(300)  # 5 minute break between batches
    
    print("\n✓ All activities fetched!")

In [None]:
ACCESS_TOKEN = get_access_token()

# This will automatically skip the 132 you already have cached
fetch_remaining_activities_in_batches(all_activities, ACCESS_TOKEN, batch_size=50)