In [0]:
# ============================================================
#         FINAL CUSTOMER PROFILE INCREMENTAL GENERATOR
#         (with Full Name & Contact Number)
# ============================================================
%pip install faker
from faker import Faker
import pandas as pd
import random, os, glob
from datetime import datetime

fake = Faker("en_IN")   # generates Indian names & phone patterns

# ================================
# UPDATE STORAGE PATH
# ================================
customer_path = "/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/customers/"
os.makedirs(customer_path, exist_ok=True)

# ================================
# Find last Customer ID
# ================================
files = glob.glob(customer_path + "/*.csv")

if files:
    df_old = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
    start_id = df_old["CustomerID"].astype(int).max() + 1
    print(f"ðŸ“Œ Continuing from CustomerID {start_id}")
else:
    start_id = 1
    print("ðŸ“Œ Starting fresh from CustomerID 1")

# ================================
# Categorical Pools
# ================================
locations = [
    "Bengaluru","Mumbai","Delhi","Hyderabad","Chennai","Pune","Kolkata","Ahmedabad","Jaipur",
    "Surat","Lucknow","Nagpur","Indore","Bhopal","Chandigarh","Patna","Vadodara","Kochi",
    "Noida","Gurgaon","Visakhapatnam","Coimbatore","Mysuru","Thane","Rajkot","Vijayawada",
    "Ghaziabad","Kanpur","Nashik","Thiruvananthapuram","Madurai","Goa","Mangalore",
    "Varanasi","Amritsar","Dehradun","Ranchi","Guwahati","Bhubaneswar","Jamshedpur",
    "Hosur","Tirupati","Warangal","Guntur","Hubli","Belagavi","Udaipur","Shimla",
    "Raipur","Jabalpur"
]

payment_methods = ["UPI","Credit Card","Debit Card","Wallet","Cash on Delivery"]
frequency = ["Weekly","Monthly","Quarterly","Yearly"]
seasons = ["Winter","Summer","Spring","Fall"]

# ================================
# Number of new customers this run
# ================================
new_customers = 10000   # change anytime

data = []
for i in range(new_customers):

    # Generate mobile like Indian number (9xxxxxxxxx)
    phone = str(random.randint(6000000000, 9999999999))

    data.append({
        "CustomerID"           : start_id + i,
        "CustomerName"         : fake.name(),
        "ContactNumber"        : phone,
        "Age"                  : random.randint(18,70),
        "Gender"               : random.choice(["Male","Female"]),
        "Location"             : random.choice(locations),
        "SubscriptionStatus"   : random.choice(["Prime","Regular"]),
        "PaymentMethod"        : random.choice(payment_methods),
        "PreviousPurchases"    : random.randint(0,50),
        "FrequencyOfPurchases" : random.choice(frequency),
        "PreferredSeason"      : random.choice(seasons),
        "AvgReviewRating"      : round(random.uniform(2.0,5.0),1)
    })

df_new = pd.DataFrame(data)

# ================================
# Save increment file
# ================================
file_name = f"customer_profile_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df_new.to_csv(os.path.join(customer_path,file_name), index=False)

print("\nðŸŽ‰ CUSTOMER PROFILE BATCH GENERATED SUCCESSFULLY!")
print(f"âž• New Customers Added : {new_customers}")
print(f"ðŸ’¾ File Saved As       : {file_name}\n")


In [0]:
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/customers/customer_profile_20251208_071550.csv")
display(df)

In [0]:
# ============================================================
#             PRODUCT INCREMENTAL DATA GENERATOR
#   Multi Color/Size Variants | Brand Mapped | Smart Description
# ============================================================

from faker import Faker
import pandas as pd
import random, os, glob
from datetime import datetime

fake = Faker()

# =============================
# DATA STORAGE PATH
# =============================
product_path = "/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/products/"
os.makedirs(product_path, exist_ok=True)

# =============================
# Load Existing to Continue IDs
# =============================
files = glob.glob(product_path + "/*.csv")

if files:
    existing = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
    start_id = existing["ProductID"].astype(int).max() + 1
    print(f"ðŸ”„ Existing products found. Starting ProductID from {start_id}")
else:
    existing = pd.DataFrame()
    start_id = 1
    print("ðŸ†• Starting new product catalog from ProductID 1")

# =============================
# Product Mappings
# =============================
category_items = {
    "Clothing":[ "Jeans","Shirt","T-shirt","Kurta","Sweater","Dress","Skirt","Pants",
                "Hoodie","Crop Top","Blazer","Leggings","Nightwear","Jumpsuit","Formal Shirt"],

    "Footwear":[ "Sneakers","Sports Shoes","Boots","Loafers","Heels","Sandals"],

    "Accessories":[ "Watch","Handbag","Sunglasses","Bracelet","Necklace","Backpack","Cap","Belt"],

    "Electronics":[ "Smartphone","Laptop","Smartwatch","Tablet","Earbuds",
                    "Bluetooth Speaker","Camera","Gaming Console","Smart TV"],

    "Home & Kitchen":[ "Cookware Set","Microwave","Dinner Set","Vacuum Cleaner",
                       "Mixer Grinder","Coffee Maker","Bedsheet","Iron"],

    "Beauty":[ "Perfume","Face Wash","Moisturizer","Shampoo","Beard Trimmer","Sunscreen"],

    "Sports":[ "Football","Cricket Bat","Dumbbells","Yoga Mat","Tennis Racket"],

    "Furniture":[ "Office Chair","Study Table","Bookshelf","Bean Bag","Wall Clock"]
}

brand_map = {
    "Clothing"      : ["Nike","Adidas","Zara","Puma","Levis","H&M","Allen Solly","Roadster"],
    "Footwear"      : ["Nike","Adidas","Reebok","Puma","Bata","Skechers","Woodland"],
    "Accessories"   : ["Titan","Fossil","Boat","Gucci","Wildcraft","American Tourister"],
    "Electronics"   : ["Apple","Samsung","Dell","HP","Lenovo","Asus","Sony","Mi","OnePlus","Vivo","Oppo","Boat"],
    "Home & Kitchen": ["Prestige","Philips","Hawkins","Bajaj","Milton","Pigeon","Butterfly"],
    "Beauty"        : ["Lakme","Loreal","Nivea","Mamaearth","Beardo","Dove"],
    "Sports"        : ["Nivia","Yonex","Puma","SG","Adidas"],
    "Furniture"     : ["Ikea","Godrej","Urban Ladder","Pepperfry","HomeTown"]
}

colors = ["Black","White","Blue","Red","Green","Yellow","Grey","Brown","Maroon","Gold","Silver","Pink","Beige"]

# =============================
# Description Generator
# Creates item description based on product name
# =============================
def generate_description(name, brand, category):
    return f"{brand} {name} - Premium quality {category.lower()} product designed for comfort & daily use."

# =============================
# Config
# =============================
new_products_count     = 1000
update_products_count  = 150

rows = []

# =============================
# Create NEW PRODUCTS
# =============================
for i in range(new_products_count):

    category = random.choice(list(category_items.keys()))
    product_name = random.choice(category_items[category])
    brand = random.choice(brand_map[category])

    # Multi color & size logic
    if category in ["Clothing","Footwear","Accessories"]:
        color_list = ",".join(random.sample(colors, random.randint(1,4)))
    else:
        color_list = "NA"

    if category == "Footwear":
        size_list = ",".join(random.sample(["6","7","8","9","10","11"], random.randint(1,5)))
    elif category == "Clothing":
        size_list = ",".join(random.sample(["XS","S","M","L","XL","XXL"], random.randint(1,5)))
    else:
        size_list = "NA"

    mrp = round(random.uniform(200,80000),2)
    price = round(random.uniform(mrp*0.5, mrp),2)

    rows.append({
        "ProductID"       : start_id + i,
        "ProductName"     : product_name,
        "Category"        : category,
        "Brand"           : brand,
        "Description"     : generate_description(product_name, brand, category),
        "AvailableColors" : color_list,
        "AvailableSizes"  : size_list,
        "MRP"             : mrp,
        "Price"           : price,
        "DiscountPercent" : round((mrp-price)/mrp*100,2),
        "Stock"           : random.randint(20,800),
        "Rating"          : round(random.uniform(3.0,5.0),1),
        "ReviewsCount"    : random.randint(10,7000),
        "IsUpdated"       : "No",
        "LastUpdated"     : datetime.now()
    })

# =============================
# UPDATE OLD PRODUCTS (SCD Simulation)
# =============================
if not existing.empty:
    updates = existing.sample(min(update_products_count, len(existing)))

    for _, row in updates.iterrows():
        row["Price"] = round(row["Price"] * random.uniform(0.9,1.12),2)
        row["Stock"] = max(0, row["Stock"] + random.randint(-40,120))
        row["Rating"] = round(min(5.0, max(3.0,row["Rating"]+random.uniform(-0.3,0.3))),1)
        row["ReviewsCount"] += random.randint(5,350)
        row["IsUpdated"] = "Yes"
        row["LastUpdated"] = datetime.now()
        rows.append(row.to_dict())

# =============================
# SAVE OUTPUT
# =============================
df = pd.DataFrame(rows)
file_name = f"products_increment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(os.path.join(product_path,file_name), index=False)

print("\nðŸŽ‰ PRODUCT DATASET CREATED SUCCESSFULLY!")
print(f"ðŸ†• New Products: {new_products_count}")
print(f"ðŸ”„ Updated Products: {update_products_count if not existing.empty else 0}")
print(f"ðŸ’¾ Saved as: {file_name}\n")


In [0]:
df_products = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/products/products_increment_20251208_073644.csv")
display(df_products)

In [0]:
# ============================================================
# SALES STREAMING DATA GENERATOR
# Multi-Product Orders | Cold-start Safe | Recommendation Ready
# ============================================================

%pip install faker

import pandas as pd
import random, os, time, glob
from datetime import datetime
from faker import Faker

fake = Faker()

# ============================================================
# PATHS
# ============================================================
customer_path = "/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/customers/"
product_path  = "/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/products/"
sales_path    = "/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/sales/"

os.makedirs(sales_path, exist_ok=True)

# ============================================================
# LOAD CUSTOMER & PRODUCT DATA
# ============================================================
customer_files = sorted(glob.glob(customer_path + "/*.csv"))
product_files  = sorted(glob.glob(product_path + "/*.csv"))

customers = pd.concat([pd.read_csv(f) for f in customer_files], ignore_index=True)
products  = pd.concat([pd.read_csv(f) for f in product_files], ignore_index=True)

print(f"âœ… Loaded Customers : {len(customers)}")
print(f"âœ… Loaded Products  : {len(products)}")

# ============================================================
# STREAMING CONFIG
# ============================================================
records_per_batch = 3000
interval_seconds  = 5

print("\nâš¡ Streaming Started (Ctrl+C to stop)\n")

# ============================================================
# ACTIVITY CLASSIFICATION (NO SCHEMA CHANGE)
# ============================================================
def assign_activity_type(customer):
    freq = str(customer.get("FrequencyOfPurchases", "")).lower()

    if freq in ["very frequent", "high"]:
        return "power"
    elif freq in ["medium"]:
        return "active"
    else:
        return "casual"

# ============================================================
# SALES GENERATION LOGIC
# ============================================================
def generate_sales_batch():

    sales_data = []

    for _ in range(records_per_batch):

        cust = customers.sample(1).iloc[0]
        order_id = fake.uuid4()[:8]
        activity_type = assign_activity_type(cust)

        # ------------------------------
        # Interaction density by user type
        # ------------------------------
        if activity_type == "power":
            interactions_per_order = random.randint(2, 5)
            interaction_weights = [35, 35, 30]   # view, cart, purchase

        elif activity_type == "active":
            interactions_per_order = random.randint(1, 3)
            interaction_weights = [55, 30, 15]

        else:  # casual
            interactions_per_order = 1
            interaction_weights = [75, 20, 5]

        # ------------------------------
        # Generate products per order
        # ------------------------------
        used_products = set()

        for _ in range(interactions_per_order):

            prod = products.sample(1).iloc[0]

            # avoid duplicate product in same order
            if prod["ProductID"] in used_products:
                continue
            used_products.add(prod["ProductID"])

            sizes  = str(prod.get("AvailableSizes", "NA"))
            colors = str(prod.get("AvailableColors", "NA"))

            size  = random.choice(sizes.split(",")) if sizes != "NA" else "NA"
            color = random.choice(colors.split(",")) if colors != "NA" else "NA"

            interaction = random.choices(
                ["view", "add_to_cart", "purchase"],
                weights=interaction_weights
            )[0]

            sales_data.append({
                "OrderID"         : order_id,
                "CustomerID"      : cust["CustomerID"],
                "CustomerLocation": cust["Location"],
                "ProductID"       : prod["ProductID"],
                "Category"        : prod["Category"],
                "Brand"           : prod["Brand"],
                "InteractionType" : interaction,
                "Quantity"        : random.randint(1, 3),
                "PriceAtPurchase" : prod["Price"],
                "DiscountUsed"    : random.choice(["Yes", "No"]),
                "PaymentMethod"   : cust["PaymentMethod"] if interaction == "purchase" else "NA",
                "SizeSelected"    : size,
                "ColorSelected"   : color,
                "Season"          : cust["PreferredSeason"],
                "Timestamp"       : datetime.now()
            })

    df = pd.DataFrame(sales_data)

    filename = f"sales_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    df.to_csv(os.path.join(sales_path, filename), index=False)

    print(f"ðŸ“„ Generated {len(df)} records â†’ {filename}")

# ============================================================
# CONTINUOUS STREAM LOOP
# ============================================================
while True:
    generate_sales_batch()
    time.sleep(interval_seconds)


In [0]:
df_sales = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/Volumes/kusha_solutions/products_recommendation_online_ml/streaming_sales_data/sales/sales_20251208_093707.csv")
display(df_sales)