In [43]:
import pandas as pd
import math

Menghitung jarak euclidean antara dua titik

In [44]:
def euclidean_distance(point1, point2):
    distance = 0
    for i in range(len(point1)):
        distance += (point1[i] - point2[i]) ** 2
    return math.sqrt(distance)

Mencari k tetangga terdekat untuk suatu titik

In [45]:
def get_neighbors(X_train, y_train, test_point, k):
    distances = []
    for idx, train_point in enumerate(X_train):
        dist = euclidean_distance(test_point, train_point)
        distances.append((dist, y_train[idx]))
    
    distances.sort(key=lambda x: x[0])
    return distances[:k]

Melakukan prediksi untuk satu titik berdasarkan tetangga terdekat

In [46]:
def predict_point(neighbors):
    votes = {}
    for _, label in neighbors:
        if label in votes:
            votes[label] += 1
        else:
            votes[label] = 1
    
    return max(votes.items(), key=lambda x: x[1])[0]

Melakukan prediksi menggunakan KNN untuk semua data test

In [47]:
def knn_predict(X_train, y_train, X_test, k):
    predictions = []
    for test_point in X_test:
        neighbors = get_neighbors(X_train, y_train, test_point, k)
        prediction = predict_point(neighbors)
        predictions.append(prediction)
    
    return predictions

Memuat data training dan testing dari file CSV

In [48]:
def load_data(train_path, test_path):
    # Load training data
    train_df = pd.read_csv(train_path)
    
    # Load testing data
    test_df = pd.read_csv(test_path)
    
    # fitur yang akan digunakan
    features = ['request_count', 'error_count', 'avg_bytes', 'error_rate', 'suspicious_score']
    
    # Memisahkan fitur dan label untuk data training
    X_train = train_df[features].values
    y_train = train_df['label'].values
    
    # Ambil fitur untuk data testing (tanpa label)
    X_test = test_df[features].values
    ip_test = test_df['ip_address'].values
    
    return X_train, y_train, X_test, ip_test, train_df, test_df

Mengkonversi label numerik ke kategori serangan

In [49]:
def get_attack_category(label):
    categories = {
        0: "Normal Traffic",
        1: "Suspicious Activity",
        2: "Potential Attack",
        3: "Confirmed Attack"
    }
    return categories.get(label, "Unknown")

Menampilkan informasi dataset training dan testing

In [50]:
def print_dataset_info(X_train, y_train, X_test, ip_test, train_df, test_df):
    print("\nINFORMASI DATASET")
    print("=" * 50)
    
    print(f"\nJumlah Data Training: {len(X_train)}")
    print(f"Jumlah Data Testing: {len(X_test)}")
    
    print("\nData Training Sample:")
    print("-" * 100)
    print(f"{'IP Address':<20} {'Label':<20} {'Request Count':<15} {'Error Count':<15} {'Error Rate':<15} {'Suspicious Score':<15}")
    print("-" * 100)
    
    for i in range(min(5, len(X_train))):  # Tampilkan 5 data pertama
        ip = train_df['ip_address'].iloc[i]
        label = get_attack_category(y_train[i])
        req_count, err_count, _, err_rate, susp_score = X_train[i]
        print(f"{ip:<20} {label:<20} {req_count:<15.4f} {err_count:<15.4f} {err_rate:<15.4f} {susp_score:<15.4f}")
    
    print("\nData Testing Sample:")
    print("-" * 100)
    print(f"{'IP Address':<20} {'Request Count':<15} {'Error Count':<15} {'Error Rate':<15} {'Suspicious Score':<15}")
    print("-" * 100)
    
    for i in range(min(5, len(X_test))):  # Tampilkan 5 data pertama
        ip = ip_test[i]
        req_count, err_count, _, err_rate, susp_score = X_test[i]
        print(f"{ip:<20} {req_count:<15.4f} {err_count:<15.4f} {err_rate:<15.4f} {susp_score:<15.4f}")

Menampilkan hasil klasifikasi

In [51]:
def print_classification_results(ip_addresses, predictions, test_df):
    print("\nHASIL KLASIFIKASI")
    print("=" * 80)
    print(f"{'IP Address':<20} {'Predicted Category':<25} {'Details'}")
    print("-" * 80)
    
    for ip, pred in zip(ip_addresses, predictions):
        predicted_category = get_attack_category(pred)
        
        # Ambil detail metrics untuk IP ini
        ip_data = test_df[test_df['ip_address'] == ip].iloc[0]
        details = f"RC:{ip_data['request_count']:.3f}, EC:{ip_data['error_count']:.3f}, ER:{ip_data['error_rate']:.3f}, SS:{ip_data['suspicious_score']:.3f}"
        
        print(f"{ip:<20} {predicted_category:<25} {details}")



Menampilkan statistik ringkasan klasifikasi

In [52]:
def print_summary_statistics(predictions):
    print("\nRINGKASAN KLASIFIKASI")
    print("=" * 50)
    
    categories = {
        0: "Normal Traffic",
        1: "Suspicious Activity",
        2: "Potential Attack",
        3: "Confirmed Attack"
    }
    
    predicted_counts = {}
    for pred in predictions:
        category = categories[pred]
        predicted_counts[category] = predicted_counts.get(category, 0) + 1
    
    total = len(predictions)
    for category, count in predicted_counts.items():
        percentage = (count / total) * 100
        print(f"{category:<20}: {count:3d} ({percentage:5.1f}%)")

In [53]:
def save_classification_results(ip_addresses, predictions, test_df, output_file):
    """
    Menyimpan hasil klasifikasi ke file CSV
    """
    results = []
    for ip, pred in zip(ip_addresses, predictions):
        ip_data = test_df[test_df['ip_address'] == ip].iloc[0]
        
        result = {
            'ip_address': ip,
            'request_count': ip_data['request_count'],
            'error_count': ip_data['error_count'],
            'error_rate': ip_data['error_rate'],
            'suspicious_score': ip_data['suspicious_score'],
            'category': get_attack_category(pred),
            'category_code': pred,  # Menyimpan kode numerik kategori
        }
        results.append(result)
    
    # Convert ke DataFrame dan simpan ke CSV
    results_df = pd.DataFrame(results)
    results_df.to_csv(output_file, index=False)
    print(f"\nHasil klasifikasi telah disimpan ke: {output_file}")


In [54]:
def evaluate_model(y_true, y_pred):
    """
    Menghitung metrik evaluasi model
    """
    # Total data
    total = len(y_true)

    # Confusion Matrix
    confusion_matrix = {
        "TP": 0,  # True Positive: Serangan terdeteksi dengan benar
        "TN": 0,  # True Negative: Normal traffic  dengan benar
        "FP": 0,  # False Positive: Normal traffic salah terdeteksi sebagai serangan
        "FN": 0,  # False Negative: Serangan tidak terdeteksi (miss)
    }

    # Hitung nilai untuk setiap kategori
    for true, pred in zip(y_true, y_pred):
        if true == pred:
            if pred > 0:  # Attack (Positive)
                confusion_matrix["TP"] += 1
            else:  # Normal (Negative)
                confusion_matrix["TN"] += 1
        else:
            if pred > 0:  # False Attack
                confusion_matrix["FP"] += 1
            else:  # Missed Attack
                confusion_matrix["FN"] += 1

    # Hitung metrik
    accuracy = (confusion_matrix["TP"] + confusion_matrix["TN"]) / total

    precision = (
        confusion_matrix["TP"] / (confusion_matrix["TP"] + confusion_matrix["FP"])
        if (confusion_matrix["TP"] + confusion_matrix["FP"]) > 0
        else 0
    )

    recall = (
        confusion_matrix["TP"] / (confusion_matrix["TP"] + confusion_matrix["FN"])
        if (confusion_matrix["TP"] + confusion_matrix["FN"]) > 0
        else 0
    )

    f1_score = (
        2 * (precision * recall) / (precision + recall)
        if (precision + recall) > 0
        else 0
    )

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1_score,
        "confusion_matrix": confusion_matrix,
    }

In [56]:
def k_fold_cross_validation(X, y, k_folds=5):
    """
    Melakukan k-fold cross validation
    """
    # Gabung data
    data = list(zip(X, y))
    fold_size = len(data) // k_folds
    metrics_list = []
    
    for i in range(k_folds):
        # Split data untuk fold 
        test_start = i * fold_size
        test_end = (i + 1) * fold_size
        
        test_data = data[test_start:test_end]
        train_data = data[:test_start] + data[test_end:]
        
        # Memishakan X dan y
        X_train, y_train = zip(*train_data)
        X_test, y_test = zip(*test_data)
        
        # Training dan prediksi
        k = 3  # nilai k untuk KNN
        predictions = knn_predict(list(X_train), list(y_train), list(X_test), k)
        
        # Evaluasi
        metrics = evaluate_model(y_test, predictions)
        metrics_list.append(metrics)
    
    # Hitung rata-rata metrik
    avg_metrics = {
        'accuracy': sum(m['accuracy'] for m in metrics_list) / k_folds,
        'precision': sum(m['precision'] for m in metrics_list) / k_folds,
        'recall': sum(m['recall'] for m in metrics_list) / k_folds,
        'f1_score': sum(m['f1_score'] for m in metrics_list) / k_folds
    }
    
    return avg_metrics

In [None]:
def print_evaluation_results(metrics):
    """
    Menampilkan hasil evaluasi model
    """
    print("\nEVALUASI MODEL")
    print("=" * 50)
    
    # Tampilkan metrik utama
    print("\nMetrik Evaluasi:")
    print(f"Accuracy  : {metrics['accuracy']:.4f}")
    print(f"Precision : {metrics['precision']:.4f}")
    print(f"Recall    : {metrics['recall']:.4f}")
    print(f"F1-Score  : {metrics['f1_score']:.4f}")
    
    # Tampilkan confusion matrix
    cm = metrics['confusion_matrix']
    print("\nConfusion Matrix:")
    print("-" * 40)
    print(f"True Positive (TP)  : {cm['TP']}")
    print(f"True Negative (TN)  : {cm['TN']}")
    print(f"False Positive (FP) : {cm['FP']}")
    print(f"False Negative (FN) : {cm['FN']}")
    
    # Tambahan interpretasi
    print("\nInterpretasi:")
    print(f"- Model berhasil mendeteksi {cm['TP']} serangan dengan benar")
    print(f"- Model berhasil mengidentifikasi {cm['TN']} traffic normal dengan benar")
    print(f"- Model salah mengklasifikasikan {cm['FP']} traffic normal sebagai serangan")
    print(f"- Model gagal mendeteksi {cm['FN']} serangan")

In [57]:
def main():
    # Load data
    print("Loading data...")
    X_train, y_train, X_test, ip_test, train_df, test_df = load_data(
        'data/normalized_logs.csv',
        'data/testing.csv'
    )
    
    # Tampilkan informasi dataset
    print_dataset_info(X_train, y_train, X_test, ip_test, train_df, test_df)
    
    # K-fold cross validation pada data training
    print("\nMelakukan k-fold cross validation...")
    cv_metrics = k_fold_cross_validation(X_train, y_train)
    print("\nHasil Cross Validation:")
    print(f"Average Accuracy  : {cv_metrics['accuracy']:.4f}")
    print(f"Average Precision : {cv_metrics['precision']:.4f}")
    print(f"Average Recall    : {cv_metrics['recall']:.4f}")
    print(f"Average F1-Score  : {cv_metrics['f1_score']:.4f}")
    

    k = 3
    print(f"\nMelakukan prediksi dengan k={k}...")
    predictions = knn_predict(X_train, y_train, X_test, k)
    
    # Tampilkan hasil klasifikasi
    print_classification_results(ip_test, predictions, test_df)
    
    # Tampilkan ringkasan statistik
    print_summary_statistics(predictions)
    
    # Evaluasi model jika ada label aktual untuk data testing
    if 'label' in test_df.columns:
        print("\nEvaluasi performa model pada data testing...")
        metrics = evaluate_model(test_df['label'].values, predictions)
        print_evaluation_results(metrics)

if __name__ == "__main__":
    main()


Loading data...

INFORMASI DATASET

Jumlah Data Training: 100
Jumlah Data Testing: 25

Data Training Sample:
----------------------------------------------------------------------------------------------------
IP Address           Label                Request Count   Error Count     Error Rate      Suspicious Score
----------------------------------------------------------------------------------------------------
1.34.111.115         Normal Traffic       0.0000          0.0000          0.0000          0.0000         
101.36.106.89        Normal Traffic       0.0148          0.0084          0.2857          0.3000         
103.149.26.249       Suspicious Activity  0.0025          0.0042          0.5000          0.0000         
103.164.60.19        Potential Attack     1.0000          0.1506          0.0887          1.0000         
103.186.212.5        Normal Traffic       0.0049          0.0042          0.3333          0.0000         

Data Testing Sample:
------------------------------