#### **Producer**

In [None]:
from confluent_kafka import Producer
import socket
import pandas as pd
import time

##### The server details (where the topic resides)

In [None]:
conf = {'bootstrap.servers': "localhost:9092",
        'client.id': socket.gethostname()}

In [None]:
producer = Producer(conf)

##### Take each data point from the dummy dataset, push it to the topic as json, every 10 seconds

In [None]:
for i, df in enumerate(pd.read_csv('dummy.csv', chunksize=1)):
    json = df.loc[i].to_json()
    producer.produce("electra",json)
    time.sleep(10)
producer.flush()

#### **Consumer**

In [None]:
from confluent_kafka import Consumer
import socket
import pandas as pd
import json
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.ml import PipelineModel

##### The server details (where the topic resides), Grp ID is used to identify the consumers of a particular group

In [None]:
conf = {'bootstrap.servers': "localhost:9092",
        'group.id': "foo",'auto.offset.reset':'earliest'}

##### Creating a Spark session

In [None]:
spark = SparkSession.builder.master("local").appName("electra-pred").getOrCreate()

##### Defining a schema, to format the incoming JSON to Spark Dataframe, for prediction

In [None]:
electraSchema = StructType() \
                        .add("Time", "long")\
                        .add("smac", "string")\
                        .add("dmac", "string")\
                        .add("sip", "string")\
                        .add("dip", "string")\
                        .add("request", "integer")\
                        .add("fc", "integer")\
                        .add("error", "integer")\
                        .add("address", "integer")\
                        .add("data", "integer")\
                        .add("label", "string")

##### Loading the trained model

In [None]:
model = PipelineModel.load('./electra-model')

##### Creating a consumer instance and subscribing to the Kafka topic

In [None]:
consumer = Consumer(conf)
consumer.subscribe(['electra'])

In [None]:
def main():
    while True:
        msg = consumer.poll(1.0) #timeout for listening to the topic
        if msg is None:
            continue
        if msg.error():
            print('Error: {}'.format(msg.error()))
            continue

        # Load JSON (values from key-value pairs) data from the topic
        data = msg.value().decode('utf-8')
        val = list(json.loads(data).values())

        # Loading it into a dataframe
        df = spark.createDataFrame(data=[(val)], schema=electraSchema)

        # Formatting the data as a CSV chunk
        val = ",".join([str(x) for x in val[0:len(val)-1]])

        # Obtaining a label prediction for the current data point
        pred = model.transform(df)
        prediction = int(pred.collect()[0][22])
        labels = list(model.stages[0].labelsArray[4])

        # Appending the prediction to the CSV chunk
        val += ","+labels[prediction]
        print(val)

        # Remove first 2 lines of d.csv, using a temp file
        os.system("sed '2d' d.csv > temp.csv")
        os.system("mv -f temp.csv d.csv")

        # Append the latest data point to d.csv as well as the aggregator CSV in the hdfs
        os.system('echo "{}" >> ./d.csv'.format(val))
        os.system('echo "{}" | hdfs dfs -appendToFile - /electra/electra_modbus.csv'.format(val))
    consumer.close()

if __name__ == "__main__":
    main()