In [9]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelBinarizer
from sklearn.decomposition import PCA
import tensorflow as tf
def on_message(msg):
    global received_data_buffer
    if msg.error():
        print(f"Consumer error: {msg.error()}")
    else:
        received_data = np.frombuffer(msg.value(), dtype=np.float32)
        if received_data.shape[0] == 1 and received_data[0] == -1:  # Check if received_data is the separator
            if len(received_data_buffer) == 7:
                X_received = np.array(received_data_buffer).reshape(1, -1)
                prediction = model.predict(X_received)
                print(f'Prediction: {prediction}')
            else:
                print("Unexpected data length, skipping this message")
            received_data_buffer = []  # Reset the buffer
        else:
            received_data_buffer.extend(received_data)  # Accumulate received data

# Consume messages and process them using the on_message function
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
    else:
        on_message(msg)

In [10]:
def prod_datapreprocess(csv_file):
    df = pd.read_csv(csv_file)
    
    dimensions_num_for_PCA = 7

    def clean_dataset(df):
        assert isinstance(df, pd.DataFrame), "df needs to be a pd.DataFrame"
        df.dropna(inplace=True)
        indices_to_keep = ~df.isin([np.nan, np.inf, -np.inf]).any(axis=1)
        return df[indices_to_keep]

    def get_PCA_feature_names(num_of_pca_components):
        feature_names = []
        for i in range(num_of_pca_components):
            feature_names.append(f"Principal component {i+1}")
        return feature_names
    
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')
    df_cleaned = df.copy()
    df_cleaned = clean_dataset(df_cleaned)

    df_cleaned = df_cleaned.reset_index()
    df_cleaned.drop('index', axis=1, inplace=True)

    # Saving the label attribute before dropping it
    df_labels = df_cleaned['label']
    df_cleaned.drop('label', axis=1, inplace=True)
    df_features = df_cleaned.columns.tolist()

    df_scaled = StandardScaler().fit_transform(df_cleaned)
    df_scaled = pd.DataFrame(data=df_scaled, columns=df_features)

    # Performing PCA
    pca = PCA(n_components=dimensions_num_for_PCA)
    principal_components = pca.fit_transform(df_scaled)

    # Creating a DataFrame with principal components
    principal_component_headings = get_PCA_feature_names(dimensions_num_for_PCA)
    df_pc = pd.DataFrame(data=principal_components, columns=principal_component_headings)

    df_final = pd.concat([df_pc, df_labels], axis=1)

    lb = LabelBinarizer()
    df_final['label'] = lb.fit_transform(df_final['label'])

    X = df_final.drop(['label'], axis = 1)
    y = df_final['label']

    return X



In [11]:
X = prod_datapreprocess('../../dataset/CICIDS2017/MachineLearningCSV/MachineLearningCVE/Friday-WorkingHours-Afternoon-DDos.pcap_ISCX.csv')

  df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')
  df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')


In [12]:
print(X)

        Principal component 1  Principal component 2  Principal component 3  \
0                   -2.341904              -2.243298               1.543957   
1                   -1.954587              -2.351691               1.405829   
2                   -1.967929              -2.368396               1.420612   
3                   -2.017654              -2.327118               1.410350   
4                   -2.341910              -2.243298               1.543953   
...                       ...                    ...                    ...   
225706              -1.938848              -2.401891               1.434239   
225707              -1.935508              -2.397758               1.430570   
225708              -1.934783              -2.396816               1.429744   
225709              -2.093903              -1.899863               1.271489   
225710              -2.054975              -2.251334               1.660558   

        Principal component 4  Principal component 

In [13]:
from confluent_kafka import Producer, Consumer, KafkaError
import time

# Load the pre-trained TensorFlow model
model = load_model('../../spectre-code/spectre-ann/Model/DDOS_2/A/spectre_ddos_2_h5.h5')

# Read the CSV file
#csv_file_path = 'path/to/your/csv_file.csv'
#df = pd.read_csv(csv_file_path)

# Process the data using prod_datapreprocess
#X, y = prod_datapreprocess(csv_file_path)

# Scale the data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

print(f'X_scaled shape: {X_scaled.shape}') 


# Initialize the Kafka producer and consumer
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'queue.buffering.max.messages': 100000,
    'max.in.flight.requests.per.connection': 1   # Add this line to set the maximum number of in-flight messages to 1
}

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'session.timeout.ms': 6000,
    'auto.offset.reset': 'earliest',
    'queued.min.messages': 1  # Add this line to set the minimum number of records in the queue to 1
}

producer = Producer(producer_conf)
consumer = Consumer(consumer_conf)

X_scaled shape: (225711, 7)


%4|1685310185.354|TERMINATE|rdkafka#producer-5| [thrd:app]: Producer terminating with 372 messages (11160 bytes) still in queue or transit: use flush() to wait for outstanding message delivery


In [None]:
producer.flush()

# Start the Kafka consumer
consumer.subscribe(['detect_anomalies'])

# Send the scaled data to the Kafka producer line by line
separator = np.array([-1], dtype=np.float32)  # Add this line to define a separator

for line in X_scaled:
    print(f'Line shape before sending: {line.shape}')
    producer.produce('detect_anomalies', line.tobytes())
    producer.produce('detect_anomalies', separator.tobytes())
    time.sleep(0.5)

# Flush the producer to ensure all messages are sent
producer.flush()

# Consume the data from the Kafka topic
received_data_buffer = []

def on_message(msg):
    global received_data_buffer
    if msg.error():
        print(f"Consumer error: {msg.error()}")
    else:
        received_data = np.frombuffer(msg.value(), dtype=np.float32)
        if received_data.shape[0] == 1 and received_data[0] == -1:  # Check if received_data is the separator
            if len(received_data_buffer) == 7:
                X_received = np.array(received_data_buffer).reshape(1, -1)
                prediction = model.predict(X_received)
                print(f'Prediction: {prediction}')
            else:
                print("Unexpected data length, skipping this message")
            received_data_buffer = []  # Reset the buffer
        else:
            received_data_buffer.extend(received_data)  # Accumulate received data

# Consume messages and process them using the on_message function
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
    else:
        on_message(msg)

In [14]:
import threading

def kafka_producer():
    for line in X_scaled:
        print(f'Line shape before sending: {line.shape}')
        producer.produce('detect_anomalies', line.tobytes())
        producer.produce('detect_anomalies', separator.tobytes())
        time.sleep(0.5)
    producer.flush()

def kafka_consumer():
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
        else:
            on_message(msg)

producer_thread = threading.Thread(target=kafka_producer)
consumer_thread = threading.Thread(target=kafka_consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line shape before sending: (7,)
Line sha

KeyboardInterrupt: 

Line shape before sending: (7,)