## firebase setup

In [2]:
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
import os

SERVICE_ACCOUNT_KEY_PATH = os.environ.get("FIREBASE_SERVICE_ACCOUNT_KEY_PATH", "./cstam2-1f2ec-firebase-adminsdk-fbsvc-2ab61a7ed6.json")

try:
    # Check if the default app is already initialized
    app = firebase_admin.get_app()
    print("Firebase Admin SDK already initialized. Reusing existing app instance.")
except ValueError:
    # If not initialized, proceed with initialization
    try:
        cred = credentials.Certificate(SERVICE_ACCOUNT_KEY_PATH)
        app = firebase_admin.initialize_app(cred)
        print("Firebase Admin SDK initialized successfully.")
    except Exception as e:
        print(f"Error during Firebase Admin SDK initialization: {e}")
        # It's crucial to handle this error, as your app can't write to Firestore without it.
        raise # Re-raise to stop the script if Firebase initialization fails

db = firestore.client(app=app) 


Firebase Admin SDK already initialized. Reusing existing app instance.


## setup spark

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, to_timestamp, from_unixtime
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType
import time
import logging
# ---------- CONFIG ----------
KAFKA_BOOTSTRAP = "kafka:29092"                  

CHECKPOINT_BASE = "/data/checkpoints"

# Configure logger
logger = logging.getLogger("DataCleaning")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s")
handler.setFormatter(formatter)
if not logger.hasHandlers():
    logger.addHandler(handler)
# ---------- Spark session ----------
spark = (
    SparkSession.builder
    .appName("health-streams-to-firebase")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

# ---------- utils ----------
def read_topic(topic, schema):
    return (
        spark.readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
             .option("subscribe", topic)
             .option("startingOffsets", "latest")
             .load()
             .selectExpr("CAST(value AS STRING) as json_str")
             .select(from_json(col("json_str"), schema).alias("data"))   # <<< fixed
             .select("data.*")
    )


heart_rate_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("time", TimestampType(), True),
    StructField("heart_rate", IntegerType(), True)
])
steps_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("time", TimestampType(), True),
    StructField("steps", IntegerType(), True)
])
calories_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("time", TimestampType(), True),
    StructField("calories", DoubleType(), True)
])

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/06 09:36:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## cleaning functions

In [5]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import current_timestamp

## ---------- timestamp normalization & cleaning ----------
def normalize_timestamp(df):
    candidates = [c for c in ["timestamp", "ts", "ts_ms", "time", "created_at"] if c in df.columns]
    logger.info(f"Normalizing timestamp — candidates: {candidates}")

    if not candidates:
        logger.warning("No timestamp found — creating current-time column.")
        return df.withColumn("timestamp", current_timestamp())

    src = candidates[0]
    logger.info(f"Using '{src}' as timestamp source.")
    src_col = col(src)

    # Only convert numeric timestamps; keep native timestamps unchanged
    df = df.withColumn(
        "timestamp",
        when(src_col.cast(DoubleType()).isNotNull(), 
             to_timestamp(from_unixtime(
                 (src_col.cast(DoubleType()) / when(src_col.cast(DoubleType()) > 1e12, lit(1000.0)).otherwise(lit(1.0))).cast("long")
             ))
        ).otherwise(src_col.cast(TimestampType()))
    )

    return df.drop(src)
def clean_heart_rate(df):
    logger.info("Cleaning heart rate stream...")
    df = df.withColumn("heart_rate", col("heart_rate").cast(IntegerType()))
    df = normalize_timestamp(df)
    df = df.dropna(subset=["heart_rate", "timestamp"])
    df = df.filter((col("heart_rate") > 0) & (col("heart_rate") < 300))
    logger.info("Heart rate cleaning complete (streaming mode).")
    return df


def clean_calories(df):
    logger.info("Cleaning calories stream...")
    df = df.withColumn("calories", col("calories").cast(DoubleType()))
    df = normalize_timestamp(df)
    df = df.dropna(subset=["calories", "timestamp"])
    df = df.filter(col("calories") >= 0)
    logger.info("Calories cleaning complete (streaming mode).")
    return df


def clean_steps(df):
    logger.info("Cleaning steps stream...")
    df = df.withColumn("steps", col("steps").cast(IntegerType()))
    df = normalize_timestamp(df)
    df = df.dropna(subset=["steps", "timestamp"])
    df = df.filter(col("steps") >= 0)
    logger.info("Steps cleaning complete (streaming mode).")
    return df
    

  
# ---------- prepare streams ----------

heart_rate_df = read_topic("heartrate", heart_rate_schema)
calories_df = read_topic("calories", calories_schema)
steps_df = read_topic("steps", steps_schema)

hr_clean = clean_heart_rate(heart_rate_df)
cal_clean = clean_calories(calories_df)
st_clean = clean_steps(steps_df)


[2025-11-06 09:37:11,530] INFO - Cleaning heart rate stream...
[2025-11-06 09:37:11,572] INFO - Normalizing timestamp — candidates: ['time']
[2025-11-06 09:37:11,573] INFO - Using 'time' as timestamp source.
[2025-11-06 09:37:11,649] INFO - Heart rate cleaning complete (streaming mode).
[2025-11-06 09:37:11,650] INFO - Cleaning calories stream...
[2025-11-06 09:37:11,674] INFO - Normalizing timestamp — candidates: ['time']
[2025-11-06 09:37:11,675] INFO - Using 'time' as timestamp source.
[2025-11-06 09:37:11,740] INFO - Calories cleaning complete (streaming mode).
[2025-11-06 09:37:11,741] INFO - Cleaning steps stream...
[2025-11-06 09:37:11,758] INFO - Normalizing timestamp — candidates: ['time']
[2025-11-06 09:37:11,759] INFO - Using 'time' as timestamp source.
[2025-11-06 09:37:11,832] INFO - Steps cleaning complete (streaming mode).


## anomaly detection functions

In [6]:
import time
from datetime import datetime

def send_alert_to_firestore(user_id, alert_type, value, firestore_client,timestamp):
    """
    Write an anomaly alert to Firestore.
    Each alert becomes a document in the 'alerts' collection.

    Args:
        user_id (str): The ID of the affected user.
        alert_type (str): Type of the alert, e.g., 'heart_rate', 'calories', etc.
        firestore_client: Initialized Firestore client (google.cloud.firestore.Client).
    """
    try:
        alerts_ref = firestore_client.collection("alerts")
        alert_doc = {
            "user_id": user_id,
            "type": alert_type,
            "value": value,
            "detected_at": timestamp,
            "alert_created_at_unix": int(time.time()),
            "status":"waiting"
        }

        alerts_ref.add(alert_doc)
        print(f"[ok] Alert written for user={user_id}: {alert_type} — {value}")

    except Exception as e:
        print(f"[error] Failed to write alert for user={user_id}: {e}")
        
def detect_heart_rate_anomalies(df, firestore_client):
    anomalies = df.filter((col("heart_rate") > 180) | (col("heart_rate") < 45))
    if anomalies.rdd.isEmpty():
        return
    for row in anomalies.toLocalIterator():
        send_alert_to_firestore(
            user_id=row.user_id,
            alert_type="heart_rate",
            value=row.heart_rate,
            firestore_client=firestore_client,
            timestamp= row.timestamp
        )
        
def detect_low_calories(df, firestore_client):
    low_activity = df.filter(col("calories") < 0.5)  # e.g. less than 0.5 kcal/min or similar
    if low_activity.rdd.isEmpty():
        return
    for row in low_activity.toLocalIterator():
        send_alert_to_firestore(
            user_id=row.user_id,
            alert_type="low_calories",
            value=row.calories,
            firestore_client=firestore_client,
            timestamp= row.timestamp
        )
        
def detect_inactivity(df, firestore_client):
    inactive = df.filter(col("steps") == 0)
    if inactive.rdd.isEmpty():
        return
    for row in inactive.toLocalIterator():
        send_alert_to_firestore(
            user_id=row.user_id,
            alert_type="inactivity",
            value=0,
            firestore_client=firestore_client,
            timestamp= row.timestamp
        )


## write stream functions

In [7]:
import logging
import time
import datetime
import decimal
import json
from google.api_core.exceptions import GoogleAPICallError, RetryError

# logger (reuse your existing logger or this)
fs_logger = logging.getLogger("firestore-writer")
fs_logger.setLevel(logging.INFO)
if not fs_logger.hasHandlers():
    h = logging.StreamHandler()
    h.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s"))
    fs_logger.addHandler(h)


def _serialize_value(v):
    """Make values Firestore-safe: datetime -> ISO, Decimal -> float, fallback to str for non-JSON."""
    if v is None:
        return None
    if isinstance(v, (datetime.datetime, datetime.date)):
        return v.isoformat()
    if isinstance(v, decimal.Decimal):
        return float(v)
    # Bytes -> decode
    if isinstance(v, (bytes, bytearray)):
        try:
            return v.decode()
        except Exception:
            return str(v)
    # Test JSON-serializable quickly
    try:
        json.dumps(v)
        return v
    except Exception:
        return str(v)


def _commit_with_retries(batch_obj, max_retries=3, base_backoff=0.5):
    """Commit a Firestore batch with retries on transient errors."""
    attempt = 0
    while True:
        try:
            batch_obj.commit()
            return
        except (GoogleAPICallError, RetryError, IOError) as e:
            attempt += 1
            if attempt > max_retries:
                fs_logger.exception("Firestore commit failed after %d attempts", attempt - 1)
                raise
            backoff = base_backoff * (2 ** (attempt - 1))
            fs_logger.warning("Transient error committing firestore batch (attempt %d). Backing off %.2fs. Error: %s",
                              attempt, backoff, str(e))
            time.sleep(backoff)


def make_firestore_writer(collection_name, firestore_client, batch_size=500, max_retries=3):
    """
    Writes to Firestore under this structure:
      users/{user_id}/{collection_name}/{auto_doc_id}

    collection_name: e.g. "heart_rate", "calories", "steps"
    """
    def write_batch_to_firestore(batch_df, epoch_id):
        epoch_str = str(epoch_id) if epoch_id is not None else "(no-epoch)"
        try:
            rows = batch_df.count()
        except Exception as e:
            fs_logger.exception("[epoch %s] failed to count batch: %s", epoch_str, e)
            rows = None

        if not rows:
            fs_logger.info("[epoch %s] empty, skipping collection=%s", epoch_str, collection_name)
            return

        fs_logger.info("[epoch %s] writing %s rows to Firestore subcollections '%s'", epoch_str, rows, collection_name)

        docs_written = 0
        ops_in_current_batch = 0
        fs_batch = firestore_client.batch()

        try:
            for row in batch_df.toLocalIterator():
                data = row.asDict(recursive=True)
                user_id = data.get("user_id")

                if not user_id:
                    fs_logger.warning("[epoch %s] skipping row without user_id: %s", epoch_str, data)
                    continue

                # Serialize all values to Firestore-safe types
                for k, v in list(data.items()):
                    data[k] = _serialize_value(v)

                # Path: users/{user_id}/{collection_name}/{auto_doc_id}
                doc_ref = (
                    firestore_client.collection("users")
                    .document(str(user_id))
                    .collection(collection_name)
                    .document()
                )

                fs_batch.set(doc_ref, data)
                ops_in_current_batch += 1

                if ops_in_current_batch >= batch_size:
                    _commit_with_retries(fs_batch, max_retries=max_retries)
                    docs_written += ops_in_current_batch
                    ops_in_current_batch = 0
                    fs_batch = firestore_client.batch()

            if ops_in_current_batch > 0:
                _commit_with_retries(fs_batch, max_retries=max_retries)
                docs_written += ops_in_current_batch

            fs_logger.info("[epoch %s] wrote %d docs under users/*/%s/", epoch_str, docs_written, collection_name)

        except Exception as e:
            fs_logger.exception("[epoch %s] Firestore write failed: %s", epoch_str, e)
            raise

    return write_batch_to_firestore


## cleaning stream queries

In [10]:
# start streams separately with their own checkpoints
# Pass the initialized Firestore client 'db' to your writer functions
hr_query = (
    hr_clean
    .writeStream
    .foreachBatch(make_firestore_writer("heart_rate", db)) # Pass the Firestore client here
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/heart_rate")
    .outputMode("append")
    .start()
)
cal_clean.writeStream.format("console").outputMode("append").start()

cal_query = (
    cal_clean
    .writeStream
    .foreachBatch(make_firestore_writer("calories", db)) # Pass the Firestore client here
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/calories")
    .outputMode("append")
    .start()
)

st_query = (
    st_clean
    .writeStream
    .foreachBatch(make_firestore_writer("steps", db)) # Pass the Firestore client here
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/steps")
    .outputMode("append")
    .start()
)
# Start streams for each detection type
hr_alerts = (
    hr_clean.writeStream
    .foreachBatch(lambda batch_df, epoch_id: detect_heart_rate_anomalies(batch_df, db))
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/alerts_hr")
    .start()
)

cal_alerts = (
    cal_clean.writeStream
    .foreachBatch(lambda batch_df, epoch_id: detect_low_calories(batch_df, db))
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/alerts_calories")
    .start()
)

st_alerts = (
    st_clean.writeStream
    .foreachBatch(lambda batch_df, epoch_id: detect_inactivity(batch_df, db))
    .option("checkpointLocation", f"{CHECKPOINT_BASE}/alerts_steps")
    .start()
)

spark.streams.awaitAnyTermination()


print("Started all streams. Waiting...")
spark.streams.awaitAnyTermination()

Started all streams. Waiting...
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+--------+---------+
|user_id|calories|timestamp|
+-------+--------+---------+
+-------+--------+---------+



[INFO] 2025-11-06 09:38:29,947 - [epoch 2741] starting write: collection=heart_rate rows=1022
[INFO] 2025-11-06 09:38:30,164 - [epoch 55] starting write: collection=steps rows=17
[INFO] 2025-11-06 09:38:30,188 - [epoch 55] starting write: collection=calories rows=17
[INFO] 2025-11-06 09:38:31,501 - [epoch 55] wrote 17 documents to Firestore collection=steps
[INFO] 2025-11-06 09:38:31,630 - [epoch 55] wrote 17 documents to Firestore collection=calories


[ok] Alert written for user=4702921684: inactivity — 0
[ok] Alert written for user=6290855005: inactivity — 0
[ok] Alert written for user=5553957443: inactivity — 0
[ok] Alert written for user=8583815059: inactivity — 0
[ok] Alert written for user=5577150313: inactivity — 0
[ok] Alert written for user=1503960366: inactivity — 0
[ok] Alert written for user=8378563200: inactivity — 0
[ok] Alert written for user=1644430081: inactivity — 0


[INFO] 2025-11-06 09:38:33,095 - [epoch 2741] wrote 1022 documents to Firestore collection=heart_rate


[ok] Alert written for user=1844505072: inactivity — 0
[ok] Alert written for user=1927972279: inactivity — 0
[ok] Alert written for user=8053475328: inactivity — 0
[ok] Alert written for user=2022484408: inactivity — 0
[ok] Alert written for user=8877689391: inactivity — 0
[ok] Alert written for user=2026352035: inactivity — 0


[INFO] 2025-11-06 09:38:33,793 - [epoch 2742] starting write: collection=heart_rate rows=4


[ok] Alert written for user=7086361926: inactivity — 0
[ok] Alert written for user=2320127002: inactivity — 0
[ok] Alert written for user=2347167796: inactivity — 0


[INFO] 2025-11-06 09:38:34,525 - [epoch 2742] wrote 4 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:38:34,771 - [epoch 2743] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:38:34,959 - [epoch 2743] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:38:35,613 - [epoch 2744] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:38:35,920 - [epoch 2744] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:38:36,555 - [epoch 2745] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:38:36,940 - [epoch 2745] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:38:37,698 - [epoch 2746] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:38:37,992 - [epoch 2746] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:38:38,641 - [epoch 2747] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:38:38,932 - [epoch 2747] wrot

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|2873212765|0.87830001115799|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:39:01,076 - [epoch 2769] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:39:01,512 - [epoch 56] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:39:01,521 - [epoch 56] wrote 1 documents to Firestore collection=steps


[ok] Alert written for user=2873212765: inactivity — 0


[INFO] 2025-11-06 09:39:01,705 - [epoch 2770] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:39:02,074 - [epoch 2770] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:39:02,703 - [epoch 2771] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:39:03,081 - [epoch 2771] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:39:03,700 - [epoch 2772] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:39:04,091 - [epoch 2772] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:39:04,733 - [epoch 2773] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:39:05,091 - [epoch 2773] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:39:05,687 - [epoch 2774] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:39:06,101 - [epoch 2774] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:39:06,695 - [epoch 2775] star

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|7007744171|1.08120000362396|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:40:00,983 - [epoch 2829] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:40:01,311 - [epoch 57] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:40:01,337 - [epoch 57] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:40:01,388 - [epoch 2829] wrote 1 documents to Firestore collection=heart_rate


[ok] Alert written for user=7007744171: inactivity — 0


[INFO] 2025-11-06 09:40:01,964 - [epoch 2830] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:40:02,470 - [epoch 2830] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:40:03,040 - [epoch 2831] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:40:03,408 - [epoch 2831] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:40:03,978 - [epoch 2832] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:40:04,416 - [epoch 2832] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:40:04,991 - [epoch 2833] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:40:05,425 - [epoch 2833] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:40:05,994 - [epoch 2834] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:40:06,413 - [epoch 2834] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:40:06,985 - [epoch 2835] star

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|3372868164|0.93529999256134|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:41:01,308 - [epoch 2889] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:41:01,312 - [epoch 58] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:41:01,345 - [epoch 58] wrote 1 documents to Firestore collection=steps


[ok] Alert written for user=3372868164: inactivity — 0


[INFO] 2025-11-06 09:41:01,695 - [epoch 2889] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:41:02,267 - [epoch 2890] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:41:02,687 - [epoch 2890] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:41:03,259 - [epoch 2891] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:41:03,699 - [epoch 2891] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:41:04,271 - [epoch 2892] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:41:04,700 - [epoch 2892] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:41:05,307 - [epoch 2893] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:41:05,714 - [epoch 2893] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:41:06,286 - [epoch 2894] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:41:06,704 - [epoch 2894] wrot

-------------------------------------------
Batch: 4
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|3977333714|8.08104991912842|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:42:00,959 - [epoch 2948] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:42:01,285 - [epoch 59] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:42:01,307 - [epoch 59] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:42:01,524 - [epoch 2949] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:42:01,979 - [epoch 2949] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:42:02,545 - [epoch 2950] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:42:02,978 - [epoch 2950] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:42:03,547 - [epoch 2951] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:42:03,984 - [epoch 2951] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:42:04,550 - [epoch 2952] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:42:04,978 - [epoch 2952] wrote 1

-------------------------------------------
Batch: 5
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|4020332650|1.37539994716644|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:43:00,836 - [epoch 3008] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:43:01,260 - [epoch 60] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:43:01,275 - [epoch 3008] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:43:01,302 - [epoch 60] wrote 1 documents to Firestore collection=steps


[ok] Alert written for user=4020332650: inactivity — 0


[INFO] 2025-11-06 09:43:01,849 - [epoch 3009] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:43:02,259 - [epoch 3009] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:43:02,827 - [epoch 3010] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:43:03,280 - [epoch 3010] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:43:03,845 - [epoch 3011] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:43:04,281 - [epoch 3011] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:43:04,844 - [epoch 3012] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:43:05,293 - [epoch 3012] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:43:05,861 - [epoch 3013] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:43:06,284 - [epoch 3013] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:43:06,846 - [epoch 3014] star

-------------------------------------------
Batch: 6
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|4057192912|1.23360002040863|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:44:01,168 - [epoch 3068] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:44:01,280 - [epoch 61] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:44:01,313 - [epoch 61] wrote 1 documents to Firestore collection=steps


[ok] Alert written for user=4057192912: inactivity — 0


[INFO] 2025-11-06 09:44:01,583 - [epoch 3068] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:44:02,145 - [epoch 3069] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:44:02,597 - [epoch 3069] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:44:03,190 - [epoch 3070] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:44:03,595 - [epoch 3070] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:44:04,159 - [epoch 3071] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:44:04,598 - [epoch 3071] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:44:05,161 - [epoch 3072] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:44:05,599 - [epoch 3072] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:44:06,161 - [epoch 3073] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:44:06,630 - [epoch 3073] wrot

-------------------------------------------
Batch: 7
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|4319703577|1.01110005378723|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:45:00,860 - [epoch 3127] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:45:01,282 - [epoch 62] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:45:01,300 - [epoch 62] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:45:01,422 - [epoch 3128] starting write: collection=heart_rate rows=1


[ok] Alert written for user=4319703577: inactivity — 0


[INFO] 2025-11-06 09:45:01,881 - [epoch 3128] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:45:02,465 - [epoch 3129] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:45:02,871 - [epoch 3129] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:45:03,432 - [epoch 3130] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:45:03,890 - [epoch 3130] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:45:04,454 - [epoch 3131] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:45:04,898 - [epoch 3131] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:45:05,458 - [epoch 3132] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:45:05,885 - [epoch 3132] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:45:06,447 - [epoch 3133] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:45:06,896 - [epoch 3133] wrot

-------------------------------------------
Batch: 8
-------------------------------------------
+----------+-----------------+-------------------+
|   user_id|         calories|          timestamp|
+----------+-----------------+-------------------+
|6962181067|0.903999984264374|2016-04-12 00:00:00|
+----------+-----------------+-------------------+



[INFO] 2025-11-06 09:46:01,181 - [epoch 3187] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:46:01,312 - [epoch 63] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:46:01,314 - [epoch 63] wrote 1 documents to Firestore collection=calories


[ok] Alert written for user=6962181067: inactivity — 0


[INFO] 2025-11-06 09:46:01,743 - [epoch 3188] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:46:02,201 - [epoch 3188] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:46:02,760 - [epoch 3189] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:46:03,204 - [epoch 3189] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:46:03,765 - [epoch 3190] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:46:04,209 - [epoch 3190] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:46:04,771 - [epoch 3191] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:46:05,199 - [epoch 3191] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:46:05,762 - [epoch 3192] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:46:06,223 - [epoch 3192] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:46:06,792 - [epoch 3193] star

-------------------------------------------
Batch: 9
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|8792009665|1.17270004749298|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:47:01,059 - [epoch 3247] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:47:01,275 - [epoch 64] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:47:01,278 - [epoch 64] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:47:01,491 - [epoch 3247] wrote 1 documents to Firestore collection=heart_rate


[ok] Alert written for user=8792009665: inactivity — 0


[INFO] 2025-11-06 09:47:02,056 - [epoch 3248] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:47:02,502 - [epoch 3248] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:47:03,062 - [epoch 3249] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:47:03,485 - [epoch 3249] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:47:04,049 - [epoch 3250] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:47:04,501 - [epoch 3250] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:47:05,063 - [epoch 3251] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:47:05,509 - [epoch 3251] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:47:06,074 - [epoch 3252] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:47:06,505 - [epoch 3252] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:47:07,065 - [epoch 3253] star

-------------------------------------------
Batch: 10
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|4388161847|1.30429995059967|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:48:01,300 - [epoch 65] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:48:01,340 - [epoch 3307] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:48:01,361 - [epoch 65] wrote 1 documents to Firestore collection=calories


[ok] Alert written for user=4388161847: inactivity — 0


[INFO] 2025-11-06 09:48:01,791 - [epoch 3307] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:48:02,355 - [epoch 3308] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:48:02,788 - [epoch 3308] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:48:03,348 - [epoch 3309] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:48:03,792 - [epoch 3309] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:48:04,353 - [epoch 3310] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:48:04,800 - [epoch 3310] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:48:05,361 - [epoch 3311] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:48:05,799 - [epoch 3311] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:48:06,373 - [epoch 3312] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:48:06,810 - [epoch 3312] wrot

-------------------------------------------
Batch: 11
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|6775888955|1.27849996089935|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:49:01,081 - [epoch 3366] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:49:01,290 - [epoch 66] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:49:01,315 - [epoch 66] wrote 1 documents to Firestore collection=steps


[ok] Alert written for user=6775888955: inactivity — 0


[INFO] 2025-11-06 09:49:01,639 - [epoch 3367] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:49:02,094 - [epoch 3367] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:49:02,657 - [epoch 3368] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:49:03,090 - [epoch 3368] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:49:03,656 - [epoch 3369] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:49:04,094 - [epoch 3369] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:49:04,664 - [epoch 3370] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:49:05,097 - [epoch 3370] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:49:05,657 - [epoch 3371] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:49:06,119 - [epoch 3371] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:49:06,680 - [epoch 3372] star

-------------------------------------------
Batch: 12
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|4445114986|1.14939999580383|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:50:00,945 - [epoch 3426] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:50:01,314 - [epoch 67] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:50:01,342 - [epoch 67] wrote 1 documents to Firestore collection=calories
[INFO] 2025-11-06 09:50:01,407 - [epoch 3426] wrote 1 documents to Firestore collection=heart_rate


[ok] Alert written for user=4445114986: inactivity — 0


[INFO] 2025-11-06 09:50:01,979 - [epoch 3427] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:50:02,385 - [epoch 3427] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:50:02,945 - [epoch 3428] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:50:03,386 - [epoch 3428] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:50:03,945 - [epoch 3429] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:50:04,405 - [epoch 3429] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:50:04,965 - [epoch 3430] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:50:05,408 - [epoch 3430] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:50:05,967 - [epoch 3431] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:50:06,402 - [epoch 3431] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:50:06,964 - [epoch 3432] star

-------------------------------------------
Batch: 13
-------------------------------------------
+----------+-----------------+-------------------+
|   user_id|         calories|          timestamp|
+----------+-----------------+-------------------+
|4558609924|0.928099989891052|2016-04-12 00:00:00|
+----------+-----------------+-------------------+



[INFO] 2025-11-06 09:51:01,259 - [epoch 3486] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:51:01,292 - [epoch 68] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:51:01,311 - [epoch 68] wrote 1 documents to Firestore collection=calories


[ok] Alert written for user=4558609924: inactivity — 0


[INFO] 2025-11-06 09:51:01,705 - [epoch 3486] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:51:02,276 - [epoch 3487] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:51:02,718 - [epoch 3487] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:51:03,301 - [epoch 3488] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:51:03,710 - [epoch 3488] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:51:04,298 - [epoch 3489] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:51:04,718 - [epoch 3489] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:51:05,317 - [epoch 3490] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:51:05,726 - [epoch 3490] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:51:06,312 - [epoch 3491] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:51:06,733 - [epoch 3491] wrot

-------------------------------------------
Batch: 14
-------------------------------------------
+----------+----------------+-------------------+
|   user_id|        calories|          timestamp|
+----------+----------------+-------------------+
|4702921684|1.40079998970032|2016-04-12 00:00:00|
+----------+----------------+-------------------+



[INFO] 2025-11-06 09:52:01,022 - [epoch 3545] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:52:01,314 - [epoch 69] wrote 1 documents to Firestore collection=steps
[INFO] 2025-11-06 09:52:01,317 - [epoch 69] wrote 1 documents to Firestore collection=calories
                                                                                

[ok] Alert written for user=4702921684: inactivity — 0


[INFO] 2025-11-06 09:52:01,612 - [epoch 3546] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:52:02,012 - [epoch 3546] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:52:02,583 - [epoch 3547] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:52:03,029 - [epoch 3547] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:52:03,588 - [epoch 3548] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:52:04,025 - [epoch 3548] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:52:04,587 - [epoch 3549] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:52:05,030 - [epoch 3549] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:52:05,588 - [epoch 3550] starting write: collection=heart_rate rows=1
[INFO] 2025-11-06 09:52:06,029 - [epoch 3550] wrote 1 documents to Firestore collection=heart_rate
[INFO] 2025-11-06 09:52:06,593 - [epoch 3551] star

In [9]:
for q in spark.streams.active:
    q.stop()
print("✅ Stopped all active queries.")


✅ Stopped all active queries.
