In [1]:
!pip install kafka-python



# GlobalMart Streaming Data Generator
Streams have different distribution and patterns of data generation, to make seem more realistic.

### User Stream
User stream has a constant rate of 10 users per second. 
- Users age is normally distributed centered at 26 and std 10.
- Users country is selected from a list of countries with weights based on population.
- Users email is generated using the user_id.

### Product Stream
- Products are generated with a constant rate of 10 products per second.
- Products price is dependent on the category.
- Products inventory is dependent on the category.
- Product ratings are distributed normally centered at 4.2 and std 0.8.

### Transaction Stream
- Transactions are generated with a constant rate of 100 transactions per second.
- Transactions are generated using existing users and products if available.

### Session Stream
- Everything is random, except users; it uses existing users to generate sessions.


In [None]:
import time
import json
import random
import uuid
from datetime import datetime
from kafka import KafkaProducer

# Configuration
# Using the service names from docker-compose.yml as we expect this to run inside the docker network
KAFKA_BROKERS = ['kafka1:9092', 'kafka2:9092']

TOPICS = {
    'users': 'new_users',
    'transactions': 'new_transactions',
    'products': 'new_products',
    'sessions': 'new_sessions'
}

# Global pools to store generated entities for referential integrity simulation
USER_POOL = []
PRODUCT_POOL = []

def get_producer():
    """Create and return a KafkaProducer instance."""
    try:
        producer = KafkaProducer(
            bootstrap_servers=KAFKA_BROKERS,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        print(f"Connected to Kafka at {KAFKA_BROKERS}")
        return producer
    except Exception as e:
        print(f"Error connecting to Kafka: {e}")
        return None

# --- Data Generators ---

def generate_user():
    """Generate a random user record with realistic distributions."""
    # Weighted countries based on approximate population (simplified relative weights)
    countries = ["Egypt", "Eswatini", "Timor-Leste", "Cambodia", "Federated States of Micronesia"]
    weights = [110, 1.2, 1.3, 16, 0.1] # Rough relative population scale
    
    # Age: Normal distribution centered at 26, clipped to realistic bounds
    age = int(random.gauss(26, 10))
    age = max(18, min(90, age))
    
    domains = ["example.com", "gmail.com", "yahoo.com", "hotmail.com", "outlook.com"]
    
    user_id = str(uuid.uuid4())
    
    user = {
        "user_id": user_id,
        # Using user_id in email ensures uniqueness better than random int
        "email": f"user_{user_id}@{random.choice(domains)}",
        "age": age,
        "country": random.choices(countries, weights=weights, k=1)[0],
        "registeration_date": datetime.now().isoformat()
    }
    
    USER_POOL.append(user)
    # Keep pool size manageable
    if len(USER_POOL) > 10000:
        USER_POOL.pop(0)
        
    return user

def generate_product():
    """Generate a random product record with category-specific biases."""
    categories = ["Electronics", "Clothing", "Home", "Books", "Sports", "Beauty", "Toys"]
    category = random.choice(categories)
    
    # Category biases for price and inventory
    if category == "Electronics":
        price = round(random.uniform(100.0, 3000.0), 2)
        inventory = random.randint(10, 200)
    elif category == "Books":
        price = round(random.uniform(5.0, 50.0), 2)
        inventory = random.randint(50, 2000)
    elif category == "Clothing":
        price = round(random.uniform(10.0, 200.0), 2)
        inventory = random.randint(20, 500)
    else:
        price = round(random.uniform(10.0, 500.0), 2)
        inventory = random.randint(0, 1000)

    # Ratings: Biased normal distribution towards higher ratings (3.5 - 5.0)
    rating = random.gauss(4.2, 0.8)
    rating = round(max(1.0, min(5.0, rating)), 1)

    product = {
        "product_id": str(uuid.uuid4()),
        "name": f"{category}_Product_{random.randint(1, 10000)}",
        "category": category,
        "price": price,
        "inventory": inventory,
        "ratings": rating
    }
    
    PRODUCT_POOL.append(product)
    if len(PRODUCT_POOL) > 5000:
        PRODUCT_POOL.pop(0)
        
    return product

def generate_transaction():
    """Generate a random transaction record using existing users and products if available."""
    products = []
    total_amount = 0.0
    num_products = random.randint(1, 5)
    
    # Use existing user if available to establish country preference
    if USER_POOL:
        user_record = random.choice(USER_POOL)
        user_id = user_record["user_id"]
        user_country = user_record.get("country", "Unknown")
    else:
        user_id = str(uuid.uuid4())
        user_country = "Unknown"

    # Define simple category preferences by country
    country_preferences = {
        "Egypt": ["Electronics", "Clothing"],
        "Eswatini": ["Home", "Books"],
        "Timor-Leste": ["Sports", "Beauty"],
        "Cambodia": ["Toys", "Electronics"],
        "Federated States of Micronesia": ["Clothing", "Home"]
    }

    preferred_categories = country_preferences.get(user_country, [])
    
    for _ in range(num_products):
        if PRODUCT_POOL:
            # Filter pool by preference if applicable (50% chance to stick to preference)
            candidates = PRODUCT_POOL
            if preferred_categories and random.random() < 0.5:
                preferred_candidates = [p for p in PRODUCT_POOL if p["category"] in preferred_categories]
                if preferred_candidates:
                    candidates = preferred_candidates
            
            prod = random.choice(candidates)
            prod_id = prod["product_id"]
            price = prod["price"]
        else:
            prod_id = str(uuid.uuid4())
            price = round(random.uniform(10.0, 1000.0), 2)
            
        quantity = random.randint(1, 3)
        products.append({
            "product_id": prod_id,
            "quantity": quantity,
            "price": price
        })
        total_amount += price * quantity
    
    return {
        "transaction_id": str(uuid.uuid4()),
        "user_id": user_id,
        "timestamp": datetime.now().isoformat(),
        "products": products,
        "total_amount": round(total_amount, 2),
        "payment_method": random.choice(["Credit Card", "PayPal", "Debit Card", "Apple Pay", "Google Pay"])
    }

def generate_session():
    """Generate a random session record."""
    events = []
    base_time = datetime.now()
    num_events = random.randint(1, 10)
    
    user_id = random.choice(USER_POOL)["user_id"] if USER_POOL else str(uuid.uuid4())
    
    for i in range(num_events):
        events.append({
            "eventType": random.choice(["ADD_TO_CART", "REMOVE_FROM_CART", "CLEAR_CART"]),
            "timestamp": base_time.isoformat() 
        })
    
    return {
        "user_id": user_id,
        "session_id": str(uuid.uuid4()),
        "timestamp": datetime.now().isoformat(),
        "events": events
    }


In [None]:

def main():
    print("Initializing Kafka Producer...")
    producer = get_producer()
    
    if not producer:
        print("Failed to initialize producer. Please check your Kafka connection settings.")
        return

    print("Starting data stream. Press Ctrl+C to stop.")
    
    try:
        while True:
            # Produce User
            user_data = generate_user()
            producer.send(TOPICS['users'], user_data)
            
            # Produce Product
            product_data = generate_product()
            producer.send(TOPICS['products'], product_data)

            for _ in range(10): 
                # Produce Transaction
                transaction_data = generate_transaction()
                producer.send(TOPICS['transactions'], transaction_data)
            
                # Produce Session
                session_data = generate_session()
                producer.send(TOPICS['sessions'], session_data)
            
            # Flush periodically to ensure data is sent
            producer.flush()
            
            print(f"Produced 4 records (User, Product, Transaction, Session) at {datetime.now().strftime('%H:%M:%S')}")
            
            # Sleep for a random interval to simulate continuous streaming
            time.sleep(random.uniform(1.0, 3.0))

    except KeyboardInterrupt:
        print("\nStopping stream...")
    except Exception as e:
        print(f"\nAn error occurred: {e}")
    finally:
        if producer:
            producer.close()
            print("Producer closed.")

if __name__ == "__main__":
    main()

Initializing Kafka Producer...
Connected to Kafka at ['kafka1:9092', 'kafka2:9092']
Starting data stream. Press Ctrl+C to stop.
Produced 4 records (User, Product, Transaction, Session) at 11:45:07
Produced 4 records (User, Product, Transaction, Session) at 11:45:09
Produced 4 records (User, Product, Transaction, Session) at 11:45:10
Produced 4 records (User, Product, Transaction, Session) at 11:45:12
Produced 4 records (User, Product, Transaction, Session) at 11:45:15
Produced 4 records (User, Product, Transaction, Session) at 11:45:16
Produced 4 records (User, Product, Transaction, Session) at 11:45:17
Produced 4 records (User, Product, Transaction, Session) at 11:45:19
Produced 4 records (User, Product, Transaction, Session) at 11:45:21
Produced 4 records (User, Product, Transaction, Session) at 11:45:24
Produced 4 records (User, Product, Transaction, Session) at 11:45:26
Produced 4 records (User, Product, Transaction, Session) at 11:45:28
Produced 4 records (User, Product, Transacti