In [1]:
# Import necessary libraries
import numpy as np  
import pandas as pd  
from sklearn.preprocessing import StandardScaler  
from sklearn.ensemble import IsolationForest  
from sklearn.neighbors import LocalOutlierFactor  
from sklearn.linear_model import SGDOneClassSVM  
from sklearn.covariance import EllipticEnvelope  
import warnings  
warnings.filterwarnings('ignore')  

# Class definition for anomaly detection using different models
class AnomalyDetection:
    
    # Initialize the class with file paths and contamination level
    def __init__(self, input_file, output_file, contamination=0.164):
        # Store input and output file paths as class attributes
        self.input_file = input_file  
        self.output_file = output_file  
        self.contamination = contamination  # Proportion of outliers expected in the data
        self.df = pd.read_csv(input_file, sep=';')  # Read input CSV file into a pandas DataFrame
    
    # Preprocess the data to exclude certain columns and normalize the rest
    def preprocess_data(self, exclude_columns):
        # Exclude specified columns that are not used in anomaly detection
        columns = [c for c in self.df.columns if c not in exclude_columns]
        X = self.df[columns]  # Subset the DataFrame with only selected columns
        
        # Normalize the data using StandardScaler (mean=0, variance=1)
        scaler = StandardScaler()
        X_normalized = scaler.fit_transform(X)  # Fit the scaler and transform the data
        # Return normalized data as a DataFrame with the original column names
        return pd.DataFrame(X_normalized, columns=X.columns)
    
    # Apply Isolation Forest for anomaly detection
    def apply_isolation_forest(self, X_normalized):
        # Create an Isolation Forest model with specified parameters
        isf = IsolationForest(n_estimators=100, max_samples=len(X_normalized),
                              contamination=self.contamination, random_state=42)
        # Train the model on normalized data
        isf.fit(X_normalized)
        # Return the anomaly scores and predictions (-1 = anomaly, 1 = normal)
        return isf.decision_function(X_normalized), isf.predict(X_normalized)
    
    # Apply Local Outlier Factor for anomaly detection
    def apply_local_outlier_factor(self, X_normalized):
        # Create a Local Outlier Factor model with specified parameters
        lof = LocalOutlierFactor(n_neighbors=20, algorithm='auto', leaf_size=30,
                                 metric='minkowski', p=2, contamination=self.contamination,
                                 novelty=True)  # novelty=True allows predicting on new data
        # Train the model on normalized data
        lof.fit(X_normalized)
        # Return the anomaly scores and predictions (-1 = anomaly, 1 = normal)
        return lof.decision_function(X_normalized), lof.predict(X_normalized)
    
    # Apply Elliptic Envelope for anomaly detection
    def apply_elliptic_envelope(self, X_normalized):
        # Create an Elliptic Envelope model with specified parameters
        cov = EllipticEnvelope(contamination=self.contamination, random_state=42)
        # Train the model on normalized data
        cov.fit(X_normalized)
        # Return the anomaly scores and predictions (-1 = anomaly, 1 = normal)
        return cov.decision_function(X_normalized), cov.predict(X_normalized)
    
    # Apply One-Class SVM (Stochastic Gradient Descent) for anomaly detection
    def apply_sgd_one_class_svm(self, X_normalized):
        # Create a One-Class SVM model with specified parameters
        svm = SGDOneClassSVM(nu=self.contamination, fit_intercept=True, max_iter=1000,
                             tol=0.001, shuffle=True, verbose=0, random_state=42,
                             learning_rate='optimal', eta0=0.0, power_t=0.5,
                             warm_start=False, average=False)
        # Train the model on normalized data
        svm.fit(X_normalized)
        # Return the anomaly scores and predictions (-1 = anomaly, 1 = normal)
        return svm.decision_function(X_normalized), svm.predict(X_normalized)
    
    # Method to apply all anomaly detection models and store results
    def detect_anomalies(self, exclude_columns):
        # Step 1: Preprocess the data by excluding unnecessary columns and normalizing
        X_normalized = self.preprocess_data(exclude_columns)
        
        # Step 2: Initialize a DataFrame to store the results, starting with 'id' column
        df_results = pd.DataFrame(self.df['id'])
        
        # Step 3: Apply Isolation Forest and store the scores and predictions
        scores_isf, y_pred_isf = self.apply_isolation_forest(X_normalized)
        df_results['scores-IsF'] = scores_isf  # Store anomaly scores
        df_results['anomaly-IsF'] = y_pred_isf  # Store anomaly predictions
        
        # Step 4: Apply Local Outlier Factor and store the scores and predictions
        scores_lof, y_pred_lof = self.apply_local_outlier_factor(X_normalized)
        df_results['scores-Lof'] = scores_lof  # Store anomaly scores
        df_results['anomaly-Lof'] = y_pred_lof  # Store anomaly predictions
        
        # Step 5: Apply Elliptic Envelope and store the scores and predictions
        scores_cov, y_pred_cov = self.apply_elliptic_envelope(X_normalized)
        df_results['scores-Cov'] = scores_cov  # Store anomaly scores
        df_results['anomaly-Cov'] = y_pred_cov  # Store anomaly predictions
        
        # Step 6: Apply One-Class SVM and store the scores and predictions
        scores_svm, y_pred_svm = self.apply_sgd_one_class_svm(X_normalized)
        df_results['scores-SVM'] = scores_svm  # Store anomaly scores
        df_results['anomaly-SVM'] = y_pred_svm  # Store anomaly predictions
        
        # Step 7: Save the resulting DataFrame with all scores and predictions to the output file
        df_results.to_csv(self.output_file, sep=';', index=False)
        
        # Optionally, the results can be returned (commented out)
        # return df_results

# Main function to run the anomaly detection for different strategies
def main():
    # Step 1: Specify the input file and output files for human, social, and mixed capital strategies
    input_file = 'data/individuals.csv'
    output_file_humano = 'data/human.csv'
    output_file_social = 'data/social.csv'
    output_file_misto = 'data/mixed.csv'
    
    # Step 2: Run anomaly detection for human capital data
    anomaly_detector_humano = AnomalyDetection(input_file, output_file_humano)
    # Exclude columns that are specific to human capital when detecting anomalies
    anomaly_detector_humano.detect_anomalies(['id', 'Dc', 'Bc', 'Cc', 'CC', 'M'])
    
    # Step 3: Run anomaly detection for social capital data
    anomaly_detector_social = AnomalyDetection(input_file, output_file_social)
    # Exclude columns that are specific to social capital when detecting anomalies
    anomaly_detector_social.detect_anomalies(['id', 'nickname', 'gender', 'skin', 'hair', 'height', 'tattoo', 
                                              'age', 'weapon', 'arrested', 'convicted', 'rape', 'extortion', 
                                              'kidnapping', 'theft', 'homicide', 'arms_trafficking', 'drug_trafficking', 
                                              'faction'])
    
    # Step 4: Run anomaly detection for mixed capital data
    anomaly_detector_misto = AnomalyDetection(input_file, output_file_misto)
    # Exclude only the 'id' column for mixed capital data when detecting anomalies
    anomaly_detector_misto.detect_anomalies(['id'])

# Check if the script is being run directly
if __name__ == "__main__":
    # If so, run the main function
    main()
