In [1]:
import os
import json
import pandas as pd
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, avg, count, round

In [2]:
def get_aws_credentials(profile='default'):
    
    config = configparser.ConfigParser()
    config.read(os.path.expanduser('~/.aws/credentials'))
    
    access_key = config[profile]['aws_access_key_id']
    secret_key = config[profile]['aws_secret_access_key']
    
    return access_key, secret_key

aws_access_key, aws_secret_key = get_aws_credentials()

In [3]:
config = configparser.ConfigParser()
config.read(os.path.expanduser('~/etc/strava/config.conf'))
BUCKET_NAME = config['S3']['BUCKET_NAME']

In [5]:
# Initialize SparkSession
spark = SparkSession.builder\
    .appName("ReadProcessedDataFromS3") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .getOrCreate()

# Define S3 path for processed folder
s3_bucket = f"s3a://{BUCKET_NAME}/processed/"
activities_json = "activities.json"
activities_path = s3_bucket + activities_json
# Load JSON (or CSV) data from S3 processed folder into a DataFrame
# Modify 'json' to your file format (e.g., 'csv', 'parquet', etc.)
activities = spark.read.json(activities_path)

In [7]:
activities.show()

+-----------------+-------------+-------------+---------------+-----------------+-------------+-------------+-------------+-------+------------+-----------------------------+--------+------------+---------+--------+--------------------+--------------------+-------+-----------------+---------+-------------+----------+-----------------+-----------+----------+-----------+-------------+----------------+--------------+------+--------------------+-------------+---------+---------+-----------+--------------------+-----------+--------+-------+--------------+----------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+-----------------+-------+----+-----------+-------------+----------+----------+----------------------+------------+
|achievement_count|      athlete|athlete_count|average_cadence|average_heartrate|average_speed|average_watts|comment_count|commute|device_watts|display_hide_heartrate_option|distance|elapsed_time

In [None]:
# Only include Run actvities.
# In minutes and miles.
long_run_condition = (
    (col("sport_type") == "TrailRun") | (col("sport_type") == "Run") & \
    (col("elapsed_time") > 60) & \
    (col("distance") > 10)
)

filtered_activities = activities.filter(long_run_condition)
filtered_activities.show(5)

+-----------------+-------------+-------------+---------------+-----------------+-------------+-------------+-------------+-------+------------+-----------------------------+--------+------------+---------+--------+--------------------+--------------------+-------+-----------------+---------+-------------+----------+-----------------+-----------+----------+-----------+-------------+----------------+--------------+------+--------------------+-------------+---------+---------+-----------+--------------------+-----------+--------+-------+--------------+----------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+-----------------+-------+----+-----------+-------------+----------+----------+----------------------+------------+
|achievement_count|      athlete|athlete_count|average_cadence|average_heartrate|average_speed|average_watts|comment_count|commute|device_watts|display_hide_heartrate_option|distance|elapsed_time

In [6]:
averages = ["distance", "elapsed_time", "total_elevation_gain"]
average_metrics = [round(avg(column)).alias(f"avg_{column}") for column in averages]
long_run_count = count("sport_type").alias("number_of_long_runs")
avg_cadence_spm = round(avg("steps_per_minute")).alias("avg_cadence_spm")
avg_speed_mph = round(avg("average_speed")).alias("avg_speed_mph")
avg_heartrate_bpm = round(avg("average_heartrate")).alias("avg_heartrate_bpm")
avg_watts_ftp = round(avg("average_watts")).alias("avg_watts_ftp")

agg_expressions = average_metrics + [long_run_count, avg_cadence_spm, avg_speed_mph, avg_heartrate_bpm, avg_watts_ftp]


In [7]:
aggregated_df = filtered_activities.agg(*agg_expressions)

In [8]:
aggregated_df.show()

+------------+----------------+------------------------+-------------------+---------------+-------------+-----------------+-------------+
|avg_distance|avg_elapsed_time|avg_total_elevation_gain|number_of_long_runs|avg_cadence_spm|avg_speed_mph|avg_heartrate_bpm|avg_watts_ftp|
+------------+----------------+------------------------+-------------------+---------------+-------------+-----------------+-------------+
|        10.0|            98.0|                  1276.0|                396|          172.0|          6.0|            138.0|        289.0|
+------------+----------------+------------------------+-------------------+---------------+-------------+-----------------+-------------+



In [51]:
s3_bucket = f"s3a://{BUCKET_NAME}/processed/"
file = "stats.json"
path = s3_bucket + file
# Load JSON (or CSV) data from S3 processed folder into a DataFrame
# Modify 'json' to your file format (e.g., 'csv', 'parquet', etc.)
stats = spark.read.json(path)

In [52]:
stats.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     all_ride_totals|      all_run_totals|     all_swim_totals|  recent_ride_totals|   recent_run_totals|  recent_swim_totals|     ytd_ride_totals|      ytd_run_totals|     ytd_swim_totals|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{0, 0.0, 0:00:00,...|{681, 5334.12, 38...|{0, 0.0, 0:00:00,...|{0, 0, 0.0, 0:00:...|{30, 15, 116.12, ...|{0, 0, 0.0, 0:00:...|{0, 0.0, 0:00:00,...|{314, 2891.25, 19...|{0, 0.0, 0:00:00,...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+



In [None]:


# Step 4: Load - Save Transformed Data
output_path = "s3://your-bucket/output/transformed-data/"
df_aggregated.write.mode("overwrite").parquet(output_path)  # Save as Parquet
# Alternatively, save as JSON:
# df_aggregated.write.mode("overwrite").json(output_path)

# Step 5: Stop Spark Session
spark.stop()


FileNotFoundError: [WinError 2] The system cannot find the file specified

In [None]:
with open('data/raw/athlete.json', 'r') as j:
    athlete = json.load(j)
with open('data/raw/comments.json', 'r') as j:
    comments = json.load(j)
with open('data/raw/stats.json', 'r') as j:
    stats = json.load(j)
with open('data/raw/activities.json','r') as j:
    activities = json.load(j)

athlete = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json("data/raw/athlete.json")
stats = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json("data/processed/stats.json")
activities = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json("data/processed/activities.json")
comments = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json("data/raw/comments.json")