# Kafka


## 1. От объектов к событиям

* Традиционно бизнес-системы работали с **объектами** и **их состоянием**: например, пользователь с полями name, email, balance.
* Но если сохранять только текущее состояние, теряется история изменений.
* Современный подход — **event sourcing**: хранить не состояние, а **последовательность событий**, которые к нему привели (например: «пополнение счёта», «списание», «покупка»).
* Это позволяет:

  * восстанавливать состояние в любой момент времени;
  * легко интегрировать разные системы;
  * строить асинхронные архитектуры (каждый сервис реагирует на события, а не ждёт синхронных вызовов).

Kafka — это платформа, которая идеально поддерживает такую событийную модель.

---

## 2. Общая идея Kafka

Kafka — это **распределённый commit-log**, т.е. устойчивая, масштабируемая и реплицируемая структура данных, куда **пишутся события** (записи) и откуда их **читают потребители**.

* События хранятся **в порядке поступления**, и это порядок фиксируется навсегда.
* Kafka не удаляет сообщения сразу — она хранит их в течение заданного времени (retention).
* Можно пересчитывать данные, переигрывать события.

---

## 3. Архитектура: продюсеры, брокеры, консумеры

### Продюсер (Producer)

* Отправляет события в Kafka.
* Выбирает, в какой **топик** и **партицию** записать сообщение.
* Может включать **идемпотентный режим** (чтобы избежать дубликатов) и **транзакции**.

### Брокер (Broker)

* Сервер Kafka, который физически хранит данные (обычно на диске).
* Один кластер Kafka состоит из множества брокеров.
* Каждый брокер хранит часть партиций.

### Консумер (Consumer)

* Читает события из Kafka.
* Может быть **одиночным** (читает все события сам) или входить в **consumer-group** (где партиции распределяются между участниками группы).

---

## 4. Топики, партиции и сегменты

* **Топик** — логическое имя потока сообщений 
* **Партиция** — физическая часть топика, независимый log.
  Каждое сообщение в партиции имеет **offset** — смещение (номер записи).
* **Сегменты** — файлы внутри партиции. Когда один сегмент достигает лимита размера или времени, создаётся новый.

Каждая партиция — **неизменяемая последовательность** записей, к которой можно только добавлять данные.

---

## 5. Репликация и отказоустойчивость

Kafka гарантирует надёжность через **репликацию**:

* У каждой партиции есть **лидер** и **фолловеры**.
* Только лидер принимает запись от продюсеров.
* Фолловеры асинхронно копируют данные.
* Если лидер падает, Kafka выбирает нового лидера (из ISR — in-sync replicas).

Конфигурация `acks` определяет, когда продюсер получает подтверждение:

* `acks=0` — не ждёт подтверждения (быстро, но небезопасно);
* `acks=1` — ждёт от лидера;
* `acks=all` — ждёт от всех синхронизированных реплик.

---

## 6. Retention (жизненный цикл данных)

Kafka не удаляет события сразу после прочтения.
Удаление управляется политикой хранения (retention policy):

* **По времени** (`retention.ms`) — старые сегменты удаляются после заданного времени;
* **По размеру** (`retention.bytes`) — удаляются старейшие, когда общий размер превышает лимит.

Kafka — не база данных, а **журнал событий с ограниченной историей**.

---

## 7. Партицирование и ключи

При записи продюсер выбирает, в какую партицию отправить сообщение:

* Если указан **ключ**, Kafka использует хэш ключа, чтобы выбрать партицию.
* Сообщения с одинаковым ключом всегда попадают в одну партицию → сохраняется порядок.
* Если ключ не указан — выбирается случайная партиция (обычно round-robin).

Это важно для **производительности** (распределение нагрузки) и **упорядоченности** событий.

---

## 8. Гарантии доставки

Kafka предоставляет три уровня гарантий:

1. **At most once** — сообщение может потеряться (минимальная задержка);
2. **At least once** — сообщение не потеряется, но возможны дубли;
3. **Exactly once** — ни потерь, ни дублей (достигается с идемпотентными продюсерами и транзакциями).


---

## 9. Консумеры и consumer-группы

* Консумеры читают данные независимо от продюсеров.
* Несколько консумеров могут объединяться в **группу**.
  Тогда Kafka распределяет партиции топика между членами группы.
* **Group Coordinator** следит за состоянием группы, распределяет партиции и хранит offsets.

### Offsets

* Offset показывает, где консумер «остановился».
* Сохраняется в специальном топике `__consumer_offsets`.
* Если консумер упал — после перезапуска продолжит чтение с последнего зафиксированного offset.

---

## 10. Heartbeat и ребалансировка

* Консумеры регулярно посылают **heartbeat** координатору.
* Если координатор не получает heartbeat в течение `session.timeout.ms`, он считает консумера «упавшим» и запускает **ребалансировку**.
* Ребалансировка — перераспределение партиций между оставшимися участниками группы.

Это важно, но может вызывать «остановки мира» (STW): на время ребаланса чтение приостанавливается.

---

## 11. Производительность и масштабирование

* Kafka масштабируется горизонтально за счёт увеличения числа партиций и брокеров.
* Чем больше партиций — тем выше параллелизм, но растёт нагрузка на координаторов.
* Для больших нагрузок важно заранее планировать число партиций 

---

## Итог

* Kafka — это **не очередь**, а **распределённый журнал** событий.
* Всё крутится вокруг **топиков, партиций, offset’ов и ретеншена**.
* Продюсеры добавляют данные, консумеры читают независимо.
* Состояние консумера — это его offset, а не данные в Kafka.
* Репликация и ack-механизм обеспечивают надёжность, а ребалансировка — устойчивость к сбоям.



# Запуск в `Docker`:

```bash
docker compose -f docker/docker-compose-kafka.yml up
```       

## Пример клиента (Confluent Kafka)

In [None]:
from confluent_kafka.admin import AdminClient

client = AdminClient({
    "bootstrap.servers": "localhost:9092"
})



In [4]:
from confluent_kafka import Producer, Consumer, TopicPartition

In [5]:
TOPIC_NAME = "some_topic"

producer = Producer({
        "bootstrap.servers": "localhost:9092"
    })

for idx in range(0, 25):
    producer.produce(TOPIC_NAME, key=bytes(idx), value=b"Msg %d" % idx)
    producer.flush()

In [6]:
consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "group1",
    "auto.offset.reset": "earliest"
})

consumer.subscribe([TOPIC_NAME])

# tp = TopicPartition(topic=TOPIC_NAME, partition=0, offset=0)
# consumer.assign([tp])
# consumer.seek(tp)

for _ in range(25): 
    msg = consumer.consume(num_messages=1, timeout=1.0)
    if len(msg) > 0:
        print(msg[0].value()) 

consumer.close()

b'Msg 0'
b'Msg 6'
b'Msg 7'
b'Msg 10'
b'Msg 14'
b'Msg 20'
b'Msg 24'
b'Msg 1'
b'Msg 4'
b'Msg 5'
b'Msg 8'
b'Msg 9'
b'Msg 11'
b'Msg 13'
b'Msg 16'
b'Msg 19'
b'Msg 22'
b'Msg 2'
b'Msg 3'
b'Msg 12'
b'Msg 15'
b'Msg 17'
b'Msg 18'
b'Msg 21'


%3|1759699948.485|FAIL|rdkafka#producer-4| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1759699948.998|FAIL|rdkafka#producer-3| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1759699949.485|FAIL|rdkafka#producer-4| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1759699950.000|FAIL|rdkafka#producer-3| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1759699951.000|FAIL|rdkafka#producer-3| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|17