# Kafka streaming

Send user’s purchases in real time via kafka

1. Create a user app interaction flow simulation
2. Authenticate kafka cluster
3. Integrate Kafka streaming into the simulation

In [5]:
import pandas as pd
import json
import random
import time
import hashlib
import os
import uuid
import pandas as pd
from dotenv import load_dotenv
import requests
import os
from kafka import KafkaProducer
from confluent_kafka import Producer
import socket
import json

#### simulate user interaction app

In [6]:
CITIES = [
    'Ciudad de México',
    'Guadalajara',
    'Queretaro',
    'Monterrey',
    'Puebla'
]

PAYMENT_ONLINE = ['Debit', 'Credit']

OS_DEVICE = ['WEB','ANDROID', 'IOS']

INITIAL_EVENT_FLOW = 'LAUNCH_APP'
SECOND_EVENT_FLOW = ['HOME','EXIT_APP','HOME','HOME']
THIRD_EVENT_FLOW = ['GO_TO_CATEGORY','GO_TO_CATEGORY','GO_TO_CATEGORY','EXIT_APP']
EVENT_CATEGORY_FLOW= ['LIQUOR','PHARMACY','TECHNOLOGY','ELECTRO_DOMESTIC','BABY','CLOTHES']
FINAL_EVENT_FLOW = ['PURCHASE','PURCHASE','PURCHASE','EXIT_APP','EXIT_APP','PURCHASE']


STORE_COORDS_BY_CITY = {
    'Ciudad de México':(19.372879, -99.049378),
    'Guadalajara':(20.690072, -103.301842),
    'Queretaro':(20.606305, -100.412364),
    'Monterrey':(25.713272, -100.277447),
    'Puebla':(18.973534, -98.252895),
    
}

In [7]:
def simulate_event_user_app():
    simulation = {
        'payment' : random.choice(PAYMENT_ONLINE),
        'os_device' : random.choice(OS_DEVICE),
        'city' : random.choice(CITIES),
        'status' : 'UNCONVERTED',
        'order_type' : 'USER_VISIT',
        'event_1': INITIAL_EVENT_FLOW,
        'event_2': random.choice(SECOND_EVENT_FLOW),
        'event_3': random.choice(THIRD_EVENT_FLOW),
        'last_event': None

    }

    if simulation['event_2'] == 'HOME':
        if simulation['event_3'] == 'GO_TO_CATEGORY':
            simulation['last_event'] = random.choice(EVENT_CATEGORY_FLOW)
            final_event = random.choice(FINAL_EVENT_FLOW)
            if final_event == 'PURCHASE':
                simulation['status'] = 'COMPLETED'
                simulation['order_type'] = 'PURCHASE'
            else:
                simulation['payment'] = None
                simulation['last_event'] = 'HOME'
        else:
           simulation['payment'] = None
           simulation['last_event'] = 'HOME'
    else:
        simulation['payment'] = None
        simulation['last_event'] = 'LAUNCH_APP'
        simulation['event_3'] = None

    return simulation

def get_store_location(city:str):
    return STORE_COORDS_BY_CITY[city]


def create_users_bank(num_users):
    users = []
    for i in range(num_users):
        user = {
            'created_at' : pd.to_datetime('today').strftime('%Y-%m-%d %H:%M:%S'),
            'user_id' :  str(uuid.uuid4())
        }
        users.append(user)
    return users

In [8]:
user_random_interaction = simulate_event_user_app()
user_random_interaction

{'payment': 'Debit',
 'os_device': 'IOS',
 'city': 'Queretaro',
 'status': 'COMPLETED',
 'order_type': 'PURCHASE',
 'event_1': 'LAUNCH_APP',
 'event_2': 'HOME',
 'event_3': 'GO_TO_CATEGORY',
 'last_event': 'CLOTHES'}

In [9]:
users = create_users_bank(1000)
users[:10]

[{'created_at': '2024-01-18 14:21:17',
  'user_id': '528559cf-ebc9-4b4a-a675-61d36bd72855'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': 'fbab6e1a-13d9-452d-b31d-15ed51235166'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': '79b7ddd9-2993-41af-bdb3-26ef73651ac9'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': 'f4d842d5-e40c-4b04-b727-129f749aa8a3'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': '36066089-ee87-4048-80db-3cb901866d37'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': '5ccecd3d-485f-4b2b-819f-f757d7200ed8'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': 'a9f72901-47e2-4e6f-a674-0ba4424987e6'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': '1147704c-7b20-428f-b024-0dcc4d6ef0b4'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': '14ecf889-3f70-407f-a306-dadd621749f4'},
 {'created_at': '2024-01-18 14:21:17',
  'user_id': '21fdf2e9-efe8-4680-ad72-ba788b0e077c'}]

In [10]:
USERS = create_users_bank(1000)

#### Kafka cluster authentication

In [11]:
load_dotenv()
KAFKA_AUTH = {
    'bootstrap.servers': os.getenv('CONFLUENT_BOOTSTRAP_SERVER'),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': os.getenv('CONFLUENT_API_KEY'),
    'sasl.password': os.getenv('CONFLUENT_API_SECRET'),
}

In [12]:
kafka_producer = Producer(KAFKA_AUTH)

#### Send data messages to kafka topic

In [13]:
def get_users_simulation_app(num_simulations:int, kafka_topic:str):
    delivered_records = 0
    x = 0
    while x < num_simulations:
        date = pd.to_datetime('today').strftime('%Y-%m-%d %H:%M:%S')
        user_simulation = simulate_event_user_app()
        user_id = random.choice(USERS)['user_id']
        purchase = {
            'user_id':user_id,
            'created_at':date,
            'payment_method' : user_simulation['payment'],
            'os_device' : user_simulation['os_device'],
            'city' : user_simulation['city'],
            'latitude': STORE_COORDS_BY_CITY[user_simulation['city']][0],
            'longitude': STORE_COORDS_BY_CITY[user_simulation['city']][1],
            'status' : user_simulation['status'],
            'order_type' : user_simulation['order_type'],
            'event_1': user_simulation['event_1'],
            'event_2': user_simulation['event_2'],
            'event_3': user_simulation['event_3'],
            'last_event': user_simulation['last_event'],
        }
        record_key = 'purchase_simulator'
        record_data = json.dumps(purchase).encode('utf-8')
        kafka_producer.produce(kafka_topic,key=record_key,value=record_data)
        kafka_producer.poll(0)
        delivered_records += 1
        x += 1
        time.sleep(random.choice([1,1,5,2]))
    kafka_producer.flush()

    print(f'Messages produced to topic {kafka_topic}: {delivered_records}')

In [14]:
get_users_simulation_app(num_simulations=20, kafka_topic='topic_druid_real_time')

Messages produced to topic topic_druid_real_time: 20


#### Results

![image.png](./resources/kafka_results.png)