In [4]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

class AnomalyDetectionComparator:
    """
    Sistem untuk membandingkan efektivitas deteksi anomali 
    berdasarkan berbagai tingkat sensitivitas
    """
    
    def __init__(self):
        self.anomalies_list = None
        self.detection_results = {}
        self.comparison_results = {}
        
    def load_anomalies_list(self, file_path: str) -> pd.DataFrame:
        """
        Load file anomalies_list yang berisi ground truth anomali
        """
        try:
            df = pd.read_csv(file_path, sep=';')
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            self.anomalies_list = df
            print(f"✅ Berhasil load {len(df)} anomali dari ground truth")
            return df
        except Exception as e:
            print(f"❌ Error loading anomalies list: {e}")
            return None
    
    def load_detection_results(self, sensitivity_files: Dict[str, str]) -> Dict[str, pd.DataFrame]:
        """
        Load hasil deteksi dari berbagai tingkat sensitivitas
        
        sensitivity_files: dict dengan format
        {
            'high': 'path/to/anomaly_high_sensitivity.csv',
            'medium': 'path/to/anomaly_medium_sensitivity.csv', 
            'low': 'path/to/anomaly_low_sensitivity.csv'
        }
        """
        for sensitivity, file_path in sensitivity_files.items():
            try:
                df = pd.read_csv(file_path, sep=';')
                df['created_at'] = pd.to_datetime(df['created_at'])
                self.detection_results[sensitivity] = df
                print(f"✅ Berhasil load {len(df)} deteksi untuk sensitivity {sensitivity}")
            except Exception as e:
                print(f"❌ Error loading {sensitivity} sensitivity: {e}")
        
        return self.detection_results
    
    def create_time_windows(self, window_minutes: int = 5) -> List[Tuple]:
        """
        Membuat window waktu berdasarkan anomali yang ada di ground truth
        """
        if self.anomalies_list is None:
            print("❌ Anomalies list belum di-load")
            return []
        
        windows = []
        for _, anomaly in self.anomalies_list.iterrows():
            start_time = anomaly['timestamp'] - timedelta(minutes=window_minutes//2)
            end_time = anomaly['timestamp'] + timedelta(minutes=window_minutes//2)
            windows.append({
                'anomaly_id': anomaly['record_index'],
                'anomaly_timestamp': anomaly['timestamp'],
                'window_start': start_time,
                'window_end': end_time,
                'anomaly_type': anomaly['anomaly_type'],
                'severity': anomaly['severity'],
                'affected_metrics': anomaly['affected_metrics']
            })
        
        return windows
    
    def check_detection_in_window(self, detection_df: pd.DataFrame, window: Dict) -> bool:
        """
        Cek apakah ada deteksi dalam window waktu tertentu
        """
        detections_in_window = detection_df[
            (detection_df['created_at'] >= window['window_start']) &
            (detection_df['created_at'] <= window['window_end'])
        ]
        return len(detections_in_window) > 0
    
    def calculate_detection_rates(self, window_minutes: int = 5) -> Dict:
        """
        Hitung tingkat deteksi untuk setiap sensitivitas
        """
        if self.anomalies_list is None or not self.detection_results:
            print("❌ Data belum lengkap")
            return {}
        
        windows = self.create_time_windows(window_minutes)
        results = {}
        
        for sensitivity, detection_df in self.detection_results.items():
            detected_count = 0
            total_anomalies = len(windows)
            detected_anomalies = []
            missed_anomalies = []
            
            for window in windows:
                is_detected = self.check_detection_in_window(detection_df, window)
                if is_detected:
                    detected_count += 1
                    detected_anomalies.append(window)
                else:
                    missed_anomalies.append(window)
            
            detection_rate = (detected_count / total_anomalies) * 100 if total_anomalies > 0 else 0
            
            results[sensitivity] = {
                'total_anomalies': total_anomalies,
                'detected_count': detected_count,
                'missed_count': total_anomalies - detected_count,
                'detection_rate_percent': detection_rate,
                'detected_anomalies': detected_anomalies,
                'missed_anomalies': missed_anomalies
            }
            
            print(f"📊 {sensitivity.upper()} Sensitivity:")
            print(f"   - Total Anomali: {total_anomalies}")
            print(f"   - Terdeteksi: {detected_count}")
            print(f"   - Terlewat: {total_anomalies - detected_count}")
            print(f"   - Tingkat Deteksi: {detection_rate:.2f}%")
            print()
        
        self.comparison_results = results
        return results
    
    def analyze_by_severity(self) -> Dict:
        """
        Analisis tingkat deteksi berdasarkan severity anomali
        """
        if not self.comparison_results:
            print("❌ Belum ada hasil comparison")
            return {}
        
        severity_analysis = {}
        
        for sensitivity, results in self.comparison_results.items():
            severity_stats = {}
            
            # Analisis detected anomalies
            for anomaly in results['detected_anomalies']:
                severity = anomaly['severity']
                if severity not in severity_stats:
                    severity_stats[severity] = {'detected': 0, 'total': 0}
                severity_stats[severity]['detected'] += 1
            
            # Analisis missed anomalies
            for anomaly in results['missed_anomalies']:
                severity = anomaly['severity']
                if severity not in severity_stats:
                    severity_stats[severity] = {'detected': 0, 'total': 0}
            
            # Hitung total untuk setiap severity
            all_anomalies = results['detected_anomalies'] + results['missed_anomalies']
            for anomaly in all_anomalies:
                severity = anomaly['severity']
                severity_stats[severity]['total'] += 1
            
            # Hitung persentase
            for severity in severity_stats:
                total = severity_stats[severity]['total']
                detected = severity_stats[severity]['detected']
                severity_stats[severity]['detection_rate'] = (detected / total * 100) if total > 0 else 0
            
            severity_analysis[sensitivity] = severity_stats
        
        return severity_analysis
    
    def analyze_by_anomaly_type(self) -> Dict:
        """
        Analisis tingkat deteksi berdasarkan tipe anomali
        """
        if not self.comparison_results:
            print("❌ Belum ada hasil comparison")
            return {}
        
        type_analysis = {}
        
        for sensitivity, results in self.comparison_results.items():
            type_stats = {}
            
            # Analisis detected anomalies
            for anomaly in results['detected_anomalies']:
                anom_type = anomaly['anomaly_type']
                if anom_type not in type_stats:
                    type_stats[anom_type] = {'detected': 0, 'total': 0}
                type_stats[anom_type]['detected'] += 1
            
            # Analisis missed anomalies
            for anomaly in results['missed_anomalies']:
                anom_type = anomaly['anomaly_type']
                if anom_type not in type_stats:
                    type_stats[anom_type] = {'detected': 0, 'total': 0}
            
            # Hitung total untuk setiap type
            all_anomalies = results['detected_anomalies'] + results['missed_anomalies']
            for anomaly in all_anomalies:
                anom_type = anomaly['anomaly_type']
                type_stats[anom_type]['total'] += 1
            
            # Hitung persentase
            for anom_type in type_stats:
                total = type_stats[anom_type]['total']
                detected = type_stats[anom_type]['detected']
                type_stats[anom_type]['detection_rate'] = (detected / total * 100) if total > 0 else 0
            
            type_analysis[sensitivity] = type_stats
        
        return type_analysis
    
    def generate_comparison_report(self) -> str:
        """
        Generate laporan perbandingan lengkap
        """
        if not self.comparison_results:
            return "❌ Belum ada hasil comparison"
        
        report = []
        report.append("=" * 70)
        report.append("LAPORAN PERBANDINGAN DETEKSI ANOMALI")
        report.append("=" * 70)
        report.append()
        
        # Summary tingkat deteksi
        report.append("📊 RINGKASAN TINGKAT DETEKSI:")
        report.append("-" * 40)
        for sensitivity, results in self.comparison_results.items():
            report.append(f"{sensitivity.upper():>10}: {results['detection_rate_percent']:>6.2f}% ({results['detected_count']}/{results['total_anomalies']})")
        report.append()
        
        # Analisis per severity
        severity_analysis = self.analyze_by_severity()
        if severity_analysis:
            report.append("🔥 ANALISIS PER SEVERITY:")
            report.append("-" * 40)
            for sensitivity, stats in severity_analysis.items():
                report.append(f"\n{sensitivity.upper()}:")
                for severity, data in stats.items():
                    report.append(f"  {severity:>8}: {data['detection_rate']:>6.2f}% ({data['detected']}/{data['total']})")
            report.append()
        
        # Analisis per tipe anomali
        type_analysis = self.analyze_by_anomaly_type()
        if type_analysis:
            report.append("🔍 ANALISIS PER TIPE ANOMALI:")
            report.append("-" * 40)
            for sensitivity, stats in type_analysis.items():
                report.append(f"\n{sensitivity.upper()}:")
                for anom_type, data in stats.items():
                    report.append(f"  {anom_type:>15}: {data['detection_rate']:>6.2f}% ({data['detected']}/{data['total']})")
            report.append()
        
        # Rekomendasi
        report.append("💡 REKOMENDASI:")
        report.append("-" * 40)
        best_sensitivity = max(self.comparison_results.keys(), 
                             key=lambda x: self.comparison_results[x]['detection_rate_percent'])
        report.append(f"• Sensitivity terbaik: {best_sensitivity.upper()} ({self.comparison_results[best_sensitivity]['detection_rate_percent']:.2f}%)")
        
        # Cari sensitivity dengan balance terbaik (tidak terlalu banyak false positive)
        report.append("• Pertimbangkan trade-off antara detection rate dan false positive rate")
        report.append("• Monitor performa secara berkala dan sesuaikan threshold")
        
        return "\n".join(report)
    
    def visualize_results(self):
        """
        Buat visualisasi hasil perbandingan
        """
        if not self.comparison_results:
            print("❌ Belum ada hasil comparison")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        fig.suptitle('Analisis Perbandingan Deteksi Anomali', fontsize=16, fontweight='bold')
        
        # 1. Bar chart tingkat deteksi
        sensitivities = list(self.comparison_results.keys())
        detection_rates = [self.comparison_results[s]['detection_rate_percent'] for s in sensitivities]
        
        axes[0,0].bar(sensitivities, detection_rates, color=['#ff9999', '#ffcc99', '#99ccff'])
        axes[0,0].set_title('Tingkat Deteksi per Sensitivity')
        axes[0,0].set_ylabel('Detection Rate (%)')
        axes[0,0].set_ylim(0, 100)
        
        # Tambahkan nilai di atas bar
        for i, v in enumerate(detection_rates):
            axes[0,0].text(i, v + 1, f'{v:.1f}%', ha='center', va='bottom')
        
        # 2. Stacked bar untuk detected vs missed
        detected_counts = [self.comparison_results[s]['detected_count'] for s in sensitivities]
        missed_counts = [self.comparison_results[s]['missed_count'] for s in sensitivities]
        
        axes[0,1].bar(sensitivities, detected_counts, label='Detected', color='#90EE90')
        axes[0,1].bar(sensitivities, missed_counts, bottom=detected_counts, label='Missed', color='#FFB6C1')
        axes[0,1].set_title('Detected vs Missed Anomalies')
        axes[0,1].set_ylabel('Jumlah Anomali')
        axes[0,1].legend()
        
        # 3. Analisis per severity
        severity_analysis = self.analyze_by_severity()
        if severity_analysis:
            severity_data = {}
            for sensitivity, stats in severity_analysis.items():
                for severity, data in stats.items():
                    if severity not in severity_data:
                        severity_data[severity] = {}
                    severity_data[severity][sensitivity] = data['detection_rate']
            
            severity_df = pd.DataFrame(severity_data).fillna(0)
            severity_df.plot(kind='bar', ax=axes[1,0], color=['#ff6666', '#ffaa66', '#66aaff'])
            axes[1,0].set_title('Detection Rate per Severity')
            axes[1,0].set_ylabel('Detection Rate (%)')
            axes[1,0].tick_params(axis='x', rotation=45)
            axes[1,0].legend(title='Severity')
        
        # 4. Analisis per tipe anomali
        type_analysis = self.analyze_by_anomaly_type()
        if type_analysis:
            type_data = {}
            for sensitivity, stats in type_analysis.items():
                for anom_type, data in stats.items():
                    if anom_type not in type_data:
                        type_data[anom_type] = {}
                    type_data[anom_type][sensitivity] = data['detection_rate']
            
            type_df = pd.DataFrame(type_data).fillna(0)
            type_df.plot(kind='bar', ax=axes[1,1], color=['#ff6666', '#ffaa66', '#66aaff'])
            axes[1,1].set_title('Detection Rate per Anomaly Type')
            axes[1,1].set_ylabel('Detection Rate (%)')
            axes[1,1].tick_params(axis='x', rotation=45)
            axes[1,1].legend(title='Anomaly Type')
        
        plt.tight_layout()
        plt.show()

# Contoh penggunaan
def main():
    """
    Contoh penggunaan sistem
    """
    # Inisialisasi comparator
    comparator = AnomalyDetectionComparator()
    
    # Load anomalies list (ground truth)
    print("🔄 Loading anomalies list...")
    comparator.load_anomalies_list('anomalies_list.csv')
    # comparator.load_anomalies_list('list_anomali.csv')
    # Load hasil deteksi dari berbagai sensitivity
    print("🔄 Loading detection results...")
    sensitivity_files = {
        'high': 'anomaly_high_sensitivity.csv',
        'medium': 'anomaly_medium_sensitivity.csv', 
        'low': 'anomaly_low_sensitivity.csv'
    }
    comparator.load_detection_results(sensitivity_files)
    
    # Hitung tingkat deteksi
    print("🔄 Calculating detection rates...")
    results = comparator.calculate_detection_rates(window_minutes=5)
    
    # Generate laporan
    # print("📋 Generating comparison report...")
    # report = comparator.generate_comparison_report()
    # print(report)
    
    # Buat visualisasi
    # print("📊 Creating visualizations...")
    # comparator.visualize_results()
    
    return comparator

if __name__ == "__main__":
    # Jalankan sistem
    comparator = main()

🔄 Loading anomalies list...
✅ Berhasil load 100 anomali dari ground truth
🔄 Loading detection results...
✅ Berhasil load 65 deteksi untuk sensitivity high
✅ Berhasil load 2000 deteksi untuk sensitivity medium
✅ Berhasil load 2000 deteksi untuk sensitivity low
🔄 Calculating detection rates...
📊 HIGH Sensitivity:
   - Total Anomali: 100
   - Terdeteksi: 13
   - Terlewat: 87
   - Tingkat Deteksi: 13.00%

📊 MEDIUM Sensitivity:
   - Total Anomali: 100
   - Terdeteksi: 100
   - Terlewat: 0
   - Tingkat Deteksi: 100.00%

📊 LOW Sensitivity:
   - Total Anomali: 100
   - Terdeteksi: 100
   - Terlewat: 0
   - Tingkat Deteksi: 100.00%

