# Kafka Consumer

A notebook that is used on the local machine to connect to the Kafka server and act as the consumer.

---

##### Libraries

In [1]:
# Libraries in the Consumer Notebook

# General
import time
import os

# Data manipulation
import pandas as pd
import numpy as np

# Data Visualisation
import matplotlib.pyplot as plt
import seaborn as sns

# SQL
import pymysql

# Data types
import json
import pickle
import joblib
import avro

# Avro objects
import io
import fastavro

# Kafka
from confluent_kafka import Consumer, KafkaException, KafkaError

# Machine Learning Model
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix

---

# Fix Issue with Consumer not serailising issue correctly

There was an issue where the .....

---

# Basic Consumer test

Start the consumer first to be listening for a producer message.

In [None]:
# # Kafka configuration
# conf = {'bootstrap.servers': 'localhost:9092', # Local kafka server address (local machine)
#        'group.id':'jupyter-consumer-group',
#        'auto.offset.reset': 'earliest'} # Start from the beginning of the topic. 

# # Create a consumer instance
# consumer = Consumer(conf)

# # Subscribe to the topic
# consumer.subscribe(['network-data-events'])

# # Use a try, except, finally to allow kafka consumer to shut down gracefully.
# # Try, continuously polls for new messages. Start the porducer to get a messages sent to the consumer
# # except, lets you handle errors, 
# # Finally, lets you execte code. Lets the cosumer code be closed.

# try:
#     # Code block to poll messages from the producer
#     while True:
#         msg = consumer.poll(timeout=1.0)
#         if msg is None: continue
#         if msg.error():
#             if msg.error().code() == KafkaError._PARTITION_EOF:
#                 # End of partition event
#                 print(f'{msg.topic()}[{msg.partition()}] reached end at offset {msg.offset()}')
#             elif msg.error():
#                 raise KafkaException(msg.error())
#         else:
#             print(f'Recieved message: {msg.value().decode("utf-8")}')
# except KeybaordInterrupt:
#     pass
# finally:
#     # Closes the connection
#     consumer.close()

---

# Real Time Anomaly Detection Consumer

This consumer will take in the two topics from the Producer that have the features and the labels that have been separated out into two topics. Every 5th batch the second topic will be used to test the results to get an idea how the model is performing.

This is done as a first test and to reduce computer power.

# Run before running Consumer

### SQL connection

In [None]:
# # Function to connect to MYSQL connection
# def mysql_connection():
#     return pymysql.connect(host='localhost',
#                             user='root',
#                             password='root',
#                             db='mtu_capstone_db')            

### Load schema for features and labels. Created in the Producer

##### Attempts to load schema from file. Can cause issues if not loaded correctly.

In [2]:
# ###############################
# # Attempt 4: Use Fastavro to read schema.
# ###############################
def load_avro_schema_with_fastavro(schema_file_path):
    schema = fastavro.schema.load_schema(schema_file_path)
    return schema

# folder location and file names for the schema files
folder_path = r'C:\Users\Kolobane\OneDrive\CIT MSc Data Science Modules\_Semester Three - Final Project\Project Two - Network Project\Data\Avro Schema'
features_avro_schema_file = "features_avro_schema.avsc" 
label_avro_schema_file = "label_avro_schema.avsc"

# Call each scehma
features_avro_schema = load_avro_schema_with_fastavro(os.path.join(folder_path ,features_avro_schema_file)) 
label_avro_schema = load_avro_schema_with_fastavro(os.path.join(folder_path, label_avro_schema_file))

In [None]:
# # features_avro_schema
# label_avro_schema 

In [None]:
# features_avro_schema 

### Save Model

Models are too large to save, they will be 600mb plus times 50. I don't have the resources for that.

In [None]:
# def save_model(model, filename):
#     model_path = os.path.join('C:\\Users\\Kolobane\\OneDrive\\CIT MSc Data Science Modules\\_Semester Three - Final Project\\Project Two - Network Project\\ML Models\\Random Forest Real Time Models', filename)
#     with open(filename, 'wb') as file:
#         pickle.dump(model, file)

### Function to deserialise avro record sent from the Producer.

##### Attempt 1. Didnt work. Match error

In [None]:
# # Take the avro objects from the producer and with the schema, remove the data for use.
# def deserialise_avro_record(avro_bytes, avro_schema):
#     # Create a datum reader using the schema avro
#     reader = DatumReader(avro_schema)
    
#     # Create a a Binary Decoder with avro bytes
#     decoder = BinaryDecoder(io.BytesIO(avro_bytes))
    
#     return reader.read(decoder)

##### Attempt 2: Create two functions, by listing out the featues in the DatumReader did not work. Deleted code 

###### Attempt 3: Getting a "Received message from topic: batch-network-data, key: b'0'. Error: unhashable type: 'RecordSchema'" error

In [None]:
# def deserialise_features_avro_record(avro_bytes, features_avro_schema):
#     # Create a by bytesIO steam from the avro bytes
#     bytes_reader = io.BytesIO(avro_bytes)
    
#     # deserailise the data using the schema
#     features_data = schemaless_reader(bytes_reader, features_avro_schema)
    
#     return features_data

# def deserialise_label_avro_record(avro_bytes, label_avro_schema):
#     # Create a by bytesIO steam from the avro bytes
#     bytes_reader = io.BytesIO(avro_bytes)
    
#     # deserailise the data using the schema
#     label_data = schemaless_reader(bytes_reader, label_avro_schema)
    
#     return label_data

##### Attempt 4: Add in debugging

In [3]:
def deserialise_features_avro_record(avro_bytes, schema):
    bytes_reader = io.BytesIO(avro_bytes)
    deserialised_data = []
    for record in fastavro.reader(bytes_reader, reader_schema=schema):
        deserialised_data.append(record)
    return deserialised_data

def deserialise_label_avro_record(avro_bytes, schema):
    bytes_reader = io.BytesIO(avro_bytes)
    deserialised_data = []
    for record in fastavro.reader(bytes_reader, reader_schema=schema):
        deserialised_data.append(record)
    return deserialised_data

## Run Consumer 1: Random Forest Basic Model

Set up consumer to listen for the batches sent from the producer. Loads in baseline model, tests the batches, outputs results to database.

In [None]:
# ############################################################
# # Set up the Kafka configuration
# ############################################################
# # Kafka configuration
# conf = {'bootstrap.servers': 'localhost:9092', # Local kafka server address (local machine)
#        'group.id':'jupyter-consumer-group',
#        'auto.offset.reset': 'earliest'} # Start from the beginning of the topic. 

# # Create Kafka Consumer instance
# consumer = Consumer(conf)

# # Subscribe to the topics
# consumer.subscribe(["batch-network-data", "test-batch-labels"]) # consumer needs to listen to both topics.

# ############################################################
# # Load the baseline model: Random Forest
# ############################################################
# # # Model Location
# # model_path = r"C:\Users\Kolobane\OneDrive\CIT MSc Data Science Modules\_Semester Three - Final Project\Project Two - Network Project\ML Models\rf_model_baseline_basic.joblib"

# # # Load the previous pre-trained baseline model.
# # model = joblib.load(model_path)

# ############################################################
# # Random Forest model parameters
# ############################################################
# # # For each model I will use the same parameters as I used in the previous models. For RF basic model, thats n_estimators=100
# # new_model_hyperparameters = {
# #     'n_estimators': 100
# # }

# ############################################################
# # Initialize dictionaries for storing batch data
# ############################################################
# batch_features = {}
# batch_labels = {}

# ############################################################
# # Start Consumer polling
# ############################################################
# try: 
#     while True:
#         msg = consumer.poll(1.0) # poll the first topic
        
#         if msg is None:
#             continue
#         if msg.error():
#             print(f"Consumer error: {msg.error()}")
#             continue   
    
#         ############################################################
#         # Check Topics - this is kept seperate for the next models.
#         ############################################################
    
#         #######################
#         # Check if topics are from "batch-network-data"
#         if msg.topic() == "batch-network-data":
#             batch_number = int(msg.key()) # get the batch number
#             avro_bytes = msg.value()
            
#             # Deserilise the avro data with the label_avro_schema loaded above
#             avro_data = deserialize_avro_record(avro_bytes, features_avro_schema )
#             batch_features[batch_number] = avro_data
            
    
#         #######################
#         # Check if topics are from "test-batch-labels"
#         if msg.topic() == "test-batch-labels":
#             batch_number = int(msg.key())
#             avro_bytes = msg.value()
            
#             # Deserilise the avro data with the features_avro_schema loaded above
#             avro_data = deserialize_avro_record(avro_bytes,label_avro_schema)
#             batch_features[batch_number] = avro_data
            

#         ############################################################
#         # Predict Using Baseline Models
#         ############################################################
    
# #         #######################
# #         # Use baseline model to make predictions with current batch
# #         if batch_number in batch_features and batch_number in batch_labels:
# #             batch_features_data = batch_features[batch_number]
# #             batch_labels_data = batch_labels[batch_number]
            
# #             # Make predictions using the baseline model
# #             baseline_predictions = model.predict(preprocessed_data)
        
# #         else:
# #             print("Batch features or labels are missing for batch number:", batch_number)
        
#         ############################################################
#         # Combine Features and Labels
#         ############################################################
        
# #         #######################
# #         # Combine Features and the labels.
# #         if batch_features_data and batch_labels_data:
# #             combined_data = {
# #                 "features": batch_features_data,
# #                 "labels": batch_labels_data
# #             }
# #         else:
# #             print("Batch features or labels are missing for batch number:", batch_number)
        
#         ############################################################
#         # Evaluate Predictions
#         ############################################################
        
# #         #######################
# #         ## Compare the baseline model with the actual labels.
# #         if batch_number in baseline_predictios and batch_number in batch_labels:
# #             baseline_predictions = baseline_predictions_dict[batch_number]
# #             actual_labels = batch_labels[batch_number]
            
# #             if baseline_predictions and actual_labels:
                
# #                 # Metrics
# #                 accuracy_value = accuracy_score(actual_labels, baseline_predictions)
# #                 precision_value = precision_score(actual_labels, baseline_predictions)
# #                 recall_value = recall_score(actual_labels, baseline_predictions)
# #                 f1_score_value = f1_score(actual_labels, baseline_predictions)
# #                 auc_score = roc_auc_score(actual_labels, baseline_predictions)
                
# #                 # Confusion matrix
# #                 confusion_matrix_result = confusion_matrix(actual_labels, baseline_predictions)
# #                 confusion_matrix = json.dumps(confusion_matrix, matrix_result.tolist())
                
# #                 # feature Importance
# #                 feature_importance_results = model.feature_importances_
# #                 feature_importance = json.dumps(feature_importance_results)           
                
# #             else:
# #                 print("Batch data is empty for batch_number:", batch_number)
        
# #         else:
# #             print("Batch predictions or labels missing in batch_number:", batch_number)
        
        
# #         ############################################################
# #         # Retrain the baseline model to get the next baseline model.
# #         ############################################################
        
# #         #######################
# #         ## retain model to get new baseline model
# #         retrain_start_time = time.time()
        
# #         # Create a new baseline model with the same hyperparameters
# #         new_baseline_model = RandomForestClassifier(**new_model_hyperparameters)
        
# #         # Retrain the baseline model wiht the batch data
# #         new_baseline_model.fit(batch_features_data, batch_labels_data)
        
# #         # Stop the timer
# #         retrain_end_time =  time.time()
        
# #         #######################
# #         # save date
# #         model_training_time_seconds = retrain_end_time -retrain_start_time
        
# #         # save the new model parameters # although I have decided to use the same.
# #         new_model_parameters_results = new_baseline_model.get_params()
# #         new_model_parameters_results_tolist = new_model_parameters_results.tolist()
# #         new_model_parameters = json.dumps(new_model_parameters_results_tolist)
        
# #         ############################################################
# #         # Send values to database
# #         ############################################################
        
# #         #######################
# #         ## connect to the database
# #         conn = mysql_connection()        
# #         cursor = conn.cursor()
        
# #         #######################
# #         ## Create SQL Query
# #         insert_query = """
# #             INSERT INTO rf_basic_rt_model_results(
# #                 batch_number, model_name, timestamp, accuracy_value,
# #                 precision_value, recall_value, f1_value, auc_score,
# #                 confusion_matrix, feature_importance, testing_time_seconds,
# #                 model_training_time_seconds, model_parameters
# #             )
# #             VALUES (%s, %s, NOW(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """

# #         # Values to be insert#
# #         values =  (
# #             batch_number, model_name, accuracy_value,
# #             precision_value, recall_value, f1_value, auc_score,
# #             confusion_matrix_json, feature_importance_json, testing_time_seconds,
# #             retraining_time_seconds, new_model_parameters_json
# #         )

# #         # Execute the insert query that is defind above
# #         cursor.execute(insert_query, values)
        
# #         # Commit the changes to the database
# #         conn.commit()
        
# #         print(f"Batch {batch_number} values inserted successfully into the database.")
       
              
# ############################################################
# # Error handling and logging
# ############################################################             
# except Exception as e:
#     print(f"Error: {e}")
#     if "conn" in locals() or "conn" in globals():    
#         conn.rollback()
    
# ############################################################
# # Clean up and close
# ############################################################        
# finally:
#     # Good practices to reduce issues by cleaning up and closing connections
              
#     if "conn" in locals() or "conn" in globals():    
#         conn.rollback()
#         conn.close()
        
#     if "cursor" in locals() or "conn" in globals():   
#         cursor.close()
    
#     consumer.close()

### Shorter version of consumer

In [None]:
############################################################
# Set up the Kafka configuration
############################################################
# Kafka configuration
conf = {'bootstrap.servers': 'localhost:9092', # Local kafka server address (local machine)
       'group.id':'jupyter-consumer-group',
       'auto.offset.reset': 'earliest'} # Start from the beginning of the topic. 

# Create Kafka Consumer instance
consumer = Consumer(conf)

# Subscribe to the topics
consumer.subscribe(["batch-network-data", "test-batch-labels"]) # consumer needs to listen to both topics.


############################################################
# Initialize dictionaries for storing batch data
############################################################
batch_features = {}
batch_label = {}

############################################################
# Start Consumer polling
############################################################
try: 
    while True:
        msg = consumer.poll(1.0) # poll the first topic
        
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue   
    
        # Debugging: Print the topic and keys
        print(f"Received message from topic: {msg.topic()}, key: {msg.key()}")
        
        ############################################################
        # Check Topics - this is kept seperate for the next models.
        ############################################################
    
        #######################
        # Check if topics are from "batch-network-data"
        if msg.topic() == "batch-network-data":
            batch_number = int(msg.key()) # get the batch number
            avro_bytes = msg.value()
            
            # Deserilise the avro data with the label_avro_schema loaded above
            avro_data = deserialise_features_avro_record(avro_bytes, features_avro_schema )
            batch_features[batch_number] = avro_data
            
            # Debugging: print part of producer data
            print(f"Deserialized data for FEATURES: {avro_data}")  
            
        #######################
        # Check if topics are from "test-batch-labels"
        if msg.topic() == "test-batch-labels":
            batch_number = int(msg.key())
            avro_bytes = msg.value()
            
            # Deserilise the avro data with the features_avro_schema loaded above
            avro_data = deserialise_label_avro_record(avro_bytes, label_avro_schema)
            batch_label[batch_number] = avro_data

            # Debugging: print part of producer data
            print(f"Deserialized data for LABELS: {avro_data}")
        

############################################################
# Error handling and logging
############################################################             
# except Exception as e:
#     print(f"Error: {e}")
#     if "conn" in locals() or "conn" in globals():    
#         conn.rollback()
    
except Exception as e:
    print(f"Error: {e}")   
    
############################################################
# Clean up and close
# ############################################################        
# finally:
#     # Good practices to reduce issues by cleaning up and closing connections
              
#     if "conn" in locals() or "conn" in globals():    
#         conn.rollback()
#         conn.close()
        
#     if "cursor" in locals() or "conn" in globals():   
#         cursor.close()
    
#     consumer.close()

finally:
    consumer.close()

Received message from topic: batch-network-data, key: b'0'


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



---

---

---

--- 

# Total Code

In [None]:
# ###############################
# Consumer code
# ###############################

############################################################
def load_avro_schema_with_fastavro(schema_file_path):
    schema = fastavro.schema.load_schema(schema_file_path)
    return schema

# folder location and file names for the schema files
folder_path = r'C:\Users\Kolobane\OneDrive\CIT MSc Data Science Modules\_Semester Three - Final Project\Project Two - Network Project\Data\Avro Schema'
features_avro_schema_file = "features_avro_schema.avsc" 
label_avro_schema_file = "label_avro_schema.avsc"

# Call each scehma
features_avro_schema = load_avro_schema_with_fastavro(os.path.join(folder_path ,features_avro_schema_file)) 
label_avro_schema = load_avro_schema_with_fastavro(os.path.join(folder_path, label_avro_schema_file))

############################################################
def deserialise_features_avro_record(avro_bytes, schema):
    bytes_reader = io.BytesIO(avro_bytes)
    deserialised_data = []
    for record in fastavro.reader(bytes_reader, reader_schema=schema):
        deserialised_data.append(record)
    return deserialised_data

def deserialise_label_avro_record(avro_bytes, schema):
    bytes_reader = io.BytesIO(avro_bytes)
    deserialised_data = []
    for record in fastavro.reader(bytes_reader, reader_schema=schema):
        deserialised_data.append(record)
    return deserialised_data

############################################################
    
############################################################
# Set up the Kafka configuration
############################################################
# Kafka configuration
conf = {'bootstrap.servers': 'localhost:9092', # Local kafka server address (local machine)
       'group.id':'jupyter-consumer-group',
       'auto.offset.reset': 'earliest'} # Start from the beginning of the topic. 

# Create Kafka Consumer instance
consumer = Consumer(conf)

# Subscribe to the topics
consumer.subscribe(["batch-network-data", "test-batch-labels"]) # consumer needs to listen to both topics.


############################################################
# Initialize dictionaries for storing batch data
############################################################
batch_features = {}
batch_label = {}

############################################################
# Start Consumer polling
############################################################
try: 
    while True:
        msg = consumer.poll(1.0) # poll the first topic
        
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue   
    
        # Debugging: Print the topic and keys
        print(f"Received message from topic: {msg.topic()}, key: {msg.key()}")
        
        ############################################################
        # Check Topics - this is kept seperate for the next models.
        ############################################################
    
        #######################
        # Check if topics are from "batch-network-data"
        if msg.topic() == "batch-network-data":
            batch_number = int(msg.key()) # get the batch number
            avro_bytes = msg.value()
            
            # Deserilise the avro data with the label_avro_schema loaded above
            avro_data = deserialise_features_avro_record(avro_bytes, features_avro_schema )
            batch_features[batch_number] = avro_data
            
            # Debugging: print part of producer data
            print(f"Deserialized data for FEATURES: {avro_data}")  
            
        #######################
        # Check if topics are from "test-batch-labels"
        if msg.topic() == "test-batch-labels":
            batch_number = int(msg.key())
            avro_bytes = msg.value()
            
            # Deserilise the avro data with the features_avro_schema loaded above
            avro_data = deserialise_label_avro_record(avro_bytes, label_avro_schema)
            batch_label[batch_number] = avro_data

            # Debugging: print part of producer data
            print(f"Deserialized data for LABELS: {avro_data}")
        

############################################################
# Error handling and logging
############################################################             
# except Exception as e:
#     print(f"Error: {e}")
#     if "conn" in locals() or "conn" in globals():    
#         conn.rollback()
    
except Exception as e:
    print(f"Error: {e}")   
    
############################################################
# Clean up and close
# ############################################################        
# finally:
#     # Good practices to reduce issues by cleaning up and closing connections
              
#     if "conn" in locals() or "conn" in globals():    
#         conn.rollback()
#         conn.close()
        
#     if "cursor" in locals() or "conn" in globals():   
#         cursor.close()
    
#     consumer.close()

finally:
    consumer.close()

# Explore other things