In [1]:
from kafka import KafkaProducer
from kafka import KafkaConsumer
from json import loads
import json
from time import sleep
import pandas as pd

In [8]:
def data_preprocessing(df):
    preprocessed = False
    # Since attack column is aggregation of attack_P1, attack_P2 and attack_P3, we are dropping those three columns and using only attack col as label
    col_to_drop = ['time','attack_P1', 'attack_P2', 'attack_P3']
    if all(column in df.columns for column in col_to_drop):
        df = df.drop(columns=col_to_drop)
        preprocessed = True
    
    return df, preprocessed

In [9]:
def json_serializer(data):
    return json.dumps(data).encode("utf-8")

In [12]:
class KafkaDataStreamer:
    def __init__(self, bootstrap_servers, topic):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        # Initialize the Kafka producer
        self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, 
                                      value_serializer=json_serializer,
                                      api_version=(0, 10, 1))

    def stream_data(self, data):

        message = data.to_dict()
        self.producer.send(self.topic, value=message)

In [13]:
topic_producer = 'hai-preprocessed-mao'
bootstrap_servers = ['localhost:9092']

In [14]:
class HaiConsumer:
    def __init__(self, topic, bootstrap_servers):
        self.topic = topic
        self.bootstrap_servers = bootstrap_servers
        self.consumer = KafkaConsumer(
            self.topic,
            bootstrap_servers=self.bootstrap_servers,
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            value_deserializer=lambda x: loads(x.decode('utf-8')))

    def consume(self):
        
        streamer = KafkaDataStreamer(bootstrap_servers, topic_producer)
        counter = 0
        for i, message in enumerate(self.consumer):
            message = message.value
            # selected_columns = {key: value for key, value in message.items() if key in columns_to_scale_and_monitor}
            df = pd.DataFrame([message])
            df, preprocessed = data_preprocessing(df)

            if not preprocessed:
                continue

            if preprocessed:
                streamer.stream_data(df)
                counter += 1

                
            

In [15]:
topic = 'hai-input-mao'
consumer = HaiConsumer(topic, bootstrap_servers)
consumer.consume()