# Лабораторная работа №2
## Обработка данных с использованием Kafka

**Выполнил:** Васильев А.С.
**Группа:** 6131-010402D
**Преподаватель:** Попов С.Б.  

## Импорт необходимых модулей

In [1]:
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError
import csv
import time
import pandas as pd

## Конфигурация Kafka

In [2]:
# Список серверов Kafka
BOOTSTRAP_SERVERS = ['localhost:9092']
TOPIC_1 = 'rides1'   # Топик для одного consumer
TOPIC_2 = 'rides2'   # Топик для двух consumer

### Очистка топиков (если необходимо)

In [3]:

admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)

try:
    topics = admin.list_topics()

    # фильтруем внутренние топики 
    user_topics = [t for t in topics if not t.startswith('__')]

    if not user_topics:
        print("Нет поль-их топиков для удаления")
        exit(-1)

    print(f"найдено топиков {user_topics}")

    for topic in user_topics:
        try:
            admin.delete_topics([topic])
            time.sleep(1)
        except UnknownTopicOrPartitionError:
            print (f"топик {topic} уже удален")
        except Exception as e:
            print(f"Ошибка при удалении {topic}: {e}")

    print("Все топики удалены")

finally:
    admin.close()

найдено топиков ['rides1']
Все топики удалены


---
# Вариант 1: Один источник - один обработчик
---

## 1.1. Создание топика с одним разделом

In [4]:
# Создание административного клиента
admin_client : KafkaAdminClient = None

try:
    admin_client = KafkaAdminClient(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        client_id='admin')
    
    print("Подключение установлено!")
   
except Exception as e:
   print(f"Ошибка подключения к Kafka: {e}")

# Создание топика с одним разделом
topic_list = [NewTopic(name=TOPIC_1, num_partitions=1, replication_factor=1)]

try:
    admin_client.create_topics(new_topics=topic_list, validate_only=False)
    print(f"Топик '{TOPIC_1}' успешно создан с 1 разделом")
except TopicAlreadyExistsError:
    print(f"Топик '{TOPIC_1}' уже существует")
except Exception as e:
    print(f"Ошибка при создании топика: {e}")
finally:
    admin_client.close()

Подключение установлено!
Топик 'rides1' успешно создан с 1 разделом


## 1.2. Реализация источника данных (Producer)

In [6]:
# Создание Producer
producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    value_serializer=lambda v: v.encode('utf-8')
)

# Чтение CSV файла и отправка данных в Kafka
csv_file = 'rides.csv'
message_count = 0

print(f"Начинаем отправку данных из {csv_file} в топик '{TOPIC_1}'...\n")

with open(csv_file, 'r', encoding='utf-8') as file:
    csv_reader = csv.reader(file)
    header = next(csv_reader)  # Пропускаем заголовок
    
    for row in csv_reader:
        # row[0] - VendorID
        # Формируем строку для отправки
        value = ','.join(row)
        
        # Отправка сообщения с ключом (VendorID)
        key = str(row[0])
        producer.send(TOPIC_1, key=key, value=value)
        
        message_count += 1
        
        if message_count % 50 == 0:
            print(f"Отправлено {message_count} сообщений...")

# Гарантируем доставку всех сообщений
producer.flush()
producer.close()

print(f"\nВсего отправлено {message_count} сообщений в топик '{TOPIC_1}'")
print("Producer завершил работу")

Начинаем отправку данных из rides.csv в топик 'rides1'...

Отправлено 50 сообщений...
Отправлено 100 сообщений...
Отправлено 150 сообщений...
Отправлено 200 сообщений...
Отправлено 250 сообщений...

Всего отправлено 266 сообщений в топик 'rides1'
Producer завершил работу


## 1.3. Реализация обработчика данных (Consumer)

In [12]:
# Создание Consumer
consumer = KafkaConsumer(
    TOPIC_1,
    bootstrap_servers=BOOTSTRAP_SERVERS,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='rides1-group',
    value_deserializer=lambda x: x.decode('utf-8'),
    key_deserializer=lambda x: x.decode('utf-8') if x else None
)

print(f"Consumer подключился к топику '{TOPIC_1}'")
print("Начинаем обработку сообщений...\n")

# Словарь для хранения статистики по каждому VendorID
stats = {}
message_count = 0

try:
    for message in consumer:
        # Парсинг данных
        row = message.value.split(',')
        
        vendor_id = row[0]
        passenger_count = int(row[3])
        trip_distance = float(row[4])
        total_amount = float(row[16])
        
        # Инициализация статистики для нового VendorID
        if vendor_id not in stats:
            stats[vendor_id] = {
                'trip_count': 0,
                'total_passengers': 0,
                'total_distance': 0.0,
                'total_amount': 0.0
            }
        
        # Обновление статистики
        stats[vendor_id]['trip_count'] += 1
        stats[vendor_id]['total_passengers'] += passenger_count
        stats[vendor_id]['total_distance'] += trip_distance
        stats[vendor_id]['total_amount'] += total_amount
        
        message_count += 1
        
        if message_count % 50 == 0:
            print(f"Обработано {message_count} сообщений...")
        
        # Прерываем после обработки всех сообщений
        # (в реальной системе consumer работал бы постоянно)
        if message_count >= 266:  # Известное количество записей в файле
            break
            
except KeyboardInterrupt:
    print("\nОстановка consumer...")
finally:
    consumer.close()

# Вывод результатов
print(f"\n{'='*70}")
print("ИТОГОВЫЕ РЕЗУЛЬТАТЫ ОБРАБОТКИ (Вариант 1: один consumer)")
print(f"{'='*70}\n")

for vendor_id in sorted(stats.keys()):
    print(f"VendorID: {vendor_id}")
    print(f"  Количество поездок: {stats[vendor_id]['trip_count']}")
    print(f"  Всего пассажиров: {stats[vendor_id]['total_passengers']}")
    print(f"  Общее расстояние: {stats[vendor_id]['total_distance']:.2f} миль")
    print(f"  Общая сумма оплаты: ${stats[vendor_id]['total_amount']:.2f}")
    print()

print(f"Всего обработано сообщений: {message_count}")
print("\nConsumer завершил работу")

Consumer подключился к топику 'rides1'
Начинаем обработку сообщений...

Обработано 50 сообщений...
Обработано 100 сообщений...
Обработано 150 сообщений...
Обработано 200 сообщений...
Обработано 250 сообщений...

ИТОГОВЫЕ РЕЗУЛЬТАТЫ ОБРАБОТКИ (Вариант 1: один consumer)

VendorID: 1
  Количество поездок: 87
  Всего пассажиров: 101
  Общее расстояние: 394.30 миль
  Общая сумма оплаты: $1976.63

VendorID: 2
  Количество поездок: 179
  Всего пассажиров: 242
  Общее расстояние: 744.65 миль
  Общая сумма оплаты: $3709.17

Всего обработано сообщений: 266

Consumer завершил работу


---
# Вариант 2: Один источник - два обработчика в группе
---

## 2.1. Создание топика с двумя разделами

In [9]:
from kafka import KafkaAdminClient
import time

admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
try:
    admin_client.delete_topics(['rides2'])
    print("Топик 'rides2' удален")
    time.sleep(2)
except:
    pass
finally:
    admin_client.close()

In [10]:
# Создание административного клиента
admin_client = KafkaAdminClient(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    client_id='admin'
)

# Создание топика с ДВУМЯ разделами
topic_list = [NewTopic(name=TOPIC_2, num_partitions=2, replication_factor=1)]

try:
    admin_client.create_topics(new_topics=topic_list, validate_only=False)
    print(f"Топик '{TOPIC_2}' успешно создан с 2 разделами")
except TopicAlreadyExistsError:
    print(f"Топик '{TOPIC_2}' уже существует")
except Exception as e:
    print(f"Ошибка при создании топика: {e}")
finally:
    admin_client.close()

Топик 'rides2' успешно создан с 2 разделами


## 2.2. Реализация источника данных (Producer)

In [13]:
# Создание Producer
producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    value_serializer=lambda v: v.encode('utf-8')
)

# Чтение CSV файла и отправка данных в Kafka
csv_file = 'rides.csv'
message_count = 0

print(f"Начинаем отправку данных из {csv_file} в топик '{TOPIC_2}' с 2 разделами...\n")
print(f"{'VendorID':<10} {'Partition':<10} {'Offset':<10} {'Passengers':<12} {'Distance':<12} {'Amount':<12}")
print("-" * 70)

with open(csv_file, 'r', encoding='utf-8') as file:
    csv_reader = csv.reader(file)
    header = next(csv_reader)  # Пропускаем заголовок
    
    for row in csv_reader:
        # Извлекаем нужные данные
        vendor_id = row[0]
        passenger_count = row[3]
        trip_distance = row[4]
        total_amount = row[16]
        
        # Формируем значение сообщения (вся строка)
        value = ','.join(row)
        
        # Отправка сообщения с ключом (VendorID) для распределения по разделам
        record = producer.send(TOPIC_2, key=vendor_id, value=value)
        
        # Получаем результат отправки
        result = record.get(timeout=60)
        
        # Выводим информацию о распределении по разделам
        print(f"{vendor_id:<10} {result.partition:<10} {result.offset:<10} {passenger_count:<12} {trip_distance:<12} {total_amount:<12}")
        
        message_count += 1
        
        # Задержка 0.5 секунды перед отправкой следующего сообщения
        time.sleep(0.5)

# Гарантируем доставку всех сообщений
producer.flush()
producer.close()

print("\n" + "-" * 70)
print(f"Всего отправлено {message_count} сообщений в топик '{TOPIC_2}'")
print("Producer завершил работу")

Начинаем отправку данных из rides.csv в топик 'rides2' с 2 разделами...

VendorID   Partition  Offset     Passengers   Distance     Amount      
----------------------------------------------------------------------
1          1          0          1            1.50         9.3         
1          1          1          1            9.50         27.8        
2          0          0          1            5.85         22.3        
2          0          1          1            1.90         14.16       
2          0          2          1            1.25         7.8         
1          1          2          1            9.70         33.8        
2          0          3          1            5.27         26.39       
2          0          4          1            1.32         8.8         
2          0          5          1            .73          10.12       
2          0          6          1            18.65        66.36       
1          1          3          2            8.00         28.3 

---
#                                                                               Полезные утилиты
---

## Просмотр списка топиков

In [None]:
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
topics = admin_client.list_topics()
print("Список топиков:")
for topic in topics:
    print(f"  - {topic}")
admin_client.close()

Список топиков:
  - rides2
  - rides1
  - __consumer_offsets


## Просмотр списка групп потребителей

In [None]:
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
groups = admin_client.list_consumer_groups()
print("Список групп потребителей:")
for group in groups:
    print(f"  - {group}")
admin_client.close()

Список групп потребителей:
  - ('rides1-group', 'consumer')
  - ('rides2', 'consumer')


## Просмотр описания группы

In [None]:
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
group_description = admin_client.describe_consumer_groups(['rides2'])
print("Описание группы 'rides2':")
print(group_description)
admin_client.close()

Описание группы 'rides2':
[GroupInformation(error_code=0, group='rides2', state='Empty', protocol_type='consumer', protocol='', members=[], authorized_operations=None)]
