In [1]:
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler
from scipy.stats import kurtosis
import pandas as pd
from urllib.request import urlretrieve
import os
import arff

In [2]:
def load_dataset(name):
    if name.lower() == 'satellite':
        url = "https://archive.ics.uci.edu/ml/machine-learning-databases/statlog/satimage/sat.trn"
        if not os.path.exists('sat.trn'):
            urlretrieve(url, 'sat.trn')
        
        data = pd.read_csv('sat.trn', sep=' ', header=None)
        
        X = data.iloc[:, :-1].values
        y_orig = data.iloc[:, -1].values
        
        y = np.where(y_orig == 2, -1, 1)
        
    elif name.lower() == 'annthyroid':
        with open('data/Annthyroid_withoutdupl_norm_07.arff', 'r') as f:
            dataset = arff.load(f)
            
        df = pd.DataFrame(dataset['data'], columns=[attr[0] for attr in dataset['attributes']])
        y = df['outlier'].map({'yes': -1, 'no': 1}).values
        X = df.drop(['id', 'outlier'], axis=1).astype(float).values
        
    elif name.lower() == 'pendigits':
        train_data = pd.read_csv('data/pendigits_dyn_train.csv', header=None)
        test_data = pd.read_csv('data/pendigits_dyn_test.csv', header=None)
        train_labels = pd.read_csv('data/pendigits_label_train.csv', header=None)
        test_labels = pd.read_csv('data/pendigits_label_test.csv', header=None)
        
        X = np.vstack([train_data.values, test_data.values])
        y_orig = np.concatenate([train_labels.values.ravel(), test_labels.values.ravel()])
        
        y = np.where(y_orig == 8, -1, 1)
    
    return X, y

X, y = load_dataset('pendigits')
print("pendigits dataset loaded successfully with shape:", X.shape)

pendigits dataset loaded successfully with shape: (10992, 16)


In [3]:
class CustomIsolationForest:
    def __init__(self, n_estimators=100, max_samples='auto', contamination=0.1, 
                 random_state=None, splitting_criterion='random'):
        self.n_estimators = n_estimators
        self.max_samples = max_samples
        self.contamination = contamination
        self.random_state = random_state
        self.splitting_criterion = splitting_criterion
        
    def _calculate_pooled_gain(self, X, feature):
        sorted_x = np.sort(X[:, feature])
        n = len(sorted_x)
        if n <= 1:
            return 0
        
        splits = (sorted_x[1:] + sorted_x[:-1]) / 2
        
        max_gain = 0
        for split in splits:
            left_mask = X[:, feature] <= split
            right_mask = ~left_mask
            
            if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
                continue
                
            left_std = np.std(X[left_mask], axis=0).mean()
            right_std = np.std(X[right_mask], axis=0).mean()
            total_std = np.std(X, axis=0).mean()
            
            if total_std == 0:
                continue
            
            n_left = np.sum(left_mask)
            n_right = np.sum(right_mask)
            
            gain = (total_std - (n_left * left_std + n_right * right_std) / (n_left + n_right)) / total_std
            max_gain = max(max_gain, gain)
            
        return max_gain

    def fit_predict(self, X):
        if self.splitting_criterion == 'random':
            clf = IsolationForest(
                n_estimators=self.n_estimators,
                max_samples=100,
                contamination=self.contamination,
                random_state=self.random_state
            )
            return clf.fit_predict(X)
        
        elif self.splitting_criterion in ['kurtosis', 'pooled_gain']:
            scores = np.zeros(X.shape[0])
            
            for _ in range(self.n_estimators):
                if isinstance(self.max_samples, str) and self.max_samples == 'auto':
                    sample_size = min(256, X.shape[0])
                else:
                    sample_size = self.max_samples
                    
                indices = np.random.choice(X.shape[0], size=sample_size, replace=False)
                X_sample = X[indices]
                
                feature_scores = []
                for feature in range(X.shape[1]):
                    if self.splitting_criterion == 'kurtosis':
                        try:
                            score = abs(kurtosis(X_sample[:, feature]))
                            if np.isnan(score) or np.isinf(score):
                                score = 0
                        except:
                            score = 0
                    else:  # pooled_gain
                        score = self._calculate_pooled_gain(X_sample, feature)
                    feature_scores.append(score)
                
                if self.splitting_criterion == 'kurtosis':
                    feature_scores = np.array(feature_scores)
                    if np.sum(feature_scores) == 0:
                        probs = np.ones(len(feature_scores)) / len(feature_scores)
                    else:
                        probs = feature_scores + 1e-10
                        probs = probs / probs.sum()
                    selected_feature = np.random.choice(X.shape[1], p=probs)
                else:
                    selected_feature = np.argmax(feature_scores)
                
                feature_data = X[:, [selected_feature]]
                tree = IsolationForest(n_estimators=1, max_samples=sample_size,
                                     random_state=self.random_state)
                scores += tree.fit_predict(feature_data)
            
            scores /= self.n_estimators
            threshold = np.percentile(scores, self.contamination * 100)
            return np.where(scores <= threshold, -1, 1)

In [4]:
import warnings

def run_experiment():
    datasets = ['satellite', 'annthyroid', 'pendigits']
    methods = ['random', 'kurtosis']
    results = []
    
    warnings.filterwarnings('ignore', category=RuntimeWarning, 
                          message='Precision loss occurred in moment calculation.*')
    
    for dataset_name in datasets:
        X, y = load_dataset(dataset_name)
        contamination = np.mean(y == -1)
        
        scaler = StandardScaler()
        X = scaler.fit_transform(X)
        
        for method in methods:
            print(f"Training {method} on {dataset_name} dataset....")
            
            clf = CustomIsolationForest(
                n_estimators=100,
                contamination=contamination,
                random_state=42,
                splitting_criterion=method,
            )
            
            y_pred = clf.fit_predict(X)
            y_pred_scores = -y_pred
            auc = roc_auc_score(y == -1, y_pred_scores)
            
            results.append({
                'Dataset': dataset_name,
                'Method': method,
                'AUC': auc
            })

    warnings.resetwarnings()
    
    results_df = pd.DataFrame(results)
    return results_df

results_df = run_experiment()

pivoted_df = results_df.pivot(index='Dataset', columns='Method', values='AUC')

print("\nResults (AUC scores):")
print(pivoted_df
      .round(3)  
      .to_string())

Training random on satellite dataset....
Training kurtosis on satellite dataset....
Training pooled_gain on satellite dataset....
Training random on annthyroid dataset....
Training kurtosis on annthyroid dataset....
Training pooled_gain on annthyroid dataset....
Training random on pendigits dataset....
Training kurtosis on pendigits dataset....
Training pooled_gain on pendigits dataset....

Results (AUC scores):
Method      kurtosis  random  pooled_gain
Dataset                                  
annthyroid     0.657   0.561        0.573
pendigits      0.516   0.705        0.734
satellite      0.784   0.919        0.801
