# Consume messages from an Apache Kafka partitioned topic

In the [previous notebook](4-consume-partition-0.ipynb), we explored consuming topics in separate partitions.

![Consume messages from an Apache Kafka Topic](../img/consume.png)

In the last sample we created one si
## Prerequisites

To start the tutorial you need to:

* Download the Aiven for Apache Kafka SSL certificates as mentioned in the [README instructions](../README.md#Download-the-required-SSL-certificates)
* Retrieve the Aiven for Apache Kafka hostname and port, from the [Aiven Console](https://console.aiven.io/) in the Aiven for Apache Kafka service overview
* Substitute the Apache Kafka hostname and port in the parameters below

In [23]:

import os
import json

from confluent_kafka import (
    DeserializingConsumer,
    KafkaError,
    KafkaException,
    TopicPartition,
)

from dotenv import load_dotenv

# Load environment variables
load_dotenv()

KAFKA_SERVICE_URI = os.getenv("KAFKA_SERVICE_URI")
topic = 'pizzaPartitioned'

def json_serializer(msg, s_obj):
    return json.loads(msg.decode('ascii'))

base_conf = {
    'bootstrap.servers': KAFKA_SERVICE_URI,
    'client.id': 'myclient',
    'group.id': 'base',
    'security.protocol': 'SSL',
    'ssl.ca.location': '../sslcerts/ca.pem',
    'ssl.certificate.location': '../sslcerts/service.cert',
    'ssl.key.location': '../sslcerts/service.key', 
    'value.deserializer': json_serializer,
    'key.deserializer': json_serializer
    }

orders_conf = {**base_conf, 'group.id': 'orders'}

# Create a and orders consumer for each store
orders_consumer_store_0 = DeserializingConsumer(orders_conf)
orders_consumer_store_0.assign([TopicPartition(topic, 0)])

orders_consumer_store_1 = DeserializingConsumer(orders_conf)
orders_consumer_store_1.assign([TopicPartition(topic, 1)])

# Create a centralized billing consumer
billing_conf = {**base_conf, 'group.id': 'billing'}
billing_consumer = DeserializingConsumer(billing_conf)
billing_consumer.assign([TopicPartition(topic, 0), TopicPartition(topic, 1)])



## Consume messages to the `pizzaPartitioned` topic

To show how that the partitions have access to different one or more groups we'll consume our groups using this custom function `display_orders`.

This will tell you the Name of the partition and the message that we're used to.

We've color-coded the consumers so you'll see the _Northern_Store_ Consumer in Red, the _Southern Store_ in green, and the _Accounts Payable Consumer_
that can consumer from both topics in blue.

In [37]:
from rich.console import Console

console = Console()

def display_orders(consumer:DeserializingConsumer, label:str, style:str = "bold", count:int = 1):
    """Display the orders from a consumer."""
    finished = False
    local_count = 0
    while not finished:
        if (msg:=consumer.poll(timeout=1.0)) is None:
            continue
        elif msg.error():
                raise KafkaException(msg.error())
        else:
            console.print(f"Checking Partition {label} \n" + "-"*10, style=style)
            console.print(f"[{style}]{msg.partition()}:{msg.offset()}: {msg.key()}:{msg.value()}[/{style}]\n\n")
            local_count += 1
            finished = local_count == count

console.print("Run the last block from `Produce-Partitioned-Topics.ipynb`.")
display_orders(orders_consumer_store_0, "Northern Store", "bold red")
display_orders(orders_consumer_store_1, "Southern Store", "bold green")
display_orders(billing_consumer, "Accounts Payable", "bold blue", count=2)

## Great Work

We configured our three consumers to consume different topics. This is one way of limiting how we consume particular topics.

In our penultimate notebook, we'll take a look at a using Apache Flink to filter messages based on a query.

[Proceed to the next notebook](6-transform-with-apache-flink.ipynb) 