# Getting Started

In [3]:
import pandas as pd
import json
import random
import hashlib
import time
import sys
import os
import logging
from datetime import datetime
from confluent_kafka import Producer


# Build a catalog of products

In [4]:
df = pd.read_csv("usercode/TV_DATASET_USA.csv")


In [5]:
df['BRAND'].value_counts()


SAMSUNG        182
LG             122
SONY            88
TCL             37
INSIGNIA™       34
VIZIO           24
HISENSE         20
ROKU            19
TOSHIBA         13
AMAZON           9
PEERLESS-AV      4
PIONEER          3
FURRION          1
SHARP            1
Name: BRAND, dtype: int64

In [6]:
df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 557 entries, 0 to 556
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   PRODUCT_NAME  557 non-null    object
 1   PRICING       557 non-null    object
 2   URL           557 non-null    object
 3   BRAND         557 non-null    object
 4   SCREEN_SIZE   557 non-null    int64 
 5   DISPLAY_TYPE  557 non-null    object
 6   RESOLUTION    557 non-null    object
 7   OS            557 non-null    object
dtypes: int64(1), object(7)
memory usage: 34.9+ KB


In [7]:
df['PRICING'] = df['PRICING'].apply(lambda x : float(x.replace('$','').replace(',','')))


# Set Up Variables and Functions

In [8]:
cities = ['New York','Los Ángeles','Chicago','Houston','Philadelphia']

payment_modes = ['Credit_card','Stripe','Paypal','Apple_Pay','Google_Pay','Samsung_Pay']

payment_store = ['Cash','Credit_card']

sources = ['Facebook','Instagram','Organic','Twitter','Influencer_1','Influencer_2','Influencer_3','Influencer_4']

purchase_statuses = ['COMPLETED','FAILED_CHECKOUT','FAILED_API_RESPONSE','INSUFICCIENT_FUNDS','COMPLETED','COMPLETED','COMPLETED','COMPLETED','COMPLETED','COMPLETED','FAILED_API_RESPONSE','INSUFICCIENT_FUNDS','USER_ERROR','FRAUD','COMPLETED','COMPLETED','COMPLETED']

commission = [0.2,0.25,0.3,0.27,0.35,0.4,0.37,0.15,0.1]

NY_coords = [(40.76046814557239, -73.97764793953105),(40.76921169592604, -73.98326984936075),(40.762515994719834, -73.98095242088134)]
LA_coords = [(34.07210945806006, -118.35747350374957),(34.071754649810025, -118.37593530089991)]
CHI_coords = [(41.89819876058171, -87.62280110486684),(41.89182575694393, -87.6249468719774),(41.88375296758592, -87.62814652743663)]
HOU_coords = [(29.742233338438325, -95.44654054545151),(29.743148850926094, -95.45312636612748),(29.739981565214627, -95.46428435510245)]
PHI_coords = [(40.089499621312456, -75.39015007888118),(40.085310197975055, -75.40444450974655),(40.09069475292698, -75.3815277170056)]

def get_pay_method(sources,purchase_statuses,payment_modes,payment_store):
    if sources == 'Organic':
        payment = random.choice(payment_store)
        status = 'COMPLETED'
        order_type = 'STORE'
    elif sources != 'Organic':
        payment = random.choice(payment_modes)
        status = random.choice(purchase_statuses)
        order_type = 'ONLINE'
    return payment,status,order_type

def get_coords(city):
    if city == 'New York':
        coords = random.choice(NY_coords)
    elif city == 'Los Ángeles':
        coords = random.choice(LA_coords)
    elif city == 'Chicago':
        coords = random.choice(CHI_coords)
    elif city == 'Houston':
        coords = random.choice(HOU_coords)
    elif city == 'Philadelphia':
        coords = random.choice(PHI_coords)
    return coords


#  Generate Records

In [9]:
delivered_records = 0

x = 1

data_purchase = []

while(x < 10):

    date = pd.to_datetime('today').strftime("%Y-%m-%d %H:%M:%S")
    product = df['PRODUCT_NAME'][random.randint(0,556)]
    pricing =  df[df['PRODUCT_NAME']==product]['PRICING'].values[0]
    commission_temp =  random.choice(commission)
    brand = df[df['PRODUCT_NAME']==product]['BRAND'].values[0]
    screen = df[df['PRODUCT_NAME']==product]['SCREEN_SIZE'].values[0]
    display = df[df['PRODUCT_NAME']==product]['DISPLAY_TYPE'].values[0]
    resolution = df[df['PRODUCT_NAME']==product]['DISPLAY_TYPE'].values[0]
    source_temp = random.choice(sources)
    pay = get_pay_method(source_temp,purchase_statuses,payment_modes,payment_store)
    city = random.choice(cities)

    purchase = {'purchase_ID': str(hashlib.sha256(f"{x} {product} {pricing} {commission_temp} {date} {source_temp} {pay[1]}".encode('utf-8')).hexdigest())[:10],
            'Product_name' : product,
            'Pricing':str(pricing),
            'Commission':str(commission_temp),
            'Revenue' : str(round(pricing*commission_temp,2)),
            'Payment_Mehtod':pay[0],
            'Status' : pay[1],
            'Order_Type' : pay[2],
            'City':city,
            'Location': str(get_coords(city)),
            'Latitud' : str(get_coords(city)[0]) ,
            'Longitud' :  str(get_coords(city)[1]),
            'Source':source_temp,
            'Brand' : brand,
            'Screen' : screen,
            'Display' :  display ,
            'Resolution' : resolution,
            'Created_at': date}

    data_purchase.append(pd.DataFrame(purchase,index =[x]))
    delivered_records += 1
    x += 1
    print(purchase)
    time.sleep(random.choice([1,1.5]))

print('\n')
print("{} messages were produced".format(delivered_records))
print('\n')


{'purchase_ID': '30dce4106d', 'Product_name': 'Package - Samsung - 85" Class CU8000 Crystal UHD Smart Tizen TV - Black and SAMSUNG C Series 2.1ch DTS Virtual: X Soundbar - Titan Black', 'Pricing': '1749.98', 'Commission': '0.35', 'Revenue': '612.49', 'Payment_Mehtod': 'Credit_card', 'Status': 'COMPLETED', 'Order_Type': 'ONLINE', 'City': 'Los Ángeles', 'Location': '(34.071754649810025, -118.37593530089991)', 'Latitud': '34.07210945806006', 'Longitud': '-118.35747350374957', 'Source': 'Influencer_1', 'Brand': 'SAMSUNG', 'Screen': 85, 'Display': 'LED', 'Resolution': 'LED', 'Created_at': '2026-01-17 15:09:41'}
{'purchase_ID': '5762d7a60e', 'Product_name': 'Sony - 85" Class BRAVIA XR Z9J LED 8K UHD Smart Google TV', 'Pricing': '5999.99', 'Commission': '0.25', 'Revenue': '1500.0', 'Payment_Mehtod': 'Samsung_Pay', 'Status': 'COMPLETED', 'Order_Type': 'ONLINE', 'City': 'Los Ángeles', 'Location': '(34.071754649810025, -118.37593530089991)', 'Latitud': '34.07210945806006', 'Longitud': '-118.3574

#  Create a Kafka Producer Instance

In [None]:
from confluent_kafka import Producer

conf = {
    'bootstrap.servers': '$BOOTSTRAP_SERVERS',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': '$SASL_USERNAME',
    'sasl.password': '$SASL_PASSWORD'   
}


producer = Producer(conf)
print("Producteur Kafka créé avec succès !")

Producteur Kafka créé avec succès !


#  Send Records to Kafka Topic

In [11]:
delivered_records = 0
topic = 'ecommerce-topic-1' 
x = 1

while True:
    # Génération de la date actuelle
    date = pd.to_datetime('today').strftime("%Y-%m-%d %H:%M:%S")
    
    # Sélection aléatoire d'un produit et de ses caractéristiques
    idx = random.randint(0, 556)
    product = df['PRODUCT_NAME'].iloc[idx]
    pricing = df[df['PRODUCT_NAME'] == product]['PRICING'].values[0]
    brand = df[df['PRODUCT_NAME'] == product]['BRAND'].values[0]
    screen = df[df['PRODUCT_NAME'] == product]['SCREEN_SIZE'].values[0]
    display = df[df['PRODUCT_NAME'] == product]['DISPLAY_TYPE'].values[0]
    resolution = df[df['PRODUCT_NAME'] == product]['DISPLAY_TYPE'].values[0]
    
    # Sélection des variables aléatoires
    commission_temp = random.choice(commission)
    source_temp = random.choice(sources)
    pay = get_pay_method(source_temp, purchase_statuses, payment_modes, payment_store)
    city = random.choice(cities)

    # Création du dictionnaire avec clés en MINUSCULES
    purchase = {
        'purchase_id': str(hashlib.sha256(f"{x} {product} {pricing} {date}".encode('utf-8')).hexdigest())[:10],
        'product_name': product,
        'pricing': str(int(pricing)),
        'commission': str(commission_temp),
        'revenue': str(round(pricing * commission_temp, 2)),
        'payment_method': pay[0],
        'status': pay[1],
        'order_type': pay[2],
        'city': city,
        'location': str(get_coords(city)),
        'latitude': str(get_coords(city)[0]),
        'longitude': str(get_coords(city)[1]),
        'source': source_temp,
        'brand': brand,
        'screen_size': str(screen),
        'display_type': display,
        'resolution': str(resolution),
        'created_at': date
    }

    # Encodage et envoi vers Kafka
    record_key = "Purchase_simulator"
    record_value = json.dumps(purchase).encode('utf-8')
    
    producer.produce(topic, key=record_key, value=record_value)
    producer.poll(0)
    
    print(f"Envoi réussi : {purchase['product_name']} | Status : {purchase['status']}")
    
    delivered_records += 1
    x += 1
    
    # Pause et Flush pour assurer l'envoi
    time.sleep(random.choice([1, 1.5]))
    producer.flush()
    

Envoi réussi : TCL - 50" Class 5-Series QLED 4K UHD Smart Google TV | Status : COMPLETED
Envoi réussi : Package - Samsung - 85" Class QN90C NEO QLED 4K Smart TV - TITAN BLACK and Q-Series  9.1.4ch  Wireless True Dolby Atmos Soundbar +  Rear Speakers w/ Q-Symphony - Titan Black | Status : COMPLETED
Envoi réussi : VIZIO - 24" Class D-Series LED 720P Smart TV | Status : FAILED_CHECKOUT
Envoi réussi : Samsung - 43" QMB series LED 4K UHD Digital Signage Display - Black | Status : FAILED_API_RESPONSE
Envoi réussi : Samsung - 85" Class QN900A Series Neo QLED 8K UHD Smart Tizen TV | Status : COMPLETED
Envoi réussi : LG - 55" Class NanoCell 75UQA Series LED 4K UHD Smart webOS TV | Status : COMPLETED
Envoi réussi : Hisense - 43" Class R6G Series LED 4K UHD Smart Roku TV | Status : USER_ERROR
Envoi réussi : Samsung - 85" Class QN800 Neo QLED 8K UHD Smart Tizen TV | Status : COMPLETED
Envoi réussi : Samsung - 85" Class  Q70A Series QLED 4K UHD Smart Tizen TV | Status : COMPLETED
Envoi réussi : LG 