# Python Simple Producer Example

In [1]:
# Importing Modules
from kafka import KafkaProducer
from time import sleep
import json
from datetime import datetime

In [2]:
# Messages will be serialized as JSON
def serializer(message):
    return json.dumps(message).encode('utf-8')

Let's create a kafka producer by using the kafka-python module. 

[KafkaProducer()](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html): is a Kafka client that publishes records to the Kafka cluster. The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

*bootstrap_servers* – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the producer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.

*value_serializer* (callable) – used to convert user-supplied message values to bytes. If not None, called as f(value), should return bytes.

*api_version* (tuple) – Specify which Kafka API version to use. If set to None, the client will attempt to infer the broker version by probing various APIs. Example: (0, 10, 2).

In [3]:
# Make a simple KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer = serializer,
                         api_version=(0,10,1))

*send()* is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency. It takes a tuple where the first element is the name of the **topic** and the second element its the message. 

In [4]:
# Send a test message (topic's name is topic)
producer.send('topic','Hello World!')

<kafka.producer.future.FutureRecordMetadata at 0x198204c3b80>

In [5]:
# Example message
print('Current datetime: {}'.format(datetime.now().strftime("%m/%d/%Y at %H:%M:%S")))

Current datetime: 09/20/2022 at 19:59:12


In [6]:
# Iterate 10 times
for i in range(10):
    
    # Generate a new message 
    message= "Current datetime: {}".format(str(datetime.now().strftime("%m/%d/%Y at %H:%M:%S")))
    
    # Send the message to consumer
    producer.send('topic',message)
    
    # Wait 1 second
    sleep(1)
    
    # Print confirmation of message sent
    print("Message sent ",i)

Message sent  0
Message sent  1
Message sent  2
Message sent  3
Message sent  4
Message sent  5
Message sent  6
Message sent  7
Message sent  8
Message sent  9


In [7]:
# Close producer
producer.close()