# Events Generation

In [3]:
import random 
import numpy as np
import pandas as pd
import json
import time

from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
from loguru import logger
from datetime import timedelta
from faker import Faker

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer

In [4]:
PROD_IDS = [
    "fert_001", "fert_002", "fert_003", "fert_004", "fert_005",
    "seed_001", "seed_002", "seed_003", "seed_004", "seed_005",
    "pest_001", "pest_002", "pest_003", "pest_004", "pest_005",
    "equip_001", "equip_002", "equip_003", "equip_004", "equip_005",
    "supply_001", "supply_002", "supply_003", "supply_004", "supply_005",
    "soil_001", "soil_002", "soil_003", "soil_004", "soil_005"
]

In [20]:
pd.Timestamp.now()

Timestamp('2025-05-03 14:05:38.219516')

In [17]:
class EventsGeneration:
    def __init__(self, config_path: str):
        self.config = EventsGeneration.read_config(config_path)
        self.schema_registry_config = {k: self.config.pop(k) for k in ["url", "basic.auth.user.info"] if k in self.config}

    @staticmethod
    def read_config(config_path: str):
        config = {}
        with open(config_path) as fh:
            for line in fh:
                line = line.strip()
                if len(line) != 0 and line[0] != "#":
                    parameter, value = line.strip().split('=', 1)
                    config[parameter] = value.strip()
        return config

    def create_topics(self, topics: list) -> None:
        admin_client = AdminClient(self.config) 
        topic_list = admin_client.list_topics().topics 
        logger.info(f"Available topics in the Kafka cluster: {topic_list}")

        for topic in topics:
            new_topic = NewTopic(topic, num_partitions=3, replication_factor=3)
            fs = admin_client.create_topics([new_topic])

            for topic, f in fs.items(): 
                try: 
                    f.result()
                    logger.info(f"Topic {topic} created") 
                except Exception as ex: 
                    logger.info(f"Error while creeating topic {topic}: {ex}")

    @staticmethod
    def iot_event_generation(previous_timestamp):
        key = f"sensor_{random.randint(1,20):03}"
        temp = float(np.random.normal(25, 15, 1).clip(-10, 45).round(1)[0]) if random.random()<0.8 else np.nan
        hum = float(np.random.normal(45, 20, 1).clip(0, 100).round(1)[0]) if random.random()<0.8 else np.nan
        quality = float(np.random.normal(50, 35, 1).clip(0, 100).round(1)[0]) if random.random()<0.8 else np.nan
        value = {
            "timestamp": previous_timestamp + timedelta(seconds=random.randint(10, 200)),
            "soil_temperature": temp,
            "soil_humidity": hum,
            "soil_quality": quality,
        }
        return key, value

    @staticmethod
    def client_data_event_generation(previous_timestamp, faker_instance):
        key = f"user_{random.randint(1, 5000):04}"
        value = {
            "timestamp": (previous_timestamp + timedelta(seconds=random.randint(10, 200))).isoformat(),
            "full_name": faker_instance.unique.name(),
            "email": faker_instance.unique.email(),
            "age": random.randint(21, 70),
            "clicked_ads": random.randint(1,20),
            "wishlist_items": np.random.choice(PROD_IDS, random.randint(1, 10)).tolist(),
            "engagement_score": round(random.random(), 3),
        }
        return key, value
    
    def produce_client_events(self, amount_events):
        producer = Producer(self.config) 
        fake = Faker()
        current_timestamp = pd.Timestamp.now()
        for i in range(amount_events):
            key,value = EventsGeneration.client_data_event_generation(current_timestamp, fake)
            current_timestamp = pd.Timestamp(value.get("timestamp"))
            producer.produce(topic="topic_client_data", key=key, value=json.dumps(value))
            producer.flush()
            logger.info(f"Message {i+1} sent")
            sleep_time = (
                random.randint(0, int(1 * 10000)) / 10000
            )
            time.sleep(sleep_time)

    def produce_iot_events(self, amount_events):
        value_schema_str = """
        {
            "namespace": "sensors.iot",
            "type": "record",
            "name": "Telemetry",
            "fields" : [
                {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
                {"name": "soil_temperature", "type" : "double"},
                {"name": "soil_humidity", "type" : "double"},
                {"name": "soil_quality", "type" : "double"}
            ]
        }
        """

        schema_registry_client = SchemaRegistryClient(self.schema_registry_config)

        def telemetry_to_dict(obj, ctx):
            return obj  # we already provide a dict

        value_serializer = AvroSerializer(
            schema_str=value_schema_str,
            schema_registry_client=schema_registry_client,
            to_dict=telemetry_to_dict
        )

        serializers_config = {
            "key.serializer": StringSerializer("utf_8"),
            "value.serializer": value_serializer
        }

        avro_config = {**self.config, **serializers_config}
        producer = SerializingProducer(avro_config)
        current_timestamp = pd.Timestamp.now()
        for i in range(amount_events):
            key, value = EventsGeneration.iot_event_generation(current_timestamp)
            current_timestamp = value.get("timestamp")
            producer.produce(topic="topic_iot_sensors", key=key, value=value)
            producer.flush()
            logger.info(f"Message {i+1} sent")
            sleep_time = (
                random.randint(0, int(1 * 10000)) / 10000
            )
            time.sleep(sleep_time)


In [18]:
topics_list = ["topic_client_data", "topic_iot_sensors"]
config_path = "./config/confluent_kafka/client_properties"
generator = EventsGeneration(config_path)

In [7]:
generator.schema_registry_config

{'url': 'https://psrc-e0919.us-east-2.aws.confluent.cloud',
 'basic.auth.user.info': '3V6ZA52AX2PJTOQX:f22gMhO6kWGSCk3GKq84R2rJ9fM3zoVdVTmt1nY5pPWkupMgq5S3G3zliVBaPb0v'}

# Create Topics **(If not exists)**

In [42]:
generator.create_topics(topics_list)

%4|1746281828.988|CONFWARN|ccloud-python-client-758566b2-581f-4b70-97c0-96c6f69c3fb4#producer-1| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
[32m2025-05-03 16:17:09.916[0m | [1mINFO    [0m | [36m__main__[0m:[36mcreate_topics[0m:[36m20[0m - [1mAvailable topics in the Kafka cluster: {}[0m
%6|1746281830.856|GETSUBSCRIPTIONS|ccloud-python-client-758566b2-581f-4b70-97c0-96c6f69c3fb4#producer-1| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to b0KbJ/blSb6E2hYpJL8XdQ
[32m2025-05-03 16:17:10.920[0m | [1mINFO    [0m | [36m__main__[0m:[36mcreate_topics[0m:[36m29[0m - [1mTopic topic_client_data created[0m
[32m2025-05-03 16:17:11.107[0m | [1mINFO    [0m | [36m__main__[0m:[36mcreate_topics[0m:[36m29[0m - [1mTopic topic_iot_sensors created[0m


# Produce client data events

In [12]:
generator.produce_client_events(100)

%4|1746452512.936|CONFWARN|ccloud-python-client-758566b2-581f-4b70-97c0-96c6f69c3fb4#producer-3| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
[32m2025-05-05 15:41:54.729[0m | [1mINFO    [0m | [36m__main__[0m:[36mproduce_client_events[0m:[36m70[0m - [1mMessage 1 sent[0m
%6|1746452514.729|GETSUBSCRIPTIONS|ccloud-python-client-758566b2-581f-4b70-97c0-96c6f69c3fb4#producer-3| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to zRfS0BOLTHm1F45KJ0go9Q


KeyboardInterrupt: 

# Produce IOT events

In [19]:
generator.produce_iot_events(200)

%4|1746454601.201|CONFWARN|ccloud-python-client-758566b2-581f-4b70-97c0-96c6f69c3fb4#producer-6| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
[32m2025-05-05 16:16:43.084[0m | [1mINFO    [0m | [36m__main__[0m:[36mproduce_iot_events[0m:[36m115[0m - [1mMessage 1 sent[0m
%6|1746454603.084|GETSUBSCRIPTIONS|ccloud-python-client-758566b2-581f-4b70-97c0-96c6f69c3fb4#producer-6| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to 6EQyTxEkS8u45ganra6rQA
[32m2025-05-05 16:16:44.060[0m | [1mINFO    [0m | [36m__main__[0m:[36mproduce_iot_events[0m:[36m115[0m - [1mMessage 2 sent[0m
[32m2025-05-05 16:16:44.863[0m | [1mINFO    [0m | [36m__main__[0m:[36mproduce_iot_events[0m:[36m115[0m - [1mMessage 3 sent[0m
[32m2025-05-05 16:16:46.857[0m | [1mINFO    [0m | [36m__main__[0m:[36mproduce_iot_events[0m:[36m115[0m - [1mMessage 4 sent[0m
[32m2025-05-05 16: