### Import Libraries and Kafka Configuration

In [6]:
from kafka import KafkaProducer
import csv
import json
import time
import os
from tqdm import tqdm

# Kafka Configuration
KAFKA_BROKER = "localhost:9092"  
KAFKA_TOPIC = "icu_topic"     

DATASET_FILE = r"D:\Hungary\Semester 2\Open-Source Technologies for Data Science\Practice\Project\ICU\Modified_ICU_Dataset.csv"

### Initialize Kafka Producer

In [8]:
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode("utf-8") 
)

print("Kafka Producer initialized.")

Kafka Producer initialized.


### Define the Data Streaming Function

In [9]:
def stream_data_to_kafka(file_path, topic, producer):

    print(f"Streaming data from '{file_path}' to Kafka topic '{topic}'...")

    try:
        with open(file_path, "r") as file:
            reader = csv.DictReader(file)  
            total_rows = sum(1 for _ in open(file_path, "r")) - 1  
            file.seek(0)  
            next(reader)  
            
            for row in tqdm(reader, total=total_rows, desc="Streaming rows", unit="row"):
                producer.send(topic, value=row)
                time.sleep(0.01)  

    except FileNotFoundError:
        print(f"Error: Dataset file not found at '{file_path}'.")
    except Exception as e:
        print(f"Error during streaming: {e}")
    finally:
        producer.flush()  
        print("Data streaming completed.")

### Stream the Data

In [10]:
if not os.path.exists(DATASET_FILE):
    print(f"Dataset file not found: {DATASET_FILE}")
else:
    stream_data_to_kafka(DATASET_FILE, KAFKA_TOPIC, producer)

Streaming data from 'D:\Hungary\Semester 2\Open-Source Technologies for Data Science\Practice\Project\ICU\Modified_ICU_Dataset.csv' to Kafka topic 'icu_topic'...


Streaming rows: 100%|█████████▉| 14999/15000 [04:01<00:00, 62.02row/s]

Data streaming completed.



