# Hands-on with Kafka

![kafka](https://images.contentful.com/gt6dp23g0g38/53UO4964r0e7kRVm0mcUUZ/f6f6d7b1b90e8e88a5be0d1845bdf950/what_is_kafka_and_how_does_it_work.png)

## Installing Kafka Python Client

First, we'll need to install Kafka Python client `kafka-python` to consume messages from a Kafka cluster.

In [1]:
import json
from kafka import KafkaConsumer, TopicPartition

First, we'll instantiate the consumer.

Our data was serialized in JSON during publishing, hence we'll need to deserialize as JSON when consuming. In actual production, it is recommended to use the binary format that we've learnt, such as Avro.

In [2]:
consumer = KafkaConsumer(
  bootstrap_servers=['localhost:9092'],
  auto_offset_reset='earliest',
  value_deserializer = lambda v: json.loads(v.decode('ascii')),
  key_deserializer = lambda v: json.loads(v.decode('ascii')),
)

We set the offset for the consumer to be `earliest` to get the earliest order (message).

List the available topics:

In [3]:
consumer.topics()

{'pizza-orders'}

Let's assume we are the owners of a pizza delivery chain, and we'd like to push the orders to Apache Kafka as they come in.

We have a topic consisting of pizza orders, which contains the Order ID, Order Details (pizzas and toppings), client's Name, Address, Phone Number, the Shop Name, Total Cost and Timestamp.

There is only one partition for the topic. First, let's subscribe to the topic:

In [4]:
topic_name = "pizza-orders"

In [5]:
consumer.subscribe(topics=[topic_name])
consumer.subscription()

{'pizza-orders'}

We can then start reading the events, let's read and (pretty) print the first 5:

In [6]:
from pprint import pprint

In [7]:
for ix, message in enumerate(consumer, start=1):
    print("%d:%d: k=%s" % (message.partition,
                           message.offset,
                           message.key))
    pprint("-------------------")
    pprint(message.value)
    print()
    if ix == 5:
        break

0:0: k={'shop': 'Ill Make You a Pizza You Cant Refuse'}
'-------------------'
{'address': '21087 Calvin Plains\nJonesland, NY 76392',
 'cost': 1.82,
 'id': 0,
 'name': 'Jessica Smith',
 'phoneNumber': '001-701-915-3000',
 'pizzas': [{'additionalToppings': [], 'pizzaName': 'Salami'}],
 'shop': 'Ill Make You a Pizza You Cant Refuse',
 'timestamp': 1765371003381}

0:1: k={'shop': 'Ill Make You a Pizza You Cant Refuse'}
'-------------------'
{'address': '19721 Drew Key\nNew Donaldport, NH 05690',
 'cost': 2.71,
 'id': 1,
 'name': 'Roger Brown',
 'phoneNumber': '475-943-3780x8105',
 'pizzas': [{'additionalToppings': ['üçì strawberry', 'üêü tuna'],
             'pizzaName': 'Mari & Monti'}],
 'shop': 'Ill Make You a Pizza You Cant Refuse',
 'timestamp': 1765371005414}

0:2: k={'shop': 'Luigis Pizza'}
'-------------------'
{'address': '278 Phillips Crossing Apt. 661\nPort Maryfurt, GU 86535',
 'cost': 34.83,
 'id': 2,
 'name': 'Matthew Williams',
 'phoneNumber': '924-927-0965x759',
 'pizzas

Now, continue printing the next 5 events, notice the offset number and order id:

In [9]:
for ix, message in enumerate(consumer, start=1):
    print("%d:%d: k=%s" % (message.partition,
                           message.offset,
                           message.key))
    pprint(message.value)
    print()
    if ix == 5:
        break

0:10: k={'shop': 'Its-a me! Mario Pizza!'}
{'address': '144 Ross Port\nAliciafurt, KY 28535',
 'cost': 5.66,
 'id': 0,
 'name': 'Ricky Sexton',
 'phoneNumber': '8667536484',
 'pizzas': [{'additionalToppings': ['ü´í olives',
                                    'üßÄ blue cheese',
                                    'üå∂Ô∏è hot pepper'],
             'pizzaName': 'Salami'},
            {'additionalToppings': ['üßÑ garlic'], 'pizzaName': 'Mari & Monti'},
            {'additionalToppings': ['ü´í olives', 'üçì strawberry'],
             'pizzaName': 'Salami'},
            {'additionalToppings': [], 'pizzaName': 'Mari & Monti'},
            {'additionalToppings': ['ü´ë green peppers'],
             'pizzaName': 'Margherita'}],
 'shop': 'Its-a me! Mario Pizza!',
 'timestamp': 1765372047348}

0:11: k={'shop': 'Luigis Pizza'}
{'address': '49595 Gary Port Suite 707\nNorth Michaelfort, CA 18853',
 'cost': 17.53,
 'id': 1,
 'name': 'Russell Sanders',
 'phoneNumber': '(873)972-6401x764',
 'pi

In [18]:
# go to beginning 
consumer.seek_to_beginning()

In [None]:
# close consumer 
consumer.close()

This is the full code to publish an example order:

```python
from kafka import KafkaProducer

producer = KafkaProducer(
  bootstrap_servers=['brave-fish-11463-us1-kafka.upstash.io:9092'],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username='YnJhdmUtZmlzaC0xMTQ2MyQSvwXBuLOQsV1W7YffuC8cDaZcA3fKQwakMhnQGgg',
  sasl_plain_password='MDUxNjc4YzEtYzYxNy00NTE1LWEwNWYtMDBhODRlZmE0OGJm',
  value_serializer=lambda v: json.dumps(v).encode('ascii'),
  key_serializer=lambda v: json.dumps(v).encode('ascii')
)

message = {'address': '8697 Anthony Valley\nPort Kellymouth, FL 64221',
 'id': 10,
 'name': 'Patricia Castaneda',
 'phoneNumber': '328-798-9970x51560',
 'pizzas': [{'additionalToppings': ['üçÖ tomato'], 'pizzaName': 'Mari & Monti'}],
 'shop': 'Mammamia Pizza',
 'timestamp': 1696609343719}

key = {'shop': 'Mammamia Pizza'}

producer.send(topic_name, key=key, value=message)
```