In [None]:
# Perform necessary imports
from dotenv import load_dotenv
import os
import requests
import json
from datetime import datetime
import pandas as pd
import time

# Load variables from .env file
load_dotenv()

In [None]:
# Read environment variables
BASE_URL = os.getenv("BASE_URL")
OAUTH_ENDPOINT = os.getenv("OAUTH_ENDPOINT")
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN")
CLIENT_ID = os.getenv("CLIENT_ID")
REFRESH_TOKEN = os.getenv("REFRESH_TOKEN")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
AUTHORIZATION_CODE = os.getenv("AUTHORIZATION_CODE")

In [None]:
# Automated Retrieval of Tokens - Code flow: if strava_tokens.json file doesn't exist perform OAUTH authorization using the authorization code generated manually (this step is done only once)
# If the file exists, then use that file's access_token field for fetching data further. Also while using the file, we need to ensure that the token is not expired, if expired, re-generate the access token using refresh token
strava_tokens_file = 'strava_tokens.json'

if os.path.exists(strava_tokens_file):

    # Get the tokens from file to connect to Strava
    with open('strava_tokens.json') as json_file:
        strava_tokens = json.load(json_file)

    # If access_token has expired then 
    # use the refresh_token to get the new access_token
    # Make Strava auth API call with current refresh token
    if strava_tokens['expires_at'] < time.time():
        response = requests.post(
                            OAUTH_ENDPOINT,
                            data = {
                                    'client_id': CLIENT_ID,
                                    'client_secret': CLIENT_SECRET,
                                    'grant_type': 'refresh_token',
                                    'refresh_token': strava_tokens['refresh_token']
                                    }
                        )
        new_strava_tokens = response.json()
    
        # Save new tokens to file
        with open('strava_tokens.json', 'w') as outfile:
            json.dump(new_strava_tokens, outfile)
    
        # Use new Strava tokens from now
        strava_tokens = new_strava_tokens

else:
    # Perform OAuth authentication to generate access tokens for reading activities
    params = {
        'client_id': CLIENT_ID,
        'client_secret': CLIENT_SECRET,
        'code': AUTHORIZATION_CODE,
        'grant_type': 'authorization_code'
    }

    authentication_response = requests.post(OAUTH_ENDPOINT, params=params)

    if authentication_response.status_code == 200:
        authentication_response_data = authentication_response.json()
        print(json.dumps(authentication_response_data, indent=4))
    else:
        print("Error: ", authentication_response)

    # Save tokens to file
    with open('strava_tokens.json', 'w') as outfile:
        json.dump(authentication_response_data, outfile)

In [None]:
# Load the strava_tokens json file that was generated from the previous step
with open('strava_tokens.json') as json_file:
    strava_tokens = json.load(json_file)

#Loop through all activities
page = 1
activities_endpoint = f'{BASE_URL}/activities'
access_token = strava_tokens['access_token']

## Create the dataframe ready for the API call to store your activity data
activities = pd.DataFrame(
    columns = [
            "id",
            "name",
            "start_date_local",
            "type",
            "distance",
            "moving_time",
            "elapsed_time",
            "total_elevation_gain",
            "end_latlng",
            "external_id"
    ]
)

while True:
    # get page of activities from Strava
    activities_response = requests.get(f'{activities_endpoint}?access_token={access_token}&per_page=200&page={str(page)}')
    response_json = activities_response.json()
    
    # if no results then exit loop
    if (not response_json):
        break
    
    # otherwise add new data to dataframe
    for x in range(len(response_json)):
        activities.loc[x + (page-1)*200,'id'] = response_json[x]['id']
        activities.loc[x + (page-1)*200,'name'] = response_json[x]['name']
        activities.loc[x + (page-1)*200,'start_date_local'] = response_json[x]['start_date_local']
        activities.loc[x + (page-1)*200,'type'] = response_json[x]['type']
        activities.loc[x + (page-1)*200,'distance'] = response_json[x]['distance']
        activities.loc[x + (page-1)*200,'moving_time'] = response_json[x]['moving_time']
        activities.loc[x + (page-1)*200,'elapsed_time'] = response_json[x]['elapsed_time']
        activities.loc[x + (page-1)*200,'total_elevation_gain'] = response_json[x]['total_elevation_gain']
        activities.loc[x + (page-1)*200,'end_latlng'] = response_json[x]['end_latlng']
        activities.loc[x + (page-1)*200,'external_id'] = response_json[x]['external_id']
    
    # increment page
    page += 1

# write the records to a CSV file
activities.to_csv('strava_activities.csv')

In [None]:
runs = activities[activities['type'] == "Run"]
runs

In [None]:
# append a new columns to the runs dataframe signifying the day of run
for idx, row in runs.iterrows():
    tstamp = datetime.strptime(row['start_date_local'], '%Y-%m-%dT%H:%M:%SZ')
    runs.at[idx, 'day_of_week'] = tstamp.strftime('%A')

runs

In [None]:
day_total_distance = runs.groupby(['day_of_week'])['distance'].sum().reset_index()
day_total_distance['total_distance'] = day_total_distance['distance'].sum()

day_total_distance['normalized_distance'] = day_total_distance['distance'] / day_total_distance['total_distance'] * 100
day_total_distance

day_total_counts = runs.groupby(['day_of_week'])['id'].count().reset_index()
day_total_counts.rename(columns={'id': 'day_counts'}, inplace=True)
day_total_counts['normalized_counts'] = 100 * (day_total_counts['day_counts'] / runs.shape[0])

day_total_distance = day_total_distance.merge(day_total_counts, on='day_of_week', how='inner')
day_total_distance

In [None]:
# define an ordering to appear for printing winning outcomes & use another array for storing relevant values
weekdays_names = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
weekdays_runs_count = []
weekdays_runs_distance = []
for d in weekdays_names:
    weekdays_runs_count.append(round(day_total_distance[day_total_distance['day_of_week'] == d]['normalized_counts'].iloc[0], 2))
    weekdays_runs_distance.append(round(day_total_distance[day_total_distance['day_of_week'] == d]['normalized_distance'].iloc[0], 2))


In [None]:
# code copied from matplotlib documentation
import matplotlib.pyplot as plt
import numpy as np

from matplotlib.patches import Circle, RegularPolygon
from matplotlib.path import Path
from matplotlib.projections import register_projection
from matplotlib.projections.polar import PolarAxes
from matplotlib.spines import Spine
from matplotlib.transforms import Affine2D


def radar_factory(num_vars, frame='circle'):
    """
    Create a radar chart with `num_vars` Axes.

    This function creates a RadarAxes projection and registers it.

    Parameters
    ----------
    num_vars : int
        Number of variables for radar chart.
    frame : {'circle', 'polygon'}
        Shape of frame surrounding Axes.

    """
    # calculate evenly-spaced axis angles
    theta = np.linspace(0, 2*np.pi, num_vars, endpoint=False)

    class RadarTransform(PolarAxes.PolarTransform):

        def transform_path_non_affine(self, path):
            # Paths with non-unit interpolation steps correspond to gridlines,
            # in which case we force interpolation (to defeat PolarTransform's
            # autoconversion to circular arcs).
            if path._interpolation_steps > 1:
                path = path.interpolated(num_vars)
            return Path(self.transform(path.vertices), path.codes)

    class RadarAxes(PolarAxes):

        name = 'radar'
        PolarTransform = RadarTransform

        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # rotate plot such that the first axis is at the top
            self.set_theta_zero_location('N')

        def fill(self, *args, closed=True, **kwargs):
            """Override fill so that line is closed by default"""
            return super().fill(closed=closed, *args, **kwargs)

        def plot(self, *args, **kwargs):
            """Override plot so that line is closed by default"""
            lines = super().plot(*args, **kwargs)
            for line in lines:
                self._close_line(line)

        def _close_line(self, line):
            x, y = line.get_data()
            # FIXME: markers at x[0], y[0] get doubled-up
            if x[0] != x[-1]:
                x = np.append(x, x[0])
                y = np.append(y, y[0])
                line.set_data(x, y)

        def set_varlabels(self, labels):
            self.set_thetagrids(np.degrees(theta), labels)

        def _gen_axes_patch(self):
            # The Axes patch must be centered at (0.5, 0.5) and of radius 0.5
            # in axes coordinates.
            if frame == 'circle':
                return Circle((0.5, 0.5), 0.5)
            elif frame == 'polygon':
                return RegularPolygon((0.5, 0.5), num_vars,
                                      radius=.5, edgecolor="k")
            else:
                raise ValueError("Unknown value for 'frame': %s" % frame)

        def _gen_axes_spines(self):
            if frame == 'circle':
                return super()._gen_axes_spines()
            elif frame == 'polygon':
                # spine_type must be 'left'/'right'/'top'/'bottom'/'circle'.
                spine = Spine(axes=self,
                              spine_type='circle',
                              path=Path.unit_regular_polygon(num_vars))
                # unit_regular_polygon gives a polygon of radius 1 centered at
                # (0, 0) but we want a polygon of radius 0.5 centered at (0.5,
                # 0.5) in axes coordinates.
                spine.set_transform(Affine2D().scale(.5).translate(.5, .5)
                                    + self.transAxes)
                return {'polar': spine}
            else:
                raise ValueError("Unknown value for 'frame': %s" % frame)

    register_projection(RadarAxes)
    return theta

In [None]:
# Radar Plot for Normalized Run Counts + Normalized Distance across days of week!
N = len(weekdays_names)
theta = radar_factory(N, frame='polygon')
count_data = weekdays_runs_count
distance_data = weekdays_runs_distance


fig, axs = plt.subplots(figsize=(9, 9), subplot_kw=dict(projection='radar'))
fig.subplots_adjust(wspace=0.25, hspace=0.20, top=0.85, bottom=0.05)
axs.set_title('Strava Recorded Runs Across Different Days of Week', weight='bold', size='medium', position=(0.5, 1.1), horizontalalignment='center', verticalalignment='center', fontsize=18)

# for normalized run counts
axs.plot(theta, count_data, color='r')
axs.fill(theta, count_data, facecolor='r', alpha=0.15, label='Number of Runs (%)')

# for normalized running distance
axs.plot(theta, distance_data, color='g')
axs.fill(theta, distance_data, facecolor='g', alpha=0.15, label='Distance Covered (%)')

axs.set_varlabels(weekdays_names)
axs.legend(frameon=False)


plt.show()