In [2]:
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from scipy import stats
from typing import Dict, List, Tuple
import logging


class FinancialAnomalyDetector:
    """Advanced anomaly detection for financial metrics."""
    
    def __init__(self, contamination: float = 0.1):
        self.contamination = contamination
        self.isolation_forest = IsolationForest(contamination=contamination, random_state=42)
        self.scaler = StandardScaler()
        self.logger = logging.getLogger(__name__)
        self.is_fitted = False
        
    def detect_statistical_anomalies(self, data: pd.DataFrame, 
                                   columns: List[str] = None,
                                   z_threshold: float = 3.0) -> Dict:
        """Detect anomalies using Z-score method."""
        if columns is None:
            columns = data.select_dtypes(include=[np.number]).columns.tolist()
        
        anomalies = {}
        
        for column in columns:
            if column not in data.columns:
                continue
                
            values = data[column].dropna()
            if len(values) < 3:  # Need minimum data points
                continue
            
            # Calculate Z-scores
            z_scores = np.abs(stats.zscore(values))
            anomaly_indices = np.where(z_scores > z_threshold)[0]
            
            anomalies[column] = {
                'anomaly_indices': anomaly_indices.tolist(),
                'anomaly_values': values.iloc[anomaly_indices].tolist(),
                'z_scores': z_scores[anomaly_indices].tolist(),
                'mean': float(values.mean()),
                'std': float(values.std()),
                'threshold': z_threshold
            }
        
        return anomalies
    
    def detect_isolation_forest_anomalies(self, data: pd.DataFrame, 
                                        columns: List[str] = None) -> Dict:
        """Detect anomalies using Isolation Forest."""
        if columns is None:
            columns = data.select_dtypes(include=[np.number]).columns.tolist()
        
        # Prepare data
        X = data[columns].dropna()
        if len(X) < 5:  # Need minimum data points
            return {'error': 'Insufficient data for anomaly detection'}
        
        # Scale features
        X_scaled = self.scaler.fit_transform(X)
        
        # Fit and predict
        anomaly_labels = self.isolation_forest.fit_predict(X_scaled)
        anomaly_scores = self.isolation_forest.score_samples(X_scaled)
        
        # Find anomalies (labeled as -1)
        anomaly_mask = anomaly_labels == -1
        anomaly_indices = np.where(anomaly_mask)[0]
        
        return {
            'anomaly_indices': anomaly_indices.tolist(),
            'anomaly_scores': anomaly_scores[anomaly_mask].tolist(),
            'total_anomalies': int(np.sum(anomaly_mask)),
            'anomaly_ratio': float(np.mean(anomaly_mask)),
            'features_used': columns,
            'contamination': self.contamination
        }
    
    def detect_financial_metric_anomalies(self, financial_data: List[Dict]) -> Dict:
        """Detect anomalies in specific financial metrics."""
        # Convert to DataFrame
        df = pd.DataFrame(financial_data)
        
        financial_columns = ['revenue', 'profit', 'eps', 'cash_flow', 'debt', 'assets']
        available_columns = [col for col in financial_columns if col in df.columns]
        
        if not available_columns:
            return {'error': 'No financial metrics found for anomaly detection'}
        
        # Statistical anomalies
        statistical_anomalies = self.detect_statistical_anomalies(df, available_columns)
        
        # Isolation Forest anomalies
        isolation_anomalies = self.detect_isolation_forest_anomalies(df, available_columns)
        
        # Growth rate anomalies
        growth_anomalies = self._detect_growth_anomalies(df, available_columns)
        
        return {
            'statistical_anomalies': statistical_anomalies,
            'isolation_forest_anomalies': isolation_anomalies,
            'growth_rate_anomalies': growth_anomalies,
            'summary': self._create_anomaly_summary(statistical_anomalies, isolation_anomalies, growth_anomalies)
        }
    
    def _detect_growth_anomalies(self, data: pd.DataFrame, columns: List[str], 
                                threshold: float = 2.0) -> Dict:
        """Detect anomalies in growth rates."""
        growth_anomalies = {}
        
        for column in columns:
            if column not in data.columns:
                continue
                
            values = data[column].dropna()
            if len(values) < 2:
                continue
            
            # Calculate period-over-period growth rates
            growth_rates = values.pct_change().dropna()
            
            if len(growth_rates) < 2:
                continue
            
            # Find anomalous growth rates
            growth_z_scores = np.abs(stats.zscore(growth_rates))
            anomaly_mask = growth_z_scores > threshold
            
            if np.any(anomaly_mask):
                anomaly_indices = np.where(anomaly_mask)[0]
                growth_anomalies[f'{column}_growth'] = {
                    'anomaly_indices': anomaly_indices.tolist(),
                    'anomaly_growth_rates': growth_rates.iloc[anomaly_indices].tolist(),
                    'z_scores': growth_z_scores[anomaly_mask].tolist(),
                    'mean_growth_rate': float(growth_rates.mean()),
                    'std_growth_rate': float(growth_rates.std())
                }
        
        return growth_anomalies
    
    def _create_anomaly_summary(self, statistical: Dict, isolation: Dict, growth: Dict) -> Dict:
        """Create a summary of all detected anomalies."""
        total_statistical = sum(len(v.get('anomaly_indices', [])) for v in statistical.values())
        total_isolation = isolation.get('total_anomalies', 0)
        total_growth = sum(len(v.get('anomaly_indices', [])) for v in growth.values())
        
        return {
            'total_statistical_anomalies': total_statistical,
            'total_isolation_anomalies': total_isolation,
            'total_growth_anomalies': total_growth,
            'overall_anomaly_score': (total_statistical + total_isolation + total_growth) / 3,
            'high_priority_metrics': self._identify_high_priority_anomalies(statistical, growth)
        }
    
    def _identify_high_priority_anomalies(self, statistical: Dict, growth: Dict) -> List[str]:
        """Identify metrics with severe anomalies."""
        high_priority = []
        
        # Check statistical anomalies
        for metric, data in statistical.items():
            if len(data.get('anomaly_indices', [])) > 0:
                max_z_score = max(data.get('z_scores', [0]))
                if max_z_score > 4.0:  # Very high Z-score
                    high_priority.append(f"{metric} (extreme statistical deviation)")
        
        # Check growth anomalies
        for metric, data in growth.items():
            if len(data.get('anomaly_indices', [])) > 0:
                max_z_score = max(data.get('z_scores', [0]))
                if max_z_score > 3.0:  # High growth anomaly
                    high_priority.append(f"{metric} (unusual growth pattern)")
        
        return high_priority


class StreamlitAnomalyDetector:
    def __init__(self):
        self.anomaly_detector = FinancialAnomalyDetector()  # Fixed class name
        
    def detect_for_streamlit(self, financial_data):
        try:
            # Use the correct method name from FinancialAnomalyDetector
            result = self.anomaly_detector.detect_financial_metric_anomalies(financial_data)
            
            # Handle error case
            if 'error' in result:
                return {
                    "anomalies_detected": [],
                    "anomaly_score": 0.0,
                    "risk_level": "Unknown",
                    "status": "error",
                    "error": result['error']
                }
            
            # Extract summary data
            summary = result.get('summary', {})
            anomaly_score = summary.get('overall_anomaly_score', 0.0) / 10.0  # Normalize to 0-1
            
            # Format anomalies for Streamlit display
            anomalies_detected = []
            
            # Add statistical anomalies
            for metric, data in result.get('statistical_anomalies', {}).items():
                if len(data.get('anomaly_indices', [])) > 0:
                    anomalies_detected.append({
                        'type': 'Statistical',
                        'metric': metric,
                        'count': len(data['anomaly_indices']),
                        'severity': 'High' if max(data.get('z_scores', [0])) > 4.0 else 'Medium'
                    })
            
            # Add isolation forest anomalies
            isolation_data = result.get('isolation_forest_anomalies', {})
            if isolation_data.get('total_anomalies', 0) > 0:
                anomalies_detected.append({
                    'type': 'Isolation Forest',
                    'metric': 'Multiple features',
                    'count': isolation_data['total_anomalies'],
                    'severity': 'High' if isolation_data.get('anomaly_ratio', 0) > 0.2 else 'Medium'
                })
            
            # Add growth anomalies
            for metric, data in result.get('growth_rate_anomalies', {}).items():
                if len(data.get('anomaly_indices', [])) > 0:
                    anomalies_detected.append({
                        'type': 'Growth Rate',
                        'metric': metric,
                        'count': len(data['anomaly_indices']),
                        'severity': 'High' if max(data.get('z_scores', [0])) > 3.0 else 'Medium'
                    })
            
            # Determine risk level
            if anomaly_score > 0.7:
                risk_level = "High"
            elif anomaly_score > 0.3:
                risk_level = "Medium"
            else:
                risk_level = "Low"
                
            return {
                "anomalies_detected": anomalies_detected,
                "anomaly_score": float(anomaly_score),
                "risk_level": risk_level,
                "status": "success",
                "total_anomalies": summary.get('total_statistical_anomalies', 0) + 
                                 summary.get('total_isolation_anomalies', 0) + 
                                 summary.get('total_growth_anomalies', 0),
                "high_priority_metrics": summary.get('high_priority_metrics', []),
                "detailed_results": result  # Include full results for detailed analysis
            }
            
        except Exception as e:
            return {
                "anomalies_detected": [],
                "anomaly_score": 0.0,
                "risk_level": "Unknown",
                "status": "error",
                "error": str(e)
            }


# Initialize the anomaly detector
print("🔍 Loading Financial Anomaly Detection models...")
streamlit_anomaly = StreamlitAnomalyDetector()
print("✅ Financial Anomaly Detection initialized for Streamlit")


🔍 Loading Financial Anomaly Detection models...
✅ Financial Anomaly Detection initialized for Streamlit
