In [3]:
import random
from confluent_kafka import Producer, Consumer, KafkaError
import json
from collections import Counter
from datetime import datetime, timedelta

In [4]:
# Доступные действия и их вероятности
actions = ["login", "logout", "purchase", "click"]
action_weights = [0.5, 0.1, 0.1, 0.3]  # Вероятности для действий

# Списки товаров для покупок
purchase_items = ["laptop", "phone", "headphones", "monitor", "keyboard", "mouse", "tablet"]

# Генерация данных
def generate_realistic_actions(user_id):
    num_actions = random.randint(5, 20)  # Количество действий от 5 до 20
    actions_list = []
    current_time = datetime.now() - timedelta(days=random.randint(1, 30))  # Начало временной шкалы

    for _ in range(num_actions):
        action = random.choices(actions, weights=action_weights, k=1)[0]
        current_time += timedelta(seconds=random.randint(5, 300))  # Разброс времени между действиями

        record = {"user_id": user_id, "action": action, "timestamp": current_time.isoformat()}

        # Для purchase добавляем дополнительные детали
        if action == "purchase":
            record["item"] = random.choice(purchase_items)
            record["amount"] = round(random.uniform(50, 2000), 2)  # Случайная сумма покупки

        actions_list.append(record)

    return actions_list

# Генерация данных для пользователей
data = []
for user_id in range(1, 101):  # Генерация для 100 пользователей
    user_actions = generate_realistic_actions(user_id)
    data.extend(user_actions)

# Запись данных в файл
with open('realistic_client_actions.json', 'w') as f:
    json.dump(data, f, indent=4)

print(f"Сгенерировано {len(data)} записей. Данные сохранены в 'realistic_client_actions.json'.")


Сгенерировано 1315 записей. Данные сохранены в 'realistic_client_actions.json'.


In [None]:
conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'python-producer',
    'acks': 'all',
    'linger.ms': 10
}

producer = Producer(conf)

# Callback для подтверждения отправки
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

# Чтение данных из файла и отправка в Kafka
with open('realistic_client_actions.json') as f:
    data = json.load(f)
    for record in data:
        producer.produce(
            'example_topic',
            key=str(record['user_id']),
            value=json.dumps(record),
            callback=delivery_report
        )
        producer.poll(0)  # Для вызова колбэков

producer.flush()


In [6]:
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(consumer_conf)
consumer.subscribe(['example_topic'])

action_counter = Counter()

try:
    while True:
        msg = consumer.poll(1.0)  # Таймаут ожидания сообщений
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue  # Конец раздела
            else:
                print(f"Consumer error: {msg.error()}")
                break

        # Обработка сообщения
        record = json.loads(msg.value().decode('utf-8'))
        action = record['action']
        user_id = record['user_id']
        action_counter[(user_id, action)] += 1

except KeyboardInterrupt:
    print("Consumer stopped manually.")

finally:
    consumer.close()


Consumer stopped manually.


In [9]:
# Отфильтруем пользователей с действиями "click" и "purchase"
filtered_actions = [(user, action, count) for (user, action), count in action_counter.items() if action in ['click', 'purchase']]
# Отсортируем по количеству действий в порядке убывания
sorted_actions = sorted(filtered_actions, key=lambda x: x[2], reverse=True)

# Выведем топ-10 пользователей с наибольшим количеством действий "click" и "purchase"
for user_id, action, count in sorted_actions[:10]:
    print(f'User ID: {user_id}, Действие: {action}, Количество: {count}')

User ID: 68, Действие: click, Количество: 9
User ID: 87, Действие: click, Количество: 9
User ID: 20, Действие: click, Количество: 8
User ID: 37, Действие: click, Количество: 8
User ID: 48, Действие: click, Количество: 8
User ID: 17, Действие: click, Количество: 7
User ID: 21, Действие: click, Количество: 7
User ID: 27, Действие: click, Количество: 7
User ID: 38, Действие: click, Количество: 7
User ID: 64, Действие: click, Количество: 7
