# Confluent 101 Workshop

<img src="images/jupyter-setup.png">


# What is Confluent?

<img src="images/confluent-logo.png" width="400" height="300"> 
Confluent is a company that provides a platform built around Apache Kafka, a popular open-source distributed streaming platform. At its core, Confluent aims to empower organizations to harness the full potential of real-time data streams, enabling them to build scalable, event-driven architectures that drive business innovation.

## Key Components:

### 1. Apache Kafka:
   Apache Kafka is a distributed event streaming platform that allows users to publish, subscribe to, store, and process streams of records in real-time. It is known for its scalability, fault-tolerance, and durability, making it suitable for a wide range of use cases, from real-time analytics to data integration.

### 2. Confluent Platform:
<img src="images/cp-demo-overview.jpg" width="1200" height="500"> 
   Confluent Platform is an enterprise-grade distribution of Apache Kafka, enhanced with additional features and tools to simplify deployment, management, and monitoring of Kafka clusters. It includes features such as Schema Registry, Connectors, Control Center, and ksqlDB, providing comprehensive capabilities for building end-to-end streaming applications.

### 3. Ecosystem Integrations:
   Confluent offers a rich ecosystem of integrations and connectors that enable seamless integration with other data systems, databases, cloud services, and event sources. This allows organizations to leverage Kafka as the central nervous system for their data infrastructure, enabling data to flow freely across the entire organization.


# What is Kafka?

<img src="images/kafka-logo.png" width="400" height="300"> 
Apache Kafka is a distributed event streaming platform designed to handle high-volume, real-time data streams. It serves as a central nervous system for modern applications, enabling the processing and analysis of continuous data flows at scale. Kafka is built for reliability, scalability, and fault-tolerance, making it suitable for a wide range of use cases across industries.

## Key Capabilities:

### 1. Publish-Subscribe Messaging:
   Kafka follows a publish-subscribe messaging model, where producers publish messages to topics, and consumers subscribe to these topics to receive messages. This decouples data producers from consumers, allowing for flexible, asynchronous communication.
<img src="images/kafka-multitenancy.png" width="800" height="400"> 
### 2. Fault-Tolerant Storage:
   Kafka provides fault-tolerant storage of data streams using a distributed commit log. Messages are durably stored on disk and replicated across multiple brokers for resilience. This ensures that data is not lost even in the event of hardware failures.
<img src="images/kafka-broker-topology.png" width="800" height="400"> 

### 3. Scalability:
   Kafka is designed to scale horizontally to handle massive data volumes and high throughput. It supports partitioning of topics, allowing messages to be distributed across multiple brokers for parallel processing. This enables Kafka clusters to scale seamlessly to accommodate growing data workloads.

### 4. Stream Processing:
   Kafka supports stream processing capabilities through its Streams API and integration with Apache Kafka Streams. These features enable real-time processing and analysis of data streams directly within the Kafka ecosystem, facilitating the development of complex event-driven applications.
<img src="images/kafka-decoupling.png" width="800" height="400"> 

## Use Cases:

### 1. Real-Time Analytics:
   Kafka is widely used for real-time analytics applications, including monitoring, dashboards, and operational intelligence. It enables organizations to ingest, process, and analyze large volumes of data streams in real-time, allowing for timely insights and decision-making.

### 2. Data Integration:
   Kafka serves as a central data hub for integrating disparate data sources and systems within an organization. It facilitates data movement, replication, and synchronization between databases, applications, and services, enabling seamless data integration across the enterprise.

### 3. Event-Driven Architectures:
   Kafka is a fundamental building block for event-driven architectures, where applications react to events and state changes in real-time. It enables event sourcing, event-driven microservices, and event-driven workflows, fostering agility, responsiveness, and scalability in modern application development.

### 4. Log and Event Collection:
   Kafka is used for log and event collection in distributed systems, infrastructure monitoring, and security analytics. It acts as a centralized platform for collecting, aggregating, and analyzing logs, metrics, and events from diverse sources, facilitating observability and troubleshooting.


# Agenda - Day 1
- Get connected
- Kafka
- Schema Registry
- RBAC
- Connect

## Getting Started

### Helpful Links
- [BIP Confluent Control Center](https://bip-controlcenter.bip-sbx-ec4ac126.eks.local)
- [BIP AKHQ](https://akhq.bip-sbx-ec4ac126.eks.local/ui/bip-kafka)


In [None]:
# cell 01 - click me and press (shift + enter) to run me
1+1

In [None]:
# cell 02
a = 1
b = 10
a + b

In [None]:
# cell 03
a = a+1
a

In [None]:
# cell 04
!pip install confluent-kafka fastavro prometheus-client faker kubernetes

In [None]:
# cell 05
from prometheus_client import start_http_server, Gauge
from confluent_kafka import DeserializingConsumer, Consumer, Producer, TopicPartition, SerializingProducer, ConsumerGroupState
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer
from confluent_kafka.serialization import StringDeserializer
from IPython.display import display, Image
from IPython.display import Markdown as md
from pprint import pprint as pp
from jinja2 import Template
from faker import Faker
from helpers import render_from_template, admin_create_topic, admin_delete_topic, send_data_to_topic, consume, get_image
from helpers import list_confluent_rolebindings, create_confluent_rolebinding, create_confluent_rolebinding_schema
import getpass
import time
import json

### Let's get your credentials setup for reuse.
_**Note, your password will be hidden from the screen, but will be saved into a variable. Do not print the `_password` variable to the screen.**_

In [None]:
# cell 06
_username = input("Enter your IDM username (SBX)")
_password = getpass.getpass("Enter your IDM password (SBX)")

# Kafka

## Key Terms

<img src="images/kafka-event-anatomy.png" width="600" height="300"> 

- **Broker**:
   A Kafka broker is a single instance of a Kafka server that stores and manages topic partitions. Brokers are responsible for receiving messages from producers, storing them on disk, and serving them to consumers.

- **Topic**:
   A Kafka topic is a logical category or feed name to which messages are published by producers. Topics are partitioned and replicated across multiple brokers for scalability and fault-tolerance.

- **Partition**:
    A Kafka topic is divided into one or more partitions, which are the basic unit of parallelism and scalability in Kafka. Each partition is an ordered, immutable sequence of messages and is hosted by exactly one broker in the Kafka cluster. Partitions allow Kafka to scale horizontally and distribute message processing across multiple brokers.

- **Key**:
   In Kafka, a key is an optional attribute associated with each message that is used for message routing and partitioning. When a message is produced to a topic with key-based partitioning, Kafka uses the key to determine the partition to which the message will be sent.

- **Value**:
   The value in Kafka refers to the actual payload or data of the message that is published to a topic by a producer. It represents the information being transmitted from the producer to the consumer.

- **Header**:
   Headers in Kafka are key-value pairs associated with each message that provide additional metadata or contextual information about the message. Headers can be used to store information such as message timestamps, message headers, or any custom metadata relevant to the application.

- **Offset**:
   An offset is a unique identifier assigned to each message within a partition of a Kafka topic. Offsets are used by consumers to keep track of their progress in reading messages from a topic. Each message in a partition has a monotonically increasing offset starting from 0.

- **Producer**:
   A Kafka producer is an application or process that publishes messages to Kafka topics. Producers are responsible for creating messages and sending them to Kafka brokers for storage and distribution to consumers.

- **Consumer**:
   A Kafka consumer is an application or process that subscribes to one or more topics and reads messages from them. Consumers can be part of a consumer group, where each consumer in the group reads messages from a subset of the partitions of the subscribed topics.

- **Consumer Group**:
   A consumer group is a group of consumer instances that collectively consume messages from one or more topics. Each consumer group divides the partitions of the subscribed topics among its members, ensuring that each message is consumed by only one consumer within the group.

- **Event**:
   An event, in the context of Confluent or Kafka refers to an individual message that is produced to or consumed from a Kafka topic. These are often referred to as messages, records, or events interchangeably.

<img src="images/kafka-one-topic-multi-consumer.png" width="600" height="300"> 

### Admin Client
The Kafka Admin Client is a Java client library used for managing and administering Apache Kafka clusters. It provides an interface to perform various administrative tasks such as creating, listing, deleting topics, managing consumer groups, and querying metadata about brokers, topics, and partitions.

The Admin Client is typically used by administrators or system operators to perform administrative tasks programmatically, rather than using command-line tools like `kafka-topics` or `kafka-configs`. This allows for automation and integration with other systems.

Some common tasks performed using the Admin Client include:
- Listing topics in a Kafka cluster.
- Creating and deleting topics.
- Altering topic configurations.
- Describing topic properties.
- Adding and removing partitions.
- Listing consumer groups and their offsets.

Using the Admin Client, developers can programmatically manage Kafka clusters, making it easier to automate administrative tasks and integrate Kafka with other systems and workflows.

In [None]:
# cell 07
# Set our Kafka bootstrap server
bootstrap_servers = 'bip-kafka.confluent.svc.cluster.local:9092'
broker_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username,
    'sasl.password': _password
}

In [None]:
# cell 08
# Create AdminClient instance
admin_client = AdminClient(broker_config)

# List topics
topics = admin_client.list_topics().topics
topics_cleaned = [k for k,v in topics.items() if k.find('confluent') < 0]

# Print the list of topics
display(topics_cleaned)

In [None]:
# cell 09
# Create a new topic
topic_name = f"DEMO_TOPIC_{_username.upper()}"
topic_partitions = 1
topic_replication_factor = 1

new_topic = NewTopic(topic_name, topic_partitions, topic_replication_factor)
admin_client.create_topics([new_topic])

In [None]:
# cell 10
# List topics
topics = admin_client.list_topics().topics
topics_cleaned = [k for k,v in topics.items() if "DEMO_TOPIC" in k]

# Print the list of topics
display(topics_cleaned)

### Producer
In Apache Kafka, a producer is a client application that publishes (produces) records to Kafka topics. It sends data in the form of records to Kafka brokers, which then stores and distributes these records across partitions of the topic.

Producers are typically responsible for:
- Determining which topic to publish messages to.
- Serializing the message data into bytes.
- Assigning a partition key or letting Kafka do it automatically.
- Sending the records to Kafka brokers for distribution.

Now, let's set up a basic Confluent Kafka producer using Python.

In [None]:
# cell 11
producer_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username,
    'sasl.password': _password
}

In [None]:
# cell 12
# Create a Kafka producer instance
try:
    producer = Producer(producer_config)
    print("Successfully created producer!")
except Exception as e:
    print(e)

In [None]:
# cell 13
message = input(f"Enter a message to write to the {topic_name} Kafka topic:")
producer.produce(topic_name, message)
producer.flush()

### We have successfully sent a simple message to a Kafka topic

In [None]:
# cell 14
render_from_template("consume", {'topic_name': topic_name, '_username': _username, '_password': _password})

In [None]:
# cell 15
# List all demo topics
display([k for k,v in producer.list_topics().topics.items() if k.find("DEMO_TOPIC") > -1])

In [None]:
# cell 16
friend_topic = "DEMO_TOPIC_CUCUMBER"

In [None]:
# cell 17
message = input(f"Enter a message to write to the {friend_topic} Kafka topic:")
message = f"({_username.upper()}): {message}"
producer.produce(friend_topic, message)
producer.flush()

### What other kind of data can we put into Kafka?

In [None]:
# cell 18
images_topic = topic_name + "_IMAGES"
admin_create_topic(admin_client, images_topic, 1, 1)

In [None]:
# cell 19
new_image = get_image()
Image(new_image)

In [None]:
# cell 20
producer.produce(images_topic, new_image)
producer.flush()

In [None]:
# cell 21
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username,
    'sasl.password': _password,
    'group.id': f"DEMO_IMAGES_{_username.upper()}",
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_config)
consumer.subscribe([images_topic])

incoming_images = consume(consumer, 3)
consumer.close()
recovered_images = [Image(i) for i in incoming_images]
display(*recovered_images)

## Kafka Consumer Groups

### Overview
<img src="images/consumer-group-heartbeats.png" height=300 width=500>

A Kafka consumer group is a set of consumers that work together to consume and process messages from one or more Kafka topics. Consumer groups provide scalability and fault tolerance by distributing message processing across multiple consumers.

### Joining a Consumer Group
To join a consumer group, a Kafka consumer instance must specify the group ID when it subscribes to one or more topics. When a consumer joins a group, it becomes part of the group's consumer pool and is eligible to receive messages from the subscribed topics.

### Assignments
Once consumers have joined a group, Kafka's group coordinator assigns partitions from the subscribed topics to each consumer within the group. Each partition is assigned to exactly one consumer in the group, ensuring that each message is processed by only one consumer.
<img src="images/consumer-group-healthy.png" height=400 width=800>

### Heartbeats
Consumers within a group periodically send heartbeats to the group coordinator to signal that they are alive and processing messages. Heartbeats prevent the group coordinator from considering a consumer as failed and triggering a rebalance when the consumer is still active.

### Rebalancing
Rebalancing occurs when the group coordinator detects changes in the group membership or partition assignment. This can happen when a new consumer joins or leaves the group, or when the number of partitions for a topic changes. During a rebalance, the group coordinator reassigns partitions to consumers to ensure an even distribution of workload across the group.

<img src="images/consumer-group-unhealthy.png" height=400 width=800>


In [None]:
# cell 22
multi_topic_name = topic_name + "_MULTI"

In [None]:
# cell 23
render_from_template("consume-multi", { 'topic_name': multi_topic_name, '_username': _username, '_password': _password})

### Two consumers, two partitions
- Now we'll go ahead and observe a scenario when there are two consumers pulling against two partitions.
- What do you notice about how the partitions are aligned with the consumers?
- Do you see any examples where 1 consumer reports on behalf of both partitions?

In [None]:
# cell 24
admin_create_topic(admin_client, multi_topic_name, 2, 1)

In [None]:
# cell 25
send_data_to_topic(producer, multi_topic_name, 50, fake=True)

In [None]:
# cell 26
admin_delete_topic(admin_client, multi_topic_name)

### Two consumers, ten partitions
- Now we'll go ahead and observe a scenario when there are two consumers pulling against ten partitions.
- What do you notice about how the partitions are aligned with the consumers?

In [None]:
# cell 27
admin_create_topic(admin_client, multi_topic_name, 10, 1)

In [None]:
# cell 28
send_data_to_topic(producer, multi_topic_name, 50, fake=True)

In [None]:
# cell 29
admin_delete_topic(admin_client, multi_topic_name)

### Two consumers, three partitions
- Now we'll go ahead and observe a scenario when there are two consumers pulling against three partitions.
- What do you notice about how the partitions are aligned with the consumers?
- Do you see any examples where 1 consumer reports on behalf of multiple partitions?

In [None]:
# cell 30
admin_create_topic(admin_client, multi_topic_name, 3, 1)

In [None]:
# cell 31
send_data_to_topic(producer, multi_topic_name, 50, fake=True)

In [None]:
# cell 32
admin_delete_topic(admin_client, multi_topic_name)

### Two consumers, one partition
- Now we'll go ahead and observe a scenario when there are two consumers pulling against one partition.
- What do you notice about how the partitions are aligned with the consumers?
- What do you notice is missing?
- What can you infer about the number of partitions and how that affects the number of consumers able to be in a group?

In [None]:
# cell 33
admin_create_topic(admin_client, multi_topic_name, 1, 1)

In [None]:
# cell 34
send_data_to_topic(producer, multi_topic_name, 50, fake=True)

In [None]:
# cell 35
admin_delete_topic(admin_client, multi_topic_name)

### Consumer Re-balancing
- Now we'll go ahead and observe a similar scenario
- Let's go back to a two partition setup
- Launch 2 terminals, within each terminal, run the `python consume-split.py` command.
- What do you notice about how the partitions are aligned with the consumers?
- Use `ctrl-c` in one terminal to kill the consumer
- What changed?
- Resume that dead consumer by running `python consume-split.py` again
- What behaviors did you notice?

In [None]:
# cell 36
render_from_template("consume-split", { 'topic_name': multi_topic_name, '_username': _username, '_password': _password })
admin_create_topic(admin_client, multi_topic_name, 2, 1)

In [None]:
# cell 37
send_data_to_topic(producer, multi_topic_name, 50, fake=True)

## Kafka Message Deletion

Kafka offers two main deletion modes for managing message retention and cleanup: log compaction and message deletion. Each deletion mode serves different use cases and offers unique benefits.

### Message Deletion (config.policy = delete)
Message deletion is a retention policy in Kafka that removes messages from a topic based on configurable criteria such as retention time or size. This mode allows for the deletion of old or obsolete messages to free up storage space and manage data lifecycle.

#### Use Cases:
- **Data Archival**: Message deletion is often used for managing data retention policies and archiving historical data. By specifying a retention period, organizations can automatically delete old messages to comply with regulatory requirements or minimize storage costs.
- **Transient Data**: Message deletion is suitable for managing transient data or ephemeral streams where only recent data is relevant. This mode allows organizations to prioritize storage resources for current data while discarding older messages.

### Log Compaction (config.policy = compact)
Log compaction is a retention policy in Kafka that preserves the latest value for each key within a topic while discarding older values. This mode ensures that the latest state for each key is always available for consumers, even in the presence of large volumes of data.

#### Use Cases:
- **Change Data Capture (CDC)**: Log compaction is commonly used in CDC pipelines to capture and replicate changes from source systems to downstream consumers. By retaining only the latest state for each key, log compaction ensures that consumers receive an accurate representation of the source data's current state.
- **Key-Value Storage**: Log compaction is suitable for maintaining key-value stores where only the latest value for each key needs to be retained. This mode enables efficient storage and retrieval of key-value pairs with minimal overhead.

[Ephemeral SBX AKHQ](https://akhq.bip-sbx-ec4ac126.eks.local/ui/bip-kafka/)

In [None]:
# cell 38
delete_topic = f"{topic_name}_DELETE"
compact_topic = f"{topic_name}_COMPACT"

admin_create_topic(admin_client, topics=[
    NewTopic(topic=delete_topic, num_partitions=1, replication_factor=1, config={"cleanup.policy": "delete", "retention.ms": "10000"}),
    NewTopic(topic=compact_topic, num_partitions=1, replication_factor=1, config={"cleanup.policy": "compact", "delete.retention.ms": "10000"})
])

### config.policy = delete
- For this test, let's send some data to our `delete_topic`
- Via AKHQ, observe the messages on your topic
- Wait 10-30 seconds and refresh the topic view in AKHQ
- What happened?

In [None]:
# cell 39
send_data_to_topic(producer, delete_topic, 50, fake=True)

### config.policy = compact
- For this test, let's send some data to our `compact_topic`
- Via AKHQ, observe the messages on your topic
- Wait 10-30 seconds and refresh the topic view in AKHQ
- What happened?

In [None]:
# cell 40
producer.produce(compact_topic, key=b'key1', value=b'apple')
producer.produce(compact_topic, key=b'key2', value=b'apple')
producer.flush()

In [None]:
# cell 41
producer.produce(compact_topic, key=b'key1', value=b'banana')
producer.flush()

In [None]:
# cell 42
producer.produce(compact_topic, key=b'key2', value=None)
producer.flush()

## Compaction Troubleshooting
If your Kafka topic is not compacting messages even after the configured `delete.retention.ms` time has passed, there could be several reasons for this behavior. Here are some potential issues to investigate:

**Configuration Mismatch:**
Ensure that the `delete.retention.ms` configuration is applied correctly at both the topic level and the broker level. Verify the topic configuration using the `describe_topics()` method of the admin client, and also check the broker configuration to ensure consistency.

**Reconciliation Time:**
Understand that Kafka's log compaction process may not trigger immediately after the retention time has passed. There is a reconciliation process that occurs periodically (controlled by `log.cleaner.backoff.ms` and `log.cleaner.min.cleanable.ratio` configurations) to determine when to perform log compaction. If there are not enough eligible log segments for compaction, it may delay the process.

**Segment Size:**
Check the size of the log segments in your topic. Log compaction is triggered on a per-segment basis, so if your segments are large and do not meet the `min.cleanable.dirty.ratio` threshold, they may not be compacted until they reach that threshold.

**Consumer Lag:**
If there are active consumers reading from the topic, Kafka may delay log compaction until the lagging consumers catch up with the latest messages. Ensure that your consumers are not lagging behind significantly, as this can affect the compaction process.

**Broker Load:**
High broker load or resource constraints may impact the log compaction process. Monitor the broker's CPU, memory, and disk usage to ensure that it has enough resources to perform compaction efficiently.

**Topic Segment Override:**
Check if there are any topic-level overrides that could be affecting the log compaction behavior. Ensure that there are no conflicting configurations at the topic level that override the `delete.retention.ms` setting.

-------------------

# Schemas and Data Structure


## Importance of Data Structure:

Data in Kafka needs to be structured and well-defined to ensure interoperability and compatibility across different systems and applications. Without a clear understanding of the data structure, it becomes challenging to process, interpret, and analyze the data effectively.

## Serialization and Deserialization:

Serialization is the process of converting data from its native format into a byte stream, suitable for transmission or storage. Deserialization is the reverse process, converting the byte stream back into its original format.

<img src="images/serdes-topology.png" width="600" height="300">

### Why Serialization and Deserialization Matter:

- **Interoperability:** Serialized data can be transmitted and consumed by different systems, regardless of their programming languages or platforms.
  
- **Efficiency:** Serialized data is more compact and efficient for transmission and storage compared to raw data formats.
  
- **Compatibility:** Deserialization ensures that data can be reconstructed accurately, maintaining compatibility across different systems.

## Serialization and Deserialization in Confluent Kafka:

Confluent provides three different methods for serialization and deserialization:

### 1. Externalized Schema:

- **Description:** In this approach, the schema is decided externally by the producer and consumer applications. There is no enforcement of schema compatibility or validation by Kafka.
  
- **Pros:** Flexibility for applications to define and evolve schemas independently.
  
- **Cons:** Lack of schema enforcement can lead to compatibility issues and data inconsistencies.

### 2. Embedded Schema (Registryless):

- **Description:** With embedded schema serialization, the schema is included with each message sent to Kafka. Producers encode the schema along with the message data.
  
- **Pros:** Ensures that the schema is always available with the message, simplifying deserialization.
  
- **Cons:** Increases message size due to the inclusion of the schema with each message.

### 3. Schema Registry:

- **Description:** Confluent Schema Registry is a centralized service that stores and manages schemas independently of producer and consumer applications. Producers only send a unique identifier (schema ID) with each message, and consumers retrieve the schema from the registry.
  
- **Pros:** Promotes schema reuse, centralizes schema management, and ensures schema evolution and compatibility.
  
- **Cons:** Requires additional infrastructure (Schema Registry) and introduces network overhead for schema retrieval.

<img src="images/schema-methods.png" width=800>

## Supported Schema Formats:

Confluent Schema Registry supports the following schema formats:

- **JSON Schema**
- **Protocol Buffers (Protobuf)**
- **Avro**

## Example Schema (Avro):
In this Avro schema example, we define a User record with three fields: id (integer), username (string), and email (string). This schema can be used to serialize and deserialize data representing user information in Kafka messages.

```json
{
  "type": "record",
  "name": "User",
  "namespace": "com.example",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "username", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}
```


## Example (Embedded Schema)
Now let's walk through an example. We have already demonstrated producing and consuming data to and from a Kafka topic without using a defined schema. 

In [None]:
# cell 43
embedded_schema_topic_name = topic_name + "_EMBED"
admin_create_topic(admin_client, embedded_schema_topic_name, 1, 1)

In [None]:
# cell 44
import io
import json
from avro.schema import parse
import avro.io

# Step 1: Define an Avro schema in a Python dictionary
schema_dict = {
    "type": "record",
    "name": "User",
    "namespace": "com.example",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "username", "type": "string"},
        {"name": "email", "type": "string"}
    ]
}

# Step 2: Encode the schema into a byte string
schema_bytes = json.dumps(schema_dict).encode('utf-8')

# Step 3: Use the schema to encode a message into a byte string
message_data = {"id": 123, "username": "john_doe", "email": "john.doe@example.com"}
# Assuming you have an Avro schema object 'avro_schema'
avro_schema = parse(json.dumps(schema_dict))

output_stream = io.BytesIO()
encoder = avro.io.BinaryEncoder(output_stream)
writer = avro.io.DatumWriter(avro_schema)
writer.write(message_data, encoder)
message_bytes = output_stream.getvalue()

# Step 4: Compose the payload
magic_byte = b'\x00'
payload = schema_bytes + magic_byte + message_bytes

print(payload)

send_data_to_topic(producer, embedded_schema_topic_name, 1, fake=False, messages=[payload])

In [None]:
# cell 45
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username,
    'sasl.password': _password,
    'group.id': f"DEMO_EMBED_{_username.upper()}",
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_config)
consumer.subscribe([embedded_schema_topic_name])
incoming_messages = consume(consumer, 5)
consumer.close()
raw_message = incoming_messages[0]
raw_split = raw_message.split(b'\x00')
schema = raw_split[0]
encoded_message = raw_split[1]
avro_schema = parse(schema)
decoder = avro.io.BinaryDecoder(io.BytesIO(encoded_message))
reader = avro.io.DatumReader(avro_schema)
decoded_message = reader.read(decoder)
decoded_message

### Example (Schema Registry)

In [None]:
# cell 46
registry_schema_topic_name = topic_name + "_REGISTRY"
admin_create_topic(admin_client, registry_schema_topic_name, 1, 1)

In [None]:
# cell 47
# Initialize Schema Registry client
schema_registry_url = 'https://bip-schemaregistry.confluent.svc.cluster.local:8081'
schema_registry_client = SchemaRegistryClient({ 
        'url': schema_registry_url,
        'basic.auth.user.info': f'{_username}:{_password}'
    })

# Register a schema
schema = {
    "type": "record",
    "name": "User",
    "namespace": f"com.example.{_username.lower()}",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "username", "type": "string"},
        {"name": "email", "type": "string"}
    ]
}

fq_schema = Schema(json.dumps(schema), "AVRO")

subject = registry_schema_topic_name + "-value"
schema_id = schema_registry_client.register_schema(subject, fq_schema)

print("Schema registered successfully with ID:", schema_id)

# Retrieve a schema by ID
retrieved_schema = schema_registry_client.get_schema(schema_id)
display(json.loads(retrieved_schema.schema_str))

In [None]:
# cell 48
value_avro_serializer = AvroSerializer(schema_registry_client = schema_registry_client, schema_str = fq_schema.schema_str)

# Initialize Serializing Producer
conf = {
    'bootstrap.servers': bootstrap_servers,
    'value.serializer': value_avro_serializer,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username,
    'sasl.password': _password
}
serial_producer = SerializingProducer(conf)

message_data = {
    "id": 456,
    "username": _username,
    "email": "john.foe@example.com"
}

serial_producer.produce(topic=registry_schema_topic_name, value=message_data)
serial_producer.flush()

In [None]:
# cell 49
value_avro_deserializer = AvroDeserializer(schema_registry_client = schema_registry_client)

# Initialize Deserializing Consumer
conf = {
    'bootstrap.servers': bootstrap_servers,
    'value.deserializer': value_avro_deserializer,
    'group.id': f"DEMO_REGISTRY_{_username.upper()}",
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username,
    'sasl.password': _password,
    'auto.offset.reset': 'earliest'
}

deserial_consumer = DeserializingConsumer(conf)
deserial_consumer.subscribe([registry_schema_topic_name])
incoming_messages = consume(deserial_consumer, 10)
deserial_consumer.close()
incoming_messages

#### Check the contents of the topic in AKHQ

#### Check the contents of the topic in ControlCenter

# BREAK

# Role Based Access Control (RBAC)

<img src="images/rbac-authentication-overview.png" height=400 width=1200>

## Confluent Role-Based Access Control (RBAC) and Metadata Service (MDS)

Confluent Role-Based Access Control (RBAC) is a security feature that provides fine-grained access control to Confluent components such as brokers, schema registry, connectors, and ksqlDB. RBAC ensures that only authorized users or applications can access and perform actions on Confluent resources.

### Metadata Service (MDS)
The Metadata Service (MDS) serves as the central authority for managing access control policies in a Confluent cluster. MDS controls access to all Confluent components and enforces role-based permissions based on user roles and privileges.

### MDS Functions as a Middleman
MDS acts as an intermediary between users or applications and Confluent components. It handles authentication, authorization, and role management, ensuring that only authenticated and authorized users can access Confluent resources.

### IDM LDAP Server Integration
MDS is often integrated with an Identity Management (IDM) LDAP server, which serves as the user directory for managing user identities, groups, and roles. The IDM LDAP server provides a centralized repository of user information, allowing MDS to authenticate users and enforce access control policies based on LDAP user attributes and group memberships.

### Access Control for Confluent Components
With MDS, access control policies can be defined at a granular level for Confluent components such as brokers, schema registry, connectors, and ksqlDB. Administrators can assign roles to users or groups, specifying the actions they are allowed to perform on specific resources.

Confluent RBAC and MDS provide a robust security framework for protecting sensitive data and ensuring compliance with regulatory requirements in Confluent deployments.

## Lets continue with an IDM service account

In [None]:
# cell 50
_sa_username = input("Enter service account IDM username (SBX)")
_sa_password = getpass.getpass("Enter service account IDM password (SBX)")

### Let's run a test
- Use our admin accounts and AdminClient to create a topic
- Try to use dumbledore account to produce some data to it

In [None]:
# cell 51
bootstrap_servers = 'bip-kafka.confluent.svc.cluster.local:9092'
broker_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username, # <--- Admin User (you)
    'sasl.password': _password
}
admin_client = AdminClient(broker_config)

In [None]:
# cell 52
rbac_test_topic_name = f"DEMO_TOPIC_RBAC_{_username.upper()}"
admin_create_topic(admin_client, rbac_test_topic_name, 1, 1)

In [None]:
# cell 53
producer_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _sa_username, # <--- Service Account User
    'sasl.password': _sa_password
}

# Create a Kafka producer instance
try:
    producer = Producer(producer_config)
    print("Successfully created service account producer!")
except Exception as e:
    print(e)

In [None]:
# cell 54
message = input(f"Enter a message to write to the {rbac_test_topic_name} Kafka topic:")
producer.produce(rbac_test_topic_name, message)
producer.flush()

In [None]:
# cell 55
md(f"""
#### Lets add the DeveloperWrite permission for the Topic
- Go to [BIP Control Center](https://bip-controlcenter.bip-sbx-ec4ac126.eks.local/access-control/manage/assignments)
- Sign in using **YOUR** ({_username}) credentials
  - Simulating an operational task you might take or request
- Go to Assignments tab
- Click on the ID (e.g. `xXStDwdjT6WrSKOj3A35Hw`) next to the `Kafka cluster`
- Navigate to the `Topic` tab
- Click the `+ Add Role Assignment` button
- Fill out the form using the following information:
  - **Principal Type:** User
  - **Principal Name:** {_sa_username}
  - **Role:** DeveloperWrite
  - **Pattern Type:** Literal
  - **Resource ID:** {rbac_test_topic_name}
- Then click the `Save` button
""")

In [None]:
# cell 56
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _sa_username,
    'sasl.password': _sa_password,
    'group.id': f"DEMO_RBAC_{_username.upper()}",
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_config)
consumer.subscribe([rbac_test_topic_name])

incoming_messages = consume(consumer, 5)
consumer.close()
incoming_messages

In [None]:
# cell 57
md(f"""
#### Lets add the DeveloperManage permission for the Consumer Group
- Go to [BIP Control Center](https://bip-controlcenter.bip-sbx-ec4ac126.eks.local/access-control/manage/assignments)
- Sign in using **YOUR** ({_username}) credentials
  - Simulating an operational task you might take or request
- Go to Assignments tab
- Click on the ID (e.g. `xXStDwdjT6WrSKOj3A35Hw`) next to the `Kafka cluster`
- Navigate to the `Group` tab
- Click the `+ Add Role Assignment` button
- Fill out the form using the following information:
  - **Principal Type:** User
  - **Principal Name:** {_sa_username}
  - **Role:** DeveloperManage
  - **Pattern Type:** Literal
  - **Resource ID:** DEMO_RBAC_{_username.upper()}
- Then click the `Save` button
""")

In [None]:
# cell 58
md(f"""
#### Lets add the DeveloperRead permission for the Consumer Group
- Go to [BIP Control Center](https://bip-controlcenter.bip-sbx-ec4ac126.eks.local/access-control/manage/assignments)
- Sign in using **YOUR** ({_username}) credentials
  - Simulating an operational task you might take or request
- Go to Assignments tab
- Click on the ID (e.g. `xXStDwdjT6WrSKOj3A35Hw`) next to the `Kafka cluster`
- Navigate to the `Group` tab
- Click the `+ Add Role Assignment` button
- Fill out the form using the following information:
  - **Principal Type:** User
  - **Principal Name:** {_sa_username}
  - **Role:** DeveloperRead
  - **Pattern Type:** Literal
  - **Resource ID:** DEMO_RBAC_{_username.upper()}
- Then click the `Save` button
""")

In [None]:
# cell 59
md(f"""
#### Lets add the DeveloperRead permission for the Topic
- Go to [BIP Control Center](https://bip-controlcenter.bip-sbx-ec4ac126.eks.local/access-control/manage/assignments)
- Sign in using **YOUR** ({_username}) credentials
  - Simulating an operational task you might take or request
- Go to Assignments tab
- Click on the ID (e.g. `xXStDwdjT6WrSKOj3A35Hw`) next to the `Kafka cluster`
- Navigate to the `Topic` tab
- Click the `+ Add Role Assignment` button
- Fill out the form using the following information:
  - **Principal Type:** User
  - **Principal Name:** {_sa_username}
  - **Role:** DeveloperRead
  - **Pattern Type:** Literal
  - **Resource ID:** {rbac_test_topic_name}
- Then click the `Save` button
""")

In [None]:
# cell 60
list_confluent_rolebindings()

#### Sample ConfluentRoleBinding (Topic Write)

```yaml
---
apiVersion: platform.confluent.io/v1beta1
kind: ConfluentRolebinding
metadata:
  annotations:
  finalizers:
  - confluentrolebinding.finalizers.platform.confluent.io
  name: demo-rbac-dstrivelli-topic-write
  namespace: confluent
spec:
  principal:
    name: dumbledore
    type: user
  resourcePatterns:
  - name: DEMO_TOPIC_RBAC_DSTRIVELLI
    patternType: LITERAL
    resourceType: Topic
  role: DeveloperWrite
...
```

In [None]:
# cell 61
create_confluent_rolebinding(
    name=f"demo-rbac-{_username.lower()}-topic-write",
    principal_name='dumbledore',
    principal_type='user',
    resource_id=rbac_test_topic_name,
    pattern_type='LITERAL',
    resource_type='Topic',
    role='DeveloperWrite',
)

### Let's setup a new test
- Delete and recreate the RBAC topic
- Try to produce Avro-Serialized data to a topic
- Try to consume Avro-Serialized data from a topic
- Add the missing ConfluentRoleBindings

In [None]:
# cell 62
admin_delete_topic(admin_client, rbac_test_topic_name)
admin_create_topic(admin_client, rbac_test_topic_name, 1, 1)

In [None]:
# cell 63

# Initialize Schema Registry client
schema_registry_url = 'https://bip-schemaregistry.confluent.svc.cluster.local:8081'
rbac_schema_registry_client = SchemaRegistryClient({ 
        'url': schema_registry_url,
        'basic.auth.user.info': f'{_sa_username}:{_sa_password}'
    })

# Register a schema
schema = {
    "type": "record",
    "name": "User",
    "namespace": f"com.example.rbac.{_username.lower()}",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "username", "type": "string"},
        {"name": "email", "type": "string"}
    ]
}

fq_schema = Schema(json.dumps(schema), "AVRO")

subject = rbac_test_topic_name + "-value"
schema_id = rbac_schema_registry_client.register_schema(subject, fq_schema)

print("Schema registered successfully with ID:", schema_id)

### Schema Registry RBAC Role Mappings
<img src="images/schema-rbac.png" width=1000>

#### Sample ConfluentRoleBinding (Schema Write)
```yaml
---
apiVersion: platform.confluent.io/v1beta1
kind: ConfluentRolebinding
metadata:
  name: demo-rbac-dstrivelli-schema-write
  namespace: confluent
spec:
  clustersScopeByIds:
    schemaRegistryClusterId: id_bip-schemaregistry_confluent # <------ Notice!
  principal:
    name: dumbledore
    type: user
  resourcePatterns:
  - name: DEMO_TOPIC_RBAC_DSTRIVELLI
    patternType: PREFIXED
    resourceType: Subject
  role: DeveloperWrite
... 
```

In [None]:
# cell 64
create_confluent_rolebinding_schema(
    name=f"demo-rbac-{_username.lower()}-schema-write",
    principal_name='dumbledore',
    principal_type='user',
    resource_id=rbac_test_topic_name,
    pattern_type='PREFIXED',
    resource_type='Subject',
    role='DeveloperWrite',
)

In [None]:
# cell 65
# Avro Serialized Production

retrieved_schema = rbac_schema_registry_client.get_schema(schema_id)
value_avro_serializer = AvroSerializer(schema_registry_client = rbac_schema_registry_client, schema_str = retrieved_schema.schema_str)

# Initialize Serializing Producer
conf = {
    'bootstrap.servers': bootstrap_servers,
    'value.serializer': value_avro_serializer,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _sa_username,
    'sasl.password': _sa_password
}
rbac_serial_producer = SerializingProducer(conf)

message_data = {
    "id": 456,
    "username": _sa_username,
    "email": "john.foe@example.com"
}

rbac_serial_producer.produce(topic=rbac_test_topic_name, value=message_data)
rbac_serial_producer.flush()

In [None]:
# cell 66
# Avro Deserialized Consumption
value_avro_deserializer = AvroDeserializer(schema_registry_client = rbac_schema_registry_client)

# Initialize Deserializing Consumer
conf = {
    'bootstrap.servers': bootstrap_servers,
    'value.deserializer': value_avro_deserializer,
    'group.id': f"DEMO_RBAC_{_username.upper()}",
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _sa_username,
    'sasl.password': _sa_password,
    'auto.offset.reset': 'earliest'
}

rbac_deserial_consumer = DeserializingConsumer(conf)
rbac_deserial_consumer.subscribe([rbac_test_topic_name])
incoming_messages = consume(rbac_deserial_consumer, 3)
rbac_deserial_consumer.close()
incoming_messages

# Cleanup! Thanks for joining!

In [32]:
# cell 67
import os

# Clean up Python Files
files = ['consume.py', 'consume-multi.py', 'consume-split.py']
for f in files:
    if os.path.exists(f):
        os.remove(f)

try:
    consumer.close()
    deserial_consumer.close()
except:
    pass

# Clean up Topics
bootstrap_servers = 'bip-kafka.confluent.svc.cluster.local:9092'
broker_config = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': _username,
    'sasl.password': _password
}
admin_client = AdminClient(broker_config)
topics = admin_client.list_topics().topics
topics_cleaned = [k for k,v in topics.items() if k.find(f"DEMO_TOPIC_{_username.upper()}") > -1]
if len(topics_cleaned) > 0:
    admin_client.delete_topics(topics_cleaned)

# Clean up Consumer Groups
cg_states = ["UNKNOWN","PREPARING_REBALANCING","COMPLETING_REBALANCING","STABLE","DEAD","EMPTY"]
states = {ConsumerGroupState[state] for state in cg_states}
future = admin_client.list_consumer_groups(request_timeout=10, states=states)
try:
    groupids = []
    list_consumer_groups_result = future.result()
    for valid in list_consumer_groups_result.valid:
        groupids.append(valid.group_id)

    filtered_groups = [g for g in groupids if f"{_username.upper()}" in g]
    for group in filtered_groups:
        try:
            admin_client.delete_consumer_groups([group])
            print(f"Consumer group '{group}' deleted successfully.")
        except Exception as e:
            print(f"Failed to delete consumer group '{group}': {e}")
except Exception:
    raise

# Clean up Schemas
schema_registry_url = 'https://bip-schemaregistry.confluent.svc.cluster.local:8081'
schema_registry_client = SchemaRegistryClient({ 
        'url': schema_registry_url,
        'basic.auth.user.info': f'{_username}:{_password}'
    })

subjects = schema_registry_client.get_subjects()
filtered_subjects = [s for s in subjects if f"{_username.upper()}" in s]
for subject in filtered_subjects:
    try:
        schema_registry_client.delete_subject(subject)
        print(f"Deleted subject '{subject}' successfully.")
    except Exception as e:
        print(f"Failed to delete subject '{subject}': {e}")

Consumer group 'DEMO_CUCUMBER' deleted successfully.
Consumer group 'DEMO_IMAGES_CUCUMBER' deleted successfully.


# Supporting Links

- https://docs.confluent.io/platform/current/security/rbac/index.html#rbac-and-acls
- https://www.kafka-streams-book.com/
- https://docs.confluent.io/operator/current/co-api.html