In [1]:
import json
import os

import joblib
import pandas as pd
import pika
from tensorflow.keras.models import load_model
import tensorflow as tf


In [2]:
# RABBIT MQ
RABBIT_MQ_URL = os.getenv('RABBITMQ_URL', 'amqp://guest:guest@localhost:5672/%2f')
RABBIT_MQ_EXCHANGE = os.getenv('RABBITMQ_EXCHANGE', 'mlfingerprint')
RABBIT_MQ_QUEUE = os.getenv('RABBITMQ_QUEUE', 'packets_queue')
RABBIT_MQ_ROUTING_KEY = os.getenv('RABBITMQ_ROUTING_KEY', 'packets')
RABBIT_MQ_RESULT_QUEUE = os.getenv('RABBITMQ_RESULT_QUEUE', 'result_queue')
RABBIT_MQ_RESULT_ROUTING_KEY = os.getenv('RABBITMQ_RESULT_ROUTING_KEY', 'results')

# Model
MODEL = 'dense_mlp_model.keras'
SCALER = 'scaler.save'
LABEL_ENCODER = 'label_encoder.joblib'
tf.keras.utils.disable_interactive_logging()

In [3]:
# Connect to RabbitMQ
PARAMS = pika.URLParameters(RABBIT_MQ_URL)
connection = pika.BlockingConnection(PARAMS)
channel = connection.channel()
print('RabbitMQ Connection successfully')

RabbitMQ Connection successfully


In [4]:
# Load model, scaler, and label encoder
model = load_model(MODEL)
scaler = joblib.load(SCALER)
le = joblib.load(LABEL_ENCODER)

In [5]:
def predict_packet(packet, model, scaler, label_encoder):
    # packet should be a dictionary with keys: 'packetId', 'length', 'direction', 'iat', 'entropy', 'clientProtocolVersion'
    features = ['packetId', 'length', 'direction', 'iat', 'entropy', 'clientProtocolVersion']
    X = pd.DataFrame([packet], columns=features)
    X['direction'] = X['direction'].map({'INCOMING': 0, 'OUTGOING': 1})
    X_scaled = scaler.transform(X.values)
    y_proba = model.predict(X_scaled, verbose=0)
    predicted_label = label_encoder.inverse_transform([y_proba.argmax()])
    return predicted_label[0], y_proba[0]

In [6]:
def consume_packets(ch, method, properties, body):
    packet = json.loads(body)
    label, proba = predict_packet(packet, model, scaler, le)
    result = {
        'clientId': packet.get('clientId'),
        'client': label,
        'percentage': float(max(proba)) * 100  # Convert to percentage
    }
    channel.basic_publish(
        exchange=RABBIT_MQ_EXCHANGE,
        routing_key=RABBIT_MQ_RESULT_ROUTING_KEY,  # Change to your result queue
        body=json.dumps(result)
    )
    # print(f"Processed packet: {result}")

In [None]:
channel.basic_consume(
    queue=RABBIT_MQ_QUEUE,
    on_message_callback=consume_packets,
    auto_ack=True,
)
print(f"[*] Listening on queue '{RABBIT_MQ_QUEUE}' for incoming packets")

channel.start_consuming()

[*] Listening on queue 'packets_queue' for incoming packets
