In [6]:
## Читаем с автоматическим офсетом
from confluent_kafka import Consumer, KafkaException
# Конфигурация консюмера
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group11',
    'auto.offset.reset': 'earliest',  # Чтение с самого начала, если офсеты не найдены
    'enable.auto.commit': True        # Включение автоматического коммита офсетов
}
# Создание консюмера
consumer = Consumer(conf)
# Подписка на топик
consumer.subscribe(['my-topic'])
try:
    for _ in range(5):
        msg = consumer.poll(timeout=1.0)  # Ожидание новых сообщений  
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        print(f'Received message: {msg.value().decode("utf-8")}')
        
finally:
    # Закрытие консюмера
    consumer.close()


## Читаем с РУЧНЫМ офсетом

In [8]:

from confluent_kafka import Consumer, KafkaException
# Конфигурация консюмера
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group-222222222', ## другая группа
    'auto.offset.reset': 'earliest',  # Чтение с самого начала, если офсеты не найдены
    'enable.auto.commit': False        # 
}
# Создание консюмера
consumer = Consumer(conf)
# Подписка на топик
consumer.subscribe(['my-topic'])
try:
    for _ in range(15):
        msg = consumer.poll(timeout=1.0)  # Ожидание новых сообщений  
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        print(f'Received message: {msg.value().decode("utf-8")}')
        
finally:
    # Закрытие консюмера
    consumer.close()

Received message: 123
Received message: 456
Received message: 789
Received message: 000
Received message: 111
Received message: 22222
Received message: 3333
Received message: aaa
Received message: zzzzzz


Чтение офсетов для конкретной группы консумеров
Для чтения текущих офсетов для группы консумеров можно использовать метод committed(partitions). Этот метод возвращает список офсетов для указанных разделов (partitions).

In [9]:
from confluent_kafka import Consumer, TopicPartition

# Конфигурация консюмера
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False
}

consumer = Consumer(conf)
consumer.subscribe(['my-topic'])

# Получение текущих офсетов для группы консумеров по разделам топика
partitions = [TopicPartition('my-topic', partition=0)]
offsets = consumer.committed(partitions)

for tp in offsets:
    print(f'Partition: {tp.partition}, Offset: {tp.offset}')

consumer.close()


Partition: 0, Offset: 9


### Producer


In [5]:
from confluent_kafka import Producer

# Функция для обработки успешной доставки сообщений
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

# Конфигурация продюсера
conf = {
    'bootstrap.servers': 'localhost:9092',  # Адрес Kafka брокера
    'client.id': 'my-producer'                # Идентификатор клиента
}

# Создание экземпляра продюсера
producer = Producer(conf)

# Отправка сообщений
try:
    for i in range(4):
        key = f'key-{i}'                # Ключ сообщения (опционально)
        value = f'Hello Kafka {i}'      # Значение сообщения
        producer.produce('my-topic', key=key, value=value, callback=delivery_report)

        # Обработка асинхронных операций
        producer.poll(0)  # Проверка на наличие ошибок и вызов обратных функций

    # Ожидание отправки всех сообщений
    producer.flush()

except Exception as e:
    print(f'Error while producing: {e}')

finally:
    # Закрытие продюсера
    print('finally')


Message delivered to my-topic [0] at offset 39
Message delivered to my-topic [0] at offset 40
Message delivered to my-topic [0] at offset 41
Message delivered to my-topic [0] at offset 42
finally


### exactly-once semantics, EOS

 idempotent producer и настроек для транзакций
 Конфигурация продюсера:

enable.idempotence: True: Включает идемпотентный режим для продюсера.
transactional.id: Уникальный идентификатор для транзакций. Он должен быть уникальным для каждого продюсера в вашем приложении.

In [24]:
from confluent_kafka import Producer

# Функция для обработки успешной доставки сообщений
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

# Конфигурация продюсера
conf = {
    'bootstrap.servers': 'localhost:9092',  # Адрес Kafka брокера
    'client.id': 'my-producer',              # Идентификатор клиента
    'enable.idempotence': True,               # Включение идемпотентного режима
    'acks': 'all',                            # Подтверждение от всех реплик
    'retries': 5,                             # Количество попыток повторной отправки
    'transactional.id': 'my-transactional-id'  # Идентификатор транзакции
}

# Создание экземпляра продюсера
producer = Producer(conf)

# Инициализация транзакции
producer.init_transactions()

try:
    # Начало транзакции
    producer.begin_transaction()

    # Отправка сообщений
    for i in range(4):
        key = f'key-{i}'                # Ключ сообщения (опционально)
        value = f'Hello Kafka {i}'      # Значение сообщения
        producer.produce('my-topic', key=key, value=value, callback=delivery_report)

        # Обработка асинхронных операций
        producer.poll(0)  # Проверка на наличие ошибок и вызов обратных функций

    # Завершение транзакции
    producer.commit_transaction()

except Exception as e:
    print(f'Error while producing: {e}')
    # Отмена транзакции в случае ошибки
    producer.abort_transaction()

finally:
    # Ожидание отправки всех сообщений
    producer.flush()


%4|1729585021.552|GETPID|my-producer#producer-22| [thrd:main]: Failed to acquire transactional PID from broker TxnCoordinator/1: Broker: Not coordinator: retrying


Message delivered to my-topic [0] at offset 34
Message delivered to my-topic [0] at offset 35
Message delivered to my-topic [0] at offset 36
Message delivered to my-topic [0] at offset 37


## Schema registry

In [11]:
import requests
import json

In [18]:

response = requests.get("http://localhost:8081/subjects")
response.json()

['file-topic-value']

In [19]:
response = requests.get("http://localhost:8081/subjects/file-topic-value/versions/latest")
response.json()

{'subject': 'file-topic-value', 'version': 1, 'id': 1, 'schema': '"string"'}

Регистрация новой схемы

In [13]:
import requests
import json

url = 'http://localhost:8081/subjects/my-topic-value/versions'
schema = {
    "schema": json.dumps({
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "age", "type": "int"}
        ]
    })
}

response = requests.post(url, json=schema)

if response.status_code == 200:
    schema_id = response.json().get('id')
    print('Схема зарегистрирована с ID:', schema_id)
else:
    print('Ошибка при регистрации схемы:', response.text)


Ошибка при регистрации схемы: {"error_code":409,"message":"Schema being registered is incompatible with an earlier schema for subject \"file-topic-value\", details: [{errorType:'TYPE_MISMATCH', description:'The type (path '/') of a field in the new schema does not match with the old schema', additionalInfo:'reader type: RECORD not compatible with writer type: STRING'}, {oldSchemaVersion: 1}, {oldSchema: '\"string\"'}, {compatibility: 'BACKWARD'}] io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleSchemaException: Schema being registered is incompatible with an earlier schema for subject \"file-topic-value\", details: [{errorType:'TYPE_MISMATCH', description:'The type (path '/') of a field in the new schema does not match with the old schema', additionalInfo:'reader type: RECORD not compatible with writer type: STRING'}, {oldSchemaVersion: 1}, {oldSchema: '\"string\"'}, {compatibility: 'BACKWARD'}]\nio.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleSchemaE

In [21]:
schema_id = 1  # Замените на нужный ID
url = f'http://localhost:8081/schemas/ids/{schema_id}'
response = requests.get(url)

if response.status_code == 200:
    schema = response.json().get('schema')
    print('Полученная схема:', schema)
else:
    print('Ошибка при получении схемы:', response.text)


Полученная схема: {"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}


AVRO

Avro-файл — это бинарный файл, который используется для хранения данных, сериализованных в формате Avro. Он состоит из двух основных частей:

Заголовок (Header): содержит метаданные и схему данных, которая описывает формат записей в файле.
Тело (Data Blocks): содержит сами данные (записи), закодированные в соответствии с указанной схемой.

In [28]:
#Структура Avro-файла
# 1. Заголовок (Header)
# Каждый Avro-файл начинается с заголовка, который включает в себя:

# Магическое число (magic): первые 4 байта файла — это последовательность байт
#  0x4F 0x62 0x6A 0x01 (или строка "Obj\x01"). Это используется для идентификации файла 
# как Avro-файл.
#Метаданные (Metadata): хранит JSON-объект с информацией о схеме (например, "avro.schema") 
# и другую служебную информацию.
#Синхронизационный маркер (Sync Marker): случайная последовательность из 16 байт, которая используется для синхронизации при чтении файла блоками.

In [24]:
## работа с avro 
import fastavro

# Определение схемы
schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"}
    ]
}

# Данные, которые будут сериализованы
records = [
    {"name": "Alice", "age": 25},
    {"name": "Bob", "age": 30}
]

# Запись данных в Avro-файл
with open('test.avro', 'wb') as out:
    fastavro.writer(out, schema, records)


In [26]:
import fastavro

# Чтение Avro-файла
with open('test.avro', 'rb') as f:
    reader = fastavro.reader(f)
    for record in reader:
        print(record)


{'name': 'Alice', 'age': 25}
{'name': 'Bob', 'age': 30}


In [None]:
#### Пример в kafka.py!!!!!!!!!!!!!!!1

In [23]:
response = requests.get("http://localhost:8081/subjects/my-topic-value/versions/latest")
response.json()

{'error_code': 40401,
 'message': "Subject 'my-topic-value' not found. io.confluent.rest.exceptions.RestNotFoundException: Subject 'my-topic-value' not found.\nio.confluent.rest.exceptions.RestNotFoundException: Subject 'my-topic-value' not found.\n\tat io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:78)\n\tat io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.getSchemaByVersion(SubjectVersionsResource.java:152)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)\

### KAFKA CONNECT

In [15]:
import requests
import json

response = requests.get("http://localhost:8083/connectors")
response.json()

[]

In [16]:
response = requests.get("http://localhost:8083/connectors/file-source-connector")
response.json()

{'error_code': 404, 'message': 'Connector file-source-connector not found'}

Создаем коннектор для чтения из файла

In [17]:
import requests
import json

url = 'http://localhost:8083/connectors'

headers = {
    'Content-Type': 'application/json'
}

data = {
    "name": "file-source-connector",  # Имя коннектора
    "config": {
        "connector.class": "FileStreamSource",  # Класс коннектора
        "tasks.max": "1",  # Количество задач
        "file": "/tmp/tmp/test-file.txt",  # Путь к файлу, который нужно читать
        "topic": "file-topic",  # Топик, куда отправлять данные
        "poll.interval.ms": "1000"  # Интервал опроса файла (в миллисекундах)
    }
}

response = requests.post(url, headers=headers, data=json.dumps(data))

if response.status_code == 201:
    print('Файловый коннектор успешно создан')
else:
    print('Ошибка при создании файлового коннектора:', response.status_code, response.text)


Файловый коннектор успешно создан


## DEBEZIUM CDC

In [None]:
from sqlalchemy import create_engine
import pandas as pd
import os
import time

pwd = "secret"
uid = "debezium"
server = "localhost"
db = "mydb"
port = "5432"
#
engine = create_engine(f'postgresql://{uid}:{pwd}@{server}:{port}/{db}')

In [55]:
df = pd.read_sql('select * from public.my_table', engine)
df

Unnamed: 0,id,name
0,1,John Doe
1,2,Alice
2,3,Bob
3,4,John Doe
4,5,Alice
5,6,Bob


In [74]:
# Создайте DataFrame с данными для вставки
data = {'name': ['asd----', 'Alice', 'Bob']}
df = pd.DataFrame(data)

# Вставка данных в my_table
df.to_sql('my_table', engine, if_exists='append', index=False)

3

In [57]:
response = requests.get("http://localhost:8083/")
response.json()

{'version': '2.6.1',
 'commit': '6b2021cd52659cef',
 'kafka_cluster_id': 'MkU3OEVBNTcwNTJENDM2Qk'}

In [69]:
postgres_connector = {
    "name": "my_table-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgr",  
        "database.port": "5432",
        "database.user": "debezium",      
        "database.password": "secret", 
        "database.dbname": "mydb",  
        "plugin.name": "pgoutput",
        "database.server.name": "source",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "table.include.list": "public.my_table", 
        "slot.name": "dbz_sales_transaction_slot"
    }
}


In [70]:
response = requests.get('http://localhost:8083/connectors/')
response.json()

[]

In [71]:
url = 'http://localhost:8083/connectors/'


# Отправляем данные как JSON
response = requests.post(url, json = postgres_connector)

# Проверяем статус код ответа
if response.status_code == 200:
    print("Данные успешно отправлены.")
    print("Ответ сервера:", response.json())  # Если сервер возвращает JSON-ответ
else:
    print("Ошибка при отправке данных:", response.status_code, response.text)

Ошибка при отправке данных: 201 {"name":"my_table-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"postgr","database.port":"5432","database.user":"debezium","database.password":"secret","database.dbname":"mydb","plugin.name":"pgoutput","database.server.name":"source","key.converter.schemas.enable":"false","value.converter.schemas.enable":"false","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","table.include.list":"public.my_table","slot.name":"dbz_sales_transaction_slot","name":"my_table-connector"},"tasks":[],"type":"source"}


In [72]:
response = requests.get('http://localhost:8083/connectors/my_table-connector')
response.json()

{'name': 'my_table-connector',
 'config': {'connector.class': 'io.debezium.connector.postgresql.PostgresConnector',
  'database.user': 'debezium',
  'database.dbname': 'mydb',
  'slot.name': 'dbz_sales_transaction_slot',
  'transforms': 'unwrap',
  'database.server.name': 'source',
  'database.port': '5432',
  'plugin.name': 'pgoutput',
  'key.converter.schemas.enable': 'false',
  'database.hostname': 'postgr',
  'database.password': 'secret',
  'value.converter.schemas.enable': 'false',
  'name': 'my_table-connector',
  'transforms.unwrap.type': 'io.debezium.transforms.ExtractNewRecordState',
  'value.converter': 'org.apache.kafka.connect.json.JsonConverter',
  'table.include.list': 'public.my_table',
  'key.converter': 'org.apache.kafka.connect.json.JsonConverter'},
 'tasks': [{'connector': 'my_table-connector', 'task': 0}],
 'type': 'source'}

In [73]:
response = requests.get('http://localhost:8083/connectors/my_table-connector/status')
response.json()

{'name': 'my_table-connector',
 'connector': {'state': 'RUNNING', 'worker_id': '172.18.0.5:8083'},
 'tasks': [{'id': 0, 'state': 'RUNNING', 'worker_id': '172.18.0.5:8083'}],
 'type': 'source'}

In [2]:
!pip show confluent-kafka

Name: confluent-kafka
Version: 2.6.0
Summary: Confluent's Python client for Apache Kafka
Home-page: https://github.com/confluentinc/confluent-kafka-python
Author: Confluent Inc
Author-email: support@confluent.io
License: 
Location: /root/myenv/lib/python3.12/site-packages
Requires: 
Required-by: 


In [64]:
from confluent_kafka.admin import AdminClient   

# Определите адреса брокеров Kafka
bootstrap_servers = ['localhost:9092']

# Настройка Kafka
config = {
    'bootstrap.servers': 'localhost:9092'  # Адрес вашего Kafka-брокера
}
# Создайте экземпляр KafkaAdminClient
admin_client = AdminClient(config)

# Запрос метаданных для получения списка топиков
metadata = admin_client.list_topics(timeout=10)

# Получение списка топиков
topics = metadata.topics

# Вывод списка топиков
for topic in topics:
    print(topic)

connect_status
connect_offsets
connect_configs
_schemas
source.public.my_table
__consumer_offsets


In [None]:
from kafka import KafkaConsumer
bootstrap_servers = ['localhost:29092']
consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers)
consumer.topics()

In [None]:
import json
topicName = 'source.public.my_table'
# Initialize consumer variable
consumer = KafkaConsumer (topicName , auto_offset_reset='earliest', 
                          bootstrap_servers = bootstrap_servers, group_id='sales-transactions')

# Read and print message from consumer
for msg in consumer:
    print(json.loads(msg.value))

In [77]:
# Удаление
#response = requests.delete('http://localhost:8083/connectors/my_table-connector')
#response.json()