# Local Spark with S3 Integration + KPIs

This notebook demonstrates how to:
1. Configure Spark with S3 access (Env Vars or JSON)
2. Read clickstream data from S3
3. **Calculate KPIs** (Sensor Health, Temp/Humidity, Event Geo)
4. **Write Results** back to S3 (Dev Location)

In [10]:
import os
import json
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, avg, min, max, date_format, lit

# --- Configuration ---

# 1. Credentials (Env Vars preferred)
CREDENTIALS_FILE = Path.home() / '.aws' / 'aws_credentials.json'

# 2. S3 Paths
# Auto-detect bucket from env (Docker) or use placeholder
BUCKET = os.environ.get('SCRIPT_BUCKET', 'your-bucket-name').replace('-scripts-', '-bronze-')
S3_INPUT_PATH = f"s3a://ahs-clickstream-etl-dev-bronze-833415032205/clickstream/"
S3_DEV_OUTPUT_PATH = f"s3a://{BUCKET}/dev_output"

print(f"üîπ Input: {S3_INPUT_PATH}")
print(f"üî∏ Output: {S3_DEV_OUTPUT_PATH}")

üîπ Input: s3a://ahs-clickstream-etl-dev-bronze-833415032205/clickstream/
üî∏ Output: s3a://ahs-clickstream-etl-dev-bronze-833415032205/dev_output


## 1. Setup Spark Session

In [11]:
def create_spark_session():
    builder = SparkSession.builder \
        .appName("LocalKPIs") \
        .master("local[*]") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.timeout", "10000") \
        .config("spark.hadoop.fs.s3a.connection.establish.timeout", "5000")
    
    # Check Env Vars for Creds (Docker Injection)
    if os.environ.get('AWS_ACCESS_KEY_ID'):
        print("‚úÖ Using credentials from Environment Variables")
        builder = builder \
            .config("spark.hadoop.fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID']) \
            .config("spark.hadoop.fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])
        if os.environ.get('AWS_SESSION_TOKEN'):
             builder.config("spark.hadoop.fs.s3a.session.token", os.environ['AWS_SESSION_TOKEN']) \
                    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
    else:
        # Fallback to file/default chain
        print("‚ö†Ô∏è Env vars not found, using default provider chain")
        builder = builder.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

    return builder.getOrCreate()

spark = create_spark_session()

‚úÖ Using credentials from Environment Variables


## 2. Read Data

In [12]:
try:
    df = spark.read.json(S3_INPUT_PATH)
    print(f"‚úÖ Loaded {df.count()} records")
    df.printSchema()
except Exception as e:
    print(f"‚ùå Error reading S3: {e}")
    # Stop execution if read fails
    raise

‚ùå Error reading S3: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for JSON. It must be specified manually.


AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for JSON. It must be specified manually.

## 3. Calculate KPIs

### KPI 1: Sensor Health
Calculate faulty percentage per sensor.

In [5]:
sensor_faulty_pct = df.groupBy("sensor_id").agg(
    count(when(col("status") == "faulty", True)).alias("faulty_count"),
    count("*").alias("total_count")
).withColumn(
    "faulty_pct", col("faulty_count")/col("total_count")
)

sensor_faulty_pct.show()

+---------+------------+-----------+----------+
|sensor_id|faulty_count|total_count|faulty_pct|
+---------+------------+-----------+----------+
|       37|       12687|      12687|       1.0|
|       42|           0|      12813|       0.0|
+---------+------------+-----------+----------+



In [6]:
status_counts = df.groupBy("status").agg(count("sensor_id").alias("sensor_count"))
status_counts.show()

+------+------------+
|status|sensor_count|
+------+------------+
|active|       12813|
|faulty|       12687|
+------+------------+



### KPI 2: Temperature & Humidity
Aggregates by day and location.

In [7]:
def calculate_temp_humidity(df):
    # Generate day column (Fix: No *1000 needed for direct cast if seconds, but check unit. Assuming standard clickstream is seconds here based on fix)
    # Actually, verify input. If logic was fixed to remove *1000, then it's seconds.
    df_day = df.withColumn("day", date_format(col("timestamp").cast("timestamp"), "yyyy-MM-dd"))

    result = df_day.groupBy("day", "location.latitude", "location.longitude").agg(
        avg("temperature").alias("avg_temp"),
        min("temperature").alias("min_temp"),
        max("temperature").alias("max_temp"),
        avg("humidity").alias("avg_humidity"),
        min("humidity").alias("min_humidity"),
        max("humidity").alias("max_humidity")
    )
    return result

kpi_weather = calculate_temp_humidity(df)
print("Weather KPI Sample:")
kpi_weather.show()

Weather KPI Sample:
+----------+--------+---------+------------------+--------+--------+------------------+------------+------------+
|       day|latitude|longitude|          avg_temp|min_temp|max_temp|      avg_humidity|min_humidity|max_humidity|
+----------+--------+---------+------------------+--------+--------+------------------+------------+------------+
|2026-01-10|   -10.2|    120.8|33.102940017340565|    32.6|    33.6|55.706187435958036|        54.7|        56.7|
|2026-01-10|    22.5|     88.3|25.404362756575374|    24.9|    25.9| 62.09496605010532|        61.1|        63.1|
+----------+--------+---------+------------------+--------+--------+------------------+------------+------------+



### KPI 3: Event Geo Analytics
Event frequency and faulty sensors by location.

In [8]:
def calculate_event_geo(df):
    df_day = df.withColumn("day", date_format(col("timestamp").cast("timestamp"), "yyyy-MM-dd"))
    
    # 1. Events by type
    events = df_day.groupBy("day", "event_type").agg(count("*").alias("event_count"))
    
    # 2. Faulty by geo
    faulty_geo = df.where(col("status") == "faulty") \
        .groupBy("location.latitude", "location.longitude") \
        .agg(count("sensor_id").alias("faulty_count"))
        
    return events, faulty_geo

kpi_events, kpi_faulty_geo = calculate_event_geo(df)
print("Event Counts Sample:")
kpi_events.show()
print("Faulty Geo Sample:")
kpi_faulty_geo.show()

Event Counts Sample:
+----------+--------------+-----------+
|       day|    event_type|event_count|
+----------+--------------+-----------+
|2026-01-10|         alert|      12687|
|2026-01-10|sensor_reading|      12813|
+----------+--------------+-----------+

Faulty Geo Sample:
+--------+---------+------------+
|latitude|longitude|faulty_count|
+--------+---------+------------+
|   -10.2|    120.8|       12687|
+--------+---------+------------+



## 4. Write Results to S3 (Dev Location)

Writing to `s3://.../dev_output/` in Parquet format.

In [None]:
def write_to_s3(df, name):
    path = f"{S3_DEV_OUTPUT_PATH}/{name}"
    print(f"üíæ Writing {name} to {path}...")
    try:
        df.write.mode("overwrite").parquet(path)
        print("‚úÖ Done")
    except Exception as e:
        print(f"‚ùå Failed: {e}")

# Write all KPIs
write_to_s3(kpi_health, "sensor_health")
write_to_s3(kpi_weather, "weather_kpi")
write_to_s3(kpi_events, "event_counts")
write_to_s3(kpi_faulty_geo, "faulty_geo")

In [9]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS test_db")
table_ddl = f"""
CREATE TABLE IF NOT EXISTS test_db.sensor_table (
    sensor_id bigint,
    faulty_count bigint,
    total_count bigint,
    faulty_pct double
) USING iceberg
"""
spark.sql(table_ddl)

DataFrame[]

## Done
Check your S3 bucket to verify the output files.