In [1]:
import json
import random
from kafka import KafkaProducer, KafkaConsumer
from collections import Counter

Шаг 1: Генерация данных о действиях пользователей
Эта функция создает случайные данные активности пользователей с полями user_id, action и timestamp.
Возможные действия: login, logout, click и purchase.

In [2]:
def generate_user_activity(num_records=100):
    actions = ["login", "logout", "click", "purchase"]
    data = []
    for _ in range(num_records):
        record = {
            "user_id": random.randint(1, 10),  # Случайный ID пользователя
            "action": random.choice(actions),  # Случайное действие
            "timestamp": random_date()         # Случайная временная метка
        }
        data.append(record)
    return data

In [3]:
def random_date():
    from datetime import datetime, timedelta
    start_date = datetime(2024, 12, 8, 0, 0, 0)
    delta = timedelta(seconds=random.randint(0, 86400))  # Случайное время в пределах суток
    return (start_date + delta).isoformat()

Шаг 2: Настройка Kafka Producer
Producer подключается к Kafka и отправляет сериализованные данные в JSON формате в указанный топик.

Шаг 3: Настройка Kafka Consumer
Consumer читает данные из Kafka топика и десериализует их для обработки.

Шаг 4: Обработка и анализ данных
Этот блок анализирует активность пользователей для поиска самых активных на основе действий click и purchase.

In [4]:
#Шаг 2

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  # Адрес сервера Kafka
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Сериализация данных в JSON
)

# Название топика
topic_name = "example_topic"

# Генерация и отправка данных в Kafka
data = generate_user_activity()
for record in data:
    producer.send(topic_name, record)  # Отправка каждой записи в топик
producer.flush()  # Убедиться, что все данные отправлены
print(f"{len(data)} записей отправлено в топик '{topic_name}'.")


#Шаг 3
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers='localhost:9092',  # Адрес сервера Kafka
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),  # Десериализация JSON
    auto_offset_reset='earliest',  # Начать чтение с начала топика
    enable_auto_commit=True        # Автоматическое подтверждение обработки сообщений
)


#Шаг 4
action_counter = Counter()

for message in consumer:
    record = message.value
    if record["action"] in ["click", "purchase"]:
        action_counter[record["user_id"]] += 1
    # Для демонстрации останавливаем чтение после обработки всех сгенерированных данных
    if sum(action_counter.values()) >= len(data):
        break

# Вывод результатов
print("Пользователи с наибольшим количеством действий (click, purchase):")
for user_id, count in action_counter.most_common():
    print(f"ID пользователя: {user_id}, Количество действий: {count}")

100 записей отправлено в топик 'example_topic'.
Пользователи с наибольшим количеством действий (click, purchase):
ID пользователя: 5, Количество действий: 17
ID пользователя: 8, Количество действий: 14
ID пользователя: 7, Количество действий: 13
ID пользователя: 10, Количество действий: 11
ID пользователя: 6, Количество действий: 10
ID пользователя: 4, Количество действий: 9
ID пользователя: 3, Количество действий: 8
ID пользователя: 2, Количество действий: 7
ID пользователя: 1, Количество действий: 7
ID пользователя: 9, Количество действий: 4
