In [None]:
!pip install xgboost scikit-learn joblib


In [None]:
import numpy as np
import pandas as pd
import random
from datetime import datetime, timedelta

from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

from sklearn.ensemble import IsolationForest
from xgboost import XGBClassifier

import joblib


In [None]:
SECTORS = ['Healthcare', 'Agriculture', 'Urban']
PROTOCOLS = ['MQTT', 'HTTP', 'TCP', 'UDP', 'DICOM', 'Modbus']
ATTACK_TYPES = ['Normal', 'DDoS', 'Ransomware', 'MITM', 'Injection', 'Spoofing']
OPERATIONS = ['Read', 'Write', 'Update', 'Delete']


In [None]:
def random_ip():
    return ".".join(str(random.randint(1, 254)) for _ in range(4))

def random_timestamp():
    start = datetime.now() - timedelta(days=30)
    return start + timedelta(seconds=random.randint(0, 30*24*60*60))


In [None]:
def generate_attack_features(attack):
    if attack == 'Normal':
        return (random.randint(200,800), random.uniform(10,50), 
                random.uniform(10,40), random.uniform(20,50), 
                random.uniform(60,100), 1, 0)

    if attack == 'DDoS':
        return (random.randint(1500,5000), random.uniform(200,1000), 
                random.uniform(60,90), random.uniform(60,90), 
                random.uniform(10,40), 1, 1)

    if attack == 'Ransomware':
        return (random.randint(500,1500), random.uniform(100,400), 
                random.uniform(80,100), random.uniform(80,100), 
                random.uniform(5,30), 0, 1)

    if attack == 'MITM':
        return (random.randint(300,1200), random.uniform(150,600), 
                random.uniform(30,60), random.uniform(30,60), 
                random.uniform(30,60), 0, 1)

    if attack == 'Injection':
        return (random.randint(700,2000), random.uniform(80,300), 
                random.uniform(40,70), random.uniform(40,70), 
                random.uniform(30,60), 0, 1)

    if attack == 'Spoofing':
        return (random.randint(300,1000), random.uniform(60,200), 
                random.uniform(30,60), random.uniform(30,60), 
                random.uniform(40,70), 0, 1)


In [None]:
data = []

for _ in range(50000):
    attack = random.choices(
        ATTACK_TYPES,
        weights=[0.5, 0.15, 0.1, 0.1, 0.1, 0.05]
    )[0]

    packet, latency, cpu, mem, battery, integrity, anomaly = generate_attack_features(attack)

    data.append([
        random_timestamp(),
        f"device_{random.randint(1,500)}",
        random.choice(SECTORS),
        random.randint(1,100),
        random_ip(),
        random_ip(),
        random.choice(PROTOCOLS),
        packet,
        latency,
        random.choice([0,1]),
        cpu,
        mem,
        battery,
        random.uniform(20,80),
        random.choice(OPERATIONS),
        integrity,
        attack,
        anomaly
    ])

columns = [
    'timestamp','device_id','sector','location_id','ip_src','ip_dest',
    'protocol','packet_size','latency_ms','connection_status',
    'cpu_usage_percent','memory_usage_percent','battery_level',
    'temperature_c','operation_type','data_value_integrity',
    'attack_type','is_anomaly'
]

df = pd.DataFrame(data, columns=columns)
df.head()


In [None]:
df.to_csv("iot_security_dataset.csv", index=False)
print("Dataset shape:", df.shape)


In [None]:
label_encoders = {}

for col in ['device_id','sector','protocol','operation_type','attack_type']:
    le = LabelEncoder()
    df[col] = le.fit_transform(df[col])
    label_encoders[col] = le


In [None]:
X = df.drop(['attack_type','is_anomaly','timestamp','ip_src','ip_dest'], axis=1)
y_attack = df['attack_type']
y_anomaly = df['is_anomaly']

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)


In [None]:
iso_forest = IsolationForest(
    n_estimators=200,
    contamination=0.3,
    random_state=42
)

iso_forest.fit(X_scaled)


In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y_attack, test_size=0.2, random_state=42
)

xgb_model = XGBClassifier(
    n_estimators=250,
    max_depth=6,
    learning_rate=0.1,
    eval_metric='mlogloss'
)

xgb_model.fit(X_train, y_train)


In [None]:
y_pred = xgb_model.predict(X_test)

print("Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred))


In [None]:
joblib.dump(xgb_model, "xgboost_attack_model.pkl")
joblib.dump(iso_forest, "isolation_forest_model.pkl")
joblib.dump(scaler, "scaler.pkl")
joblib.dump(label_encoders, "label_encoders.pkl")

print("All models saved successfully")


In [None]:
ATTACK_MAP = {
    0: "Normal",
    1: "DDoS",
    2: "Ransomware",
    3: "MITM",
    4: "Injection",
    5: "Spoofing"
}


TESTING

In [None]:
test_data = []

for _ in range(1000):
    attack = random.choices(
        ATTACK_TYPES,
        weights=[0.5, 0.15, 0.1, 0.1, 0.1, 0.05]
    )[0]

    packet, latency, cpu, mem, battery, integrity, anomaly = generate_attack_features(attack)

    test_data.append([
        random_timestamp(),
        f"device_{random.randint(501,800)}",  # unseen devices
        random.choice(SECTORS),
        random.randint(101,150),              # unseen locations
        random_ip(),
        random_ip(),
        random.choice(PROTOCOLS),
        packet,
        latency,
        random.choice([0,1]),
        cpu,
        mem,
        battery,
        random.uniform(20,80),
        random.choice(OPERATIONS),
        integrity,
        attack,
        anomaly
    ])

test_df = pd.DataFrame(test_data, columns=columns)
test_df.head()


In [None]:
def safe_label_transform(le, series):
    known_classes = set(le.classes_)
    return series.apply(lambda x: le.transform([x])[0] if x in known_classes else -1)


In [None]:
# Encode categorical columns safely
for col, le in label_encoders.items():
    test_df[col] = safe_label_transform(le, test_df[col])


In [None]:
X_test_final = test_df.drop(
    ['attack_type','is_anomaly','timestamp','ip_src','ip_dest'], axis=1
)

y_test_attack = test_df['attack_type']
y_test_anomaly = test_df['is_anomaly']

X_test_scaled = scaler.transform(X_test_final)


In [None]:
iso_preds = iso_forest.predict(X_test_scaled)
iso_preds = np.where(iso_preds == -1, 1, 0)

print("Anomaly Detection Results:")
print(classification_report(y_test_anomaly, iso_preds))


In [None]:
attack_preds = xgb_model.predict(X_test_scaled)

print("Attack Classification Results:")
print("Accuracy:", accuracy_score(y_test_attack, attack_preds))
print("\nClassification Report:\n",
      classification_report(y_test_attack, attack_preds))


In [None]:
print("Confusion Matrix:")
print(confusion_matrix(y_test_attack, attack_preds))


In [None]:
print("FINAL TEST SUMMARY")
print("------------------")
print(f"Total Test Samples: {len(test_df)}")
print(f"Attack Model Accuracy: {accuracy_score(y_test_attack, attack_preds):.4f}")
