from confluent_kafka.admin import AdminClient, NewTopic
Импортируем два класса из библиотеки confluent_kafka:
AdminClient: используется для административных операций с Kafka (например, создание/удаление топиков)
NewTopic: класс для описания нового топика, который мы хотим создать

conf = {
    'bootstrap.servers': 'localhost:9094'
}
Создаём словарь конфигурации для Kafka AdminClient.
'bootstrap.servers' указывает на адрес Kafka-брокера (здесь он работает локально на порту 9094).
Это обязательный параметр — он нужен, чтобы клиент знал, к какому брокеру подключаться.

admin = AdminClient(conf)
Создаём экземпляр AdminClient с указанной конфигурацией.
Этот объект позволит нам выполнять административные действия над Kafka-кластером.

In [14]:
from confluent_kafka.admin import AdminClient, NewTopic

conf = {
    'bootstrap.servers': 'localhost:9094'
}

admin = AdminClient(conf)

In [15]:
new_topic = NewTopic(
    topic='sensor_data',              # Название создаваемого топика — 'sensor_data'
    num_partitions=3,                 # Указываем количество партиций (разделов) — это влияет на параллелизм обработки
    replication_factor=1              # Количество реплик для отказоустойчивости (1 = без репликации)
)

fs = admin.create_topics([new_topic])  # Отправляем запрос на создание топика (можно сразу список из нескольких)

for topic, f in fs.items():            # Обходим словарь: ключ — имя топика, значение — future-объект
    try:
        f.result()                     # Проверяем результат — если всё хорошо, future завершится без ошибок
        print(f"[v] Топик {topic} создан")  # Подтверждаем успешное создание
    except:
        print('Ошибка')                # Если что-то пошло не так (например, топик уже существует), выводим сообщение




Ошибка


Что хранит f (Future-объект)?
- Статус операции — выполнена ли она, завершена с ошибкой, ещё в процессе.
- Результат операции, когда она закончится (если есть).
- Информацию об ошибке, если она произошла во время выполнения.

Механизм ожидания — ты можешь вызвать f.result(), и программа подождёт, пока операция завершится.

Пример:
Пока топик создаётся — f знает, что операция в процессе.
Если создание прошло успешно — f "запоминает" успешное завершение, а f.result() вернёт None.
Если случилась ошибка — f "запоминает" ошибку, и при вызове f.result() будет выброшено исключение.

producer.produce(TOPIC, value=json.dumps(msg))


producer — это объект продюсера Kafka, который отвечает за отправку сообщений в брокер.

produce — метод этого объекта, который создаёт (отправляет) сообщение в указанный топик.

TOPIC — переменная, в которой хранится имя топика, куда отправляем сообщение, здесь 'sensor_data'.

value= — ключевой аргумент метода produce, задаёт содержимое сообщения (payload).

json.dumps(msg) — функция из модуля json, которая превращает Python-словарь msg в строку формата JSON, чтобы её можно было отправить как текст.

Итого: строка говорит "Продюсер, отправь JSON-сообщение msg в топик sensor_data".

producer.poll(0) — это как "проверить почту и сразу же убрать письма".

Вот что происходит:

Продюсер отправляет сообщения асинхронно (то есть, не ждет, пока сообщение дойдет до сервера, а отправляет и идет дальше).

Чтобы узнать, дошло ли сообщение или была ошибка, продюсер иногда должен "проверять статус" отправки.

Метод poll(0) — это команда "быстро проверить статус отправки и обработать все обновления, если они есть".

Аргумент 0 значит — не ждать, а проверить сейчас же и сразу вернуться.

Если не вызывать poll(), то продюсер не узнает о том, успешно ли отправились сообщения, и не обработает возможные ошибки.

Это важно, чтобы гарантировать корректную работу и знать, что сообщение реально ушло.

In [16]:
import json  # Для преобразования словаря в JSON-строку
import random  # Для генерации случайных чисел
import time  # Для управления паузами между сообщениями
from datetime import datetime, timezone  # Для получения текущего времени в ISO формате
from confluent_kafka import Producer  # Kafka Producer для отправки сообщений

conf = {
    'bootstrap.servers': 'localhost:9094'  # Адрес Kafka брокера
}

producer = Producer(conf)  # Создаём объект продюсера с указанной конфигурацией
TOPIC = 'sensor_data'  # Имя топика, в который будем отправлять сообщения

def generate_message():
    # Формируем словарь с данными от сенсора
    return {
        "sensor_id": f'sensor_{random.randint(1, 5)}',  # Идентификатор сенсора, случайное число от 1 до 5
        "timestamp": datetime.now().isoformat(),  # Текущее время в ISO формате
        "value": round(random.uniform(10.0, 40.0), 2)  # Случайное значение сенсора с двумя знаками после запятой
    }
    
try:
    print("[v] Продюсер начал работать..")  # Лог о запуске продюсера
    while True:
        msg = generate_message()  # Генерируем новое сообщение
        producer.produce(TOPIC, value=json.dumps(msg))  # Отправляем сообщение в Kafka, сериализовав в JSON
        producer.poll(0)  # Обработка внутренних событий продюсера (отправка, коллбэки)
        time.sleep(0.01)  # Пауза в 10 миллисекунд между сообщениями, 100 сообщений в секунду

        print('[v] Отправил 1 сообщение', datetime.now().isoformat())  # Лог успешной отправки

except KeyboardInterrupt:
    print("[x] Продюсер остановлен")  # Лог, если прервали работу через Ctrl+C

finally:
    producer.flush()  # Дождаться отправки всех сообщений перед закрытием


[v] Продюсер начал работать..
[v] Отправил 1 сообщение 2025-05-30T20:00:54.523416
[v] Отправил 1 сообщение 2025-05-30T20:00:54.533779
[v] Отправил 1 сообщение 2025-05-30T20:00:54.545617
[v] Отправил 1 сообщение 2025-05-30T20:00:54.557901
[v] Отправил 1 сообщение 2025-05-30T20:00:54.569006
[v] Отправил 1 сообщение 2025-05-30T20:00:54.580174
[v] Отправил 1 сообщение 2025-05-30T20:00:54.591467
[v] Отправил 1 сообщение 2025-05-30T20:00:54.601640
[v] Отправил 1 сообщение 2025-05-30T20:00:54.612395
[v] Отправил 1 сообщение 2025-05-30T20:00:54.623285
[v] Отправил 1 сообщение 2025-05-30T20:00:54.635287
[v] Отправил 1 сообщение 2025-05-30T20:00:54.646462
[v] Отправил 1 сообщение 2025-05-30T20:00:54.657837
[v] Отправил 1 сообщение 2025-05-30T20:00:54.668467
[v] Отправил 1 сообщение 2025-05-30T20:00:54.679645
[v] Отправил 1 сообщение 2025-05-30T20:00:54.692316
[v] Отправил 1 сообщение 2025-05-30T20:00:54.702728
[v] Отправил 1 сообщение 2025-05-30T20:00:54.713239
[v] Отправил 1 сообщение 2025-05-3