## firebase setup

In [1]:
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
import os

SERVICE_ACCOUNT_KEY_PATH = os.environ.get("FIREBASE_SERVICE_ACCOUNT_KEY_PATH", "./cstam2-1f2ec-firebase-adminsdk-fbsvc-2ab61a7ed6.json")

try:
    # Check if the default app is already initialized
    app = firebase_admin.get_app()
    print("Firebase Admin SDK already initialized. Reusing existing app instance.")
except ValueError:
    # If not initialized, proceed with initialization
    try:
        cred = credentials.Certificate(SERVICE_ACCOUNT_KEY_PATH)
        app = firebase_admin.initialize_app(cred)
        print("Firebase Admin SDK initialized successfully.")
    except Exception as e:
        print(f"Error during Firebase Admin SDK initialization: {e}")
        # It's crucial to handle this error, as your app can't write to Firestore without it.
        raise # Re-raise to stop the script if Firebase initialization fails

db = firestore.client(app=app) 


An error occurred: module 'importlib.metadata' has no attribute 'packages_distributions'
Firebase Admin SDK initialized successfully.




## setup spark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, to_timestamp, from_unixtime
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType
import time
import logging
# ---------- CONFIG ----------
KAFKA_BOOTSTRAP = "kafka:29092"                  

CHECKPOINT_BASE = "/data/checkpoints"

# Configure logger
logger = logging.getLogger("DataCleaning")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s")
handler.setFormatter(formatter)
if not logger.hasHandlers():
    logger.addHandler(handler)
# ---------- Spark session ----------
spark = (
    SparkSession.builder
    .appName("health-streams-to-firebase")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

# ---------- utils ----------
def read_topic(topic, schema):
    return (
        spark.readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
             .option("subscribe", topic)
             .option("startingOffsets", "latest")
             .load()
             .selectExpr("CAST(value AS STRING) as json_str")
             .select(from_json(col("json_str"), schema).alias("data"))   # <<< fixed
             .select("data.*")
    )


heart_rate_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("time", TimestampType(), True),
    StructField("heart_rate", IntegerType(), True)
])
steps_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("time", TimestampType(), True),
    StructField("steps", IntegerType(), True)
])
calories_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("time", TimestampType(), True),
    StructField("calories", DoubleType(), True)
])

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/12 19:56:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/12 19:56:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## cleaning functions

In [3]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import current_timestamp

## ---------- timestamp normalization & cleaning ----------
def normalize_timestamp(df):
    candidates = [c for c in ["timestamp", "ts", "ts_ms", "time", "created_at"] if c in df.columns]
    logger.info(f"Normalizing timestamp — candidates: {candidates}")

    if not candidates:
        logger.warning("No timestamp found — creating current-time column.")
        return df.withColumn("timestamp", current_timestamp())

    src = candidates[0]
    logger.info(f"Using '{src}' as timestamp source.")
    src_col = col(src)

    # Only convert numeric timestamps; keep native timestamps unchanged
    df = df.withColumn(
        "timestamp",
        when(src_col.cast(DoubleType()).isNotNull(), 
             to_timestamp(from_unixtime(
                 (src_col.cast(DoubleType()) / when(src_col.cast(DoubleType()) > 1e12, lit(1000.0)).otherwise(lit(1.0))).cast("long")
             ))
        ).otherwise(src_col.cast(TimestampType()))
    )

    return df.drop(src)
def clean_heart_rate(df):
    logger.info("Cleaning heart rate stream...")
    df = df.withColumn("heart_rate", col("heart_rate").cast(IntegerType()))
    df = normalize_timestamp(df)
    df = df.dropna(subset=["heart_rate", "timestamp"])
    df = df.filter((col("heart_rate") > 0) & (col("heart_rate") < 300))
    logger.info("Heart rate cleaning complete (streaming mode).")
    return df


def clean_calories(df):
    logger.info("Cleaning calories stream...")
    df = df.withColumn("calories", col("calories").cast(DoubleType()))
    df = normalize_timestamp(df)
    df = df.dropna(subset=["calories", "timestamp"])
    df = df.filter(col("calories") >= 0)
    logger.info("Calories cleaning complete (streaming mode).")
    return df


def clean_steps(df):
    logger.info("Cleaning steps stream...")
    df = df.withColumn("steps", col("steps").cast(IntegerType()))
    df = normalize_timestamp(df)
    df = df.dropna(subset=["steps", "timestamp"])
    df = df.filter(col("steps") >= 0)
    logger.info("Steps cleaning complete (streaming mode).")
    return df
    

  
# ---------- prepare streams ----------

heart_rate_df = read_topic("heartrate", heart_rate_schema)
calories_df = read_topic("calories", calories_schema)
steps_df = read_topic("steps", steps_schema)

hr_clean = clean_heart_rate(heart_rate_df)
cal_clean = clean_calories(calories_df)
st_clean = clean_steps(steps_df)


[2025-11-12 19:56:08,227] INFO - Cleaning heart rate stream...
[2025-11-12 19:56:08,261] INFO - Normalizing timestamp — candidates: ['time']
[2025-11-12 19:56:08,261] INFO - Using 'time' as timestamp source.
[2025-11-12 19:56:08,352] INFO - Heart rate cleaning complete (streaming mode).
[2025-11-12 19:56:08,353] INFO - Cleaning calories stream...
[2025-11-12 19:56:08,368] INFO - Normalizing timestamp — candidates: ['time']
[2025-11-12 19:56:08,368] INFO - Using 'time' as timestamp source.
[2025-11-12 19:56:08,494] INFO - Calories cleaning complete (streaming mode).
[2025-11-12 19:56:08,496] INFO - Cleaning steps stream...
[2025-11-12 19:56:08,523] INFO - Normalizing timestamp — candidates: ['time']
[2025-11-12 19:56:08,524] INFO - Using 'time' as timestamp source.
[2025-11-12 19:56:08,580] INFO - Steps cleaning complete (streaming mode).


## anomaly detection functions

In [4]:
import time
from datetime import datetime

def send_alert_to_firestore(user_id, alert_type, value, firestore_client,timestamp):
    """
    Write an anomaly alert to Firestore.
    Each alert becomes a document in the 'alerts' collection.

    Args:
        user_id (str): The ID of the affected user.
        alert_type (str): Type of the alert, e.g., 'heart_rate', 'calories', etc.
        firestore_client: Initialized Firestore client (google.cloud.firestore.Client).
    """
    try:
        alerts_ref = firestore_client.collection("alerts")
        alert_doc = {
            "user_id": user_id,
            "type": alert_type,
            "value": value,
            "detected_at": timestamp,
            "alert_created_at_unix": int(time.time()),
            "status":"waiting"
        }

        alerts_ref.add(alert_doc)
        print(f"[ok] Alert written for user={user_id}: {alert_type} — {value}")

    except Exception as e:
        print(f"[error] Failed to write alert for user={user_id}: {e}")
        
def detect_heart_rate_anomalies(df, firestore_client):
    anomalies = df.filter((col("heart_rate") > 180) | (col("heart_rate") < 45))
    if anomalies.rdd.isEmpty():
        return
    for row in anomalies.toLocalIterator():
        send_alert_to_firestore(
            user_id=row.user_id,
            alert_type="heart_rate",
            value=row.heart_rate,
            firestore_client=firestore_client,
            timestamp= row.timestamp
        )
        
def detect_low_calories(df, firestore_client):
    low_activity = df.filter(col("calories") < 0.5)  # e.g. less than 0.5 kcal/min or similar
    if low_activity.rdd.isEmpty():
        return
    for row in low_activity.toLocalIterator():
        send_alert_to_firestore(
            user_id=row.user_id,
            alert_type="low_calories",
            value=row.calories,
            firestore_client=firestore_client,
            timestamp= row.timestamp
        )
        
def detect_inactivity(df, firestore_client):
    inactive = df.filter(col("steps") == 0)
    if inactive.rdd.isEmpty():
        return
    for row in inactive.toLocalIterator():
        send_alert_to_firestore(
            user_id=row.user_id,
            alert_type="inactivity",
            value=0,
            firestore_client=firestore_client,
            timestamp= row.timestamp
        )


## write stream functions

In [6]:
import logging
import time
import datetime
import decimal
import json
from google.api_core.exceptions import GoogleAPICallError, RetryError

# logger (reuse your existing logger or this)
fs_logger = logging.getLogger("firestore-writer")
fs_logger.setLevel(logging.INFO)
if not fs_logger.hasHandlers():
    h = logging.StreamHandler()
    h.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s"))
    fs_logger.addHandler(h)


def _serialize_value(v):
    """Make values Firestore-safe: datetime -> ISO, Decimal -> float, fallback to str for non-JSON."""
    if v is None:
        return None
    if isinstance(v, (datetime.datetime, datetime.date)):
        return v.isoformat()
    if isinstance(v, decimal.Decimal):
        return float(v)
    # Bytes -> decode
    if isinstance(v, (bytes, bytearray)):
        try:
            return v.decode()
        except Exception:
            return str(v)
    # Test JSON-serializable quickly
    try:
        json.dumps(v)
        return v
    except Exception:
        return str(v)


def _commit_with_retries(batch_obj, max_retries=3, base_backoff=0.5):
    """Commit a Firestore batch with retries on transient errors."""
    attempt = 0
    while True:
        try:
            batch_obj.commit()
            return
        except (GoogleAPICallError, RetryError, IOError) as e:
            attempt += 1
            if attempt > max_retries:
                fs_logger.exception("Firestore commit failed after %d attempts", attempt - 1)
                raise
            backoff = base_backoff * (2 ** (attempt - 1))
            fs_logger.warning("Transient error committing firestore batch (attempt %d). Backing off %.2fs. Error: %s",
                              attempt, backoff, str(e))
            time.sleep(backoff)


def make_firestore_writer(collection_name, firestore_client, batch_size=500, max_retries=3):
    """
    Writes to Firestore under this structure:
      users/{user_id}/{collection_name}/{auto_doc_id}

    collection_name: e.g. "heart_rate", "calories", "steps"
    """
    def write_batch_to_firestore(batch_df, epoch_id):
        epoch_str = str(epoch_id) if epoch_id is not None else "(no-epoch)"
        try:
            rows = batch_df.count()
        except Exception as e:
            fs_logger.exception("[epoch %s] failed to count batch: %s", epoch_str, e)
            rows = None

        if not rows:
            fs_logger.info("[epoch %s] empty, skipping collection=%s", epoch_str, collection_name)
            return

        fs_logger.info("[epoch %s] writing %s rows to Firestore subcollections '%s'", epoch_str, rows, collection_name)

        docs_written = 0
        ops_in_current_batch = 0
        fs_batch = firestore_client.batch()

        try:
            for row in batch_df.toLocalIterator():
                data = row.asDict(recursive=True)
                user_id = data.get("user_id")

                if not user_id:
                    fs_logger.warning("[epoch %s] skipping row without user_id: %s", epoch_str, data)
                    continue

                # Serialize all values to Firestore-safe types
                for k, v in list(data.items()):
                    data[k] = _serialize_value(v)

                # Path: users/{user_id}/{collection_name}/{auto_doc_id}
                doc_ref = (
                    firestore_client.collection("users")
                    .document(str(user_id))
                    .collection(collection_name)
                    .document()
                )

                fs_batch.set(doc_ref, data)
                ops_in_current_batch += 1

                if ops_in_current_batch >= batch_size:
                    _commit_with_retries(fs_batch, max_retries=max_retries)
                    docs_written += ops_in_current_batch
                    ops_in_current_batch = 0
                    fs_batch = firestore_client.batch()

            if ops_in_current_batch > 0:
                _commit_with_retries(fs_batch, max_retries=max_retries)
                docs_written += ops_in_current_batch

            fs_logger.info("[epoch %s] wrote %d docs under users/*/%s/", epoch_str, docs_written, collection_name)

        except Exception as e:
            fs_logger.exception("[epoch %s] Firestore write failed: %s", epoch_str, e)
            raise

    return write_batch_to_firestore


## cleaning stream queries

In [7]:
# start streams separately with their own checkpoints
# Pass the initialized Firestore client 'db' to your writer functions
hr_query = (
    hr_clean
    .writeStream
    .foreachBatch(make_firestore_writer("heart_rate", db)) # Pass the Firestore client here
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/heart_rate")
    .outputMode("append")
    .start()
)

cal_query = (
    cal_clean
    .writeStream
    .foreachBatch(make_firestore_writer("calories", db)) # Pass the Firestore client here
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/calories")
    .outputMode("append")
    .start()
)

st_query = (
    st_clean
    .writeStream
    .foreachBatch(make_firestore_writer("steps", db)) # Pass the Firestore client here
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/steps")
    .outputMode("append")
    .start()
)
# Start streams for each detection type
hr_alerts = (
    hr_clean.writeStream
    .foreachBatch(lambda batch_df, epoch_id: detect_heart_rate_anomalies(batch_df, db))
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/alerts_hr")
    .start()
)

cal_alerts = (
    cal_clean.writeStream
    .foreachBatch(lambda batch_df, epoch_id: detect_low_calories(batch_df, db))
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/alerts_calories")
    .start()
)

st_alerts = (
    st_clean.writeStream
    .foreachBatch(lambda batch_df, epoch_id: detect_inactivity(batch_df, db))
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/alerts_steps")
    .start()
)

spark.streams.awaitAnyTermination()


print("Started all streams. Waiting...")
spark.streams.awaitAnyTermination()

[INFO] 2025-11-12 19:56:19,225 - [epoch 15305] writing 20 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:19,228 - [epoch 269] writing 1 rows to Firestore subcollections 'steps'
[INFO] 2025-11-12 19:56:19,230 - [epoch 269] writing 1 rows to Firestore subcollections 'calories'
[INFO] 2025-11-12 19:56:20,330 - [epoch 269] wrote 1 docs under users/*/steps/
[INFO] 2025-11-12 19:56:20,346 - [epoch 269] wrote 1 docs under users/*/calories/
[INFO] 2025-11-12 19:56:20,349 - [epoch 15305] wrote 20 docs under users/*/heart_rate/


[ok] Alert written for user=1503960366: inactivity — 0


[INFO] 2025-11-12 19:56:20,854 - [epoch 15306] writing 5 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:21,564 - [epoch 15306] wrote 5 docs under users/*/heart_rate/
[INFO] 2025-11-12 19:56:21,861 - [epoch 15307] writing 1 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:22,036 - [epoch 15307] wrote 1 docs under users/*/heart_rate/
[INFO] 2025-11-12 19:56:22,707 - [epoch 15308] writing 1 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:22,944 - [epoch 15308] wrote 1 docs under users/*/heart_rate/
[INFO] 2025-11-12 19:56:23,597 - [epoch 15309] writing 1 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:23,958 - [epoch 15309] wrote 1 docs under users/*/heart_rate/
[INFO] 2025-11-12 19:56:24,608 - [epoch 15310] writing 1 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:24,968 - [epoch 15310] wrote 1 docs under users/*/heart_rate/
[INFO] 2025-11-12 19:56:25,614 - [epoch 15311] writing 

KeyboardInterrupt: 

[INFO] 2025-11-12 19:56:38,791 - [epoch 15324] writing 1 rows to Firestore subcollections 'heart_rate'
INFO:firestore-writer:[epoch 15324] writing 1 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:39,029 - [epoch 15324] wrote 1 docs under users/*/heart_rate/
INFO:firestore-writer:[epoch 15324] wrote 1 docs under users/*/heart_rate/
[INFO] 2025-11-12 19:56:39,632 - [epoch 15325] writing 1 rows to Firestore subcollections 'heart_rate'
INFO:firestore-writer:[epoch 15325] writing 1 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:40,040 - [epoch 15325] wrote 1 docs under users/*/heart_rate/
INFO:firestore-writer:[epoch 15325] wrote 1 docs under users/*/heart_rate/
[INFO] 2025-11-12 19:56:40,636 - [epoch 15326] writing 1 rows to Firestore subcollections 'heart_rate'
INFO:firestore-writer:[epoch 15326] writing 1 rows to Firestore subcollections 'heart_rate'
[INFO] 2025-11-12 19:56:41,051 - [epoch 15326] wrote 1 docs under users/*/heart_rate/
INFO:fi

In [8]:
for q in spark.streams.active:
    q.stop()
print("✅ Stopped all active queries.")


✅ Stopped all active queries.


[Stage 98:>                                                         (0 + 1) / 1]