<a href="https://colab.research.google.com/github/UllasAcharya16/ZeroDay-Attack/blob/main/TABA_MODEL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# =========================
# Zero-Day / Attack Detection (KMeans + PCA + Semi-circular Hull Augmentation)
# =========================

# Step 1: Import Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from scipy.spatial import ConvexHull
from scipy.spatial import distance
from sklearn.metrics import pairwise_distances_argmin_min

# ---- File paths (keep your existing filenames if those are what you have) ----
UNLABELED_TRAIN_PATH = '/content/reduced_transactions_200.csv'
KNOWN_ATTACK_PATH    = '/content/fraud_transaction.csv'          # rename if you have a different file
TEST_DATA_PATH       = '/content/fraudulent_transactions.csv'    # rename if you have a different file

def point_in_hull(point, hull_points):
    try:
        hull = ConvexHull(hull_points)
        test_hull = ConvexHull(np.vstack([hull_points, point.reshape(1, -1)]))
        # If the point is inside, the hull shouldn't gain new vertices
        return len(test_hull.vertices) == len(hull.vertices)
    except:
        return False

def create_augmented_cluster_points(cluster_points, semi_circle_radius=0.1):
    """
    Create augmented points with semi-circles for a cluster
    (Topology-inspired boundary thickening)
    """
    augmented_points = []
    for pivot in cluster_points:
        other_points = cluster_points[~np.all(cluster_points == pivot, axis=1)]
        if len(other_points) > 0:
            distances = distance.cdist([pivot], other_points)
            nearest_same = other_points[np.argmin(distances)]
            vec = nearest_same - pivot
            angle = np.arctan2(vec[1], vec[0]) * 180 / np.pi
            angle = (angle + 180) % 360
            theta = np.linspace(np.deg2rad(angle - 90), np.deg2rad(angle + 90), 10)
            semi_circle = np.column_stack((
                pivot[0] + semi_circle_radius * np.cos(theta),
                pivot[1] + semi_circle_radius * np.sin(theta)
            ))
            augmented_points.extend(semi_circle)

    return np.vstack([cluster_points, np.array(augmented_points)]) if len(augmented_points) > 0 else cluster_points

# Step 2: Load and preprocess training data (unlabeled)
print("=== STEP 1: Loading and clustering unlabeled training data ===")
df = pd.read_csv(UNLABELED_TRAIN_PATH)
df_numeric = df.select_dtypes(include=[np.number])
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df_numeric)

# Step 3: PCA Reduction
pca = PCA(n_components=2)
reduced_data = pca.fit_transform(scaled_data)

# Step 4: Clustering (unlabeled - we don't know which is attack/normal yet)
kmeans = KMeans(n_clusters=2, random_state=0)
labels = kmeans.fit_predict(reduced_data)
centroids = kmeans.cluster_centers_

print(f"Created 2 clusters from training data:")
print(f"Cluster 0: {np.sum(labels == 0)} points")
print(f"Cluster 1: {np.sum(labels == 1)} points")
print()

# Step 5: Load known attack data to identify which cluster is attack
print("=== STEP 2: Using known attack data to identify ATTACK cluster ===")
known_attack_df = pd.read_csv(KNOWN_ATTACK_PATH)  # Known attack samples
known_attack_numeric = known_attack_df.select_dtypes(include=[np.number])
scaled_known_attack = scaler.transform(known_attack_numeric)
known_attack_reduced = pca.transform(scaled_known_attack)

# Assign known attack points to clusters
attack_cluster_assignments, _ = pairwise_distances_argmin_min(known_attack_reduced, centroids)

# Determine which cluster has more attack points
cluster_0_attack_count = np.sum(attack_cluster_assignments == 0)
cluster_1_attack_count = np.sum(attack_cluster_assignments == 1)

print(f"Known attack points in Cluster 0: {cluster_0_attack_count}")
print(f"Known attack points in Cluster 1: {cluster_1_attack_count}")

# Identify attack and normal clusters
if cluster_0_attack_count > cluster_1_attack_count:
    attack_cluster_id = 0
    normal_cluster_id = 1
    print(f"Cluster 0 identified as ATTACK cluster ({cluster_0_attack_count} attack points)")
    print(f"Cluster 1 identified as NORMAL cluster ({cluster_1_attack_count} attack points)")
else:
    attack_cluster_id = 1
    normal_cluster_id = 0
    print(f"Cluster 1 identified as ATTACK cluster ({cluster_1_attack_count} attack points)")
    print(f"Cluster 0 identified as NORMAL cluster ({cluster_0_attack_count} attack points)")
print()

# Step 6: Plot training data with identified clusters
fig, ax = plt.subplots(figsize=(10, 8))
colors = ['red' if i == attack_cluster_id else 'blue' for i in range(2)]
cluster_names = [f'Attack Cluster ({attack_cluster_id})' if i == attack_cluster_id else f'Normal Cluster ({normal_cluster_id})' for i in range(2)]
semi_circle_radius = 0.1

# Store augmented boundaries for later use
cluster_boundaries = {}

for cluster_id in range(2):
    cluster_points = reduced_data[labels == cluster_id]
    color = colors[cluster_id]

    # Plot points
    for point in cluster_points:
        ax.scatter(point[0], point[1], color=color, edgecolor='black', s=100, zorder=5)

    # Create augmented points and store boundary
    augmented_points = create_augmented_cluster_points(cluster_points, semi_circle_radius)
    cluster_boundaries[cluster_id] = augmented_points

    # Draw convex hull
    if len(augmented_points) >= 3:
        hull = ConvexHull(augmented_points)
        for simplex in hull.simplices:
            ax.plot(augmented_points[simplex, 0], augmented_points[simplex, 1], 'k-', linewidth=2)

# Plot known attack points used for identification
ax.scatter(known_attack_reduced[:, 0], known_attack_reduced[:, 1],
          c='orange', marker='x', s=100, label='Known Attack (for ID)', zorder=6)

plt.title("Training Data Clusters with Identified Attack/Normal Labels", fontsize=16)
plt.legend([cluster_names[0], cluster_names[1], 'Known Attack (for ID)'], loc='best')
plt.grid(True)
plt.show()

# Step 7: Load and process test data
print("=== STEP 3: Processing test data ===")
test_df = pd.read_csv(TEST_DATA_PATH)  # Your actual test data
test_numeric = test_df.select_dtypes(include=[np.number])
scaled_test = scaler.transform(test_numeric)
test_reduced = pca.transform(scaled_test)

# Step 8: Assign test points to clusters (for visualization)
closest_clusters, distances = pairwise_distances_argmin_min(test_reduced, centroids)

# Step 9: Plot test + training data with identified cluster boundaries
fig, ax = plt.subplots(figsize=(10, 8))
train_colors = ['lightcoral' if i == attack_cluster_id else 'lightblue' for i in range(2)]
test_color = 'darkgreen'

for cluster_id in range(2):
    train_pts = reduced_data[labels == cluster_id]
    test_pts = test_reduced[closest_clusters == cluster_id]

    cluster_type = "Attack" if cluster_id == attack_cluster_id else "Normal"

    # Plot training points
    ax.scatter(train_pts[:, 0], train_pts[:, 1], c=train_colors[cluster_id],
              alpha=0.6, label=f'Training {cluster_type} Cluster {cluster_id}')

    # Plot test points
    ax.scatter(test_pts[:, 0], test_pts[:, 1], c=test_color, edgecolors='black',
              s=50, label='Test Data' if cluster_id == 0 else "")

    # Draw boundaries ONLY around training data
    augmented_points = cluster_boundaries[cluster_id]
    if len(augmented_points) >= 3:
        hull = ConvexHull(augmented_points)
        for simplex in hull.simplices:
            ax.plot(augmented_points[simplex, 0], augmented_points[simplex, 1], 'k-', linewidth=2)

# Plot centroids
ax.scatter(centroids[:, 0], centroids[:, 1], c='black', s=200, marker='X',
          label='Centroids')

plt.title("Test Data with Identified Attack/Normal Cluster Boundaries")
plt.legend()
plt.grid(True)
plt.show()

# Step 10: SIMPLIFIED Boundary-based classification
print("=== SIMPLIFIED ZERO-DAY / ATTACK DETECTION ===")
print()

def calculate_anomaly_score(d_normal, d_attack):
    """
    Calculate anomaly score for points outside both clusters.
    Higher score = more anomalous
    """
    min_distance = min(d_normal, d_attack)
    max_distance = max(d_normal, d_attack)

    # Anomaly score: combination of minimum distance (isolation) and distance ratio
    isolation_score = min_distance  # How far from nearest cluster
    balance_score = min_distance / max_distance  # How balanced between clusters

    # Combined anomaly score (higher = more anomalous)
    anomaly_score = isolation_score * (1 + balance_score)
    return anomaly_score

for i, point in enumerate(test_reduced):
    # Check if point is inside cluster boundaries
    inside_normal = point_in_hull(point, cluster_boundaries[normal_cluster_id])
    inside_attack = point_in_hull(point, cluster_boundaries[attack_cluster_id])

    # SIMPLIFIED LOGIC
    if inside_attack:
        print(f"Test Point {i+1}: 🚨 ATTACK")
    elif inside_normal:
        print(f"Test Point {i+1}: ✅ NORMAL")
    else:
        # Point is outside both boundaries - NOW we calculate distances and anomaly score
        d_norm = np.linalg.norm(point - centroids[normal_cluster_id])
        d_attk = np.linalg.norm(point - centroids[attack_cluster_id])
        anomaly_score = calculate_anomaly_score(d_norm, d_attk)

        if d_attk < d_norm:
            print(f"Test Point {i+1}: ⚠️ ZERO-DAY ATTACK")
            print(f"   Distance to attack center: {d_attk:.3f}")
            print(f"   Distance to normal center: {d_norm:.3f}")
            print(f"   🔍 ANOMALY SCORE: {anomaly_score:.3f}")
        else:
            print(f"Test Point {i+1}: ❓ ANOMALY")
            print(f"   Distance to normal center: {d_norm:.3f}")
            print(f"   Distance to attack center: {d_attk:.3f}")
            print(f"   🔍 ANOMALY SCORE: {anomaly_score:.3f}")

    print()

print("=== SUMMARY ===")
print("✅ NORMAL: Point inside normal cluster")
print("🚨 ATTACK: Point inside attack cluster")
print("⚠️ ZERO-DAY ATTACK: Outside both clusters, closer to attack")
print("❓ ANOMALY: Outside both clusters, closer to normal")
print()
print(f"Identified Clusters:")
print(f"- Attack Cluster: {attack_cluster_id}")
print(f"- Normal Cluster: {normal_cluster_id}")
