In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date


In [2]:
os.environ["HADOOP_HOME"] = r"C:\hadoop"
# CRITICAL: Add bin directory to PATH so hadoop.dll can be found
os.environ["PATH"] = r"C:\hadoop\bin;" + os.environ.get("PATH", "")

print(f"HADOOP_HOME set to: {os.environ['HADOOP_HOME']}")
print(f"Checking for winutils.exe: {os.path.exists(r'C:\hadoop\bin\winutils.exe')}")
print(f"Checking for hadoop.dll: {os.path.exists(r'C:\hadoop\bin\hadoop.dll')}")

# Create SparkSession
spark = SparkSession.builder \
    .appName("FitnessTrackerETL") \
    .master("local[*]") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

# Suppress verbose logging
spark.sparkContext.setLogLevel("ERROR")

print("SparkSession created successfully!")


HADOOP_HOME set to: C:\hadoop
Checking for winutils.exe: True
Checking for hadoop.dll: True
SparkSession created successfully!


In [3]:
raw_data_path = "../data_lake/raw/synthetic_user_data/"
# Read only Parquet files using pathGlobFilter
df = spark.read \
    .option("pathGlobFilter", "*.parquet") \
    .option("recursiveFileLookup", "true") \
    .parquet(raw_data_path)

# Convert date string to actual date type
df = df.withColumn("date", to_date(col("date")))

df.printSchema()
df.show(5)
print(f"\nTotal records: {df.count()}")

root
 |-- user_id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- steps: long (nullable = true)
 |-- calories_burned: double (nullable = true)
 |-- heart_rate_avg: long (nullable = true)
 |-- sleep_hours: double (nullable = true)
 |-- activity_type: string (nullable = true)

+-------+----------+-----+---------------+--------------+-----------+-------------+
|user_id|      date|steps|calories_burned|heart_rate_avg|sleep_hours|activity_type|
+-------+----------+-----+---------------+--------------+-----------+-------------+
|      1|2023-06-26|10686|         950.16|           134|       5.28|      running|
|      2|2023-06-26| 2062|          838.2|           127|       5.31|      cycling|
|      3|2023-06-26| 1061|          826.0|           103|       9.94|  gym_workout|
|      4|2023-06-26| 7028|         1105.4|           128|       4.08|       hiking|
|      5|2023-06-26| 3980|          306.2|           107|        5.8|      walking|
+-------+----------+-----+-----------

In [4]:
# Data Cleaning
print("Checking for null values in each column")
for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    print(f"- Column '{column}': {null_count} null values")

Checking for null values in each column
- Column 'user_id': 0 null values
- Column 'date': 0 null values
- Column 'steps': 0 null values
- Column 'calories_burned': 0 null values
- Column 'heart_rate_avg': 0 null values
- Column 'sleep_hours': 0 null values
- Column 'activity_type': 0 null values


In [5]:
# Outlier Detection: Summary Statistics
print("Descriptive statistics for numerical columns:")
df.describe(['steps', 'calories_burned', 'heart_rate_avg', 'sleep_hours']).show()

Descriptive statistics for numerical columns:
+-------+-----------------+-----------------+------------------+------------------+
|summary|            steps|  calories_burned|    heart_rate_avg|       sleep_hours|
+-------+-----------------+-----------------+------------------+------------------+
|  count|           358497|           358497|            358497|            358497|
|   mean|6347.601795830928|754.0926114026058|119.52330145022134| 6.999896791326002|
| stddev|6444.956548578772|441.3335340242217|26.986836543161438|1.7303321843744777|
|    min|               50|            100.0|                60|               4.0|
|    max|            24999|          2245.55|               169|              10.0|
+-------+-----------------+-----------------+------------------+------------------+



In [6]:
# Feature Engineering
from pyspark.sql.functions import dayofweek, date_format, when

df_transformed = df.withColumn("day_of_week", date_format(col("date"), "E"))

df_transformed = df_transformed.withColumn(
    "calories_to_steps_ratio",
    when(col("steps") > 0, col("calories_burned") / col("steps")).otherwise(0)
)

print("Transformed DataFrame with new features:")
df_transformed.show(10)

Transformed DataFrame with new features:
+-------+----------+-----+---------------+--------------+-----------+-------------+-----------+-----------------------+
|user_id|      date|steps|calories_burned|heart_rate_avg|sleep_hours|activity_type|day_of_week|calories_to_steps_ratio|
+-------+----------+-----+---------------+--------------+-----------+-------------+-----------+-----------------------+
|      1|2023-06-26|10686|         950.16|           134|       5.28|      running|        Mon|    0.08891633913531724|
|      2|2023-06-26| 2062|          838.2|           127|       5.31|      cycling|        Mon|     0.4064985451018429|
|      3|2023-06-26| 1061|          826.0|           103|       9.94|  gym_workout|        Mon|     0.7785108388312912|
|      4|2023-06-26| 7028|         1105.4|           128|       4.08|       hiking|        Mon|    0.15728514513375072|
|      5|2023-06-26| 3980|          306.2|           107|        5.8|      walking|        Mon|    0.07693467336683417|

In [7]:
# Aggregations
from pyspark.sql.functions import avg, max, min

activity_summary_df = df_transformed.groupBy("activity_type").agg(
    avg("steps").alias("avg_steps"),
    avg("calories_burned").alias("avg_calories"),
    avg("heart_rate_avg").alias("avg_hr"),
    avg("sleep_hours").alias("avg_sleep")
)
print('Activity Summary Table')
activity_summary_df.show()

Activity Summary Table
+-------------+------------------+------------------+------------------+------------------+
|activity_type|         avg_steps|      avg_calories|            avg_hr|         avg_sleep|
+-------------+------------------+------------------+------------------+------------------+
|       hiking|15990.771980223173| 1499.115077876141|134.47597271892283|6.9933325516405755|
|  gym_workout|3496.2113607625715| 600.4481081606848|129.60854002528936| 7.015822585351613|
|      cycling|2754.9361022986045| 775.4018412191749|124.51544405301777| 7.003108273808371|
|         yoga|274.91280637973495|199.84642899026622| 79.47105273445135|  6.99331261483132|
|      walking| 8981.155737222844| 558.7292030818091|  89.4894822482307| 6.998759238565741|
|      running|12499.090853670528|1099.1238547914497|144.54433541129967| 6.998144775913941|
|     swimming| 549.0619177018633| 549.4372476708074|134.44444875776398|6.9967175854037285|
+-------------+------------------+------------------+----

In [8]:
from pyspark.sql.functions import year, month

df_to_load = df_transformed.withColumn("year", year(col("date")))
df_to_load = df_to_load.withColumn("month", month(col("date")))

processed_data_path = "../data_lake/processed/fitness_data"

df_to_load.write.mode("overwrite").partitionBy("year", "month").parquet(processed_data_path)

print(f"Successfully saved processed data to: {processed_data_path}")

Successfully saved processed data to: ../data_lake/processed/fitness_data


# Phase 3 EDA

In [9]:
processed_df = spark.read.parquet("../data_lake/processed/fitness_data")

print("Successfully loaded processed data")
print("Inspecting the schema of our clean DataFrame:")
processed_df.printSchema()

print(f"\nTotal records in processed data: {processed_df.count()}")

Successfully loaded processed data
Inspecting the schema of our clean DataFrame:
root
 |-- user_id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- steps: long (nullable = true)
 |-- calories_burned: double (nullable = true)
 |-- heart_rate_avg: long (nullable = true)
 |-- sleep_hours: double (nullable = true)
 |-- activity_type: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- calories_to_steps_ratio: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)


Total records in processed data: 358497


In [10]:
# Running SQL Queries in Spark

processed_df.createOrReplaceGlobalTempView("fitness_data_view")
print("Temporary view 'fitness_data_view' created")

most_active_day_query = """
    SELECT
        day_of_week,
        AVG(steps) as average_steps
    FROM
        global_temp.fitness_data_view
    GROUP BY
        day_of_week
    ORDER BY
        average_steps DESC
"""

most_active_day_query = spark.sql(most_active_day_query)

print("\nMost active day of the week based on aveage steps:")
most_active_day_query.show()

Temporary view 'fitness_data_view' created

Most active day of the week based on aveage steps:
+-----------+------------------+
|day_of_week|     average_steps|
+-----------+------------------+
|        Mon| 6386.372050104056|
|        Sun| 6369.312855852672|
|        Sat| 6352.275329438678|
|        Thu| 6349.672144343661|
|        Tue|6339.3561864373505|
|        Wed| 6327.005222444733|
|        Fri| 6309.039030902737|
+-----------+------------------+



In [11]:
# Correlation Analysis
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# 1. Select numerical columns for correlation analysis
numerical_cols = ['steps', 'calories_burned', 'heart_rate_avg', 'sleep_hours']
corr_df = processed_df.select(numerical_cols)

# 2. Assemble the columns into a single vector
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
vector_df = assembler.transform(corr_df).select("features")

# 3. Calculate the Pearson correlation matrix
matrix = Correlation.corr(vector_df, "features").head()

print("Correlation Matrix:")
corr_matrix = matrix[0]
print(corr_matrix)

Correlation Matrix:
DenseMatrix([[ 1.00000000e+00,  8.44711501e-01,  2.69043946e-01,
              -1.45788879e-03],
             [ 8.44711501e-01,  1.00000000e+00,  5.19037015e-01,
              -1.00738821e-03],
             [ 2.69043946e-01,  5.19037015e-01,  1.00000000e+00,
               8.07845917e-04],
             [-1.45788879e-03, -1.00738821e-03,  8.07845917e-04,
               1.00000000e+00]])


In [12]:
import pandas as pd

corr_matrix_pd = pd.DataFrame(corr_matrix.toArray(), columns=numerical_cols, index=numerical_cols)

print("Formatted Correlation Matrix:")
print(corr_matrix_pd)

Formatted Correlation Matrix:
                    steps  calories_burned  heart_rate_avg  sleep_hours
steps            1.000000         0.844712        0.269044    -0.001458
calories_burned  0.844712         1.000000        0.519037    -0.001007
heart_rate_avg   0.269044         0.519037        1.000000     0.000808
sleep_hours     -0.001458        -0.001007        0.000808     1.000000


# Phase 3: EDA User Segmentation with Clustering

In [13]:
from pyspark.sql.functions import avg, stddev

user_summary_df = processed_df.groupBy("user_id").agg(
    avg("steps").alias("avg_steps"),
    avg("calories_burned").alias("avg_calories"),
    avg("heart_rate_avg").alias("avg_hr")
)

print("User level summary DataFrame:")
user_summary_df.show()

User level summary DataFrame:
+-------+------------------+-----------------+------------------+
|user_id|         avg_steps|     avg_calories|            avg_hr|
+-------+------------------+-----------------+------------------+
|     26|6896.6120218579235|788.0186338797813|120.73770491803279|
|     29| 6570.404371584699|781.3319125683059|122.89617486338798|
|    474| 5790.273224043716| 705.625737704918|118.13661202185793|
|    964| 6567.163934426229|767.2180327868854| 120.2896174863388|
|   1677| 5743.109289617486|700.7350273224044|118.37704918032787|
|   1697| 6850.737704918033|800.8994535519124|119.39890710382514|
|   1806| 6631.639344262295| 775.327267759563| 121.6775956284153|
|   1950| 5834.808743169399|735.9052459016393|122.93989071038251|
|     65| 5854.005464480874|704.1928961748634|116.60109289617486|
|    191| 6885.551912568306| 766.481912568306|117.63387978142076|
|    418| 6439.021857923497|762.5403825136611|119.20218579234972|
|    541| 5362.284153005465|711.7765027322406|

In [14]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

features_for_clustering = ['avg_steps', 'avg_calories', 'avg_hr']
assembler = VectorAssembler(inputCols=features_for_clustering, outputCol="features_unscaled")

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)

kmeans = KMeans(featuresCol="features", k=3, seed=1)
pipeline = Pipeline(stages=[assembler, scaler, kmeans])

model = pipeline.fit(user_summary_df)

predictions = model.transform(user_summary_df)

print("\nUsers with their assigned cluster:")
predictions.show(10)

print("\nCluster sizes:")
predictions.groupBy('prediction').count().show()



Users with their assigned cluster:
+-------+------------------+-----------------+------------------+--------------------+--------------------+----------+
|user_id|         avg_steps|     avg_calories|            avg_hr|   features_unscaled|            features|prediction|
+-------+------------------+-----------------+------------------+--------------------+--------------------+----------+
|     26|6896.6120218579235|788.0186338797813|120.73770491803279|[6896.61202185792...|[1.15062933929504...|         0|
|     29| 6570.404371584699|781.3319125683059|122.89617486338798|[6570.40437158469...|[0.46695520115902...|         0|
|    474| 5790.273224043716| 705.625737704918|118.13661202185793|[5790.27322404371...|[-1.1680631360302...|         1|
|    964| 6567.163934426229|767.2180327868854| 120.2896174863388|[6567.16393442622...|[0.46016381205566...|         2|
|   1677| 5743.109289617486|700.7350273224044|118.37704918032787|[5743.10928961748...|[-1.2669104873812...|         1|
|   1697| 68

In [15]:
# Interpret the Clusters
from pyspark.sql.functions import avg, count

cluster_profiles = predictions.groupBy("prediction").agg(
    avg("avg_steps").alias("avg_steps"),
    avg("avg_calories").alias("avg_calories"),
    avg("avg_hr").alias("avg_hr"),
    count("*").alias("num_users")
).orderBy("avg_steps")

print("Cluster Profiles")
cluster_profiles.show()

Cluster Profiles
+----------+-----------------+-----------------+------------------+---------+
|prediction|        avg_steps|     avg_calories|            avg_hr|num_users|
+----------+-----------------+-----------------+------------------+---------+
|         1|5865.425111536008|716.2896343081629|117.72545946567205|      523|
|         2|6297.943471317715|752.7675108541057| 119.7740100306909|      876|
|         0|6875.600185402027|791.4707276541768|120.81017759562853|      560|
+----------+-----------------+-----------------+------------------+---------+



# Phase 4: Machine Learning Pipeline

In [16]:
from pyspark.sql.functions import col

df_ml = spark.read.parquet("../data_lake/processed/fitness_data")

# Task 1: Predict Activity Type 
features = ['steps', 'calories_burned', 'heart_rate_avg', 'sleep_hours']
target = 'activity_type'

model_df = df_ml.select(features + [target])
print("Distribution of Activity Types:")
model_df.groupBy(target).count().orderBy(col("count").desc()).show()

train_df, test_df = model_df.randomSplit([0.8, 0.2], seed=42)

print(f"Training dataset count: {train_df.count()}")
print(f"Testing dataset count: {test_df.count()}")

Distribution of Activity Types:
+-------------+-----+
|activity_type|count|
+-------------+-----+
|     swimming|51520|
|  gym_workout|51405|
|      cycling|51379|
|       hiking|51171|
|         yoga|51162|
|      walking|51009|
|      running|50851|
+-------------+-----+

Training dataset count: 287006
Testing dataset count: 71491


In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression

label_indexer = StringIndexer(inputCol="activity_type", outputCol="label")

feature_assembler = VectorAssembler(inputCols=features, outputCol="features_unscaled")

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")

lr = LogisticRegression(featuresCol="features", labelCol="label")

pipeline = Pipeline(stages=[label_indexer, feature_assembler, scaler, lr])

print("Training the classification model...")
model = pipeline.fit(train_df)
print("Training Complete!")

Training the classification model...
Training Complete!


In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 1. Generate predictions on the test data using the trained classification model
classification_predictions = model.transform(test_df)

# 2. Initialize the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# 3. Evaluate the new classification_predictions DataFrame
accuracy = evaluator.evaluate(classification_predictions)

print(f"Accuracy on test data: {accuracy:.4f}")

Accuracy on test data: 0.8398


In [19]:
predictions = model.transform(test_df)
predictions.select("activity_type", "label", "prediction", "probability").show(10)


+-------------+-----+----------+--------------------+
|activity_type|label|prediction|         probability|
+-------------+-----+----------+--------------------+
|         yoga|  3.0|       3.0|[3.87017488146527...|
|         yoga|  3.0|       3.0|[3.04268689500068...|
|         yoga|  3.0|       3.0|[2.94673553441807...|
|         yoga|  3.0|       3.0|[0.02647790848949...|
|         yoga|  3.0|       3.0|[1.24677506315505...|
|         yoga|  3.0|       3.0|[1.29283148452384...|
|         yoga|  3.0|       3.0|[1.24583886120191...|
|         yoga|  3.0|       3.0|[1.00279981425510...|
|         yoga|  3.0|       3.0|[3.36103520880797...|
|         yoga|  3.0|       3.0|[2.77254885583225...|
+-------------+-----+----------+--------------------+
only showing top 10 rows


In [20]:
from pyspark.sql.functions import col

# Top 10 predictions with probability
predictions.select("activity_type", "prediction", "probability").orderBy(col("probability").desc()).show(10)

# Optional: confusion matrix
predictions.groupBy("label", "prediction").count().show()


+-------------+----------+--------------------+
|activity_type|prediction|         probability|
+-------------+----------+--------------------+
|     swimming|       0.0|[0.99943249786062...|
|     swimming|       0.0|[0.99940005791853...|
|     swimming|       0.0|[0.99937997964430...|
|     swimming|       0.0|[0.99936755053689...|
|     swimming|       0.0|[0.99936602848671...|
|     swimming|       0.0|[0.99935718772116...|
|     swimming|       0.0|[0.99935592779982...|
|     swimming|       0.0|[0.99935098457808...|
|     swimming|       0.0|[0.99934422961770...|
|     swimming|       0.0|[0.99934220189740...|
+-------------+----------+--------------------+
only showing top 10 rows
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0| 1125|
|  6.0|       1.0|  586|
|  4.0|       6.0| 2242|
|  5.0|       1.0|  204|
|  1.0|       1.0| 6849|
|  1.0|       6.0|  183|
|  0.0|       1.0|   14|
|  6.0|       4.0| 1776|
|  2.0|       2.0| 7335|
|  

In [21]:
# Task 2: Predict Calories Burned

features_reg = ['steps', 'heart_rate_avg', 'sleep_hours', 'activity_type']
target_reg = "calories_burned"

model_reg_df = df_ml.select(features_reg + [target_reg])

train_reg_df, test_reg_df = model_reg_df.randomSplit([0.8, 0.2], seed=42)

print("Data prepared for regression task")
print(f"Training set size: {train_reg_df.count()}")
print(f"Testing set size: {test_reg_df.count()}")


Data prepared for regression task
Training set size: 287006
Testing set size: 71491


In [22]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

categorical_col = "activity_type"
numerical_cols = ['steps', 'heart_rate_avg', 'sleep_hours']

string_indexer = StringIndexer(inputCol=categorical_col, outputCol="activity_idx")

one_hot_encoder = OneHotEncoder(inputCol="activity_idx", outputCol="activity_vec")

assembler_inputs = numerical_cols + ["activity_vec"]
feature_assembler_reg = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

lr_reg = LinearRegression(featuresCol="features", labelCol="calories_burned")

pipeline_reg = Pipeline(stages=[string_indexer, one_hot_encoder, feature_assembler_reg, lr_reg])

In [23]:
print("Training the regression model...")
model_reg = pipeline_reg.fit(train_reg_df)
print("Training complete!")

Training the regression model...
Training complete!


In [24]:
predictions_reg = model_reg.transform(test_reg_df)

print("predictions on the test set:")
predictions_reg.select("calories_burned", "prediction").show(10)

predictions on the test set:
+---------------+------------------+
|calories_burned|        prediction|
+---------------+------------------+
|          242.0|188.13391660088507|
|          282.0| 188.5605137106498|
|          234.0|188.75902697505865|
|          125.0| 188.9788949958397|
|          163.0|188.35986152772023|
|          286.0|189.02363682785875|
|          140.0|189.38861525331225|
|          292.0|188.51746345919435|
|          199.0|188.32785208897747|
|          167.0|188.30982782537615|
+---------------+------------------+
only showing top 10 rows


In [25]:
evaluator_rmse = RegressionEvaluator(
    labelCol="calories_burned", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator_rmse.evaluate(predictions_reg)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse:.4f}")

Root Mean Squared Error (RMSE) on test data: 131.2747


In [26]:
evaluator_r2 = RegressionEvaluator(
    labelCol="calories_burned", predictionCol="prediction",metricName="r2"
)
r2 = evaluator_r2.evaluate(predictions_reg)
print(f"R-squared (R2) on test data: {r2:.4f}")

R-squared (R2) on test data: 0.9114


# Phase 5: Advanced Analytics - Structured Streaming

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

stream_schema = StructType([
    StructField("user_id", LongType()),
    StructField("timestamp", StringType()), 
    StructField("steps", LongType()),
    StructField("calories_burned", DoubleType()),
    StructField("heart_rate_avg", LongType()),
    StructField("sleep_hours", DoubleType()),
    StructField("activity_type", StringType())
])

streaming_df = spark.readStream \
    .schema(stream_schema) \
    .option("maxFilesPerTrigger", 1) \
    .parquet("C:/Project/Fitness Tracker Analysis/data_lake/streaming_input/")

processed_stream_df = streaming_df.withColumn("timestamp", to_timestamp("timestamp"))

query = processed_stream_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("streaming_health_data") \
    .trigger(processingTime="5 seconds") \
    .start()

print("Streaming query started!")

Streaming query started!


In [32]:
import time
from IPython.display import clear_output

print("Starting auto-refresh display...")
print("⏸Press 'Interrupt Kernel' (■ button) to stop\n")
time.sleep(2)

try:
    refresh_count = 0
    while True:
        clear_output(wait=True)
        
        # Try to get data from memory table
        try:
            df = spark.sql("SELECT * FROM streaming_health_data ORDER BY timestamp DESC LIMIT 30")
            total_count = spark.sql("SELECT COUNT(*) as count FROM streaming_health_data").first()["count"]

            
            # Display header
            print("╔" + "="*78 + "╗")
            print(f"║  Auto-Refresh #{refresh_count} | Records: {total_count} | {time.strftime('%H:%M:%S')}".ljust(79) + "║")
            print("╚" + "="*78 + "╝")
            
            # Show data
            if total_count > 0:
                print("\nLATEST 30 RECORDS:")
                print("─"*80)
                df.show(30, truncate=False)
                
                # Activity breakdown
                print("\nACTIVITY BREAKDOWN:")
                print("─"*80)
                spark.sql("""
                    SELECT 
                        activity_type, 
                        COUNT(*) as count,
                        ROUND(AVG(steps), 2) as avg_steps,
                        ROUND(AVG(calories_burned), 2) as avg_calories
                    FROM streaming_health_data 
                    GROUP BY activity_type 
                    ORDER BY count DESC
                """).show(truncate=False)
            else:
                print("\nWaiting for data... (files being processed)")
            
            # Check query status
            try:
                if 'query' in globals():
                    if query.isActive:
                        print(f"Streaming Query: ACTIVE")
                        
                        # Show last progress if available
                        if query.lastProgress:
                            print(f"   Last batch processed: {query.lastProgress.get('batchId', 'N/A')} records")
                    else:
                        print(f"✗ Streaming Query: STOPPED")
                        print("\nQuery stopped - breaking refresh loop")
                        break
                else:
                    print(f"Query variable not found")
            except:
                print(f"Cannot check query status")
            
        except Exception as e:
            print(f"Waiting for data... ({str(e)[:50]})")
        
        print("\n💡 Press 'Interrupt Kernel' (■) to stop auto-refresh")
        
        refresh_count += 1
        time.sleep(3)  # Refresh every 3 seconds
        
except KeyboardInterrupt:
    print("\n\nAuto-refresh stopped by user")
except Exception as e:
    print(f"\nError: {e}")

║  Auto-Refresh #11 | Records: 43 | 15:43:52                                   ║

LATEST 30 RECORDS:
────────────────────────────────────────────────────────────────────────────────
+-------+-------------------+-----+---------------+--------------+-----------+-------------+
|user_id|timestamp          |steps|calories_burned|heart_rate_avg|sleep_hours|activity_type|
+-------+-------------------+-----+---------------+--------------+-----------+-------------+
|164    |2025-10-04 15:43:40|386  |30.88          |135           |7.88       |running      |
|1998   |2025-10-04 15:43:40|130  |9.1            |122           |6.64       |hiking       |
|1484   |2025-10-04 15:43:40|15   |448.0          |111           |6.21       |swimming     |
|1306   |2025-10-04 15:43:40|287  |20.09          |120           |6.8        |hiking       |
|130    |2025-10-04 15:43:40|30   |244.0          |86            |6.38       |yoga         |
|1940   |2025-10-04 15:43:40|148  |8.88           |134           |8.47    

In [None]:
# Stop the query
query.stop()

# Phase 5: Streaming Aggregations with Watermarking


In [1]:
# Phase 5 Setup Cell: Imports and SparkSession Initialization
import os
import sys
import time
from IPython.display import clear_output
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# --- ADD THESE LINES ---
# Point Spark to the correct Python executable for both driver and workers
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# ---------------------

# --- SparkSession Initialization ---
# This setup is necessary for running Spark locally on Windows.
# Adjust the path if your Hadoop installation is in a different location.
os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] = r"C:\hadoop\bin;" + os.environ.get("PATH", "")

# Create or get the existing SparkSession
# Create SparkSession with more memory
spark = SparkSession.builder \
    .appName("FitnessTrackerRecommendations") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.python.worker.faulthandler.enabled", "true") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

# Suppress verbose logging for a cleaner output
spark.sparkContext.setLogLevel("ERROR")

print("SparkSession and necessary libraries are ready for Phase 5.")

SparkSession and necessary libraries are ready for Phase 5.


In [None]:
# Define the schema for the incoming streaming data
stream_schema = StructType([
    StructField("user_id", LongType()),
    StructField("timestamp", StringType()), 
    StructField("steps", LongType()),
    StructField("calories_burned", DoubleType()),
    StructField("heart_rate_avg", LongType()),
    StructField("sleep_hours", DoubleType()),
    StructField("activity_type", StringType())
])

# Read the streaming data from the source directory
streaming_df = spark.readStream \
    .schema(stream_schema) \
    .option("maxFilesPerTrigger", 1) \
    .parquet("C:/Project/Fitness Tracker Analysis/data_lake/streaming_input/") # Make sure this path is correct

# Convert the timestamp string to a proper timestamp type
processed_stream_df = streaming_df.withColumn("timestamp", to_timestamp("timestamp"))

# Start the streaming query and write to an in-memory table
query = processed_stream_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("streaming_health_data") \
    .trigger(processingTime="5 seconds") \
    .start()

print("Streaming query started!")

In [36]:
try:
    if 'query' in globals() and query.isActive:
        print("Stopping the previous memory-based query...")
        query.stop()
        print("Previous query stopped.")
except Exception as e:
    print(f"Could not stop query (it may already be stopped): {e}")

In [None]:
# Advanced Stream: Tumbling Window Aggregation
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Adjust the path if your Hadoop installation is in a different location.
os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] = r"C:\hadoop\bin;" + os.environ.get("PATH", "")

# --- SparkSession Initialization ---
os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] = r"C:\hadoop\bin;" + os.environ.get("PATH", "")

# Create or get the existing SparkSession
spark = SparkSession.builder \
    .appName("FitnessTrackerRecommendations") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.python.worker.faulthandler.enabled", "true") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

# --- Streaming DataFrame Setup ---
stream_schema = StructType([
    StructField("user_id", LongType()),
    StructField("timestamp", StringType()),
    StructField("steps", LongType()),
    StructField("calories_burned", DoubleType()),
    StructField("heart_rate_avg", LongType()),
    StructField("sleep_hours", DoubleType()),
    StructField("activity_type", StringType())
])

streaming_df = spark.readStream \
    .schema(stream_schema) \
    .option("maxFilesPerTrigger", 1) \
    .parquet("C:/Project/Fitness Tracker Analysis/data_lake/streaming_input/")

processed_stream_df = streaming_df.withColumn("timestamp", to_timestamp("timestamp"))

print("Setup complete and ready.")

Setup complete and ready.


In [None]:
# Define the windowed aggregation
windowed_counts_df = processed_stream_df \
    .withWatermark("timestamp", "1 minute") \
    .groupBy(
        window("timestamp", "30 seconds"),
        "activity_type"
    ).count()

# Write to an in-memory table named "windowed_results"
query_memory = windowed_counts_df.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("windowed_results") \
    .start()

print("Streaming query started. Now wait for 2 minutes before running the next cell.")

In [4]:
# Run this cell to see the latest results from the stream
spark.sql("SELECT * FROM windowed_results ORDER BY window.start DESC").show(truncate=False)

+------------------------------------------+-------------+-----+
|window                                    |activity_type|count|
+------------------------------------------+-------------+-----+
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|yoga         |1    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|running      |1    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|swimming     |1    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|hiking       |1    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|cycling      |2    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|yoga         |3    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|running      |2    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|hiking       |2    |
|{2025-10-04 18:02:00, 2025-10-04 18:02:30}|walking      |1    |
|{2025-10-04 18:01:30, 2025-10-04 18:02:00}|gym_workout  |3    |
|{2025-10-04 18:01:30, 2025-10-04 18:02:00}|cycling      |1    |
|{2025-10-04 18:01:30, 2025-10-04 18:02:00}|swimming     |1    |
|{2025-10-04 18:01:30, 20

In [3]:
# Advanced Analytics

from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

als_df = spark.read.parquet("../data_lake/processed/fitness_data")

ratings_df = als_df.groupBy("user_id", "activity_type").count().withColumnRenamed("count", "rating")

indexer = StringIndexer(inputCol="activity_type", outputCol="item_id")
indexer_model = indexer.fit(ratings_df)
ratings_indexed_df = indexer_model.transform(ratings_df)

print("Ratings data prepared for ALS:")
ratings_indexed_df.show(10)

(training, test) = ratings_indexed_df.randomSplit([0.8, 0.2])

Ratings data prepared for ALS:
+-------+-------------+------+-------+
|user_id|activity_type|rating|item_id|
+-------+-------------+------+-------+
|     41|      walking|    32|    5.0|
|     52|         yoga|    35|    6.0|
|    110|       hiking|    26|    2.0|
|    132|      cycling|    26|    0.0|
|    187|         yoga|    19|    6.0|
|    347|  gym_workout|    40|    1.0|
|    348|      running|    35|    3.0|
|    395|      cycling|    23|    0.0|
|    398|     swimming|    26|    4.0|
|    658|         yoga|    23|    6.0|
+-------+-------------+------+-------+
only showing top 10 rows


In [4]:
# Training the ALS Model and Making Recommendations

als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="user_id",
    itemCol="item_id",
    ratingCol="rating",
    coldStartStrategy="drop"
)

print("Training the ALS recommendation model...")
model_als = als.fit(training)
print("Training Complete!")

Training the ALS recommendation model...
Training Complete!


In [5]:
# Evaluate the Model
predictions_als = model_als.transform(test)
print("Predictions on the test set")
predictions_als.show(10)

evaluator_als = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse_als = evaluator_als.evaluate(predictions_als)
print(f"\nRoot Mean Square Error for ALS model: {rmse_als:.4f}")

Predictions on the test set
+-------+-------------+------+-------+----------+
|user_id|activity_type|rating|item_id|prediction|
+-------+-------------+------+-------+----------+
|    148|      walking|    23|      5| 19.479492|
|    471|      cycling|    17|      0| 26.591143|
|   1342|     swimming|    37|      4| 12.510277|
|   1342|      walking|    15|      5| 10.966165|
|   1580|      cycling|    21|      0| 15.510708|
|   1645|  gym_workout|    22|      1| 18.292093|
|   1645|      walking|    26|      5| 25.087502|
|   1829|      running|    27|      3| 13.023116|
|   1829|     swimming|    29|      4|   9.59326|
|   1959|     swimming|    29|      4| 21.220932|
+-------+-------------+------+-------+----------+
only showing top 10 rows

Root Mean Square Error for ALS model: 9.1810


In [1]:
# Join the final lookup table to get activity name
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, udf
from pyspark.sql.types import StringType
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

# --- Stop any previous Spark session ---
try:
    if 'spark' in globals():
        print("Stopping existing Spark session...")
        spark.stop()
except:
    pass

In [2]:
# --- Configuration ---
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] = r"C:\hadoop\bin;" + os.environ.get("PATH", "")

# --- Create SparkSession with MINIMAL resources ---
print("Starting a new SparkSession...")
spark = SparkSession.builder \
    .appName("FitnessTrackerALS_Minimal") \
    .master("local[1]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.default.parallelism", "2") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("SparkSession ready.")

# --- 1. Load and Prepare Data ---
print("\nPreparing data for ALS model...")
als_df = spark.read.parquet("../data_lake/processed/fitness_data")
ratings_df = als_df.groupBy("user_id", "activity_type").count().withColumnRenamed("count", "rating")

# Index activities
indexer = StringIndexer(inputCol="activity_type", outputCol="item_id")
indexer_model = indexer.fit(ratings_df)
ratings_indexed_df = indexer_model.transform(ratings_df)

# Split data
(training, test) = ratings_indexed_df.randomSplit([0.8, 0.2], seed=42)
print("✅ Data prepared.")

Starting a new SparkSession...
SparkSession ready.

Preparing data for ALS model...
✅ Data prepared.


In [3]:
# --- 2. Train the ALS Model ---
print("\nTraining the ALS recommendation model...")
als = ALS(
    maxIter=5, 
    regParam=0.01, 
    userCol="user_id", 
    itemCol="item_id", 
    ratingCol="rating", 
    coldStartStrategy="drop"
)
model_als = als.fit(training)
print("Training complete.")

# --- 3. Generate Recommendations ---
print("\nGenerating top 3 activity recommendations for all users...")
user_recs = model_als.recommendForAllUsers(3)
print("Recommendations generated.")


Training the ALS recommendation model...
Training complete.

Generating top 3 activity recommendations for all users...
Recommendations generated.


In [4]:

# --- 4. ALTERNATIVE APPROACH: Convert to Pandas instead of Spark operations ---
print("\nFormatting final recommendations...")

# Get the activity labels mapping
id_to_activity_map = indexer_model.labels

# Convert recommendations to Pandas (this avoids the problematic join)
print("Converting to Pandas for safe processing...")
user_recs_pd = user_recs.toPandas()

# Process in Pandas
results = []
for _, row in user_recs_pd.iterrows():
    user_id = row['user_id']
    for rec in row['recommendations']:
        item_id = int(rec['item_id'])
        rating = float(rec['rating'])
        activity = id_to_activity_map[item_id]
        results.append({
            'user_id': user_id,
            'activity_type_rec': activity,
            'predicted_rating': rating
        })

# Display results
print("Formatting complete.\n")
print("Final, human-readable recommendations:")
print(f"Total rows: {len(results)}")
print("\nFirst 15 recommendations:")
for i, rec in enumerate(results[:15]):
    print(f"User {rec['user_id']}: {rec['activity_type_rec']} (score: {rec['predicted_rating']:.2f})")

# --- Clean up ---
print("\nTask complete. Stopping Spark session.")
spark.stop()


Formatting final recommendations...
Converting to Pandas for safe processing...
Formatting complete.

Final, human-readable recommendations:
Total rows: 5877

First 15 recommendations:
User 1: yoga (score: 40.40)
User 1: gym_workout (score: 31.03)
User 1: running (score: 28.01)
User 2: yoga (score: 56.91)
User 2: cycling (score: 30.82)
User 2: gym_workout (score: 30.66)
User 3: hiking (score: 30.57)
User 3: running (score: 28.17)
User 3: cycling (score: 27.86)
User 4: cycling (score: 29.64)
User 4: yoga (score: 29.20)
User 4: swimming (score: 28.77)
User 5: gym_workout (score: 35.39)
User 5: swimming (score: 28.71)
User 5: cycling (score: 27.60)

Task complete. Stopping Spark session.
