### Zadatak 1: Jednostavan Producer i Consumer

Kreirajte jednostavan Kafka producer koji šalje poruke u JSON formatu (user_id, timestamp, i activity) na jedan topic i consumer koji čita poruke iz istog topic-a.

In [10]:
from kafka import KafkaProducer, KafkaConsumer
import json
import time

import random
from datetime import datetime

In [None]:
producer = KafkaProducer(
    bootstrap_servers='localhost:19092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

topic = "test_topic"

for i in range(5):
    message = {
        "user_id": i + 1,
        "timestamp": time.time(),
        "activity": "login" if i % 2 == 0 else "logout"
    }
    producer.send(topic, message)
    print(f"Sent message: {message}")
    time.sleep(1)

producer.close()

In [None]:
consumer = KafkaConsumer(
    "test_topic",
    bootstrap_servers="localhost:19092",
    auto_offset_reset="earliest",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    consumer_timeout_ms=5000,
)

try:
    for message in consumer:
        print(f"Recieved message: {message.value}")
        time.sleep(1)
finally:
    print("No new messages for 5 seconds. Closing consumer...")
    consumer.close()

### Zadatak 2: Producer na dva topica i Consumer iz više topic-a

Kreirajte producer koji šalje dva tipa poruka (aktivnosti korisnika - npr. take_product, leave_product, transakcije korisnika - deposit, withdrawal) na dva različita topic-a i consumer koji čita poruke iz oba topic-a.

In [None]:
producer = KafkaProducer(
    bootstrap_servers='localhost:19092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

topic1 = "user_activities"
topic2 = "user_transactions"

activity_types = ["take_product", "leave_product", "open_account", "close_account"]
transaction_types = ["deposit", "withdrawal", "transfer", "payment"]

for i in range(3):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    activity_message = {
        "timestamp": timestamp,
        "user_id": i + 1,
        "action": random.choice(activity_types),
        "bank": f"bank {i + 1}"
    }
    transaction_message = {
        "timestamp": timestamp,
        "user_id": i + 1,
        "transaction_type": random.choice(transaction_types),
        "amount": random.randint(100, 500)
    }

    producer.send(topic1, activity_message)
    producer.send(topic2, transaction_message)
    print(f"Sent messages: {activity_message} and {transaction_message}")
    time.sleep(1)

producer.close()

In [None]:
consumer = KafkaConsumer(
    "user_activities",
    "user_transactions",
    bootstrap_servers='localhost:19092',
    auto_offset_reset='earliest',
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    consumer_timeout_ms=5000
)

try:
    for message in consumer:
        print(f"Recieved message from topic {message.topic}: {message.value}")
        time.sleep(1)

finally:
    print("No new messages for 5 seconds. Closing consumer...")
    consumer.close()


### Zadatak 3: Producer koji šalje poruke različitog formata na jedan topic

Kreirajte producer koji šalje poruke različitog formata na jedan topic.

1. Zajednički atributi za sve poruke:
   - timestamp
   - user_id
   - message_key (za identifikaciju tipa poruke)

2. Specifični atributi za različite tipove poruka:
   - Activity poruke: action, bank
   - Transaction poruke: transaction_type, amount

In [None]:
producer = KafkaProducer(
    bootstrap_servers='localhost:19092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

topic = "user_events"

activity_types = ["take_product", "leave_product"]
transaction_types = ["deposit", "withdrawal", "transfer", "payment"]

messages = [
    {
        "user_id": 1,
        "action": random.choice(activity_types),
        "bank": "bank 1",
        "message_key": "activity",
    },
    {
        "user_id": 1,
        "transaction_type": random.choice(transaction_types),
        "amount": round(random.uniform(0, 5000.00), 2),
        "message_key": "transaction",
    },
    {
        "user_id": 2,
        "action": random.choice(activity_types),
        "bank": "bank 2",
        "message_key": "activity",
    },
    {
        "user_id": 2,
        "transaction_type": random.choice(transaction_types),
        "amount": round(random.uniform(0, 500.00), 2),
        "message_key": "transaction",
    },
]

for msg in messages:
    msg["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    producer.send(topic, msg)
    print(f"Sent message: {msg}")
    time.sleep(1)

producer.close()

### Zadatak 4: Procesiranje poruka po tipu

Kreirajte consumer koji čita poruke iz jednog topic-a i procesira ih prema tipu. Implementirajte:

1. Identifikovanje tipa poruke
   - Koristiti message_key za identifikaciju tipa poruke
   - Validacija formata poruke prema tipu

2. Procesiranje poruke
   - Različita logika procesiranja za svaki tip poruke

In [None]:
def validate_message(message):
    message_type = message.get("message_key")
    
    if message_type == "activity":
        required_fields = ["user_id", "action", "bank"]
    elif message_type == "transaction":
        required_fields = ["user_id", "transaction_type", "amount"]
    else:
        print(f"Unknown message type: {message_type}")
        return False
        
    return all(field in message for field in required_fields)

def process_activity(message):
    print(f"Processing activity: User {message['user_id']} "
          f"performed {message['action']} at {message['bank']}")

def process_transaction(message):
    print(f"Processing transaction: User {message['user_id']} "
          f"made {message['transaction_type']} of {message['amount']}")

consumer = KafkaConsumer(
    "user_events",
    bootstrap_servers='localhost:19092',
    auto_offset_reset='earliest',
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    consumer_timeout_ms=5000
)

try:
    for message in consumer:
        value = message.value

        value["processed_timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        if not validate_message(value):
            print(f"Invalid message format: {value}")
            continue

        if value["message_key"] == "activity":
            process_activity(value)
        elif value["message_key"] == "transaction":
            process_transaction(value)

        time.sleep(1)

except KeyboardInterrupt:
    print("Stopping the consumer...")
finally:
    print("Closing consumer...")
    consumer.close()

### Zadatak 5: Stream processing pipeline

Implementirajte stream processing pipeline koji:

1. Čita poruke iz izvornog topic-a
2. Procesira poruke:
   - Transformiše poruke u novi format
   - Dodaje processed_timestamp

3. Šalje procesirane poruke na novi topic

In [None]:
def validate_message(message):
    message_type = message.get("message_key")

    if message_type == "activity":
        required_fields = ["user_id", "action", "bank"]
    elif message_type == "transaction":
        required_fields = ["user_id", "transaction_type", "amount"]
    else:
        print(f"Unknown message type: {message_type}")
        return False

    return all(field in message for field in required_fields)


def process_message(message):
    message_type = message["message_key"]
    processed_message = {
        "user_id": message["user_id"],
        "processed_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    }

    if message_type == "activity":
        processed_message.update(
            {
                "type": "processed_activity",
                "description": f"User performed {message['action']} at {message['bank']}",
            }
        )
    elif message_type == "transaction":
        processed_message.update(
            {
                "type": "processed_transaction",
                "description": f"User made {message['transaction_type']} of {message['amount']}",
            }
        )

    return processed_message


consumer = KafkaConsumer(
    "user_events",
    bootstrap_servers="localhost:19092",
    auto_offset_reset="earliest",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    consumer_timeout_ms=5000,
)

producer = KafkaProducer(
    bootstrap_servers="localhost:19092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

output_topic = "processed_events"

try:
    for message in consumer:
        value = message.value

        if not validate_message(value):
            print(f"Skipping invalid message: {value}")
            continue

        processed_message = process_message(value)

        producer.send(output_topic, processed_message)
        print(f"Processed and sent: {processed_message}")

        time.sleep(1)

except KeyboardInterrupt:
    print("Stopping the pipeline...")
finally:
    print("Closing consumer and producer...")
    consumer.close()
    producer.close()

### Zadatak: Kreiranje Kafka utils modula

Kreirati Python modul `kafka_utils.py` koji će sadržati pomoćne funkcije za rad sa Kafkom koristeći kafka-python-ng paket.

Modul treba da sadrži:
- Funkciju za kreiranje i konfiguraciju Kafka producer-a
- Funkcije za slanje poruka na Kafka topic
- Funkciju za kreiranje Kafka topic-a

Testirati modul sa primjerima korištenja svake funkcije.