# Consume messages from an Aiven for Apache Kafka®️ partitioned topic

In the [previous notebook](4-consume-partition-0.ipynb), we explored consuming topics in separate partitions.

![Consume messages from an Apache Kafka Topic](../img/5%20-%20multiple%20kafka%20consumer%20groups%20python.png)

You may have noticed in our `conf` that we create a `group.id` value. Up until now we've only had one group but we can have multiple groups that consume our messages differently.

Our stores are doing well and our billing department wants to be able to track our **Overall Orders**. We can do this with no disruption to our Aiven for Apache Kafka®️ configuration.

In this notebook we'll create two groups. One will be for our stores to manage their orders only, while the second one will consume messages from both partitions.

In [1]:

import os
import json

from confluent_kafka import (
    DeserializingConsumer,
    KafkaException,
    TopicPartition,
)

from dotenv import load_dotenv

# Load environment variables
load_dotenv()

KAFKA_SERVICE_URI = os.getenv("KAFKA_SERVICE_URI")
topic = 'pizzasPartitioned'

def json_serializer(msg, s_obj):
    return json.loads(msg.decode('ascii'))

base_conf = {
    'bootstrap.servers': KAFKA_SERVICE_URI,
    'client.id': 'myclient',
    'group.id': 'base',
    'security.protocol': 'SSL',
    'ssl.ca.location': '../sslcerts/ca.pem',
    'ssl.certificate.location': '../sslcerts/service.cert',
    'ssl.key.location': '../sslcerts/service.key', 
    'value.deserializer': json_serializer,
    'key.deserializer': json_serializer
    }

orders_conf = {**base_conf, 'group.id': 'orders'}

# Create a and orders consumer for each store
orders_consumer_northern_store = DeserializingConsumer(orders_conf)
orders_consumer_northern_store.assign([TopicPartition(topic, 0)])

orders_consumer_southern_store = DeserializingConsumer(orders_conf)
orders_consumer_southern_store.assign([TopicPartition(topic, 1)])

# Create a centralized billing consumer
billing_conf = {**base_conf, 'group.id': 'billing'}
billing_consumer = DeserializingConsumer(billing_conf)
billing_consumer.assign([TopicPartition(topic, 0), TopicPartition(topic, 1)])



## Consume messages to the `pizzasPartitioned` topic

To show how that the partitions have access to different one or more groups we'll consume our groups using this custom function `display_orders`.

This will tell you the Name of the partition and the message that we're used to.

We've color-coded the consumers so you'll see the _Northern_Store_ Consumer in Red, the _Southern Store_ in green, and the _Billing_ in blue.

In [4]:
from rich.console import Console

console = Console()

def display_orders(consumer:DeserializingConsumer, label:str, style:str = "bold", count:int = 1):
    """Display the orders from a consumer."""
    finished = False
    local_count = 0
    while not finished:
        if (msg:=consumer.poll(timeout=1.0)) is None:
            continue
        elif msg.error():
                raise KafkaException(msg.error())
        else:
            console.print(f"Checking Partition {label} \n" + "-"*10, style=style)
            console.print(f"[{style}]{msg.partition()}:{msg.offset()}: {msg.key()}:{msg.value()}[/{style}]\n\n")
            local_count += 1
            finished = local_count == count

console.print("Run the last block from `3-produce-partitioned-topic.ipynb`.")
display_orders(orders_consumer_northern_store, "Northern Store", "bold red")
display_orders(orders_consumer_southern_store, "Southern Store", "bold green")
display_orders(billing_consumer, "Billing", "white", count=2)

## Great Work

In this notebook:
  - We configured our three consumers to consume different topics.

This is one way of limiting how we consume particular topics. In our penultimate notebook, we'll take a look at a using Aiven for Apache Flink® to filter messages based on a query.

Select the [next notebook](6-transform-with-apache-flink.ipynb) or click the button below to continue.

[![Button to Part 5](https://img.shields.io/badge/6-Transformations%20Using%20Aiven%20for%20Apache%20Flink-a03586?style=for-the-badge&labelColor=ec6147)](6-transform-with-apache-flink.ipynb) 