In [98]:
from confluent_kafka import Producer
import json
import pandas as pd
from config import config
from confluent_kafka.admin import AdminClient, NewTopic

In [99]:
studentId="ds23m003" # used as topic prefix
topic= studentId + '_DSI_Abschlussprojekt'

In [100]:
def topic_exists(admin, topic):
    metadata = admin.list_topics()
    for t in iter(metadata.topics.values()):
        # print(f"Topic found: {t.topic}")
        if t.topic == topic:
            return True
    return False  # Topic does not exist

In [101]:
def create_topic(admin, topic):
    new_topic = NewTopic(topic, num_partitions=6, replication_factor=3)
    result_dict = admin.create_topics([new_topic])
    for topic, future in result_dict.items():
        try:
            future.result()  # The result itself is None
            print("Topic {} created".format(topic))
        except Exception as e:
            print("Failed to create topic {}: {}".format(topic, e))

In [102]:
admin = AdminClient(config)
if not topic_exists(admin, topic):
    create_topic(admin, topic)

Topic ds23m003_DSI_Abschlussprojekt created


In [103]:
def callback(err, event):
    if err:
        print(f'Produce to topic {event.topic()} failed for event: {event.key()}')
    else:
        val = event.value().decode('utf8')
        print(f'{val} sent to partition {event.partition()}.')

In [104]:
def fetch_data():
    try:
        data= pd.read_csv('clean_vie_data.csv')
        df = pd.DataFrame(data)
        return df
    except Exception as e:
        print(f"Error fetching data: {e}")
        return None

In [105]:
def produce_data(producer, data):
    if data is not None and not data.empty:
        for _, row in data.iterrows():
            producer.produce(topic, value=json.dumps(row.to_dict()).encode('utf-8'), callback=callback)

In [106]:
producer = Producer(config)

data = fetch_data()
produce_data(producer, data)
producer.flush()

{"Unnamed: 0": 2, "DISTRICT_CODE": 90300, "REF_YEAR": 2002, "INC_TOT_VALUE": 18701, "INC_MAL_VALUE": 21444, "INC_FEM_VALUE": 15804, "BIR": 761} sent to partition 4.
{"Unnamed: 0": 0, "DISTRICT_CODE": 90100, "REF_YEAR": 2002, "INC_TOT_VALUE": 25463, "INC_MAL_VALUE": 31961, "INC_FEM_VALUE": 18536, "BIR": 130} sent to partition 2.
{"Unnamed: 0": 3, "DISTRICT_CODE": 90400, "REF_YEAR": 2002, "INC_TOT_VALUE": 20325, "INC_MAL_VALUE": 23641, "INC_FEM_VALUE": 16876, "BIR": 259} sent to partition 1.
{"Unnamed: 0": 4, "DISTRICT_CODE": 90500, "REF_YEAR": 2002, "INC_TOT_VALUE": 16258, "INC_MAL_VALUE": 17937, "INC_FEM_VALUE": 14395, "BIR": 557} sent to partition 1.
{"Unnamed: 0": 5, "DISTRICT_CODE": 90600, "REF_YEAR": 2002, "INC_TOT_VALUE": 18724, "INC_MAL_VALUE": 21260, "INC_FEM_VALUE": 16068, "BIR": 284} sent to partition 1.
{"Unnamed: 0": 6, "DISTRICT_CODE": 90700, "REF_YEAR": 2002, "INC_TOT_VALUE": 18429, "INC_MAL_VALUE": 21129, "INC_FEM_VALUE": 15725, "BIR": 305} sent to partition 1.
{"Unnamed:

0

In [107]:
#delete topic
def delete_topic(admin, topic):
    result_dict = admin.delete_topics([topic])
    for topic, future in result_dict.items():
        try:
            future.result()  # The result itself is None
            print("Topic {} deleted".format(topic))
        except Exception as e:
            print("Failed to delete topic {}: {}".format(topic, e))


#delete_topic(admin, topic)