In [3]:
import pandas as pd
import numpy as np
from datetime import datetime, time
import logging
from typing import Dict, List, Tuple, Any
import warnings
warnings.filterwarnings('ignore')

class RuleBasedAnomalyDetector:
    def __init__(self):
        """Initialize rule-based anomaly detector with configurable rules."""
        
        # Define transaction types that are suspicious during odd hours
        self.suspicious_night_transactions = {
            'withdraw', 'withdrawal', 'atm_withdrawal', 'cash_withdrawal',
            'top_up', 'topup', 'top-up', 'reload', 'add_funds',
            'cashout', 'cash_out', 'cash-out', 'redeem', 'withdraw_cash',
            'purchase', 'buy', 'payment', 'shop', 'retail',
            'transfer_out', 'send_money', 'wire_transfer'
        }
        
        # Define normal business hours for different transaction types
        self.transaction_hour_rules = {
            # High-risk transactions (should mostly happen during business hours)
            'high_risk_hours': {
                'types': self.suspicious_night_transactions,
                'normal_hours': (6, 22),  # 6 AM to 10 PM
                'risk_level': 'HIGH'
            },
            
            # Medium-risk transactions (can happen later but still suspicious very late)
            'medium_risk_hours': {
                'types': {'deposit', 'receive', 'refund', 'credit', 'income'},
                'normal_hours': (5, 23),  # 5 AM to 11 PM
                'risk_level': 'MEDIUM'
            },
            
            # Low-risk transactions (can happen anytime but still flag extreme hours)
            'low_risk_hours': {
                'types': {'balance_check', 'inquiry', 'statement', 'view'},
                'normal_hours': (0, 24),  # Anytime, but we'll still flag 2-4 AM
                'risk_level': 'LOW'
            }
        }
        
        # Amount-based rules (percentiles and absolute thresholds)
        self.amount_rules = {
            'high_amount_threshold': 5000,      # Absolute high amount
            'user_percentile_threshold': 0.95,   # 95th percentile of user's history
            'user_multiplier_threshold': 5.0,    # 5x user's average
            'minimum_history_required': 10       # Minimum transactions to calculate user profile
        }
        
        # Store user profiles for amount rules
        self.user_profiles = {}
        
        # Setup logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
    def extract_hour_from_datetime(self, datetime_str: str) -> int:
        """Extract hour from datetime string."""
        try:
            if isinstance(datetime_str, str):
                dt = pd.to_datetime(datetime_str)
            else:
                dt = datetime_str
            return dt.hour
        except:
            self.logger.warning(f"Could not parse datetime: {datetime_str}")
            return 12  # Default to noon if parsing fails
    
    def normalize_transaction_type(self, txn_type: str) -> str:
        """Normalize transaction type to lowercase and handle variations."""
        if pd.isna(txn_type) or txn_type is None:
            return 'unknown'
        return str(txn_type).lower().strip().replace(' ', '_').replace('-', '_')
    
    def build_user_profile(self, user_transactions: pd.DataFrame) -> Dict[str, Any]:
        """Build statistical profile for a user based on historical transactions."""
        if len(user_transactions) < self.amount_rules['minimum_history_required']:
            return None
        
        amounts = user_transactions['amount'].dropna()
        
        profile = {
            'transaction_count': len(user_transactions),
            'avg_amount': amounts.mean(),
            'median_amount': amounts.median(),
            'std_amount': amounts.std(),
            'min_amount': amounts.min(),
            'max_amount': amounts.max(),
            'percentile_75': amounts.quantile(0.75),
            'percentile_90': amounts.quantile(0.90),
            'percentile_95': amounts.quantile(0.95),
            'percentile_99': amounts.quantile(0.99),
            
            # Hourly patterns
            'common_hours': user_transactions['hour'].mode().tolist(),
            'hour_distribution': user_transactions['hour'].value_counts().to_dict(),
            
            # Transaction type patterns
            'common_types': user_transactions['transaction_type_normalized'].mode().tolist(),
            'type_distribution': user_transactions['transaction_type_normalized'].value_counts().to_dict()
        }
        
        return profile
    
    def check_amount_anomalies(self, transaction: Dict[str, Any], user_profile: Dict[str, Any] = None) -> Tuple[List[str], Dict[str, float]]:
        """Check for amount-based anomalies."""
        anomalies = []
        scores = {}
        amount = float(transaction.get('amount', 0))
        
        # Rule 1: Absolute high amount threshold
        if amount > self.amount_rules['high_amount_threshold']:
            anomalies.append(f"Amount exceeds absolute threshold (£{amount:,.0f} > £{self.amount_rules['high_amount_threshold']:,.0f})")
            scores['high_amount_absolute'] = 1.0
        else:
            scores['high_amount_absolute'] = min(amount / self.amount_rules['high_amount_threshold'], 1.0)
        
        # User-specific amount rules (if profile available)
        if user_profile:
            # Rule 2: Exceeds user's 95th percentile
            user_95th = user_profile['percentile_95']
            if amount > user_95th:
                anomalies.append(f"Amount exceeds user's 95th percentile (£{amount:,.0f} > £{user_95th:,.0f})")
                scores['user_percentile_95'] = 1.0
            else:
                scores['user_percentile_95'] = min(amount / user_95th, 1.0)
            
            # Rule 3: Exceeds user's average by multiplier
            user_avg = user_profile['avg_amount']
            multiplier_threshold = user_avg * self.amount_rules['user_multiplier_threshold']
            if amount > multiplier_threshold:
                anomalies.append(f"Amount is {amount/user_avg:.1f}x user's average (£{amount:,.0f} vs avg £{user_avg:,.0f})")
                scores['user_multiplier'] = 1.0
            else:
                scores['user_multiplier'] = min(amount / multiplier_threshold, 1.0)
        else:
            scores['user_percentile_95'] = 0.0
            scores['user_multiplier'] = 0.0
        
        return anomalies, scores
    
    def check_hour_transaction_type_anomalies(self, transaction: Dict[str, Any]) -> Tuple[List[str], Dict[str, float]]:
        """Check for hour + transaction type combination anomalies."""
        anomalies = []
        scores = {}
        
        hour = transaction.get('hour', 12)
        txn_type = transaction.get('transaction_type_normalized', 'unknown')
        
        # Check against each rule category
        for rule_name, rule_config in self.transaction_hour_rules.items():
            rule_types = rule_config['types']
            normal_start, normal_end = rule_config['normal_hours']
            risk_level = rule_config['risk_level']
            
            if txn_type in rule_types:
                # Check if hour is outside normal range
                if not (normal_start <= hour <= normal_end):
                    severity = ""
                    score_value = 0.0
                    
                    # Calculate severity based on how far outside normal hours
                    if risk_level == 'HIGH':
                        if hour <= 4 or hour >= 23:  # Very late/early
                            severity = "CRITICAL"
                            score_value = 1.0
                        else:
                            severity = "HIGH"
                            score_value = 0.8
                    elif risk_level == 'MEDIUM':
                        if hour <= 3 or hour >= 24:
                            severity = "HIGH"
                            score_value = 0.8
                        else:
                            severity = "MEDIUM"
                            score_value = 0.6
                    else:  # LOW risk
                        if 2 <= hour <= 4:  # Only flag very early morning
                            severity = "MEDIUM"
                            score_value = 0.5
                    
                    if score_value > 0:
                        time_desc = self.get_time_description(hour)
                        anomalies.append(f"{severity} RISK: {txn_type.title()} transaction at {time_desc} ({hour:02d}:00)")
                        scores[f'hour_risk_{rule_name}'] = score_value
                    else:
                        scores[f'hour_risk_{rule_name}'] = 0.0
                else:
                    scores[f'hour_risk_{rule_name}'] = 0.0
                break  # Found matching rule, don't check others
        
        return anomalies, scores
    
    def get_time_description(self, hour: int) -> str:
        """Get descriptive text for time of day."""
        if 0 <= hour <= 4:
            return "very late night/early morning"
        elif 5 <= hour <= 6:
            return "early morning"
        elif 7 <= hour <= 11:
            return "morning"
        elif 12 <= hour <= 17:
            return "afternoon"
        elif 18 <= hour <= 21:
            return "evening"
        else:
            return "late evening"
    
    def detect_rule_based_anomalies(self, transaction: Dict[str, Any], user_profile: Dict[str, Any] = None) -> Dict[str, Any]:
        """Main method to detect rule-based anomalies for a single transaction."""
        
        # Prepare transaction data
        processed_transaction = {
            'amount': transaction.get('amount', 0),
            'hour': self.extract_hour_from_datetime(transaction.get('datetime', '')),
            'transaction_type_normalized': self.normalize_transaction_type(transaction.get('transaction_type', '')),
            'user_id': transaction.get('user_id', 'unknown'),
            'transaction_id': transaction.get('transaction_id', 'unknown')
        }
        
        # Check amount-based anomalies
        amount_anomalies, amount_scores = self.check_amount_anomalies(processed_transaction, user_profile)
        
        # Check hour + transaction type anomalies  
        hour_anomalies, hour_scores = self.check_hour_transaction_type_anomalies(processed_transaction)
        
        # Combine all anomalies and scores
        all_anomalies = amount_anomalies + hour_anomalies
        all_scores = {**amount_scores, **hour_scores}
        
        # Calculate overall rule-based confidence
        if all_scores:
            confidence = np.mean(list(all_scores.values()))
        else:
            confidence = 0.0
        
        # Determine if this is anomalous
        is_anomaly = confidence > 0.5 or len(all_anomalies) > 0
        
        result = {
            'is_anomaly': is_anomaly,
            'confidence': round(confidence, 3),
            'anomalies': all_anomalies,
            'scores': all_scores,
            'transaction_data': processed_transaction,
            'method': 'rule_based'
        }
        
        return result
    
    def batch_detect_anomalies(self, transactions_df: pd.DataFrame, build_user_profiles: bool = True) -> pd.DataFrame:
        """Detect anomalies for a batch of transactions."""
        
        # Prepare dataframe
        df = transactions_df.copy()
        df['hour'] = df['datetime'].apply(self.extract_hour_from_datetime)
        df['transaction_type_normalized'] = df['transaction_type'].apply(self.normalize_transaction_type)
        
        # Build user profiles if requested
        if build_user_profiles:
            self.logger.info("Building user profiles...")
            for user_id in df['user_id'].unique():
                user_data = df[df['user_id'] == user_id]
                profile = self.build_user_profile(user_data)
                if profile:
                    self.user_profiles[user_id] = profile
        
        # Detect anomalies for each transaction
        results = []
        for idx, row in df.iterrows():
            transaction = row.to_dict()
            user_profile = self.user_profiles.get(transaction['user_id'])
            
            result = self.detect_rule_based_anomalies(transaction, user_profile)
            result['row_index'] = idx
            results.append(result)
        
        # Convert to DataFrame
        results_df = pd.DataFrame([
            {
                'row_index': r['row_index'],
                'transaction_id': r['transaction_data']['transaction_id'],
                'user_id': r['transaction_data']['user_id'],
                'is_anomaly': r['is_anomaly'],
                'confidence': r['confidence'],
                'anomaly_count': len(r['anomalies']),
                'anomalies': '; '.join(r['anomalies']) if r['anomalies'] else '',
                'amount_score': r['scores'].get('high_amount_absolute', 0),
                'user_percentile_score': r['scores'].get('user_percentile_95', 0),
                'hour_risk_score': max([v for k, v in r['scores'].items() if 'hour_risk' in k], default=0),
            }
            for r in results
        ])
        
        return results_df
    
    def generate_anomaly_report(self, result: Dict[str, Any]) -> str:
        """Generate a formatted report for a detected anomaly."""
        
        if not result['is_anomaly']:
            return "✅ NORMAL TRANSACTION - No rule violations detected"
        
        txn = result['transaction_data']
        confidence = result['confidence']
        
        # Determine severity
        if confidence >= 0.8:
            severity = "🚨 CRITICAL"
            action = "IMMEDIATE INVESTIGATION REQUIRED"
        elif confidence >= 0.6:
            severity = "⚠️  HIGH RISK"
            action = "REVIEW RECOMMENDED"
        elif confidence >= 0.4:
            severity = "👀 MEDIUM RISK"
            action = "MONITOR"
        else:
            severity = "ℹ️  LOW RISK"
            action = "LOG FOR PATTERN ANALYSIS"
        
        report = f"""
{severity} RULE-BASED ANOMALY DETECTED
Transaction ID: {txn['transaction_id']}
User ID: {txn['user_id']}
Rule Confidence: {confidence:.3f}

Transaction Details:
- Amount: £{txn['amount']:,.2f}
- Type: {txn['transaction_type_normalized'].title()}
- Time: {txn['hour']:02d}:00 ({self.get_time_description(txn['hour'])})

Rule Violations:
"""
        
        for i, anomaly in enumerate(result['anomalies'], 1):
            report += f"{i}. {anomaly}\n"
        
        report += f"\nDetailed Scores:\n"
        for score_name, score_value in result['scores'].items():
            if score_value > 0:
                score_desc = score_name.replace('_', ' ').title()
                report += f"- {score_desc}: {score_value:.3f}\n"
        
        report += f"\nRecommended Action: {action}"
        
        return report.strip()

# =============================================================================
# EXAMPLE USAGE AND TESTING
# =============================================================================

if __name__ == "__main__":
    # Initialize detector
    detector = RuleBasedAnomalyDetector()
    
    # Create sample data
    sample_transactions = [
        {
            'transaction_id': 'TXN_001',
            'user_id': 'user_123',
            'amount': 50.0,
            'transaction_type': 'purchase',
            'datetime': '2023-05-14 14:30:00'  # Normal afternoon purchase
        },
        {
            'transaction_id': 'TXN_002', 
            'user_id': 'user_123',
            'amount': 8000.0,  # Very high amount
            'transaction_type': 'withdrawal',
            'datetime': '2023-05-14 02:15:00'  # Very late night withdrawal
        },
        {
            'transaction_id': 'TXN_003',
            'user_id': 'user_456', 
            'amount': 25.0,
            'transaction_type': 'top_up',
            'datetime': '2023-05-14 23:45:00'  # Late night top-up
        },
        {
            'transaction_id': 'TXN_004',
            'user_id': 'user_123',
            'amount': 1200.0,  # High for this user
            'transaction_type': 'cashout', 
            'datetime': '2023-05-14 03:30:00'  # Very early morning cashout
        }
    ]
    
    # Test single transaction detection
    print("="*70)
    print("TESTING INDIVIDUAL TRANSACTIONS")
    print("="*70)
    
    for txn in sample_transactions:
        result = detector.detect_rule_based_anomalies(txn)
        report = detector.generate_anomaly_report(result)
        print(f"\n{report}\n")
        print("-" * 50)
    
    # Test batch detection
    print("\n" + "="*70)
    print("TESTING BATCH PROCESSING")
    print("="*70)
    
    # Create DataFrame with more transactions for user profiling
    extended_data = []
    
    # Add normal transactions for user_123 to build profile
    for i in range(15):
        extended_data.append({
            'transaction_id': f'TXN_HIST_{i}',
            'user_id': 'user_123',
            'amount': np.random.normal(75, 20),  # Average £75, std £20
            'transaction_type': np.random.choice(['purchase', 'deposit', 'transfer']),
            'datetime': f'2023-05-{10+i//3:02d} {np.random.randint(9,18):02d}:30:00'
        })
    
    # Add the test transactions
    extended_data.extend(sample_transactions)
    
    df = pd.DataFrame(extended_data)
    results_df = detector.batch_detect_anomalies(df)
    
    print("\nBatch Results Summary:")
    print(f"Total transactions: {len(results_df)}")
    print(f"Anomalies detected: {results_df['is_anomaly'].sum()}")
    print(f"Average confidence: {results_df['confidence'].mean():.3f}")
    
    print("\nDetailed Anomalies:")
    anomalies = results_df[results_df['is_anomaly'] == True]
    for _, row in anomalies.iterrows():
        print(f"- {row['transaction_id']}: {row['confidence']:.3f} confidence")
        print(f"  Violations: {row['anomalies']}")
        print()
    
    print("🎯 Rule-based detector setup complete!")

INFO:__main__:Building user profiles...


TESTING INDIVIDUAL TRANSACTIONS

✅ NORMAL TRANSACTION - No rule violations detected

--------------------------------------------------

👀 MEDIUM RISK RULE-BASED ANOMALY DETECTED
Transaction ID: TXN_002
User ID: user_123
Rule Confidence: 0.500

Transaction Details:
- Amount: £8,000.00
- Type: Withdrawal
- Time: 02:00 (very late night/early morning)

Rule Violations:
1. Amount exceeds absolute threshold (£8,000 > £5,000)
2. CRITICAL RISK: Withdrawal transaction at very late night/early morning (02:00)

Detailed Scores:
- High Amount Absolute: 1.000
- Hour Risk High Risk Hours: 1.000

Recommended Action: MONITOR

--------------------------------------------------

ℹ️  LOW RISK RULE-BASED ANOMALY DETECTED
Transaction ID: TXN_003
User ID: user_456
Rule Confidence: 0.251

Transaction Details:
- Amount: £25.00
- Type: Top_Up
- Time: 23:00 (late evening)

Rule Violations:
1. CRITICAL RISK: Top_Up transaction at late evening (23:00)

Detailed Scores:
- High Amount Absolute: 0.005
- Hour Risk H

In [4]:
df.head()

Unnamed: 0,transaction_id,user_id,amount,transaction_type,datetime
0,TXN_HIST_0,user_123,74.619676,purchase,2023-05-10 10:30:00
1,TXN_HIST_1,user_123,54.949413,deposit,2023-05-10 12:30:00
2,TXN_HIST_2,user_123,100.422765,purchase,2023-05-10 17:30:00
3,TXN_HIST_3,user_123,90.417262,transfer,2023-05-11 09:30:00
4,TXN_HIST_4,user_123,76.148259,purchase,2023-05-11 14:30:00


In [5]:
df.tail()

Unnamed: 0,transaction_id,user_id,amount,transaction_type,datetime
14,TXN_HIST_14,user_123,59.035296,transfer,2023-05-14 14:30:00
15,TXN_001,user_123,50.0,purchase,2023-05-14 14:30:00
16,TXN_002,user_123,8000.0,withdrawal,2023-05-14 02:15:00
17,TXN_003,user_456,25.0,top_up,2023-05-14 23:45:00
18,TXN_004,user_123,1200.0,cashout,2023-05-14 03:30:00


In [6]:
import pandas as pd
import numpy as np

# Sample data
data = {
    'user_id': [1, 1, 1, 2, 2, 1],
    'datetime': [
        '2025-06-01 12:03:31',
        '2025-06-03 19:19:50', 
        '2025-06-01 19:52:44',  # Out of order
        '2025-06-02 10:11:53',
        '2025-06-05 21:23:30',
        '2025-06-10 08:15:00'
    ],
    'amount': [100, 200, 150, 50, 300, 80]
}

df = pd.DataFrame(data)

# Convert and sort
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values(['user_id', 'datetime'])

# Calculate days difference
df['days_since_last_transaction'] = (
    df.groupby('user_id')['datetime'].diff().dt.days
)

# Clean up first transactions
df.loc[df.groupby('user_id').head(1).index, 'days_since_last_transaction'] = np.nan

print(df[['user_id', 'datetime', 'days_since_last_transaction']])

   user_id            datetime  days_since_last_transaction
0        1 2025-06-01 12:03:31                          NaN
2        1 2025-06-01 19:52:44                          0.0
1        1 2025-06-03 19:19:50                          1.0
5        1 2025-06-10 08:15:00                          6.0
3        2 2025-06-02 10:11:53                          NaN
4        2 2025-06-05 21:23:30                          3.0
