In [1]:
import time
import pandas as pd
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
import json  # For handling JSON serialization

# Initialize Kafka Admin Client
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
existing_topics = admin_client.list_topics()
print(f"Existing topics: {existing_topics}")

# Create a Kafka producer
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: v.encode('utf-8')  # Serialize messages as UTF-8 encoded strings
)

TOPIC_NAME = 'swat'

# Check if topic exists, create if not
if TOPIC_NAME not in existing_topics:
    topic_list = [NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1)]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)

# Read CSV into a pandas DataFrame
df = pd.read_csv('./Attack.csv')  # Replace 'Attack.csv' with your actual CSV file path

# Loop through each row of the CSV and send it row by row
for index, row in df.iterrows():
    # Convert the row to a dictionary and serialize it as a JSON string
    key = f"row-{index}"  # Unique key for each row
    value = json.dumps(row.to_dict())  # Serialize the row as JSON string
    # Send the row to Kafka
    producer.send(TOPIC_NAME, key=key, value=value)
    print(f"Sent row key: {key}, row value: {value}")

    # Optional: Wait before sending the next row
    time.sleep(5)  # Adjust sleep time as needed

# Close the producer after sending all messages
producer.close()


Existing topics: ['swat-randomforest', 'swat-forcast', '__consumer_offsets', 'swat', 'swat-ocsvm', 'devices', 'swat-zscore', 'swat-forecast']
Sent row key: row-0, row value: {"Timestamp": " 28/12/2015 10:00:00 AM", "FIT101": 2.427057, "LIT101": 522.8467, "MV101": 2, "P101": 2, "P102": 1, "AIT201": 262.0161, "AIT202": 8.396437, "AIT203": 328.6337, "FIT201": 2.445391, "MV201": 2, "P201": 1, "P202": 1, "P203": 2, "P204": 1, "P205": 2, "P206": 1, "DPIT301": 19.74838, "FIT301": 2.206835, "LIT301": 956.1651, "MV301": 1, "MV302": 2, "MV303": 1, "MV304": 1, "P301": 1, "P302": 2, "AIT401": 148.808, "AIT402": 156.0882, "FIT401": 1.713517, "LIT401": 942.0662, "P401": 1, "P402": 2, "P403": 1, "P404": 1, "UV401": 2, "AIT501": 7.878621, "AIT502": 145.1166, "AIT503": 264.5475, "AIT504": 12.03538, "FIT501": 1.723789, "FIT502": 1.279621, "FIT503": 0.7352687, "FIT504": 0.3077859, "P501": 2, "P502": 1, "PIT501": 250.8652, "PIT502": 1.649953, "PIT503": 189.5988, "FIT601": 0.000128152, "P601": 1, "P602": 1

KeyboardInterrupt: 

In [5]:

from kafka import KafkaConsumer, KafkaProducer

import json

from json import loads
    
from csv import DictReader

import time

from kafka.errors import NoBrokersAvailable


def is_kafka_broker_up(bootstrap_servers):
    try:
        # Try to create a temporary Kafka producer with a short timeout
        temp_producer = KafkaProducer(bootstrap_servers=bootstrap_servers, request_timeout_ms=500)
        temp_producer.close()
        return True
    except NoBrokersAvailable:
        return False
csv_file_path = '../dataset\SWaT_Dataset_Normal_v0 2.csv' 
def kafka_producer():
    # Set up for Kafka Producer
    bootstrap_servers = ['localhost:9092']
    topicname = 'swat'

    # Check if Kafka broker is up before attempting to create the producer
    while not is_kafka_broker_up(bootstrap_servers):
        print("Kafka broker is not up. Waiting for 30 seconds...")
        time.sleep(30)

    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

    with open(csv_file_path, 'r') as new_obj:
        csv_dict_reader = DictReader(new_obj)
        index = 0

        for row in csv_dict_reader:
            producer.send(topicname, json.dumps(row).encode('utf-8'))
            print("Data sent successfully")

            index += 1
            if (index % 20) == 0:
                time.sleep(10)

kafka_producer()

Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent successfully
Data sent s

KeyboardInterrupt: 

In [7]:
import time
import pandas as pd
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

# Initialize Kafka Admin Client
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
existing_topics = admin_client.list_topics()
print(existing_topics)

# Create a Kafka producer
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    # key_serializer=lambda k: k.encode('utf-8'),
    # value_serializer=lambda v: v.encode('utf-8')
)

TOPIC_NAME = 'swat'

# Check if topic exists, create if not
if TOPIC_NAME not in existing_topics:
    topic_list = [NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1)]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)

# Read CSV into a pandas DataFrame
df = pd.read_csv('./Attack.csv')  # Replace 'data.csv' with your actual CSV file path

# Loop through each row of the CSV
for index, row in df.iterrows():
    # Convert the row to string format for both key and value
    key = str(index)  # Use index or a specific column as the key
    value = row.to_json()  # Convert the row to JSON string

    # Send the data to Kafka
    producer.send(TOPIC_NAME, key=key.encode('utf-8'), value=value.encode('utf-8'))
    
    print(f"Sent key: {key}, value: {value}")
    
    # Wait for 2 seconds before sending the next message
    time.sleep(2)

# Close the producer after sending all messages
producer.close()

['swat', '__consumer_offsets']
Sent key: 0, value: {"Timestamp":" 28\/12\/2015 10:00:00 AM","FIT101":2.427057,"LIT101":522.8467,"MV101":2,"P101":2,"P102":1,"AIT201":262.0161,"AIT202":8.396437,"AIT203":328.6337,"FIT201":2.445391,"MV201":2,"P201":1,"P202":1,"P203":2,"P204":1,"P205":2,"P206":1,"DPIT301":19.74838,"FIT301":2.206835,"LIT301":956.1651,"MV301":1,"MV302":2,"MV303":1,"MV304":1,"P301":1,"P302":2,"AIT401":148.808,"AIT402":156.0882,"FIT401":1.713517,"LIT401":942.0662,"P401":1,"P402":2,"P403":1,"P404":1,"UV401":2,"AIT501":7.878621,"AIT502":145.1166,"AIT503":264.5475,"AIT504":12.03538,"FIT501":1.723789,"FIT502":1.279621,"FIT503":0.7352687,"FIT504":0.3077859,"P501":2,"P502":1,"PIT501":250.8652,"PIT502":1.649953,"PIT503":189.5988,"FIT601":0.000128152,"P601":1,"P602":1,"P603":1,"Normal\/Attack":"Normal"}
Sent key: 1, value: {"Timestamp":" 28\/12\/2015 10:00:01 AM","FIT101":2.446274,"LIT101":522.886,"MV101":2,"P101":2,"P102":1,"AIT201":262.0161,"AIT202":8.396437,"AIT203":328.6337,"FIT201

KeyboardInterrupt: 