Задание:
> Run Streams on top of the cluster from previous homework
> 
> Create simple mapper for any of the simulated topic
> 
> Implement simple aggregate processor
> 
> Implement join processor
> 
> Could be done with Kafka Streams or any other approach

Для потоковой обработки хочу попробовать Apache Flink. Это фреймворк, написанный на Java и Scala, у которого реализован API. Для для взаимодействия с API можно использовать библиотеку `pyflink` на Python.

1. Создам топик `purchases`. Пусть он заполняется из producer.py параллельно с топиком `user-events`. Если в `user-events` приходит событие с `event_type=purchase`, то в течение 5 секунд до/после я ожидаю появления в `purchases` стоимости заказа. В 20% случаев стоимость заказа пусть приходит с задержкой более 5 секунд. В качестве времени требуется использовать продуктовое время (время события, а не время записи в Kafka или попадания во Flink). Насчёт пользователей - пусть у нас их будет всего 20 уникальных.
2. Создам топик `visits`. В него пусть попадают события из `user-events`, обогащённые ценой из `purchases` и размеченные по сессиям. Окно сессии - 20 секунд (между событиями одной сессии проходит не более 20 секунд). Идентификатор сессии генерирую случайным образом на лету. Если внутри сессии были покупки, обогащённые из топика `purchases`, то растягиваю цену `amount` (последнюю по `timestamp`-у) на все хиты в сессии. 
3. Создам топик `suspicious-visits`. В него буду записывать идентификаторы таких сессий, в которых нахожу что-то подозрительное:
    * либо суммарное кол-во событий в сессии больше 5. Обозначаю как `type=1`
    * либо в сессии нет ни одного события `event_type=page-view`, хотя есть другие. Обозначаю как `type=2`

In [63]:
!./kafka/bin/kafka-topics.sh --create --topic purchases --partitions 8 --bootstrap-server localhost:19092,localhost:19091,localhost:19090

Created topic purchases.


In [64]:
!./kafka/bin/kafka-topics.sh --create --topic visits --partitions 8 --bootstrap-server localhost:19092,localhost:19091,localhost:19090

Created topic visits.


In [65]:
!./kafka/bin/kafka-topics.sh --create --topic suspicious-visits --partitions 8 --bootstrap-server localhost:19092,localhost:19091,localhost:19090

Created topic suspicious-visits.


Вместе с установочными файлами Flink предлагает из коробки несколько примеров по запуску задач на Python. Буду адаптировать примеры, взятые оттуда. Скачиваю .jar коннектор к Kafka и подключаю его к скрипту через .add_jars.

In [142]:
!~/flink/bin/start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host LAPTOP-CT070LG3.
Starting taskexecutor daemon on host LAPTOP-CT070LG3.


In [139]:
!tree ~/flink/examples/python

[01;34m/home/timosha/flink/examples/python[00m
├── [01;34mdatastream[00m
│   ├── basic_operations.py
│   ├── [01;34mconnectors[00m
│   │   ├── elasticsearch.py
│   │   ├── kafka_avro_format.py
│   │   ├── kafka_csv_format.py
│   │   ├── kafka_json_format.py
│   │   └── pulsar.py
│   ├── event_time_timer.py
│   ├── process_json_data.py
│   ├── state_access.py
│   ├── streaming_word_count.py
│   ├── [01;34mwindowing[00m
│   │   ├── session_with_dynamic_gap_window.py
│   │   ├── session_with_gap_window.py
│   │   ├── sliding_time_window.py
│   │   ├── tumbling_count_window.py
│   │   └── tumbling_time_window.py
│   └── word_count.py
└── [01;34mtable[00m
    ├── basic_operations.py
    ├── mixing_use_of_datastream_and_table.py
    ├── multi_sink.py
    ├── [01;34mpandas[00m
    │   ├── conversion_from_dataframe.py
    │   └── pandas_udaf.py
    ├── process_json_data.py
    ├── process_json_data_with_udf.py
    ├── streaming_word_count.py
    ├── [01;34mwindowing[00m
    │   ├

Для начала просто настрою подключение к Кафке. Читаю топик с конца. Прочитанные сообщения отправляю в вывод.

In [224]:
from datetime import datetime
from pyflink.common import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema


def process_user_events(env):
    deserialization_schema = JsonRowDeserializationSchema.Builder() \
        .type_info(Types.ROW_NAMED(
            ['event_type', 'user_id', 'page_id', 'timestamp'],
            [Types.STRING(), Types.INT(), Types.INT(), Types.FLOAT()])) \
        .build()
    source = KafkaSource.builder() \
        .set_bootstrap_servers("localhost:19092,localhost:19091,localhost:19090") \
        .set_topics("user-events") \
        .set_group_id("flink-group") \
        .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        .set_value_only_deserializer(deserialization_schema) \
        .build()

    stream = env.from_source(source, WatermarkStrategy.no_watermarks(), source_name="user-events")
    stream.map(lambda row: (
        row["event_type"],
        row["user_id"],
        row["page_id"],
        datetime.fromtimestamp(row["timestamp"]).strftime("%Y-%m-%d %H:%M:%S")
    )).print()
    env.execute()

In [146]:
try:
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///home/timosha/flink/lib/flink-sql-connector-kafka-4.0.0-2.0.jar")
    process_user_events(env)
except KeyboardInterrupt:
    pass

6> ('page-view', 4, 8, '2025-07-21 19:17:04')
11> ('purchase', 2, 55, '2025-07-21 19:17:04')
9> ('page-view', 3, 84, '2025-07-21 19:17:04')
8> ('click', 7, 81, '2025-07-21 19:17:04')
6> ('page-view', 4, 8, '2025-07-21 19:17:04')
6> ('purchase', 4, 63, '2025-07-21 19:17:04')
6> ('page-view', 14, 92, '2025-07-21 19:17:04')
11> ('purchase', 2, 55, '2025-07-21 19:17:04')
8> ('click', 7, 81, '2025-07-21 19:17:04')
9> ('page-view', 3, 84, '2025-07-21 19:17:04')
9> ('click', 8, 60, '2025-07-21 19:17:04')
6> ('purchase', 4, 63, '2025-07-21 19:17:04')
6> ('page-view', 14, 92, '2025-07-21 19:17:04')
6> ('page-view', 14, 56, '2025-07-21 19:17:04')
11> ('page-view', 2, 81, '2025-07-21 19:17:04')
11> ('page-view', 2, 81, '2025-07-21 19:17:04')
6> ('page-view', 14, 56, '2025-07-21 19:17:04')
9> ('click', 8, 60, '2025-07-21 19:17:04')
9> ('click', 13, 93, '2025-07-21 19:17:04')
9> ('click', 13, 93, '2025-07-21 19:17:04')


Или из консоли:

![image.png](./pic/_1.png)

Теперь подготовлю разметку по сессиям с окном в 30 секунд. Результат будет пушится в `visits`, а я запущу консьюмера для чтения из топика `visits` в консоль.

```python
session_visits = user_events_stream\
    .key_by(lambda e: e['user_id'], key_type=Types.INT()) \
    .window(EventTimeSessionWindows.with_gap(Time.seconds(30))) \
    .process(SessionGenerator(), output_type=visits_serialization_schema)
session_visits.sink_to(visits_sinker)
```

![image.png](./pic/_2.png)

Всё верно:
* у событий внутри одного окна одинаковый `session_id`
* окно открыто до тех пор, пока после последнего из событий не пройдёт больше чем 20 секунд. Например, по пользователю `user_id=5` видно, что его сессия продолжалась суммарно 26 секунд.
* сессию пользователя с `user_id=12` мы ещё не получили, потому что окно для неё ещё не закрыто. а у пользователя `user_id=6` разметили уже две сессии, в которых по одному хиту. Так получилось из-за того, что между хитами прошло больше 20 секунд, и после последнего прошло больше 20 секунд.

Я не смог выяснить, почему Pyflink не отправляет строчку сразу как только закрывает окно, а с задержкой до нескольких минут: возможно, это как-то связано с ватерлиниями (watermark) или с тем, что я поставил параллелизм = 1. Без параллелизма, с другой стороны, просто ничего не работало, а с ватерлинией всё должно быть нормально. Pyflink тяжело дебажить - нет ни внятной документации, ни файлов с логами. Трейсбек любой ошибки занимает по двести строк. Нейросети путаются между последней и предпоследней версиями фреймворка (которые во много несовместимы) и версией API на Java, в которой имплементировано больше фичей, чем в Pyflink. Кошмар! Тысячу раз пожалел, что устроил себе это развлечение!

Сделаю мониторинг подозрительных сессий:
* либо суммарное кол-во событий в сессии больше 5. Обозначаю как type=1
* 
либо в сессии нет ни одного события event_type=page-view, хотя есть другие. Обозначаю как type=2

Эту аггрегацию, кажется, проще всего выполнить сразу вместе с разметкой сессии: пока что все хиты, входящие в сессию, собраны вместе. Попробую модифицировать `SessionGenerator` - пусть онгенерируета данные для обоих топиков,а строкие будут потом фильтроваться кажаяй - в сво топикй.

```python
class SessionGenerator(ProcessWindowFunction):
    def process(self, key, context, elements):
        session_id = str(uuid4()).split('-')[0]
        events = list(elements)
        for element in elements:
            yield Row(
                session_id=session_id,
                user_id=element['user_id'],
                event_type=element['event_type'],
                page_id=element['page_id'],
                amount=element['amount'] if 'amount' in element._fields else 0,
                timestamp=element['timestamp'],
                type=None)
            
        # проверка на подозрительность
        suspicious_type = None
        if len(events) > 3:
            suspicious_type = 1
        elif not any(e['event_type'] == 'page-view' for e in events):
            suspicious_type = 2
            
        if suspicious_type is not None:
            yield Row(
                session_id=session_id,
                user_id=key,
                event_type=None,
                page_id=None,
                amount=None,
                timestamp=None,
                type=suspicious_type)
```

![image.png](./pic/_3.png)

Теперь сделаю джойн на лету с `purchases`. 

В Pyflink нет джойна из коробки, но есть соединение по ключу `connect`, которое можно дополнить своей логикой, расширив класс `CoProcessFunction`.

> Извините за недоразумение! Вы абсолютно правы, и я благодарю за указание на ошибку. В PyFlink 2.0 (основанном на Apache Flink 1.14) метод interval_join не доступен в API DataStream для Python, хотя он существует в Java/Scala API. Это ограничение PyFlink, и моя предыдущая уверенность была ошибочной из-за путаницы с документацией Java API.

In [262]:
from pyflink.datastream.functions import CoProcessFunction

class EnrichPurchaseFunction(CoProcessFunction):
    def __init__(self):
        self.purchase_state = None
        self.events_state = None

    def open(self, context):
        # Создаю стейт, в который буду сохранять покупки и события без пары.
        # Например, покупку, которая пришла раньше события покупки
        self.purchase_state = context.get_list_state(
            ListStateDescriptor("purchases", Types.ROW_NAMED(
                ['amount', 'user_id', 'page_id', 'timestamp'],
                [Types.INT(), Types.INT(), Types.INT(), Types.INT()])))
        self.events_state = context.get_list_state(
            ListStateDescriptor("events", Types.ROW_NAMED(
                ['event_type', 'user_id', 'page_id', 'timestamp'],
                [Types.STRING(), Types.INT(), Types.INT(), Types.INT()])))

    def process_element1(self, user_event, ctx):
        # событие из user_events_stream
        event_timestamp = user_event['timestamp'] * 1000
        for purchase in self.purchase_state:
            purchase_timestamp = purchase['timestamp'] * 1000
            if abs(event_timestamp - purchase_timestamp) <= 5000:  # ±5 секунд
                yield Row(
                    user_id=user_event['user_id'],
                    event_type=user_event['event_type'],
                    page_id=user_event['page_id'],
                    timestamp=user_event['timestamp'],
                    amount=purchase['amount'])
                break
        else:
            if user_event['event_type'] == 'purchase':
                self.events_state.add(user_event)
            yield Row(
                user_id=user_event['user_id'],
                event_type=user_event['event_type'],
                page_id=user_event['page_id'],
                timestamp=user_event['timestamp'],
                amount=0)

    def process_element2(self, purchase, ctx):
        # событие из purchases
        purchase_timestamp = purchase['timestamp'] * 1000
        for event in self.events_state:
            event_timestamp = event['timestamp'] * 1000
            if abs(purchase_timestamp - event_timestamp) <= 5000:
                yield Row(
                    user_id=event['user_id'],
                    event_type=event['event_type'],
                    page_id=event['page_id'],
                    timestamp=event['timestamp'],
                    amount=purchase['amount'])
                break
        else:
            self.purchase_state.add(purchase)

    def on_timer(self, timestamp, ctx):
        # Очистка старых покупок (опционально, для управления размером состояния)
        current_watermark = ctx.get_current_watermark()
        updated_purchases = [
            p for p in self.purchase_state
            if p['timestamp'] * 1000 >= current_watermark - 5000]
        updated_events = [
            p for p in self.updated_events
            if p['timestamp'] * 1000 >= current_watermark - 5000]        
        self.purchase_state.clear()
        self.updated_events.clear()
        self.purchase_state.update(updated_purchases)
        self.updated_events.update(updated_events)       

Отправляю для отладки результат джойна в вывод

![image.png](./pic/_4.png)

Всё корректно. Если покупка приходит как до/после события в течение 5 секунд (по модулю), она успешно присоединяется к событию. При задержке более 5 секунд оплата из топика `purchases` не присоединяется.

Теперь поджойнённый стрим подкладываю в качестве источника под разметку сессий, которая уже протестирована и работает. Дорабатываю разметку так, чтобы растягивать приджойненный `amount` на все хиты в сессии.

![image.png](./pic/_5.png)

Всё готово!