- spike_flag: Easily filter/aggregate viral traffic events.
- product_id, sku: Enables product-level analytics and joins with other domains.
- cart_spike_magnitude: Quantifies demand surges for anomaly detection and dashboards.

In [1]:
# Welcome to your new notebook
# Clickstream Event Simulator Notebook

import json
import uuid
import random
import time
from datetime import datetime
import threading


In [2]:
!pip install azure-eventhub

Collecting azure-eventhub
  Downloading azure_eventhub-5.15.0-py3-none-any.whl.metadata (73 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.1/73.1 kB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m00:01[0m
Downloading azure_eventhub-5.15.0-py3-none-any.whl (327 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m327.8/327.8 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25hInstalling collected packages: azure-eventhub
Successfully installed azure-eventhub-5.15.0


In [3]:
from azure.eventhub import EventHubProducerClient, EventData

# -- Configurable Variables --
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://esehmtcy1fnna28n1janu7yk.servicebus.windows.net/;SharedAccessKeyName=key_fa045807-0fdc-4e3f-924c-7278d3b6f88c;SharedAccessKey=c3BFsqEJEODb1MFC9zL4IfHeQrH6VLgFy+AEhKzxLOY=;EntityPath=es_b3a98257-45d8-490a-a086-75712a99d0d5"
EVENT_HUB_NAME = "es_b3a98257-45d8-490a-a086-75712a99d0d5"
EVENTS_PER_SECOND = 10  # Adjust simulation load

USER_COUNTRIES = [
    ("DE", "Germany"), ("UK", "United Kingdom"), ("US", "United States"),
    ("FR", "France"), ("IT", "Italy"), ("ES", "Spain"),
    ("NL", "Netherlands"), ("IN", "India"), ("CN", "China")
]

PAGES = ["/", "/genz-pros", "/altars", "/colours", "/kids", "/cart", "/checkout"]
PRODUCT_IDS = [f"PROD{4000 + i}" for i in range(20)]
SKU = [f"SKU{4000 + i}" for i in range(20)]

EVENT_TYPES = [
    "page_view", "product_click", "add_to_cart", "remove_from_cart",
    "checkout_initiated", "purchase_completed", "account_created",
    "newsletter_subscribed", "newsletter_unsubscribed"
]

REFERRAL_TYPES = [
    "organic_search", "facebook", "instagram", "tiktok", "pinterest", "twitter", "direct", "other_social", "affiliate"
]



In [4]:
# --- Helper Functions ---
def random_ip():
    return ".".join(str(random.randint(1, 254)) for _ in range(4))

def now_utc():
    return datetime.utcnow().isoformat() + "Z"

def random_click_path():
    start_pages = ["/", "/genz-pros", "/altars", "/colours", "/kids"]
    num_steps = random.randint(2, 6)
    path = [random.choice(start_pages)]
    for _ in range(num_steps - 1):
        path.append(random.choice(PAGES))
    return path

def random_referral():
    ref = random.choice(REFERRAL_TYPES)
    if ref == "organic_search":
        return {"source_type": "search", "platform": "Google"}
    elif ref in {"facebook", "instagram", "tiktok", "pinterest", "twitter", "other_social"}:
        return {"source_type": "social", "platform": ref.capitalize()}
    elif ref == "affiliate":
        return {"source_type": "affiliate", "platform": "AffiliateNetwork"}
    else:  # direct
        return {"source_type": "direct", "platform": ""}

def random_browser():
    browsers = ["Chrome", "Firefox", "Safari", "Edge", "Opera", "Brave", "IE"]
    return random.choice(browsers)

def random_os():
    os_list = ["Windows 10", "Windows 11", "macOS 13", "Linux (Ubuntu)", "iOS 17", "Android 14"]
    return random.choice(os_list)

def random_device():
    devices = ["Desktop", "Laptop", "Tablet", "Mobile"]
    return random.choice(devices)

In [5]:
# --- Create Producer Client ---
producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STR,
    eventhub_name=EVENT_HUB_NAME
)

In [6]:
# --- Event Generator ---
def generate_event():
    country_code, country = random.choice(USER_COUNTRIES)
    event_type = random.choice(EVENT_TYPES)
    timestamp = now_utc()
    user_id = str(uuid.uuid4())
    session_id = str(uuid.uuid4())
    product_id = random.choice(PRODUCT_IDS)
    spike_flag = random.random() < 0.05  # 5% of events are spikes
    sku = random.choice(SKU)
    cart_spike_magnitude = random.randint(1, 100) if spike_flag else 0


    client_info = {
        "ip_address": random_ip(),
        "browser": random_browser(),
        "os": random_os(),
        "device": random_device()
    }

    click_path = random_click_path()
    referral = random_referral()

    payload = {}

    # Standard event payload enrichment
    if event_type == "page_view":
        payload["page"] = random.choice(PAGES)
    elif event_type in ("product_click", "add_to_cart", "remove_from_cart"):
        payload["product_id"] = random.choice(PRODUCT_IDS)
        payload["price_eur"] = round(random.uniform(49.99, 199.99), 2)
    elif event_type in ("checkout_initiated", "purchase_completed"):
        payload["cart_items"] = random.randint(1, 5)
        payload["total_value_eur"] = round(random.uniform(59.99, 499.99), 2)
    elif event_type == "account_created":
        payload["account_type"] = random.choice(["guest", "registered"])
    elif event_type in ("newsletter_subscribed", "newsletter_unsubscribed"):
        payload["newsletter"] = "ZAVA Deals"

    event = {
        "event_id": str(uuid.uuid4()),
        "timestamp": timestamp,
        "event_type": event_type,
        "user_id": user_id,
        "session_id": session_id,
        "sku": sku,
        "country": country,
        "country_code": country_code,
        "referral_source_type": referral["source_type"],
        "referral_platform": referral["platform"],
        "product_id": product_id
    }
    return event


In [7]:
# --- Event Emission ---
def emit_event(event):
    message = json.dumps(event)
    try:
        event_data = EventData(message)
        with producer:
            producer.send_batch([event_data])
    except Exception as e:
        print("Failed to send event to Event Hub:", str(e))
    print(message)

# --- Continuous Simulation ---
def start_simulation(rate_per_second=EVENTS_PER_SECOND):
    """Emit clickstream events continuously at rate_per_second."""
    interval = 1.0 / rate_per_second
    try:
        while True:
            evt = generate_event()
            emit_event(evt)
            time.sleep(interval)
    except KeyboardInterrupt:
        print("Simulation stopped.")

# --- Run Simulation in Background ---
thread = threading.Thread(target=start_simulation, args=(EVENTS_PER_SECOND,), daemon=True)
thread.start()

print("ZAVA clickstream simulator is running. Press Ctrl+C to stop the notebook.")

while True:
    time.sleep(1)

ZAVA clickstream simulator is running. Press Ctrl+C to stop the notebook.
{"event_id": "12782561-0d86-4a38-ab05-fec23dccbbde", "timestamp": "2025-09-15T11:50:42.368404Z", "event_type": "newsletter_subscribed", "user_id": "205766d5-f755-46f1-aefd-ca4396c3968a", "session_id": "47595370-56cc-450e-94f5-9b4e49a26a14", "sku": "SKU4019", "country": "Italy", "country_code": "IT", "referral_source_type": "social", "referral_platform": "Other_social", "product_id": "PROD4019"}
{"event_id": "f723e5e6-c51e-4c43-ba5e-cec9fd04f7f5", "timestamp": "2025-09-15T11:50:43.043693Z", "event_type": "checkout_initiated", "user_id": "bfcdd505-d79c-44a5-9ea1-9de4ed4db940", "session_id": "47a7b86c-e44b-434a-bd74-668ab820eeee", "sku": "SKU4010", "country": "Spain", "country_code": "ES", "referral_source_type": "search", "referral_platform": "Google", "product_id": "PROD4017"}
{"event_id": "69d6979e-5c03-4aea-8eed-2eb24f7761c8", "timestamp": "2025-09-15T11:50:44.276399Z", "event_type": "add_to_cart", "user_id": "d