In [52]:
import json
import time
import random
from datetime import datetime
from kafka import KafkaProducer

# Kafka configuration
KAFKA_BROKER = "0.0.0.0:9094"
TOPIC = "user-activities"

# Initialize Kafka producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def generate_message():
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    return {
        "id": random.randint(1, 1000),
        "name": "user_" + str(random.randint(1, 100)),
        "timestamp": timestamp,  # Ensure ISO format for timestamp
        "active": random.choice([True, False]),
        "value": round(random.uniform(1.0, 100.0), 2)
    }

In [53]:
message = generate_message()
producer.send(TOPIC, message)
print(f"Sent: {message}")

Sent: {'id': 840, 'name': 'user_22', 'timestamp': '2024-08-04 16:27:58', 'active': True, 'value': 17.04}


In [15]:
from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(
    bootstrap_servers="0.0.0.0:9094", 
    client_id='test'
)

topic_list = []
topic_list.append(NewTopic(name="user-activities", num_partitions=3, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='user-activities', error_code=0, error_message=None)])

In [54]:
topic_list = []
topic_list.append(NewTopic(name="user-activities-out", num_partitions=3, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='user-activities-out', error_code=0, error_message=None)])

In [None]:
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'user-activities-out',
    bootstrap_servers=KAFKA_BROKER,
    auto_offset_reset='earliest',  # Start reading at the earliest message
    enable_auto_commit=True,
    group_id='my-group',  # Consumer group ID
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print(f"Consuming messages from topic: {TOPIC}")

# Consume messages from the topic
for message in consumer:
    print(f"Received message: {message.value}")

Consuming messages from topic: user-activities
