In [52]:
# import required files
from kafka import KafkaProducer
from pizza import PizzaProvider
from faker import Faker
import random as rn
import json
import time
from pprint import pprint

In [53]:
# instatntiate faker with provider
fake = Faker()
Faker.seed(42)
fake.add_provider(PizzaProvider)

In [54]:
def produce_pizza_order(order_count = 1):
    
    shop = fake.pizza_shop()
    
    pizzas = []
    
    for pizza in range(rn.randint(1, 6)): # randomly select number of pizza to add to order
        toppings = []
        
        for i in range(rn.randint(1,4)): # randomly select number of pizza toppings to add to pizza
            toppings.append(fake.pizza_toppings())
            
        pizzas.append({
            'pizzaName':fake.pizza_name(),
            'additionalToppings':toppings
        })
    # generate a full order with name, address e.t.c
    message = {
        'id':order_count,
        'shop':shop,
        'name':fake.unique.name(),
        'phoneNumber':fake.unique.phone_number(),
        'address':fake.address(),
        'pizzas':pizzas
    }
    
    key = {'shop': shop}
    
    return key, message

In [55]:
def produce_messages(hostname='localhost',
                    port='9092',
                    topic_name='pizza-orders',
                    nr_messages=2,
                     max_waiting_time_in_sec=5
                    ):
    
    producer = KafkaProducer(
        bootstrap_servers=[hostname+":"+port],
        value_serializer=lambda value: json.dumps(value).encode('ascii'),
        key_serializer=lambda key: json.dumps(key).encode('ascii')
    )
    
    i = 0
    while i < nr_messages:
        key, message = produce_pizza_order(i)
        
        pprint('Sending: {}'.format(message))
        
        producer.send(topic_name, key=key, value=message)
        
        sleep_time = rn.randint(0, max_waiting_time_in_sec)
        
        time.sleep(sleep_time)
        
        if i % 100 == 0:
            producer.flush()
        i = i + 1
    
    producer.flush()

In [50]:
produce_messages()

("Sending: {'id': 0, 'shop': 'Mammamia Pizza', 'name': 'Sean Rasmussen', "
 "'phoneNumber': '586-850-1429x401', 'address': '569 Paul Ports Apt. 406\\nNew "
 "Saraside, TX 94798', 'pizzas': [{'pizzaName': 'Margherita', "
 "'additionalToppings': ['mozarella']}, {'pizzaName': 'Marinara', "
 "'additionalToppings': ['garlic', 'tomato', 'tomato']}, {'pizzaName': "
 "'Salami', 'additionalToppings': ['garlic', 'garlic']}, {'pizzaName': "
 "'Peperoni', 'additionalToppings': ['ham', 'ham']}]}")
("Sending: {'id': 1, 'shop': 'Mammamia Pizza', 'name': 'Nancy Gonzalez', "
 "'phoneNumber': '(159)514-8465x64823', 'address': '299 Sullivan Village Apt. "
 "443\\nFloydmouth, NH 58406', 'pizzas': [{'pizzaName': 'Peperoni', "
 "'additionalToppings': ['ham', 'banana']}, {'pizzaName': 'Salami', "
 "'additionalToppings': ['olives']}, {'pizzaName': 'Marinara', "
 "'additionalToppings': ['tuna', 'olives']}, {'pizzaName': 'Peperoni', "
 "'additionalToppings': ['blue cheese', 'mozarella', 'ham']}, {'pizzaName': "