https://medium.com/analytics-vidhya/accessing-user-data-via-the-strava-api-using-stravalib-d5bee7fdde17


In [1]:
import numpy as np
import folium

from dataclasses import dataclass
from stravalib import Client

@dataclass
class StravaClient:
    access_token: str
    refresh_token: str
    client_id: str
    client_secret: str

    client = Client()

    def configure_client(self):
        self.client.access_token = self.access_token
        self.client.refresh_token = self.refresh_token
        self.client.client_id = self.client_id
        self.client.client_secret = self.client_secret

    def get_activities(self):
        self.configure_client()
        activities = list(self.client.get_activities())
        return activities

strava_credentials = StravaClient(
    access_token='c50742e984355582328f35f35191bf849e0a4098',
    refresh_token='86b84f2a3a9d1eb288b4b12d2990b7ee1aebfcff',
    client_id='8712',
    client_secret='30509b6d35b37543cfe85bee1018db8fee15e22c'
)

strava_credentials.configure_client()
activities = strava_credentials.get_activities()

In [5]:
def round_coordinates(coords, precision=3):
    """Rounds a list of GPS coordinates."""
    return [[round(x, ndigits=precision), round(y, ndigits=precision)] for x, y in coords]

def sample_coords_by_distance(coords, dists, sampling_interval=500):
    """Sample GPS coordinates based on cumulative distances.

    Args:
      coords: List of GPS coordinates in the form of (lat, lon) tuples.
      dists: List of cumulative distances in meters corresponding to each GPS coordinate.
      sampling_interval: Distance threshold in meters for selecting GPS coordinates.

    Returns:
      Sampled GPS coordinates in the form of a list of (lat, lon) tuples.
    """
    distances = np.array(dists)
    current_distance = 0
    indices = []
    for i, dist in enumerate(distances):
        if dist > current_distance + sampling_interval:
            indices.append(i)
            current_distance = dist
    indices.append(len(distances)-1)
    sampled_coords = [coords[idx] for idx in indices]
    compression_ratio = len(coords) / len(sampled_coords)
    #print("Number of sampled coordinates:", len(sampled_coords))
    #print("Compression ratio:", round(compression_ratio, 2))
    return sampled_coords

def calculate_midpoint(segment):
    """Calculates the midpoint between two GPS coordinate tuples."""
    coord1 = segment[0]
    coord2 = segment[1]

    lat_mid = round((coord1[0] + coord2[0]) / 2., ndigits=3)
    lon_mid = round((coord1[1] + coord2[1]) / 2., ndigits=3)
    midpoint = (lat_mid, lon_mid)

    return midpoint

In [40]:
client = Client()

client.access_token='c50742e984355582328f35f35191bf849e0a4098'
client.refresh_token='86b84f2a3a9d1eb288b4b12d2990b7ee1aebfcff'
client.client_id='8712'
client.client_secret='30509b6d35b37543cfe85bee1018db8fee15e22c'

types = ['latlng', 'distance']

all_segments = []
existing_segments = set()
all_midpoints = []

for activity in activities[:10]:
    activity_data=client.get_activity_streams(activity.id, types=types)
    if 'latlng' in activity_data.keys():
        coords=(activity_data['latlng'].data[:])
        dists = (activity_data['distance'].data[:])
        coords = round_coordinates(coords)
        sampled_coords = sample_coords_by_distance(coords, dists)

        segments = [(tuple(sampled_coords[i]), tuple(sampled_coords[i+1])) for i in range(len(sampled_coords)-1)]

        for segment in segments:
            segment_key = tuple(sorted(segment))
            if segment_key not in existing_segments:
                all_segments.append(tuple(segment))
                existing_segments.add(segment_key)
                midpoint = calculate_midpoint(segment)
                all_midpoints.append(midpoint)


all_segments_np = np.array(all_segments)
all_midpoints_np = np.array(all_midpoints)

In [47]:
activity_data=client.get_activity_streams(activities[20].id, types=types)
if 'latlng' in activity_data.keys():
    coords=(activity_data['latlng'].data[:])
    dists = (activity_data['distance'].data[:])
    coords = round_coordinates(coords)
    sampled_coords = sample_coords_by_distance(coords, dists)

    test_segments = [(tuple(sampled_coords[i]), tuple(sampled_coords[i+1])) for i in range(len(sampled_coords)-1)]
    test_midpoints = [calculate_midpoint(segment) for segment in test_segments]

#https://towardsdatascience.com/using-scikit-learns-binary-trees-to-efficiently-find-latitude-and-longitude-neighbors-909979bd929b

from sklearn.neighbors import BallTree
from geopy.distance import great_circle

train_list = np.array([[np.deg2rad(lat), np.deg2rad(lon)] for lat, lon in all_midpoints_np])
test_list = np.array([[np.deg2rad(lat), np.deg2rad(lon)] for lat, lon in test_midpoints])
tree = BallTree(train_list, metric='haversine')

sampling_interval = 500
search_radius = sampling_interval / 6371e3

shared_segments = []
remainder_segments = []
for idx, test_point in enumerate(test_list):
    print(test_point.reshape(1, -1))
    ind = tree.query_radius(test_point.reshape(1, -1), r=search_radius)
    if ind[0].size>0:
        full_intersect = []
        for point in ind[0]:
            seg = all_segments_np[point]
            inter = segment_intersection(seg, test_segments[idx])
            inter_size = segment_length(inter)
            if inter_size > 0:
                shared_segments.append(inter)
                full_intersect.append(inter)
        if len(full_intersect) > 0:
            remainder_seg = complementary_region(full_intersect, test_segments[idx])
            if len(remainder_seg) > 0:
                remainder_segments.append(remainder_seg)
            else:
                print("no more")


print(len(shared_segments))
shared_segments = list(map(tuple, shared_segments))
unique_shared_segments = list(set(shared_segments))
print(len(unique_shared_segments))

[[0.85245371 0.04166101]]
no more
[[0.85241881 0.04176573]]
no more
[[0.8523839  0.04187045]]
no more
[[0.85234899 0.04197517]]
no more
[[0.85233154 0.04207989]]
no more
[[0.85233154 0.04218461]]
no more
[[0.85236645 0.04228933]]
no more
[[0.85240135 0.04239405]]
no more
[[0.85241881 0.04249877]]
no more
[[0.85236645 0.04258603]]
[[0.85231409 0.04265585]]
[[0.85222682 0.04265585]]
no more
[[0.85215701 0.04263839]]
no more
[[0.85215701 0.04262094]]
no more
[[0.85222682 0.04258603]]
no more
[[0.85229663 0.04256858]]
no more
[[0.85229663 0.04262094]]
no more
[[0.85224427 0.04265585]]
no more
[[0.85217446 0.04265585]]
no more
[[0.85215701 0.04262094]]
no more
[[0.85222682 0.04258603]]
no more
[[0.85229663 0.04256858]]
no more
[[0.85231409 0.04262094]]
no more
[[0.85226173 0.04265585]]
no more
[[0.85217446 0.04265585]]
no more
[[0.85215701 0.04262094]]
no more
[[0.85220937 0.04260349]]
no more
[[0.85227918 0.04256858]]
no more
[[0.85233154 0.04262094]]
no more
[[0.85227918 0.04265585]]
no m

In [57]:
import numpy as np
from stravalib import Client
from typing import Optional, List, Tuple, NamedTuple, Iterable
from functools import partial
from shapely.geometry import LineString
from sklearn.neighbors import BallTree

import sys
sys.path.append('.')
import itertools
from utils import round_coordinates, sample_coords_by_distance, calculate_midpoint, segment_intersection, segment_length, complementary_region

class HistoricalActivities():
    def __init__(self, strava_client: Client, types: Optional[List[str]] = None):
        self.strava_client = strava_client
        self.types = types or ['latlng', 'distance']
        self.path_segments = []
        self.midpoints = []

    def fetch_path_segments(self, sampling_interval: int = 500,limit: int = 10) -> Tuple[np.ndarray, np.ndarray]:
        activities = list(itertools.islice(self.strava_client.get_activities(), limit))

        existing_segments = set()
        for activity in activities:
            activity = Activity(strava_client=self.strava_client, activity_id=activity.id, types=self.types)
            activity.fetch_path_segments(sampling_interval=sampling_interval)
    
            for segment in activity.segments:
                segment_key = tuple(sorted(segment))
                if segment_key not in existing_segments:
                    self.path_segments.append(tuple(segment))
                    existing_segments.add(segment_key)
                    midpoint = calculate_midpoint(segment)
                    self.midpoints.append(midpoint)

        self.path_segments_np = np.array(self.path_segments)
        self.midpoints_np= np.array([(mp[0], mp[1]) for mp in self.midpoints])

    def index(self):
        midpoints = np.array([[np.deg2rad(lat), np.deg2rad(lon)] for lat, lon in self.midpoints_np])
        self.tree_index = BallTree(midpoints, metric='haversine')

        return self.tree_index

class Activity:
    def __init__(self, strava_client: Client, activity_id, types: Optional[List[str]] = None):
        self.strava_client = strava_client
        self.activity_id = activity_id
        self.types = types or ['latlng', 'distance']
        self.path_segments = []
        self.strava_segments = []

    def fetch_strava_segments(self):
        #WOULD BE NICE TO NOT do strava_client.client
        activity_segments = self.strava_client.client.get_activity(self.activity_id, include_all_efforts="True").segment_efforts
        for seg in activity_segments:
            name = seg.name
            start_latlng = seg.to_dict()['segment']['end_latlng']
            end_latlng = seg.to_dict()['segment']['start_latlng']
            self.strava_segments.append({"name": name, "start_latlng": start_latlng, "end_latlng": end_latlng})

    def fetch_path_segments(self, sampling_interval=500):
        #WOULD BE NICE TO NOT do strava_client.client
        activity_data = self.strava_client.client.get_activity_streams(self.activity_id, types=self.types)
        if 'latlng' in activity_data.keys():
            coords=(activity_data['latlng'].data[:])
            dists = (activity_data['distance'].data[:])
            coords = round_coordinates(coords)
            sampled_coords = sample_coords_by_distance(coords, dists, sampling_interval=sampling_interval)

            self.path_segments = [(tuple(sampled_coords[i]), tuple(sampled_coords[i+1])) for i in range(len(sampled_coords)-1)]
            self.midpoints = np.array([calculate_midpoint(segment) for segment in self.path_segments])

    def get_new_path_segments(self, history: HistoricalActivities):
        tree = history.tree_index
        sampling_interval = 500
        search_radius = sampling_interval / 6371e3

        shared_segments = []
        remainder_segments = []
        test_list = np.array([[np.deg2rad(lat), np.deg2rad(lon)] for lat, lon in self.midpoints])

        for idx, test_point in enumerate(test_list):
            ind = tree.query_radius(test_point.reshape(1, -1), r=search_radius)
            if ind[0].size>0:
                full_intersect = []
                for point in ind[0]:
                    seg = history.path_segments_np[point]
                    inter = segment_intersection(seg, self.path_segments[idx])
                    inter_size = segment_length(inter)
                    if inter_size > 0:
                        shared_segments.append(inter)
                        full_intersect.append(inter)
                if len(full_intersect) > 0:
                    remainder_seg = complementary_region(full_intersect, self.path_segments[idx])
                    if len(remainder_seg) > 0:
                        remainder_segments.append(remainder_seg)
                    else:
                        print("no more")
        self.new_segments=remainder_segments
        self.shared_segments=shared_segments 

        return self.new_segments


In [20]:
history = HistoricalActivities(strava_client=strava_credentials)
history.fetch_path_segments()
history.index()


<sklearn.neighbors._ball_tree.BallTree at 0x7febc682e210>

In [58]:
new_activity = Activity(activity_id = activities[20].id, strava_client=strava_credentials)
new_activity.fetch_path_segments()
new_segments = new_activity.get_new_path_segments(history=history)
new_segments

no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more
no more


[]

In [44]:
import shapely.geometry as geometry
from shapely import intersection
from geopy.distance import geodesic

def segment_intersection(segment1, segment2):
    line1 = geometry.LineString(segment1)
    line2 = geometry.LineString(segment2)
    inter = list(intersection(line1, line2).coords)
    return inter

def segment_length(segment):
    if len(segment)>1:
        return geodesic(segment[0], segment[1]).kilometers
    else:
        return 0

def complementary_region(S1,  S2):
    """Compute the complementary region of S1 in S2 using Shapely."""

    united_lines = geometry.LineString(sum([list(x) for x in S1], []))
    diff = geometry.LineString(S2).difference(united_lines)

    if isinstance(diff, list):
        return [list(geom.coords[0]) for geom in diff]
    elif isinstance(diff, geometry.base.BaseGeometry):
        if diff.is_empty:
            return []
        else:
            return [list(diff.coords[0])]
    else:
        raise NotImplementedError("Unexpected type found during difference operation.")

In [57]:
#https://towardsdatascience.com/using-scikit-learns-binary-trees-to-efficiently-find-latitude-and-longitude-neighbors-909979bd929b

from sklearn.neighbors import BallTree
from geopy.distance import great_circle

train_list = np.array([[np.deg2rad(lat), np.deg2rad(lon)] for lat, lon in all_midpoints_np])
test_list = np.array([[np.deg2rad(lat), np.deg2rad(lon)] for lat, lon in test_midpoints])
tree = BallTree(train_list, metric='haversine')

sampling_interval = 500
search_radius = sampling_interval / 6371e3

shared_segments = []
remainder_segments = []
for idx, test_point in enumerate(test_list):
    ind = tree.query_radius(test_point.reshape(1, -1), r=search_radius)
    if ind[0].size>0:
        full_intersect = []
        for point in ind[0]:
            seg = all_segments_np[point]
            inter = segment_intersection(seg, test_segments[idx])
            inter_size = segment_length(inter)
            if inter_size > 0:
                shared_segments.append(inter)
                full_intersect.append(inter)
        if len(full_intersect) > 0:
            remainder_seg = complementary_region(full_intersect, test_segments[idx])
            if len(remainder_seg) > 0:
                remainder_segments.append(remainder_seg)
            else:
                print("no more")


print(len(shared_segments))
shared_segments = list(map(tuple, shared_segments))
unique_shared_segments = list(set(shared_segments))
print(len(unique_shared_segments))

nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
nomore
83
70


In [25]:
folium_map = folium.Map([48.862, 2.346], zoom_start=11)

markers = [folium.Marker(loc, icon=None, popup="").add_to(folium_map) for loc in [p for segment in unique_shared_segments for p in segment]]
folium_map

Okay I was first thinking to store and index all segments and then do a neighbour search BUT it's not clear how to best do this: for two segment A = (a1, a2) and B=(b1, b2) I should get the minimum distance of (A,B) and (A, B-reversed) as ordering matters. Not clear how to set up an appropriate threshold of distance as "close enough" between segment also...

I could do without the neighbour search entirely and just do searchsorted, if we replace each segment by its mid-point. Scalar ftw. Natural distance threshold here. If segments are about 500m, then we could use something 250m and 500m ?

## With Segment names

In [98]:
client = Client()

client.access_token='c50742e984355582328f35f35191bf849e0a4098'
client.refresh_token='86b84f2a3a9d1eb288b4b12d2990b7ee1aebfcff'
client.client_id='8712'
client.client_secret='30509b6d35b37543cfe85bee1018db8fee15e22c'



activity_segments = client.get_activity(activities[0].id, include_all_efforts="True").segment_efforts
seg = activity_segments[0]
print(seg.name)
print(seg.to_dict()['segment']['end_latlng'])
print(seg.to_dict()['segment']['start_latlng'])

commico to lidl
[48.842563, 2.3849]
[48.844209, 2.381499]


In [None]:

#get segments froma activity.
activity_segments = []
for segment_effort in activity.segment_efforts:
    name = segment_effort.name
    start_lat, start_lon = segment_effort.segment.start_latlng.__root__
    end_lat, end_lon = segment_effort.segment.end_latlng.__root__
    activity_segments.append({
        "name": name,
        "start_lat": start_lat,
        "start_lon": start_lon,
        "end_lat": end_lat,
        "end_lon": end_lon,
    })

print(activity_segments)