## Real-Time Streaming Data Pipeline (Kafka → Spark → Cassandra)

This notebook walks through an end-to-end real-time data engineering pipeline:

1. Generate mock user data from an API (Random User)
2. Publish events to **Kafka** (topic: `users_created`)
3. Process events with **Spark Structured Streaming**
4. Store results in **Cassandra** (keyspace: `spark_streams`, table: `created_users`)
5. Validate and visualize the stored data


### Project goal
Build a small, reproducible streaming pipeline you can run locally with Docker.

- **Ingestion:** Kafka
- **Processing:** Spark Structured Streaming
- **Storage:** Cassandra (with TTL to avoid unlimited growth)
- **Orchestration / scheduling (optional):** Airflow


### System architecture

`Producer (Python) → Kafka → Spark Streaming → Cassandra → Analytics / Visualization`

Key components:
- **Python Producer:** generates events and sends them to Kafka
- **Kafka:** message broker / event log
- **Spark:** reads Kafka stream, parses JSON, applies basic quality checks, writes to Cassandra
- **Cassandra:** scalable NoSQL storage (table has **TTL** so old rows expire automatically)


### Technology stack
- **Orchestration:** Apache Airflow
- **Streaming:** Apache Kafka + Confluent Platform
- **Processing:** Apache Spark (Structured Streaming)
- **Storage:** Cassandra
- **Containerization:** Docker Compose
- **Language:** Python


### Notes
- This notebook contains both **learning notes** and **pipeline code**.
- Code cells use container hostnames inside Docker (e.g., `broker:29092`, `cassandra:9042`).


## Quick Kafka primer (very short)
- **Producer** publishes messages to a **topic**
- **Consumer** reads messages from a topic
- Messages are ordered per partition and tracked using **offsets**


### Kafka concepts
- **Topic:** stream/category of events (like a channel)
- **Partition:** parallelism & ordering unit
- **Broker:** Kafka server
- **Consumer group:** multiple consumers sharing the load


### Kafka data flow
1. Producer sends events to a topic (`users_created`)
2. Kafka stores events durably
3. Spark consumes the topic continuously
4. Spark writes processed rows to Cassandra


## Getting started
Make sure Docker containers are running (`docker compose up -d`).


### Step 0: Check versions inside containers (optional)
The next cell prints Python and Spark versions.


Check Python and Spark versions inside the Docker container

In [None]:
!python3 --version
!spark-submit --version

##### 1. Create mock streaming data from an API

Fetch data from an API

In [None]:
import requests

def get_data():
 # Fetch a random user
 res = requests.get("https://randomuser.me/api/") # data 
 res = res.json() # Convert JSON to dict
    res = res['results'][0]

    return res


In [None]:
import json

print(json.dumps(get_data(), indent=3))

Format the data as needed

In [None]:
import uuid

def format_data(res):
 # Format the data
    data = {}
    location = res['location']
    data['id'] = str(uuid.uuid4())
    data['first_name'] = res['name']['first']
    data['last_name'] = res['name']['last']
    data['gender'] = res['gender']
    data['address'] = f"{str(location['street']['number'])} {location['street']['name']}, " \
                      f"{location['city']}, {location['state']}, {location['country']}"
    data['post_code'] = location['postcode']
    data['email'] = res['email']
    data['username'] = res['login']['username']
    data['dob'] = res['dob']['date']
    data['registered_date'] = res['registered']['date']
    data['phone'] = res['phone']
    data['picture'] = res['picture']['medium']

    print(data)

    return data

In [None]:
format_data(get_data())

Mock Api Streaming Data

In [None]:
def stream_data():
    import json
    from kafka import KafkaProducer
    import time
    import logging

 producer = KafkaProducer(bootstrap_servers=['broker:29092'], max_block_ms=5000) # KafkaProducer Kafka Boker
    curr_time = time.time() # real time

    while True:
        if time.time() > curr_time + 10: # 1 minute
            break
        try:
            res = get_data()
            res = format_data(res)

            print(res)
 producer.send('users_created', json.dumps(res).encode('utf-8')) # res json boker
        except Exception as e:
            logging.error(f'An error occured: {e}')
            continue

In [None]:
# Function Streaming Data from API
stream_data()

#### **ETL (Extract, Transform, Load) Real-time** 

 (user) Kafka , , Cassandra

In [None]:
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


In [None]:
# Spark
def create_spark_connection():
    s_conn = None

    try:
        s_conn = (SparkSession.builder 
            .appName('SparkDataStreaming')
            .master("spark://spark-master:7077")
 .config('spark.jars.packages', # Spark
 "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0," # Load the Cassandra connector package
 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") # Load the Kafka connector package 
 .config('spark.cassandra.connection.host', 'localhost') # Cassandra
            .getOrCreate())
        
        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn

In [None]:
# Kafka connection helper
def connect_to_kafka(spark_conn):
    spark_df = None
    try:
        spark_df = (spark_conn.readStream \
            .format('kafka') \
 .option('kafka.bootstrap.servers', 'broker:29092') # Kafka Broker
 .option('subscribe', 'users_created') # Kafka Topic 
 .option('startingOffsets', 'earliest') # earliest () latest ()
 .option("maxOffsetsPerTrigger", 100) # 50 records
            .load())
        logging.info("kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df

In [None]:
# Cassandra
def create_cassandra_connection():
    try:
        # connecting to the cassandra cluster
 cluster = Cluster(contact_points=['cassandra'], port=9042) # hostname Cassandra
 cas_session = cluster.connect() # 

        logging.info("Connected to Cassandra!")
        return cas_session
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
        return None

In [None]:
def create_keyspace(session):
 # Database / Schema Cassandra Keyspace
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS spark_streams
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)

    print("Keyspace created successfully!")

In [None]:
def create_table(session):
 # Create table
    session.execute("""
    CREATE TABLE IF NOT EXISTS spark_streams.created_users (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT) WITH default_time_to_live = 1200;
    """)

    print("Table created successfully!")

In [None]:
# Spark Data Quality Checks
def create_selection_df_from_kafka(spark_df):
    """
 Kafka Spark DataFrame Data Quality Checks
    - Schema Enforcement
    - Missing Value Check
    - Business Rule Validation (regex, allowed values)
    - Deduplication
 - validation Dead-letter topic (optional)
    
    Args:
 spark_df: input streaming DataFrame Kafka
 kafka_producer_invalid: KafkaProducer invalid (optional)
    
    Returns:
 validated_df: Spark DataFrame (validated)
    """
    # -------------------- 1. Define Schema --------------------
 # Define the input schema
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])

    # -------------------- 2. Parse JSON and Enforce Schema --------------------
 # Read and parse Kafka data into a Spark DataFrame 
 df = (spark_df.selectExpr("CAST(value AS STRING)") # value String 
 .select(from_json(col('value'), schema).alias('data')).select("data.*")) # json value schema
    logging.info("Schema applied and JSON parsed.")

    # -------------------- 3. Missing Value Check --------------------
 df_non_null = df.dropna(subset=["id", "email", "registered_date"]) # null col 
    logging.info("Dropped records with null id, email, or registered_date.")

    # -------------------- 4. Business Rule Validation --------------------
 df_valid = df_non_null.filter( # filter regex 
 col("email").rlike(r".+@.+\..+") & # email pattern
 col("gender").isin("male", "female") & # 2 
        col("id").rlike("^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
    )
    logging.info("Applied business rule validation.")

    # -------------------- 5. Deduplicate --------------------
 df_deduped = df_valid.dropDuplicates(["id"]) # id 
    logging.info("Deduplicated based on id.")

    return df_deduped

In [None]:
# create spark connection
spark_conn = create_spark_connection() # Spark

if spark_conn is not None:
        # connect to kafka with spark connection
 spark_df = connect_to_kafka(spark_conn) # Kafka
 selection_df = create_selection_df_from_kafka(spark_df) # Kafka
 session = create_cassandra_connection() # Cassandra

        # Testing Data Flow on Console
        # query = (selection_df
        #  .writeStream
        #  .format("console")
        #  .outputMode("append")
        #  .start())

 # query.awaitTermination(30) # wait 30s query data
 # query.stop() # stop streaming query 30s

        if session is not None:
 create_keyspace(session) # Database / Schema Cassandra Keyspace
 create_table(session) # Create table

            logging.info("Streaming is being started...")

 # Database
            streaming_query = (selection_df.writeStream
                               .format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users')
                               .option("spark.cassandra.connection.host", "cassandra")
                               .option("spark.cassandra.connection.port", "9042")
                               .option("spark.cassandra.connection.local_dc", "datacenter1")
 .trigger(processingTime="10 seconds") # 10 
                               .start())

 streaming_query.awaitTermination(30) # query data
            streaming_query.stop()

## Visualization (Cassandra → Pandas → Charts)

The pipeline stores rows in Cassandra:
- keyspace: `spark_streams`
- table: `created_users`

The table has **TTL** (default_time_to_live), so rows expire automatically after a while.

Run the next cells to load recent rows into a DataFrame and visualize them.


In [None]:
# If you see missing packages, uncomment the next line:
# !pip -q install cassandra-driver pandas matplotlib plotly

import pandas as pd
from cassandra.cluster import Cluster

def connect_cassandra():
    """Return a Cassandra session connected to keyspace spark_streams.

    We try common hostnames used in Docker Compose:
    - cassandra_db (service name)
    - cassandra (container name / hostname)
    - 127.0.0.1 (if you exposed port 9042 to the host)
    """
    last_err = None
    for host in ["cassandra_db", "cassandra", "127.0.0.1"]:
        try:
            cluster = Cluster([host], port=9042)
            session = cluster.connect("spark_streams")
            print(f"Connected to Cassandra via host: {host}")
            return session
        except Exception as e:
            last_err = e

    raise RuntimeError(f"Could not connect to Cassandra on any host. Last error: {last_err}")

session = connect_cassandra()


In [None]:
# Load up to N rows from Cassandra
N = 2000
rows = session.execute(f"SELECT * FROM created_users LIMIT {N}")
df = pd.DataFrame(rows)

print("Rows loaded:", len(df))
df.head()


In [None]:
# Basic data quality checks

# 1) Missing values per column
missing = df.isna().sum().sort_values(ascending=False)
missing


In [None]:
# Parse the registered_date column (stored as TEXT) if present
# This enables time-based charts.

if "registered_date" in df.columns:
    df["registered_date"] = pd.to_datetime(df["registered_date"], errors="coerce")

(df.dtypes)


In [None]:
import matplotlib.pyplot as plt

# Chart 1: gender distribution (if present)
if "gender" in df.columns:
    ax = df["gender"].value_counts().plot(kind="bar")
    ax.set_title("Gender distribution (created_users)")
    ax.set_xlabel("gender")
    ax.set_ylabel("count")
    plt.show()
else:
    print("No 'gender' column found.")


In [None]:
# Chart 2: users created per minute (if registered_date parsed)
if "registered_date" in df.columns and df["registered_date"].notna().any():
    tmp = df.dropna(subset=["registered_date"]).set_index("registered_date")
    ax = tmp.resample("1min").size().plot()
    ax.set_title("Users created per minute")
    ax.set_xlabel("time")
    ax.set_ylabel("users")
    plt.show()
else:
    print("registered_date not available or not parseable.")


In [None]:
# Chart 3: top email domains (quick, useful)
if "email" in df.columns:
    domains = df["email"].astype(str).str.split("@").str[-1]
    ax = domains.value_counts().head(10).plot(kind="bar")
    ax.set_title("Top 10 email domains")
    ax.set_xlabel("domain")
    ax.set_ylabel("count")
    plt.show()
else:
    print("No 'email' column found.")


### Optional: interactive charts (Plotly)
If you want nicer interactive visuals, run the cell below.


In [None]:
# Optional interactive chart
try:
    import plotly.express as px
    if "gender" in df.columns:
        fig = px.histogram(df, x="gender", title="Gender distribution (interactive)")
        fig.show()
    else:
        print("No 'gender' column found.")
except Exception as e:
    print("Plotly not available or failed to render:", e)
