### Create a stream of website visits records

In [1]:
from kafka import KafkaProducer
import datetime
import random
import json
import time

visits_topic="spark.streaming.website.visits"

def string_serializer(value):
    return value.encode('utf-8')
    
visits_producer=KafkaProducer( 
                bootstrap_servers=["localhost:9092"],
                key_serializer=string_serializer,
                value_serializer=string_serializer
                )

countries = ["USA","India","Brazil","Australia","Russia"]
last_actions = ["Catalog","FAQ","Order","ShoppingCart"]

#Generate 100 sample visit records
for i in range(1,100):

    #Create a json string with generated data
    json_record={}
    json_record["country"]=countries[random.randint(0,4)]
    json_record["last_action"]=last_actions[random.randint(0,3)]
    json_record["visit_date"]=datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")
    json_record["duration"]=random.randint(1,20)

    #Use country as Key. Each country will go through the same partition, 
    #hence updates to a given country can be handled sequencially
    kafka_key=json_record["country"]
    kafka_value = json.dumps(json_record)
    print(kafka_value)
    visits_producer.send(visits_topic, key=kafka_key,value=kafka_value)

    #sleep for 1-3 seconds
    time.sleep(random.randint(1,3))

visits_producer.flush()
visits_producer.close()

ModuleNotFoundError: No module named 'kafka.vendor.six.moves'

In [None]:
#Consumer test - to test if topic data is published correctly
from kafka import KafkaConsumer

def string_deserializer(value):
    return value.decode('utf-8')
    
visits_consumer=KafkaConsumer(
                bootstrap_servers=["localhost:9092"],
                value_deserializer=string_deserializer,
                auto_offset_reset="latest"
                )

visits_consumer.subscribe(visits_topic)

messages = visits_consumer.poll()
print("Total messages :", len(messages))
for message in messages:
    print("%d:%d: vs=%s" % (message.partition, message.offset, message.value))