<a href="https://colab.research.google.com/github/MichalisSant/Mike_Sant/blob/main/IDS_to_IPS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ===============================
# IDS -> IPS με γραφήματα (Colab)
# ===============================

# Εγκατάσταση βιβλιοθηκών
!pip install tensorflow scikit-learn pandas matplotlib

# Εισαγωγή βιβλιοθηκών
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from sklearn.metrics import classification_report, accuracy_score
import matplotlib.pyplot as plt



In [None]:
# -----------------------------------
# Συνάρτηση φόρτωσης dataset
# -----------------------------------
def load_dataset(csv_file_path):
    df = pd.read_csv(csv_file_path)
    # Υποθέτουμε ότι η τελευταία στήλη είναι η ετικέτα (label)
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    return X, y



In [None]:
# -----------------------------------
# Δημιουργία IDS μοντέλου (νευρωνικό δίκτυο)
# -----------------------------------
def create_and_train_model(input_dim):
    model = Sequential()
    model.add(Dense(256, input_dim=input_dim, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(3, activation='softmax'))  # 3 κατηγορίες: κανονικό, ύποπτο, κακόβουλο
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model



In [None]:
# -----------------------------------
# IPS - Προληπτικές Ενέργειες
# -----------------------------------
def block_ip(ip_address):
    print(f"[IPS] Μπλοκάρισμα IP: {ip_address}")

def isolate_device(device_id):
    print(f"[IPS] Απομόνωση συσκευής με ID: {device_id}")

def alert_admin(message):
    print(f"[IPS] Ειδοποίηση Διαχειριστή: {message}")



In [None]:
# -----------------------------------
# Λογική IPS (αποφάσεις)
# -----------------------------------
def ips_response(y_pred, metadata=None):
    for i, prediction in enumerate(y_pred):
        if prediction == 1:  # ύποπτη κίνηση
            alert_admin("Εντοπίστηκε ύποπτη δραστηριότητα")
        elif prediction == 2:  # κακόβουλη κίνηση
            if metadata and "ip" in metadata[i]:
                block_ip(metadata[i]["ip"])
            if metadata and "device_id" in metadata[i]:
                isolate_device(metadata[i]["device_id"])
            alert_admin("Εντοπίστηκε κακόβουλη εισβολή – λήψη μέτρων!")



In [None]:
# -----------------------------------
# Προσομοίωση IPS με καταγραφή accuracy
# -----------------------------------
def simulate_streaming_with_ips(csv_file_path, model, scaler, batch_size=500, total_batches=2000, eval_every=200):
    batch_counter = 0
    train_accs, test_accs = [], []
    checkpoints = []

    # Φόρτωση δεδομένων για αξιολόγηση
    X_all, y_all = load_dataset(csv_file_path)
    X_all = scaler.transform(X_all)
    X_train, X_test, y_train, y_test = train_test_split(X_all, y_all, test_size=0.2, random_state=42)

    while batch_counter < total_batches:
        # Δημιουργία μεταδεδομένων (IP + device_id)
        metadata = [{"ip": f"192.168.1.{i%255}", "device_id": f"dev{i%100}"} for i in range(len(X_all))]

        # Διαίρεση σε batches
        for i in range(0, len(X_all), batch_size):
            batch_counter += 1
            X_batch = X_all[i:i + batch_size]
            y_batch = y_all[i:i + batch_size]
            meta_batch = metadata[i:i + batch_size]

            # Εκπαίδευση IDS με το batch
            model.train_on_batch(X_batch, y_batch)

            # Πρόβλεψη και αντίδραση IPS
            y_pred_probs = model.predict(X_batch, verbose=0)
            y_pred = np.argmax(y_pred_probs, axis=1)
            ips_response(y_pred, metadata=meta_batch)

            # Περιοδική αξιολόγηση
            if batch_counter % eval_every == 0:
                y_train_pred = np.argmax(model.predict(X_train, verbose=0), axis=1)
                y_test_pred = np.argmax(model.predict(X_test, verbose=0), axis=1)

                train_acc = accuracy_score(y_train, y_train_pred)
                test_acc = accuracy_score(y_test, y_test_pred)

                train_accs.append(train_acc)
                test_accs.append(test_acc)
                checkpoints.append(batch_counter)

                print(f"[ΑΞΙΟΛΟΓΗΣΗ] Batch {batch_counter} -> Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}")

            if batch_counter >= total_batches:
                break
# Γράφημα accuracy
    plt.figure(figsize=(8,5))
    plt.plot(checkpoints, train_accs, label="Train Accuracy")
    plt.plot(checkpoints, test_accs, label="Test Accuracy")
    plt.xlabel("Batch number")
    plt.ylabel("Accuracy")
    plt.title("IPS - Εξέλιξη Ακρίβειας με το χρόνο")
    plt.legend()
    plt.show()



In [None]:
# -----------------------------------
# Εκκίνηση παραδείγματος
# -----------------------------------
csv_file_path = 'https://raw.githubusercontent.com/kdemertzis/EKPA/main/Data/pcap_data.csv'

# Φόρτωση και τυποποίηση δεδομένων
X, y = load_dataset(csv_file_path)
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Χωρισμός σε training/test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Δημιουργία και αρχική εκπαίδευση IDS
model = create_and_train_model(X_train.shape[1])
model.fit(X_train, y_train, epochs=5, batch_size=64, validation_split=0.2)

# Αξιολόγηση IDS πριν τη χρήση IPS
y_pred = np.argmax(model.predict(X_test), axis=1)
print("[Αρχική IDS Αξιολόγηση]")
print(classification_report(y_test, y_pred))

# Εκκίνηση IPS προσομοίωσης με γραφήματα
simulate_streaming_with_ips(csv_file_path, model, scaler)
