In [8]:
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from datetime import datetime
import logging
import json

# Configure logging to only print to console
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

class ReconciliationAnomalyDetector:
    def __init__(self):
        self.model = None
        self.scaler = None
        self.encoder = None
        
    def safe_read_csv(self, path, dtype_columns=None):
        """Safely read CSV with optional dtype conversion"""
        try:
            if dtype_columns:
                temp_df = pd.read_csv(path, nrows=1)
                valid_columns = {col: dtype for col, dtype in dtype_columns.items() 
                                if col in temp_df.columns}
                return pd.read_csv(path, dtype=valid_columns or None)
            return pd.read_csv(path)
        except Exception as e:
            logging.error(f"Error reading {path}: {str(e)}")
            raise
            
    def load_data(self, history_path, recon_path):
        """Load and merge datasets with validation"""
        try:
            logging.info(f"Loading data from {history_path} and {recon_path}")
            
            history_df = self.safe_read_csv(history_path, {'TRADEID': str})
            recon_df = self.safe_read_csv(recon_path, {'TRADEID': str})
            
            required_cols = ['TRADEID', 'RISKDATE', 'DESKNAME', 'MatchStatus', 
                           'QUANTITYDIFFERENCE', 'PRICEDIFFERENCE', 'COMMENT']
            for col in required_cols:
                if col not in history_df.columns or col not in recon_df.columns:
                    raise ValueError(f"Missing required column: {col}")
            
            if 'Anomaly' not in history_df.columns:
                history_df['Anomaly'] = False
                
            combined_df = pd.concat([history_df, recon_df], ignore_index=True)
            
            logging.info(f"Data loaded successfully. History: {len(history_df)} rows, Recon: {len(recon_df)} rows")
            return history_df, recon_df, combined_df
            
        except Exception as e:
            logging.error(f"Data loading failed: {str(e)}")
            raise

    def preprocess_data(self, df):
        """Clean and transform data for modeling"""
        try:
            logging.info("Starting data preprocessing")
            
            date_cols = ['RISKDATE', 'RECONDATE', 'TRADE_DATE', 'SETTLE_DATE']
            for col in date_cols:
                if col in df.columns:
                    df[col] = pd.to_datetime(df[col], format='%d-%m-%Y', errors='coerce')
                    null_dates = df[col].isnull().sum()
                    if null_dates > 0:
                        logging.warning(f"{null_dates} null values found in {col}")
            
            if 'RECONDATE' in df.columns and 'TRADE_DATE' in df.columns:
                df['TRADE_AGE_DAYS'] = (df['RECONDATE'] - df['TRADE_DATE']).dt.days
            if 'RECONDATE' in df.columns and 'SETTLE_DATE' in df.columns:
                df['SETTLE_AGE_DAYS'] = (df['RECONDATE'] - df['SETTLE_DATE']).dt.days
            
            numeric_cols = ['QUANTITYDIFFERENCE', 'PRICEDIFFERENCE', 'ORIGINALFACEDIFFERENCE']
            for col in numeric_cols:
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col], errors='coerce')
                    df[col].fillna(0, inplace=True)
            
            if 'QUANTITYDIFFERENCE' in df.columns:
                df['ABS_QUANTITY_DIFF'] = abs(df['QUANTITYDIFFERENCE'])
            if 'PRICEDIFFERENCE' in df.columns:
                df['ABS_PRICE_DIFF'] = abs(df['PRICEDIFFERENCE'])
            if 'ORIGINALFACEDIFFERENCE' in df.columns:
                df['ABS_OF_DIFF'] = abs(df['ORIGINALFACEDIFFERENCE'])
            
            if 'COMMENT' in df.columns:
                df['COMMENT_LENGTH'] = df['COMMENT'].apply(lambda x: len(str(x)))
                df['HAS_ROUNDING_NOTE'] = df['COMMENT'].str.contains('rounding', case=False, na=False)
            
            logging.info("Data preprocessing completed")
            return df
            
        except Exception as e:
            logging.error(f"Data preprocessing failed: {str(e)}")
            raise

    def train_model(self, df):
        """Train Isolation Forest model with feature engineering"""
        try:
            logging.info("Starting model training")
            
            numerical_features = [
                'QUANTITYDIFFERENCE', 'PRICEDIFFERENCE', 'ORIGINALFACEDIFFERENCE',
                'TRADE_AGE_DAYS', 'SETTLE_AGE_DAYS', 'ABS_QUANTITY_DIFF',
                'ABS_PRICE_DIFF', 'ABS_OF_DIFF', 'COMMENT_LENGTH'
            ]
            numerical_features = [f for f in numerical_features if f in df.columns]
            
            categorical_features = ['DESKNAME', 'BUY_SELL', 'TRADING_UNIT_NAME', 'MatchStatus']
            categorical_features = [f for f in categorical_features if f in df.columns]
            
            preprocessor = ColumnTransformer(
                transformers=[
                    ('num', StandardScaler(), numerical_features),
                    ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features)
                ],
                remainder='drop'
            )
            
            self.model = Pipeline([
                ('preprocessor', preprocessor),
                ('classifier', IsolationForest(
                    n_estimators=150,
                    contamination=0.05,
                    random_state=42,
                    verbose=1
                ))
            ])
            
            X_train = df[df['Anomaly'] == False]
            self.model.fit(X_train)
            
            logging.info("Model training completed successfully")
            return self.model
            
        except Exception as e:
            logging.error(f"Model training failed: {str(e)}")
            raise

    def detect_anomalies(self, df):
        """Detect anomalies and generate report"""
        try:
            logging.info("Starting anomaly detection")
            
            if not self.model:
                raise ValueError("Model not trained. Call train_model() first.")
            
            predictions = self.model.predict(df)
            df['Detected_Anomaly'] = predictions == -1
            df['Anomaly_Score'] = self.model.decision_function(df)
            
            anomalies = df[df['Detected_Anomaly']].copy()
            
            if not anomalies.empty:
                anomalies['Anomaly_Reason'] = anomalies.apply(self._get_anomaly_reason, axis=1)
            else:
                anomalies['Anomaly_Reason'] = ""
            
            logging.info(f"Detected {len(anomalies)} anomalies")
            return df, anomalies
            
        except Exception as e:
            logging.error(f"Anomaly detection failed: {str(e)}")
            raise

    def _get_anomaly_reason(self, row):
        """Generate human-readable anomaly reason"""
        reasons = []
        
        if 'QUANTITYDIFFERENCE' in row and 'QUANTITY_TOLERANCE' in row:
            qty_diff = abs(row['QUANTITYDIFFERENCE'])
            qty_tol = row['QUANTITY_TOLERANCE']
            if qty_diff > qty_tol:
                reasons.append(f"Qty delta {qty_diff} > tolerance {qty_tol}")
        
        if 'PRICEDIFFERENCE' in row and 'PRICE_TOLERANCE' in row:
            price_diff = abs(row['PRICEDIFFERENCE'])
            price_tol = row['PRICE_TOLERANCE']
            if price_diff > price_tol:
                reasons.append(f"Price delta {price_diff} > tolerance {price_tol}")
        
        if 'ORIGINALFACEDIFFERENCE' in row and row['ORIGINALFACEDIFFERENCE'] != 0:
            reasons.append(f"Original face diff {row['ORIGINALFACEDIFFERENCE']}")
        
        if 'TRADE_AGE_DAYS' in row and row['TRADE_AGE_DAYS'] > 7:
            reasons.append(f"Old trade ({row['TRADE_AGE_DAYS']} days)")
        
        if 'HAS_ROUNDING_NOTE' in row and 'COMMENT' in row:
            if not row['HAS_ROUNDING_NOTE'] and pd.notna(row['COMMENT']):
                reasons.append("Unusual comment pattern")
        
        return " | ".join(reasons) if reasons else "Multi-factor anomaly"

    def generate_anomaly_report(self, anomalies):
        """Create detailed anomaly report"""
        try:
            logging.info("Generating anomaly report")
            
            if anomalies.empty:
                logging.info("No anomalies detected - empty report")
                return pd.DataFrame()
            
            report_cols = [
                'TRADEID', 'RISKDATE', 'DESKNAME', 'TRADING_UNIT_NAME',
                'MatchStatus', 'QUANTITYDIFFERENCE', 'PRICEDIFFERENCE',
                'ORIGINALFACEDIFFERENCE', 'Anomaly_Reason', 'COMMENT',
                'Anomaly_Score'
            ]
            report_cols = [col for col in report_cols if col in anomalies.columns]
            
            report = anomalies[report_cols].sort_values('Anomaly_Score', ascending=True)
            
            logging.info("\nSample of anomaly report:")
            logging.info(report.head().to_string())
            
            report.to_csv('detected_anomalies_report.csv', index=False)
            logging.info("Saved anomaly report to detected_anomalies_report.csv")
            
            return report
            
        except Exception as e:
            logging.error(f"Report generation failed: {str(e)}")
            raise

    def fix_anomalies(self, history_df, anomaly_report):
        """Fix anomalies with detailed change logging"""
        try:
            logging.info("\nStarting anomaly fixing process")
            
            fixed_df = history_df.copy()
            change_log = []
            
            if anomaly_report.empty:
                logging.info("No anomalies to fix")
                return fixed_df
            
            for _, row in anomaly_report.iterrows():
                trade_id = row['TRADEID']
                mask = fixed_df['TRADEID'] == trade_id
                
                if not mask.any():
                    logging.warning(f"Trade ID {trade_id} not found in history data")
                    continue
                
                original = fixed_df.loc[mask].iloc[0].to_dict()
                changes = {
                    'TRADEID': trade_id,
                    'RISKDATE': row['RISKDATE'],
                    'DESKNAME': row['DESKNAME'],
                    'ACTION': 'NO_CHANGE',
                    'CHANGES': {},
                    'REASON': row.get('Anomaly_Reason', 'Unknown reason')
                }
                
                comment = str(row.get('COMMENT', '')).lower()
                if 'rounding' in comment:
                    changes['ACTION'] = 'ROUNDING_ADJUSTMENT'
                    for field in ['QUANTITYDIFFERENCE', 'PRICEDIFFERENCE', 'ORIGINALFACEDIFFERENCE']:
                        if field in original and original[field] != 0:
                            changes['CHANGES'][field] = {'old': original[field], 'new': 0}
                            fixed_df.loc[mask, field] = 0
                
                elif 'var update' in comment:
                    changes['ACTION'] = 'VAR_UPDATE_ADJUSTMENT'
                
                elif 'blotter code' in comment:
                    changes['ACTION'] = 'BLOTTER_CODE_ADJUSTMENT'
                
                else:
                    changes['ACTION'] = 'GENERIC_FIX'
                
                if 'Anomaly' in original and original['Anomaly']:
                    changes['CHANGES']['Anomaly'] = {'old': True, 'new': False}
                    fixed_df.loc[mask, 'Anomaly'] = False
                
                if changes['CHANGES'] or changes['ACTION'] != 'NO_CHANGE':
                    change_log.append(changes)
                    logging.info(f"\n=== Fixing Trade ID: {trade_id} ===")
                    logging.info(f"Action Type: {changes['ACTION']}")
                    logging.info(f"Reason: {changes['REASON']}")
                    logging.info("Detailed Changes:")
                    for field, change in changes['CHANGES'].items():
                        logging.info(f"  Column: {field:25} Old Value: {change['old']:15} → New Value: {change['new']}")
            
            if change_log:
                action_counts = pd.DataFrame(change_log)['ACTION'].value_counts()
                logging.info("\n=== FIXING SUMMARY ===")
                logging.info(f"Total anomalies processed: {len(anomaly_report)}")
                logging.info(f"Total trades modified: {len(change_log)}")
                for action, count in action_counts.items():
                    logging.info(f"  {action}: {count} trades")
                
                self._save_change_log(change_log)
            else:
                logging.info("\nNo changes were made to any trades")
            
            fixed_df.to_csv('updated_Catalyst_History_Reconciliation.csv', index=False)
            logging.info("\nSaved fixed data to updated_Catalyst_History_Reconciliation.csv")
            
            return fixed_df
            
        except Exception as e:
            logging.error(f"Anomaly fixing failed: {str(e)}")
            raise

    def _save_change_log(self, change_log):
        """Save detailed change log with statistics"""
        try:
            if not change_log:
                logging.info("No changes were made - empty change log")
                return
                
            log_df = pd.DataFrame(change_log)
            
            changes_expanded = []
            for _, row in log_df.iterrows():
                base_info = {
                    'TRADEID': row['TRADEID'],
                    'RISKDATE': row['RISKDATE'],
                    'DESKNAME': row['DESKNAME'],
                    'ACTION': row['ACTION'],
                    'REASON': row['REASON']
                }
                if row['CHANGES']:
                    for field, change in row['CHANGES'].items():
                        record = base_info.copy()
                        record.update({
                            'FIELD': field,
                            'OLD_VALUE': change['old'],
                            'NEW_VALUE': change['new']
                        })
                        changes_expanded.append(record)
            
            summary = {
                'total_fixes': len(change_log),
                'actions_applied': {},
                'fields_modified': {},
                'desks_affected': {}
            }
            
            if not log_df.empty:
                summary['actions_applied'] = log_df['ACTION'].value_counts().to_dict()
                summary['desks_affected'] = log_df['DESKNAME'].value_counts().to_dict()
                
            if changes_expanded:
                changes_df = pd.DataFrame(changes_expanded)
                changes_df.to_csv('anomaly_fix_details.csv', index=False)
                summary['fields_modified'] = changes_df['FIELD'].value_counts().to_dict()
            else:
                changes_df = pd.DataFrame()
            
            logging.info("\n=== DETAILED FIX STATISTICS ===")
            logging.info(f"Total fixes applied: {summary['total_fixes']}")
            
            if summary['actions_applied']:
                logging.info("\nActions applied:")
                for action, count in summary['actions_applied'].items():
                    logging.info(f"  {action}: {count}")
                
            if summary['fields_modified']:
                logging.info("\nFields modified:")
                for field, count in summary['fields_modified'].items():
                    logging.info(f"  {field}: {count}")
                
            if summary['desks_affected']:
                logging.info("\nDesks affected:")
                for desk, count in summary['desks_affected'].items():
                    logging.info(f"  {desk}: {count}")
            
            with open('fix_summary.json', 'w') as f:
                json.dump(summary, f, indent=2)
            
        except Exception as e:
            logging.error(f"Could not save change log: {str(e)}")
            raise

    def validate_fixes(self, original_df, fixed_df):
        """Validate that fixes were applied correctly"""
        try:
            logging.info("\nValidating fixes...")
            
            if len(original_df) != len(fixed_df):
                logging.warning(f"Row count changed: before {len(original_df)}, after {len(fixed_df)}")
            else:
                logging.info("Row count consistent")
            
            original_anomalies = original_df['Anomaly'].sum() if 'Anomaly' in original_df else 0
            fixed_anomalies = fixed_df['Anomaly'].sum() if 'Anomaly' in fixed_df else 0
            logging.info(f"Anomaly count: before {original_anomalies}, after {fixed_anomalies}")
            
            if 'COMMENT' in fixed_df and 'QUANTITYDIFFERENCE' in fixed_df and 'PRICEDIFFERENCE' in fixed_df:
                rounding_fixes = fixed_df[
                    (fixed_df['COMMENT'].str.contains('rounding', na=False)) &
                    (fixed_df['QUANTITYDIFFERENCE'] == 0) &
                    (fixed_df['PRICEDIFFERENCE'] == 0)
                ]
                logging.info(f"Applied {len(rounding_fixes)} rounding fixes")
                
                if len(rounding_fixes) > 0:
                    sample_fix = rounding_fixes.iloc[0]
                    logging.info("\nExample rounding fix:")
                    logging.info(f"TRADEID: {sample_fix['TRADEID']}")
                    logging.info(f"DESK: {sample_fix['DESKNAME']}")
                    if 'QUANTITYDIFFERENCE' in original_df.columns:
                        original_qty = original_df.loc[original_df['TRADEID'] == sample_fix['TRADEID'], 'QUANTITYDIFFERENCE'].values[0]
                        logging.info(f"Qty before: {original_qty}")
                    logging.info(f"Qty after: 0")
            
            logging.info("Validation completed")
            return True
            
        except Exception as e:
            logging.error(f"Validation failed: {str(e)}")
            raise

def main():
    try:
        # Example file paths - replace with your actual paths
        history_path = 'Catalyst_History_Reconciliation.csv'
        recon_path = 'Catalyst_Reconciliation.csv'
        
        detector = ReconciliationAnomalyDetector()
        
        logging.info("\n=== STEP 1: LOADING DATA ===")
        history_df, recon_df, combined_df = detector.load_data(history_path, recon_path)
        
        logging.info("\n=== STEP 2: PREPROCESSING DATA ===")
        processed_df = detector.preprocess_data(combined_df)
        
        logging.info("\n=== STEP 3: TRAINING MODEL ===")
        detector.train_model(processed_df)
        
        logging.info("\n=== STEP 4: DETECTING ANOMALIES ===")
        scored_df, anomalies = detector.detect_anomalies(processed_df)
        
        logging.info("\n=== STEP 5: GENERATING REPORT ===")
        report = detector.generate_anomaly_report(anomalies)
        
        logging.info("\n=== STEP 6: FIXING ANOMALIES ===")
        fixed_df = detector.fix_anomalies(history_df, report)
        
        logging.info("\n=== STEP 7: VALIDATING FIXES ===")
        detector.validate_fixes(history_df, fixed_df)
        
        logging.info("\n=== PROCESS COMPLETED SUCCESSFULLY ===")
        
    except Exception as e:
        logging.error(f"\n=== PROCESS FAILED: {str(e)} ===")
        raise

if __name__ == "__main__":
    main()

2025-03-26 16:41:03,095 - INFO - 
=== STEP 1: LOADING DATA ===
2025-03-26 16:41:03,101 - INFO - Loading data from Catalyst_History_Reconciliation.csv and Catalyst_Reconciliation.csv
2025-03-26 16:41:03,211 - INFO - Data loaded successfully. History: 20 rows, Recon: 10 rows
2025-03-26 16:41:03,213 - INFO - 
=== STEP 2: PREPROCESSING DATA ===
2025-03-26 16:41:03,214 - INFO - Starting data preprocessing
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df[col].fillna(0, inplace=True)
2025-03-26 16:41:03,224 - INFO - Data preprocessing completed
2025-03-26 16:41:03,225 - INFO - 
=== STEP 3: TRAINING MODEL ===
2025-03-26 16:41:03,227 - INFO - Starting model train