# Spark Streaming

Real-time recommendation engine with:
- Session windows (7-day gap for historical data - because we do not have in minute wise or hourly data)
- Job A: Pair generation with Redis pipelining
- Job B: Similarity computation (background thread)
- Job C: Recommendation generation

In [None]:
import shutil
import os

CHECKPOINT_DIR = "../checkpoints/streaming"

# clear checkpoints for fresh run
if os.path.exists(CHECKPOINT_DIR):
    shutil.rmtree(CHECKPOINT_DIR)
    print(f"Cleared: {CHECKPOINT_DIR}")
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

## Configuration

In [None]:
import os
import time
import random
import math
import threading
from datetime import datetime
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class Config:
    # kafka
    kafka_servers: str = "kafka-broker-1:19092,kafka-broker-2:19092,kafka-broker-3:19092"
    kafka_topic: str = "movielens-ratings"
    
    # spark
    trigger_interval: str = "10 seconds"
    max_offsets: int = 500000
    
    # streaming
    session_gap: str = "7 days"
    watermark_delay: str = "1 day"
    max_session_items: int = 50
    rating_threshold: float = 3.5
    
    # redis
    pipeline_batch: int = 5000
    top_k_neighbors: int = 500
    top_k_recs: int = 10
    
    # similarity
    similarity_method: str = "conditional"  # conditional, jaccard, cosine, pmi

config = Config()

# background job B
BACKGROUND_JOB_B = True
BACKGROUND_INTERVAL = 60

print(f"Kafka: {config.kafka_servers}")
print(f"Session gap: {config.session_gap}, Max items: {config.max_session_items}")

## Redis Connection

In [None]:
import redis
from redis.cluster import RedisCluster, ClusterNode

def get_redis():
    nodes = [
        ClusterNode("redis-node-1", 6379),
        ClusterNode("redis-node-2", 6379),
        ClusterNode("redis-node-3", 6379),
    ]
    try:
        r = RedisCluster(startup_nodes=nodes, decode_responses=True)
        r.ping()
        print("Connected to Redis Cluster")
        return r
    except Exception as e:
        print(f"Cluster failed ({e}), trying standalone...")
        r = redis.Redis(host="redis-node-1", port=6379, decode_responses=True)
        r.ping()
        print("Connected to Redis (standalone)")
        return r

redis_conn = get_redis()

# clear existing data
for pattern in ["user:*", "item:*", "metrics:*"]:
    for key in redis_conn.scan_iter(match=pattern, count=1000):
        redis_conn.delete(key)
print("Redis cleared")

## Spark Session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, LongType, StringType

# detect spark version for correct kafka package
import pyspark
spark_version = pyspark.__version__
scala_version = "2.13" if spark_version.startswith("4.") else "2.12"
kafka_pkg = f"org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}"

spark = (
    SparkSession.builder
    .appName("RecommendationEngine")
    .master(os.getenv("SPARK_MASTER_URL", "spark://spark-master:7077"))
    .config("spark.jars.packages", kafka_pkg)
    .config("spark.sql.shuffle.partitions", "24")
    .config("spark.streaming.backpressure.enabled", "true")
    .config("spark.sql.adaptive.enabled", "false")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark_version}, Scala {scala_version}")

# schema for rating events
schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("movie_id", IntegerType()),
    StructField("rating", FloatType()),
    StructField("original_timestamp", LongType()),
    StructField("event_timestamp", LongType()),
    StructField("event_id", StringType())
])

## Job A: Pair Generation

In [None]:
class JobA:
    """Session-based pair generation with pipelining."""
    
    def __init__(self, redis_client, cfg):
        self.r = redis_client
        self.cfg = cfg
        self.batch_count = 0
        self.total_pairs = 0
    
    def _sample_session(self, events):
        """Smart sampling: keep first/last items, sample middle."""
        max_items = self.cfg.max_session_items
        if len(events) <= max_items:
            return events
        
        keep = min(10, max_items // 4)
        first = events[:keep]
        last = events[-keep:]
        middle = events[keep:-keep]
        
        sample_size = max_items - 2 * keep
        if len(middle) > sample_size:
            sampled = sorted(random.sample(middle, sample_size), key=lambda x: x['event_time'])
        else:
            sampled = middle
        
        return first + sampled + last
    
    def process(self, batch_df, batch_id):
        """Process a batch of sessions."""
        start = time.time()
        rows = batch_df.collect()
        
        if not rows:
            return
        
        # parse sessions
        user_updates = defaultdict(set)
        cooc_updates = defaultdict(lambda: defaultdict(float))
        
        for row in rows:
            user_id = row['user_id']
            events = row['events'] or []
            
            # filter positive ratings
            positive = [e for e in events if e['rating'] >= self.cfg.rating_threshold]
            if len(positive) < 2:
                continue
            
            # sample if needed
            positive = self._sample_session(positive)
            
            # update user history
            for e in positive:
                user_updates[user_id].add(e['movie_id'])
            
            # generate pairs with proximity weighting
            n = len(positive)
            for i in range(n):
                for j in range(i + 1, n):
                    m1, m2 = positive[i]['movie_id'], positive[j]['movie_id']
                    weight = 1.0 / (1 + abs(j - i) * 0.1)  # closer = higher weight
                    if m1 < m2:
                        cooc_updates[m1][m2] += weight
                    else:
                        cooc_updates[m2][m1] += weight
        
        # write to redis with pipelining
        pipe = self.r.pipeline(transaction=False)
        pipe_count = 0
        
        # user histories
        for uid, movies in user_updates.items():
            pipe.sadd(f"user:history:{uid}", *[str(m) for m in movies])
            pipe_count += 1
            if pipe_count >= self.cfg.pipeline_batch:
                pipe.execute()
                pipe = self.r.pipeline(transaction=False)
                pipe_count = 0
        
        # co-occurrence (symmetric)
        pair_count = 0
        for m1, neighbors in cooc_updates.items():
            for m2, weight in neighbors.items():
                pipe.zincrby(f"item:cooc:{m1}", weight, str(m2))
                pipe.zincrby(f"item:cooc:{m2}", weight, str(m1))
                pipe_count += 2
                pair_count += 1
                if pipe_count >= self.cfg.pipeline_batch:
                    pipe.execute()
                    pipe = self.r.pipeline(transaction=False)
                    pipe_count = 0
        
        if pipe_count > 0:
            pipe.execute()
        
        elapsed = time.time() - start
        self.batch_count += 1
        self.total_pairs += pair_count
        
        # metrics
        self.r.hset("metrics:job_a", mapping={
            "batch_count": self.batch_count,
            "total_pairs": self.total_pairs,
            "last_sessions": len(rows),
            "last_time_ms": int(elapsed * 1000)
        })
        
        print(f"[JobA] batch={batch_id} sessions={len(rows)} pairs={pair_count} time={elapsed:.1f}s")
        return list(user_updates.keys())

## Job B: Similarity Computation

In [None]:
class JobB:
    """Compute item similarity from co-occurrence."""
    
    def __init__(self, redis_client, cfg):
        self.r = redis_client
        self.cfg = cfg
        self.run_count = 0
    
    def compute(self):
        """Update similarity matrix for all items."""
        start = time.time()
        
        # get all items with co-occurrence data
        items = [k.split(":")[-1] for k in self.r.scan_iter(match="item:cooc:*", count=1000)]
        if not items:
            return
        
        # get item counts for normalization
        item_counts = {}
        pipe = self.r.pipeline(transaction=False)
        for item in items:
            pipe.zcard(f"item:cooc:{item}")
        counts = pipe.execute()
        for item, count in zip(items, counts):
            if count:
                total = sum(float(s) for _, s in self.r.zrange(f"item:cooc:{item}", 0, -1, withscores=True))
                item_counts[item] = total
        
        # compute similarities
        method = self.cfg.similarity_method
        items_processed = 0
        
        for item in items:
            coocs = self.r.zrevrange(f"item:cooc:{item}", 0, self.cfg.top_k_neighbors - 1, withscores=True)
            if not coocs:
                continue
            
            similarities = {}
            count_i = item_counts.get(item, 1)
            
            for other, cooc in coocs:
                count_j = item_counts.get(other, 1)
                
                if method == "conditional":
                    sim = cooc / count_i if count_i > 0 else 0
                elif method == "jaccard":
                    sim = cooc / (count_i + count_j - cooc) if (count_i + count_j - cooc) > 0 else 0
                elif method == "cosine":
                    sim = cooc / math.sqrt(count_i * count_j) if count_i > 0 and count_j > 0 else 0
                else:  # pmi
                    total = sum(item_counts.values()) or 1
                    pmi = math.log((cooc * total) / (count_i * count_j + 1) + 1)
                    sim = max(0, pmi)
                
                similarities[other] = sim
            
            if similarities:
                self.r.delete(f"item:sim:{item}")
                self.r.zadd(f"item:sim:{item}", similarities)
                items_processed += 1
        
        elapsed = time.time() - start
        self.run_count += 1
        
        self.r.hset("metrics:job_b", mapping={
            "run_count": self.run_count,
            "items_processed": items_processed,
            "last_time_ms": int(elapsed * 1000),
            "method": method
        })
        
        print(f"[JobB] items={items_processed} method={method} time={elapsed:.1f}s")

## Job C: Recommendations

In [None]:
class JobC:
    """Generate recommendations from similarity matrix."""
    
    def __init__(self, redis_client, cfg):
        self.r = redis_client
        self.cfg = cfg
        self.run_count = 0
    
    def generate(self, user_ids=None):
        """Generate recommendations for users."""
        start = time.time()
        
        # get users to process
        if user_ids is None:
            user_ids = [k.split(":")[-1] for k in self.r.scan_iter(match="user:history:*", count=1000)]
        
        if not user_ids:
            return
        
        users_updated = 0
        
        for uid in user_ids:
            # get user history
            history = self.r.smembers(f"user:history:{uid}")
            if not history:
                continue
            
            # aggregate similarity scores
            scores = defaultdict(float)
            for movie in history:
                sims = self.r.zrevrange(f"item:sim:{movie}", 0, 50, withscores=True)
                for other, score in sims:
                    if other not in history:
                        scores[other] += score
            
            # get top-k
            if scores:
                top = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:self.cfg.top_k_recs]
                self.r.delete(f"user:recs:{uid}")
                self.r.zadd(f"user:recs:{uid}", dict(top))
                users_updated += 1
        
        elapsed = time.time() - start
        self.run_count += 1
        
        self.r.hset("metrics:job_c", mapping={
            "run_count": self.run_count,
            "users_updated": users_updated,
            "last_time_ms": int(elapsed * 1000)
        })
        
        print(f"[JobC] users={users_updated} time={elapsed:.1f}s")

## Background Similarity Runner

In [None]:
class BackgroundRunner:
    """Run Job B in background thread."""
    
    def __init__(self, job_b, interval):
        self.job_b = job_b
        self.interval = interval
        self.running = False
        self.thread = None
    
    def _run(self):
        while self.running:
            try:
                self.job_b.compute()
            except Exception as e:
                print(f"[BackgroundRunner] error: {e}")
            time.sleep(self.interval)
    
    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self._run, daemon=True)
        self.thread.start()
        print(f"[BackgroundRunner] started (interval={self.interval}s)")
    
    def stop(self):
        self.running = False
        if self.thread:
            self.thread.join(timeout=5)
        print("[BackgroundRunner] stopped")

## Create Jobs and Stream

In [None]:
from pyspark.sql.functions import col, from_json, session_window, collect_list, struct, from_unixtime

# create jobs
job_a = JobA(redis_conn, config)
job_b = JobB(redis_conn, config)
job_c = JobC(redis_conn, config)

# background runner
background_runner = BackgroundRunner(job_b, BACKGROUND_INTERVAL) if BACKGROUND_JOB_B else None

# kafka stream
raw_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", config.kafka_servers)
    .option("subscribe", config.kafka_topic)
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", config.max_offsets)
    .option("failOnDataLoss", "false")
    .load()
)

# parse events
parsed = (
    raw_stream
    .select(from_json(col("value").cast("string"), schema).alias("e"))
    .select(
        col("e.user_id").alias("user_id"),
        col("e.movie_id").alias("movie_id"),
        col("e.rating").alias("rating"),
        from_unixtime(col("e.original_timestamp")).cast("timestamp").alias("event_time")
    )
    .filter(col("user_id").isNotNull())
)

# session windows
sessionized = (
    parsed
    .withWatermark("event_time", config.watermark_delay)
    .groupBy(
        col("user_id"),
        session_window(col("event_time"), config.session_gap)
    )
    .agg(
        collect_list(
            struct("movie_id", "rating", "event_time")
        ).alias("events")
    )
)

print("Stream configurion Done - Ready to process")

## Pipeline Processor

In [None]:
def process_batch(batch_df, batch_id):
    """Process batch: Job A -> Job C (Job B runs in background)."""
    try:
        # job A: pairs
        affected_users = job_a.process(batch_df, batch_id)
        
        # job C: recommendations (only for affected users)
        if affected_users:
            job_c.generate(affected_users)
    except Exception as e:
        print(f"[Pipeline] batch {batch_id} error: {e}")

## Start Streaming

In [None]:
# start background job B
if background_runner:
    background_runner.start()

# start streaming
query = (
    sessionized
    .writeStream
    .foreachBatch(process_batch)
    .trigger(processingTime=config.trigger_interval)
    .option("checkpointLocation", f"{CHECKPOINT_DIR}/pipeline")
    .queryName("recommendation_pipeline")
    .start()
)

print(f"Streaming started: {query.name}")
print(f"Active: {query.isActive}")

## Monitor Progress

In [None]:
from IPython.display import clear_output

def monitor(interval=10, iterations=None):
    """Show pipeline metrics."""
    i = 0
    while query.isActive:
        clear_output(wait=True)
        
        print(f"=== Pipeline Status ({datetime.now().strftime('%H:%M:%S')}) ===")
        print(f"Query active: {query.isActive}")
        
        # job metrics
        for job in ["job_a", "job_b", "job_c"]:
            m = redis_conn.hgetall(f"metrics:{job}")
            if m:
                print(f"\n{job.upper()}: {dict(m)}")
        
        # key counts
        cooc = len(list(redis_conn.scan_iter(match="item:cooc:*", count=100)))
        sim = len(list(redis_conn.scan_iter(match="item:sim:*", count=100)))
        recs = len(list(redis_conn.scan_iter(match="user:recs:*", count=100)))
        print(f"\nKeys: cooc={cooc} sim={sim} recs={recs}")
        
        i += 1
        if iterations and i >= iterations:
            break
        time.sleep(interval)

# monitor realtime
# monitor(interval=10, iterations=30)

In [None]:
# wait for streaming to complete (or interrupt manually)
try:
    query.awaitTermination()
except KeyboardInterrupt:
    print("Interrupted")

## Check Results

In [None]:
# count keys
cooc_keys = list(redis_conn.scan_iter(match="item:cooc:*", count=1000))
sim_keys = list(redis_conn.scan_iter(match="item:sim:*", count=1000))
history_keys = list(redis_conn.scan_iter(match="user:history:*", count=1000))
rec_keys = list(redis_conn.scan_iter(match="user:recs:*", count=1000))

print(f"Co-occurrence keys: {len(cooc_keys)}")
print(f"Similarity keys: {len(sim_keys)}")
print(f"User history keys: {len(history_keys)}")
print(f"Recommendation keys: {len(rec_keys)}")

In [None]:
# sample recommendations
if rec_keys:
    sample = rec_keys[0]
    uid = sample.split(":")[-1]
    recs = redis_conn.zrevrange(sample, 0, 9, withscores=True)
    print(f"\nSample recommendations for user {uid}:")
    for movie, score in recs:
        print(f"  Movie {movie}: {score:.3f}")

## Stop Streaming

In [None]:
# stop when done
try:
    query.stop()
    print("Query stopped")
except:
    pass

if background_runner:
    background_runner.stop()
