## Schema Registry in Apache Kafka

A Schema Registry is a service that provides a central repository for managing and validating schemas for data serialization formats such as Avro, JSON, and Protobuf. In the context of Apache Kafka, the Schema Registry is used to enforce schema evolution rules and ensure that data produced and consumed by Kafka topics adheres to a predefined schema.

### Key Concepts

1. **Schema**: A schema defines the structure of the data. It specifies the fields, their types, and any constraints on the data.

2. **Schema Evolution**: Schema evolution refers to the process of updating schemas over time while maintaining compatibility with existing data. The Schema Registry enforces compatibility rules to ensure that changes to schemas do not break existing producers or consumers.

3. **Compatibility Modes**: The Schema Registry supports different compatibility modes to control how schemas can evolve. Common compatibility modes include:
    - **Backward Compatibility**: New schema versions can read data written by previous versions.
    - **Forward Compatibility**: Previous schema versions can read data written by new versions.
    - **Full Compatibility**: Both backward and forward compatibility are enforced.
    - **Backward Transitive Compatibility**: New schema versions can read data written by all previous versions.
    - **Forward Transitive Compatibility**: All previous schema versions can read data written by the new version.
    - **Full Transitive Compatibility**: Both backward and forward transitive compatibility are enforced.

4. **Schema Registry API**: The Schema Registry provides a RESTful API for managing schemas. Producers and consumers interact with the Schema Registry to retrieve and validate schemas.

### Benefits of Using a Schema Registry

1. **Data Quality**: Ensures that data produced and consumed by Kafka topics adheres to a predefined schema, improving data quality and consistency.

2. **Schema Evolution**: Facilitates schema evolution by enforcing compatibility rules, allowing schemas to evolve without breaking existing applications.

3. **Centralized Management**: Provides a central repository for managing schemas, making it easier to track and update schemas across multiple applications.

4. **Interoperability**: Supports multiple serialization formats (Avro, JSON, Protobuf), enabling interoperability between different systems and applications.


### Use Case

Consider a Kafka topic that stores user events. By using a Schema Registry, you can define a schema for user events and ensure that all producers and consumers adhere to this schema. If you need to add a new field to the user event schema, the Schema Registry can enforce compatibility rules to ensure that existing consumers can still read the data.

In summary, the Schema Registry is a powerful tool for managing and validating schemas in Apache Kafka, ensuring data quality, facilitating schema evolution, and providing centralized schema management.

### Examples

Below are examples of registering a JSON schema, a sample producer, and a sample consumer using the Schema Registry in Apache Kafka.

In [None]:
import requests
import json

# Schema Registry URL
schema_registry_url = 'http://schema-registry:8081'

# User JSON Schema
user_schema = {
    "type": "object",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string"},
        "email": {"type": "string"}
    },
    "required": ["id", "name", "email"]
}

# Register Schema
response = requests.post(
    f"{schema_registry_url}/subjects/user-json-value/versions",
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
    data=json.dumps({"schema": json.dumps(user_schema), "schemaType": "JSON"})
)

if response.status_code == 200:
    print("Schema registered successfully!")
else:
    print(f"Failed to register schema: {response.text}")

In [None]:
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
import time
import json

# Kafka Configuration
conf = {
    'bootstrap.servers': "kafka-broker-1:29094,kafka-broker-2:29094"
}

# Schema Registry Configuration
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# User Schema
user_schema = {
    "type": "object",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string"},
        "email": {"type": "string"}
    },
    "required": ["id", "name", "email"]
}

# Create JSON Serializer
json_serializer = JSONSerializer(json.dumps(user_schema), schema_registry_client)

# Create Producer Instance
producer = Producer(conf)

# Kafka Topic
topic = "user-json"

# Produce User Messages
for i in range(10):
    user = {'id': i, 'name': f"User {i}", 'email': f"user{i}@example.com"}
    producer.produce(
        topic=topic,
        key=StringSerializer('utf_8')(str(i), SerializationContext(topic, MessageField.KEY)),
        value=json_serializer(user, SerializationContext(topic, MessageField.VALUE))
    )
    print(f"Produced: {user}")
    producer.flush()  # Ensure delivery
    time.sleep(1)  # Simulate delay between messages

print("All user messages produced successfully!")

In [None]:
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.serialization import StringDeserializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
import json

# Kafka Consumer Configuration
conf = {
    'bootstrap.servers': "kafka-broker-1:29094,kafka-broker-2:29094",
    'group.id': 'user-group',
    'auto.offset.reset': 'earliest'
}

# Schema Registry Configuration
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# User Schema
user_schema = {
    "type": "object",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string"},
        "email": {"type": "string"}
    },
    "required": ["id", "name", "email"]
}

# Create JSON Deserializer
def user_from_dict(data, ctx):
    return data

json_deserializer = JSONDeserializer(json.dumps(user_schema), from_dict=user_from_dict, schema_registry_client=schema_registry_client)

# Create Consumer Instance
consumer = Consumer(conf)
topic = "user-json"
consumer.subscribe([topic])

# Consume User Messages
try:
    while True:
        msg = consumer.poll(1.0)  # Poll for messages
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        user = json_deserializer(msg.value(), SerializationContext(topic, MessageField.VALUE))
        print(f"Consumed: {user}")

except KeyboardInterrupt:
    print("Stopping Consumer...")
finally:
    consumer.close()

### Registering an Avro Schema

Below is an example of registering an Avro schema, a sample producer, and a sample consumer using the Schema Registry in Apache Kafka.

In [None]:
import requests
import json

# Schema Registry URL
schema_registry_url = 'http://schema-registry:8081'

# User Avro Schema
user_avro_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": "string"}
    ]
}

# Register Avro Schema
response = requests.post(
    f"{schema_registry_url}/subjects/user-avro-value/versions",
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
    data=json.dumps({"schema": json.dumps(user_avro_schema)})
)

if response.status_code == 200:
    print("Avro schema registered successfully!")
else:
    print(f"Failed to register Avro schema: {response.text}")

In [None]:
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer
import time
import avro.schema

# Kafka Configuration
conf = {
    'bootstrap.servers': "kafka-broker-1:29094,kafka-broker-2:29094"
}

# Schema Registry Configuration
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# User Avro Schema
user_avro_schema_str = """
{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": "string"}
    ]
}
"""


# Create Schema object
user_schema = Schema(user_avro_schema_str, "AVRO")
# Create Avro Serializer
avro_serializer = AvroSerializer(schema_registry_client, user_schema)
# Create Producer Instance
producer = Producer(conf)
# Kafka Topic
topic = "user-avro"

# Produce User Messages
for i in range(10):
    user = {'id': i, 'name': f"User {i}", 'email': f"user{i}@example.com"}
    producer.produce(
        topic=topic,
        key=StringSerializer('utf_8')(str(i), SerializationContext(topic, MessageField.KEY)),
        value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE))
    )
    print(f"Produced: {user}")
    producer.flush()  # Ensure delivery
    time.sleep(1)  # Simulate delay between messages

print("All user messages produced successfully!")

In [None]:
from confluent_kafka import Consumer, KafkaException
from confluent_kafka.serialization import StringDeserializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

# Kafka Consumer Configuration
conf = {
    'bootstrap.servers': "kafka-broker-1:29094,kafka-broker-2:29094",
    'group.id': 'user-group',
    'auto.offset.reset': 'earliest'
}

# Schema Registry Configuration
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# User Avro Schema
user_avro_schema_str = """
{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": "string"}
    ]
}
"""

# Create Schema object
user_schema = Schema(user_avro_schema_str, "AVRO")

# Create Avro Deserializer
def user_from_dict(data, ctx):
    return data

avro_deserializer = AvroDeserializer(schema_registry_client, user_schema, from_dict=user_from_dict)

# Create Consumer Instance
consumer = Consumer(conf)
topic = "user-avro"
consumer.subscribe([topic])

# Consume User Messages
try:
    while True:
        msg = consumer.poll(1.0)  # Poll for messages
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        user = avro_deserializer(msg.value(), SerializationContext(topic, MessageField.VALUE))
        print(f"Consumed: {user}")

except KeyboardInterrupt:
    print("Stopping Consumer...")
finally:
    consumer.close()

### Registering an Protobuf Schema

Below is an example of registering an Protobuf schema, a sample producer, and a sample consumer using the Schema Registry in Apache Kafka.

In [None]:
import requests
import json

# Schema Registry URL
schema_registry_url = 'http://schema-registry:8081'

# User Protobuf Schema
user_proto_schema = """
syntax = "proto3";
package user;

message User {
    int32 id = 1;
    string name = 2;
    string email = 3;
}
"""

# Register Protobuf Schema
response = requests.post(
    f"{schema_registry_url}/subjects/user-proto-value/versions",
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
    data=json.dumps({"schema": user_proto_schema, "schemaType": "PROTOBUF"})
)

if response.status_code == 200:
    print("Protobuf schema registered successfully!")
else:
    print(f"Failed to register Protobuf schema: {response.text}")

In [4]:
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
import time
import sys

# Add the src/proto directory to the Python path
sys.path.append('/workspaces/kafka-tutorial/src/proto')

import user_pb2

# Kafka Configuration
conf = {
    'bootstrap.servers': "kafka-broker-1:29094,kafka-broker-2:29094"
}

# Schema Registry Configuration
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Protobuf Serializer Configuration
protobuf_serializer_conf = {'use.deprecated.format': False}

# Create Protobuf Serializer
protobuf_serializer = ProtobufSerializer(user_pb2.User, schema_registry_client, protobuf_serializer_conf)

# Create Producer Instance
producer = Producer(conf)

# Kafka Topic
topic = "user-protobuf"

# Produce User Messages
for i in range(10):
    user = user_pb2.User(id=i, name=f"User {i}", email=f"user{i}@example.com")
    producer.produce(
        topic=topic,
        key=StringSerializer('utf_8')(str(i), SerializationContext(topic, MessageField.KEY)),
        value=protobuf_serializer(user, SerializationContext(topic, MessageField.VALUE))
    )
    print(f"Produced: {user}")
    producer.flush()  # Ensure delivery
    time.sleep(1)  # Simulate delay between messages

print("All user messages produced successfully!")

Produced: name: "User 0"
email: "user0@example.com"

Produced: id: 1
name: "User 1"
email: "user1@example.com"

Produced: id: 2
name: "User 2"
email: "user2@example.com"

Produced: id: 3
name: "User 3"
email: "user3@example.com"

Produced: id: 4
name: "User 4"
email: "user4@example.com"

Produced: id: 5
name: "User 5"
email: "user5@example.com"

Produced: id: 6
name: "User 6"
email: "user6@example.com"

Produced: id: 7
name: "User 7"
email: "user7@example.com"

Produced: id: 8
name: "User 8"
email: "user8@example.com"

Produced: id: 9
name: "User 9"
email: "user9@example.com"

All user messages produced successfully!


In [None]:
from confluent_kafka import Consumer
from confluent_kafka.serialization import StringDeserializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer
import sys

# Add the src/proto directory to the Python path
sys.path.append('/workspaces/kafka-tutorial/src/proto')

import user_pb2

# Kafka Configuration
conf = {
    'bootstrap.servers': "kafka-broker-1:29094,kafka-broker-2:29094",
    'group.id': "user-protobuf-consumer-group",
    'auto.offset.reset': 'earliest'
}

# Schema Registry Configuration
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Protobuf Deserializer Configuration
protobuf_deserializer_conf = {'use.deprecated.format': False}

# Create Protobuf Deserializer
protobuf_deserializer = ProtobufDeserializer(user_pb2.User, protobuf_deserializer_conf, schema_registry_client)

# Create String Deserializer for the key
string_deserializer = StringDeserializer('utf_8')

# Create Consumer Instance
consumer = Consumer(conf)

# Kafka Topic
topic = "user-protobuf"

# Subscribe to the topic
consumer.subscribe([topic])

# Consume User Messages
try:
    while True:
        msg = consumer.poll(1.0)  # Poll for messages with a timeout of 1 second
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        user = protobuf_deserializer(msg.value(), SerializationContext(topic, MessageField.VALUE))
        key = string_deserializer(msg.key(), SerializationContext(topic, MessageField.KEY))
        print(f"Consumed: key={key}, value={user}")

except KeyboardInterrupt:
    pass
finally:
    # Close the consumer to commit final offsets and clean up resources
    consumer.close()

ModuleNotFoundError: No module named 'user_pb2'