# Imports

In [None]:
%cd '/opt/itmo-bigdata/hw3'
!python3 -m pip install -r requirements.txt

# Block #1

### Default dir

In [3]:
%cd '/opt/itmo-bigdata/hw3/kafka-notebook'

/opt/itmo-bigdata/hw3/kafka-notebook


### Producer

In [4]:
%%writefile producer.py
import argparse
import random
from json import dumps
from time import sleep
from kafka import KafkaProducer, errors

def write_data(producer, data_cnt):
    topic = "hw3_topic"
    for i in range(data_cnt):
        device_id = random.randint(1, 10)
        temperature = random.uniform(60, 110) + 273
        execution_time = i * 5
        cur_data = {"device_id": device_id, "temperature": temperature, "execution_time": execution_time}
        producer.send(topic,
                      key=dumps(device_id).encode('utf-8'),
                      value=cur_data)
        print(f"Data was sent to topic [{topic}]: {cur_data}")
        sleep(1)

def create_producer():
    print("Connecting to Kafka brokers")
    try:
        producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
                                 value_serializer=lambda x: dumps(x).encode('utf-8'),
                                 acks=1)
        print("Connected to Kafka")
        return producer
    except errors.NoBrokersAvailable:
        print("Waiting for brokers to become available")
        raise RuntimeError("Failed to connect to brokers within retry limit")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Kafka Producer")
    parser.add_argument("--message-count", type=int, help="Number of messages to produce", default=10)
    args = parser.parse_args()

    producer = create_producer()
    write_data(producer, args.message_count)


Overwriting producer.py


In [5]:
!python3 producer.py --message-count 30

Connecting to Kafka brokers
Connected to Kafka
Data was sent to topic [hw3_topic]: {'device_id': 5, 'temperature': 378.8648169030971, 'execution_time': 0}
Data was sent to topic [hw3_topic]: {'device_id': 6, 'temperature': 368.30924691389487, 'execution_time': 5}
Data was sent to topic [hw3_topic]: {'device_id': 2, 'temperature': 367.55649140998236, 'execution_time': 10}
Data was sent to topic [hw3_topic]: {'device_id': 8, 'temperature': 364.72706033537463, 'execution_time': 15}
Data was sent to topic [hw3_topic]: {'device_id': 6, 'temperature': 365.4685412219744, 'execution_time': 20}
Data was sent to topic [hw3_topic]: {'device_id': 4, 'temperature': 355.23828862098406, 'execution_time': 25}
Data was sent to topic [hw3_topic]: {'device_id': 2, 'temperature': 371.29977934694836, 'execution_time': 30}
Data was sent to topic [hw3_topic]: {'device_id': 3, 'temperature': 369.2200390614406, 'execution_time': 35}
Data was sent to topic [hw3_topic]: {'device_id': 7, 'temperature': 


### Consumer

In [6]:
%%writefile consumer.py
import argparse
from kafka import KafkaConsumer

def create_consumer(max_messages=None):
    print("Connecting to Kafka brokers")
    consumer = KafkaConsumer("hw3_topic",
                             group_id="itmo_hw3_group",
                             bootstrap_servers='kafka:9092',
                             auto_offset_reset='earliest',
                             enable_auto_commit=True)

    message_count = 0
    for message in consumer:
        print(message)

        message_count += 1

        if max_messages and message_count >= max_messages:
            break

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Kafka Consumer Example")
    parser.add_argument("--message-limit", type=int, help="Maximum number of messages to consume", default=None)
    args = parser.parse_args()

    create_consumer(max_messages=args.message_limit)

Overwriting consumer.py


In [7]:
!python3 consumer.py --message-limit 10

Connecting to Kafka brokers
ConsumerRecord(topic='hw3_topic', partition=0, offset=0, timestamp=1703539775770, timestamp_type=0, key=b'5', value=b'{"device_id": 5, "temperature": 378.8648169030971, "execution_time": 0}', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=71, serialized_header_size=-1)
ConsumerRecord(topic='hw3_topic', partition=0, offset=1, timestamp=1703539776772, timestamp_type=0, key=b'6', value=b'{"device_id": 6, "temperature": 368.30924691389487, "execution_time": 5}', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=72, serialized_header_size=-1)
ConsumerRecord(topic='hw3_topic', partition=0, offset=2, timestamp=1703539777774, timestamp_type=0, key=b'2', value=b'{"device_id": 2, "temperature": 367.55649140998236, "execution_time": 10}', headers=[], checksum=None, serialized_key_size=1, serialized_value_size=73, serialized_header_size=-1)
ConsumerRecord(topic='hw3_topic', partition=0, offset=3, timestamp=1703539778778

### Apache Flink job

In [8]:
!mkdir -p tmp/checkpoints/logs

In [9]:
%%writefile flink_job1.py
from argparse import ArgumentParser
from pyflink.common import SimpleStringSchema
from pyflink.common.typeinfo import Types, RowTypeInfo
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.functions import MapFunction
from pyflink.datastream.checkpoint_config import CheckpointingMode


def python_data_stream_example(checkpoint_dir):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    env.enable_checkpointing(10000)  # checkpoint every 10000 ms
    env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
    env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
    env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
    env.get_checkpoint_config().set_checkpoint_storage_dir(checkpoint_dir)

    type_info: RowTypeInfo = Types.ROW_NAMED(["device_id", "temperature", "execution_time"],
                                             [Types.LONG(), Types.DOUBLE(), Types.INT()])

    json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

    source = KafkaSource.builder() \
        .set_bootstrap_servers('kafka:9092') \
        .set_topics('hw3_topic') \
        .set_group_id('pyflink-e2e-source') \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_value_only_deserializer(json_row_schema) \
        .build()

    sink = KafkaSink.builder() \
        .set_bootstrap_servers('kafka:9092') \
        .set_record_serializer(KafkaRecordSerializationSchema.builder()
                               .set_topic('hw3_preprocessed_topic')
                               .set_value_serialization_schema(SimpleStringSchema())
                               .build()
                               ) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .build()

    ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    ds.map(TemperatureFunction(), Types.STRING()) \
        .sink_to(sink)
    env.execute_async("Devices preprocessing")


class TemperatureFunction(MapFunction):

    def map(self, value):
        device_id, temperature, execution_time = value
        return str({"device_id": device_id, "temperature": temperature - 273, "execution_time": execution_time})


if __name__ == '__main__':
    parser = ArgumentParser(description="Flink Job with Checkpointing")
    parser.add_argument("--checkpoint_dir", type=str, help="Directory for saving checkpoints", default='file:///opt/pyflink/tmp/checkpoints/logs')
    args = parser.parse_args()
    python_data_stream_example(args.checkpoint_dir)

Writing flink_job1.py


Job execution:

```commandline
docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink/flink_job1.py -d 
```
Screen log result:
![job](images/block1_job.png)

Screen Apache Flink Example:
![flink](images/block1_flink.png)

# Block #2

In [None]:
%%writefile flink_job2.py
from argparse import ArgumentParser
from pyflink.common import SimpleStringSchema, Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.functions import MapFunction
from pyflink.datastream.checkpoint_config import CheckpointingMode
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows, EventTimeSessionWindows
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.time import Duration


class FormatTemperature(MapFunction):
    def map(self, value):
        return "max_temp: {}".format(value[1])


def python_data_stream_example(checkpoint_dir, window_type):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.enable_checkpointing(10000)
    env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
    env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
    env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
    env.get_checkpoint_config().set_checkpoint_storage_dir(checkpoint_dir)

    type_info = Types.ROW_NAMED(["device_id", "temperature", "execution_time"],
                                [Types.LONG(), Types.DOUBLE(), Types.INT()])

    json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

    source = KafkaSource.builder() \
        .set_bootstrap_servers('kafka:9092') \
        .set_topics('hw3_topic') \
        .set_group_id('pyflink-e2e-source') \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_value_only_deserializer(json_row_schema) \
        .build()

    sink = KafkaSink.builder() \
        .set_bootstrap_servers('kafka:9092') \
        .set_record_serializer(KafkaRecordSerializationSchema.builder()
                               .set_topic('hw3_preprocessed_topic')
                               .set_value_serialization_schema(SimpleStringSchema())
                               .build()
                               ) \
        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
        .build()

    watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)) \
        .with_timestamp_assigner(lambda event, timestamp: event[2])

    ds = env.from_source(source, watermark_strategy, "Kafka Source")

    if window_type == "tumbling":
        ds = ds.key_by(lambda value: value[0]) \
            .window(TumblingEventTimeWindows.of(Time.seconds(15))) \
            .reduce(lambda a, b: a if a[1] > b[1] else b) \
            .map(FormatTemperature(), output_type=Types.STRING())
    elif window_type == "sliding":
        ds = ds.key_by(lambda value: value[0]) \
            .window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5))) \
            .reduce(lambda a, b: a if a[1] > b[1] else b) \
            .map(FormatTemperature(), output_type=Types.STRING())
    elif window_type == "session":
        ds = ds.key_by(lambda value: value[0]) \
            .window(EventTimeSessionWindows.with_gap(Time.seconds(10))) \
            .reduce(lambda a, b: a if a[1] > b[1] else b) \
            .map(FormatTemperature(), output_type=Types.STRING())
    else:
        raise ValueError("Unsupported window type: {}".format(window_type))

    ds.sink_to(sink)
    env.execute_async("Devices preprocessing")

if __name__ == '__main__':
    parser = ArgumentParser(description="Flink Job with Different Window Types")
    parser.add_argument("--checkpoint_dir", type=str, help="Directory for saving checkpoints",
                        default='file:///opt/pyflink/tmp/checkpoints/logs')
    parser.add_argument("--window_type", type=str, help="Type of window to apply (tumbling, sliding, session)",
                        default='tumbling')
    args = parser.parse_args()
    print(args.window_type)
    python_data_stream_example(args.checkpoint_dir, args.window_type)

Job executions:
```commandline
docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink/flink_job2.py --window_type 'tumbling' -d 
```
```commandline
docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink/flink_job2.py --window_type 'sliding' -d 
```
```commandline
docker-compose exec jobmanager ./bin/flink run -py /opt/pyflink/flink_job2.py --window_type 'session' -d 
```

# Block #3

In [11]:
%%writefile backoff_consumer.py
import time
from functools import wraps
import argparse
from kafka import KafkaConsumer


def backoff(tries, sleep):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            ntries = tries
            while ntries > 1:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"Error: {e}, waiting {sleep} seconds before retry...")
                    time.sleep(sleep)
                    ntries -= 1
            return func(*args, **kwargs)
        return wrapper
    return decorator


@backoff(tries=10, sleep=10)
def connect_kafka_consumer():
    try:
        consumer = KafkaConsumer("hw3_topic",
                                 group_id="itmo_hw3_group",
                                 bootstrap_servers='kafka:9092',
                                 auto_offset_reset='earliest',
                                 enable_auto_commit=True)
        print("Connected to Kafka")
        return consumer
    except Exception as e:
        print(f"Failed to connect to Kafka: {e}")
        raise


def message_handler(value):
    print(value)


def create_consumer(max_messages=None):
    consumer = connect_kafka_consumer()

    print("Consumer setup complete. Listening for messages...")
    message_count = 0
    for message in consumer:
        try:
            message_handler(message.value)
            message_count += 1
            if max_messages and message_count >= max_messages:
                break
        except Exception as e:
            print(f"Error processing message: {e}")


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Kafka Consumer with Backoff for Connection")
    parser.add_argument("--message-limit", type=int, help="Maximum number of messages to consume", default=None)
    args = parser.parse_args()

    create_consumer(max_messages=args.message_limit)

Writing backoff_consumer.py


Example of backoff working, at the beginning docker with kafka is down, then after 30 seconds it is up

In [106]:
!python3 backoff_consumer.py --message-limit 10

Failed to connect to Kafka: NoBrokersAvailable
Error: NoBrokersAvailable, waiting 10 seconds before retry...
Failed to connect to Kafka: NoBrokersAvailable
Error: NoBrokersAvailable, waiting 10 seconds before retry...
Failed to connect to Kafka: NoBrokersAvailable
Error: NoBrokersAvailable, waiting 10 seconds before retry...
Connected to Kafka
Consumer setup complete. Listening for messages...
b'{"device_id": 4, "temperature": 367.60424483866046, "execution_time": 0}'
b'{"device_id": 2, "temperature": 364.71385965918205, "execution_time": 5}'
b'{"device_id": 1, "temperature": 382.3306217475197, "execution_time": 10}'
b'{"device_id": 9, "temperature": 351.5943844046902, "execution_time": 15}'
b'{"device_id": 3, "temperature": 369.785697708518, "execution_time": 20}'
b'{"device_id": 1, "temperature": 335.2740750198138, "execution_time": 25}'
b'{"device_id": 4, "temperature": 382.31956139997146, "execution_time": 30}'
b'{"device_id": 6, "temperature": 339.8556579285353, "ex