<a href="https://colab.research.google.com/github/Pandit-C/DS-03/blob/main/Isolation_Forest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, ConfusionMatrixDisplay
from IPython.display import display, clear_output
import ipywidgets as widgets

# === Simulate Dataset A (Training) ===
data_a = pd.DataFrame({
    'CPU': [23.5, 25.1, 22.8, 24.3, 23.9, 24.7],
    'Memory': [1024, 1035, 1018, 1027, 1022, 1030],
    'Disk': [45.2, 46.0, 44.8, 45.5, 45.0, 45.7],
    'Network': [12.3, 11.8, 13.1, 12.6, 12.9, 12.4]
})
data_a['Timestamp'] = pd.date_range('2024-08-15 14:00:00', periods=len(data_a), freq='min')
data_a['Set'] = 'A (Training)'
data_a['True Anomaly'] = 'No'

# === Dataset B with anomalies ===
np.random.seed(42)
normal_data = pd.DataFrame({
    'CPU': np.random.normal(24, 0.5, 17),
    'Memory': np.random.normal(1025, 5, 17).astype(int),
    'Disk': np.random.normal(45, 0.5, 17),
    'Network': np.random.normal(12.5, 0.5, 17)
})
anomaly_data = pd.DataFrame({
    'CPU': [5, 95, 2, 88, 3],
    'Memory': [100, 15000, 90, 13000, 120],
    'Disk': [10, 85, 5, 90, 8],
    'Network': [1, 70, 0.5, 55, 2]
})
data_b_updated = pd.concat([normal_data, anomaly_data], ignore_index=True).sample(frac=1, random_state=2).reset_index(drop=True)
data_b_updated['True Anomaly'] = ['No'] * 17 + ['Yes'] * 5
data_b_updated['Set'] = 'B (Test)'
data_b_updated['Timestamp'] = pd.date_range('2024-08-15 14:06:00', periods=len(data_b_updated), freq='min')

# === Train model ===
features = ['CPU', 'Memory', 'Disk', 'Network']
full_data = pd.concat([data_a, data_b_updated], ignore_index=True)
clf = IsolationForest(contamination=0.227, random_state=0)
clf.fit(data_a[features])
full_data['Anomaly Score'] = -clf.decision_function(full_data[features])
base_full = full_data.copy()

# === Slack-style alert function ===
def send_slack_alert(timestamp, cpu, mem, disk, net, score):
    print(f"🔔 [SLACK ALERT]")
    print(f"*Anomaly Detected* at `{timestamp}`")
    print(f"> CPU: `{cpu}`, Memory: `{mem}`, Disk: `{disk}`, Network: `{net}`")
    print(f"> Anomaly Score: `{score:.4f}`")
    print("Posted to #anomaly-alerts 🚨")

# === Widgets ===
threshold_dropdown = widgets.Dropdown(
    options=[0.0, -0.1, -0.2, -0.3],
    value=-0.2,
    description='Threshold:'
)
x_feature_dropdown = widgets.Dropdown(options=features, value='CPU', description='X-axis')

def update_y_options(*args):
    y_feature_dropdown.options = [f for f in features if f != x_feature_dropdown.value]
    if y_feature_dropdown.value == x_feature_dropdown.value:
        y_feature_dropdown.value = y_feature_dropdown.options[0]

y_feature_dropdown = widgets.Dropdown(options=[f for f in features if f != 'CPU'], value='Memory', description='Y-axis')
x_feature_dropdown.observe(update_y_options, 'value')

# === Main update function ===
def update_with_features(threshold, x_feature, y_feature):
    df = base_full.copy()
    df['Predicted Label'] = np.where(df['Anomaly Score'] < threshold, 'Anomaly', 'Normal')
    y_true = df['True Anomaly'].map({'Yes': 1, 'No': 0})
    y_pred = df['Predicted Label'].map({'Anomaly': 1, 'Normal': 0})

    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    cm = confusion_matrix(y_true, y_pred)
    tn, fp, fn, tp = cm.ravel()

    anomalies_df = df[(df['Predicted Label'] == 'Anomaly') | (df['True Anomaly'] == 'Yes')].copy()
    anomalies_df = anomalies_df[['Timestamp', 'CPU', 'Memory', 'Disk', 'Network',
                                 'Anomaly Score', 'True Anomaly', 'Predicted Label']]
    anomalies_df = anomalies_df.sort_values(by='Anomaly Score', ascending=True).reset_index(drop=True)

    clear_output(wait=True)
    display(widgets.HTML(f"<b>Contamination Level Used:</b> 22.7%"))
    display(widgets.HBox([threshold_dropdown, x_feature_dropdown, y_feature_dropdown]))

    print(f"\nThreshold: {threshold:.4f}")
    print(f"Anomaly Score Range: {df['Anomaly Score'].min():.4f} to {df['Anomaly Score'].max():.4f}")
    print(f"Confusion Matrix: TP={tp}, FP={fp}, TN={tn}, FN={fn}")
    print(f"Precision: {precision:.2f} | Recall: {recall:.2f} | F1 Score: {f1:.2f}")
    display(anomalies_df.head(10).style.format(precision=2))

    rule_violations = df[(df['CPU'] > 80) | (df['Memory'] > 10000) | (df['Disk'] > 80) | (df['Network'] > 50)]
    if not rule_violations.empty:
        print(f"\n⚠️ Rule violations: {len(rule_violations)}")
        display(rule_violations[['Timestamp', 'CPU', 'Memory', 'Disk', 'Network']].style.format(precision=2))

    recent_anomalies = df[(df['Predicted Label'] == 'Anomaly') & (df['Set'] == 'B (Test)')]
    if not recent_anomalies.empty:
        latest = recent_anomalies.sort_values(by='Timestamp', ascending=False).iloc[0]
        send_slack_alert(
            latest['Timestamp'], latest['CPU'], latest['Memory'],
            latest['Disk'], latest['Network'], latest['Anomaly Score']
        )
    else:
        print("\n✅ No current anomalies detected.")

    # Plot 1: Confusion Matrix
    fig, ax = plt.subplots(figsize=(4, 4))
    ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Normal', 'Anomaly']).plot(
        cmap='Blues', values_format='d', ax=ax, colorbar=False)
    plt.tight_layout()
    plt.show()

    # Plot 2: Anomaly Score Histogram
    plt.figure(figsize=(8, 4))
    plt.hist(df['Anomaly Score'], bins=20, color='gray', edgecolor='black')
    plt.axvline(threshold, color='red', linestyle='--', label=f'Threshold = {threshold:.2f}')
    plt.title("Anomaly Score Distribution")
    plt.xlabel("Anomaly Score")
    plt.ylabel("Frequency")
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Plot 3: Scatter
    plt.figure(figsize=(8, 6))
    for label in ['Normal', 'Anomaly']:
        subset = df[df['Predicted Label'] == label]
        marker_type = 'o' if label == 'Normal' else 'x'
        edge_color = 'black' if marker_type != 'x' else None
        plt.scatter(subset[x_feature], subset[y_feature],
                    label=label,
                    c='green' if label == 'Normal' else 'red',
                    s=100,
                    edgecolors=edge_color,
                    marker=marker_type)

    pred_anoms = df[df['Predicted Label'] == 'Anomaly'].reset_index()
    for idx, row in pred_anoms.iterrows():
        plt.text(row[x_feature], row[y_feature], f"A{idx+1}", fontsize=9, color='darkred', weight='bold')

    true_anoms = df[df['True Anomaly'] == 'Yes']
    plt.scatter(true_anoms[x_feature], true_anoms[y_feature],
                facecolors='none', edgecolors='black',
                s=150, linewidths=1.5, label='True Anomaly (Outline)')

    plt.title(f"Scatter Plot: {x_feature} vs {y_feature} (Tagged Anomalies)")
    plt.xlabel(x_feature)
    plt.ylabel(y_feature)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

# === Bind interactive output ===
interactive_out = widgets.interactive_output(update_with_features, {
    'threshold': threshold_dropdown,
    'x_feature': x_feature_dropdown,
    'y_feature': y_feature_dropdown
})
display(widgets.HBox([threshold_dropdown, x_feature_dropdown, y_feature_dropdown]), interactive_out)


HBox(children=(Dropdown(description='Threshold:', index=2, options=(0.0, -0.1, -0.2, -0.3), value=-0.2), Dropd…

Output()