In [1]:
#!/usr/bin/env python3

import argparse, os, numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.linear_model import LogisticRegression  # for LogReg
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.dummy import DummyClassifier
from TALENT.model.lib.data import dataname_to_numpy
from TALENT.model.lib.models_classical import CLASSICAL_MODELS

def dataname_to_numpy(dataset_name, dataset_path):

    """
    Load the dataset from the numpy files.

    :param dataset_name: str
    :param dataset_path: str
    :return: Tuple[ArrayDict, ArrayDict, ArrayDict, Dict[str, Any]]
    """
    dir_ = Path(os.path.join(DATA_PATH, dataset_path, dataset_name))

    def load(item) -> ArrayDict:
        return {
            x: ty.cast(np.ndarray, np.load(dir_ / f'{item}_{x}.npy', allow_pickle = True))  
            for x in ['train', 'val', 'test']
        }

    return (
        load('N') if dir_.joinpath('N_train.npy').exists() else None,
        load('C') if dir_.joinpath('C_train.npy').exists() else None,
        load('y'),
        load_json(dir_ / 'info.json'),
    )


# --- Outlier detection with logging and sample extraction ---
def detect_and_log_outliers(split_name, data, dataset_name, method, log_dir="outlier_logs", sample_dir="outlier_samples"):
    data = np.asarray(data)  # ensure array
    # Outlier detection
    if method == "IsolationForest":
        clf = IsolationForest(contamination=0.05, random_state=42)
        mask = clf.fit_predict(data) == 1
    elif method == "LocalOutlierFactor":
        clf = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
        mask = clf.fit_predict(data) == 1
    elif method == "OneClassSVM":
        clf = OneClassSVM(nu=0.05, kernel="rbf", gamma="scale")
        mask = clf.fit_predict(data) == 1
    elif method == "ZScore":
        z = np.abs((data - data.mean(axis=0)) / data.std(axis=0))
        mask = (z < 3).all(axis=1)
    elif method == "ModifiedZScore":
        med = np.median(data, axis=0)
        mad = np.median(np.abs(data - med), axis=0)
        mz = 0.6745 * (data - med) / (mad + 1e-9)
        mask = np.abs(mz) < 3.5
    elif method == "IQR":
        Q1 = np.percentile(data, 25, axis=0)
        Q3 = np.percentile(data, 75, axis=0)
        IQR = Q3 - Q1
        mask = ((data >= Q1 - 1.5 * IQR) & (data <= Q3 + 1.5 * IQR)).all(axis=1)
    else:
        raise ValueError(f"Unknown method: {method}")

    num_outliers = len(data) - mask.sum()

    os.makedirs(log_dir, exist_ok=True)
    with open(os.path.join(log_dir, f"{dataset_name}__{split_name}__{method}.txt"), "w") as f:
        f.write(f"Outliers: {num_outliers} / {len(data)}\n")

    # Save samples
    os.makedirs(sample_dir, exist_ok=True)
    outfile = os.path.join(sample_dir, f"{dataset_name}__{split_name}__{method}.txt")
    with open(outfile, "w") as f:
        f.write("Outlier samples:\n")
        for i in np.where(~mask)[0][:10]:
            f.write(f"{data[i].tolist()}\n")
        f.write("\nNon-Outlier samples:\n")
        for i in np.where(mask)[0][:10]:
            f.write(f"{data[i].tolist()}\n")
    return mask



for i in range(2):
    parser = argparse.ArgumentParser()
    parser.add_argument('--dataset', required=True)
    parser.add_argument('--dataset_path', required=True)
    parser.add_argument('--model_type', required=True)
    parser.add_argument('--remove_outliers', action='store_true')
    parser.add_argument('--outlier_method', choices=["IsolationForest","LocalOutlierFactor","OneClassSVM","ZScore","ModifiedZScore","IQR"], default="IsolationForest")
    args = parser.parse_args()
    
    N, C, y, info = dataname_to_numpy(args.dataset, args.dataset_path)


SyntaxError: expected ':' (3069147771.py, line 92)

In [None]:
    def split_dict(d): return ({k: d[k] for k in ("train", "val") if k in d}, { "test": d["test"] }) if d else (None, None)
    N_trval, N_test = split_dict(N)
    C_trval, C_test = split_dict(C)
    y_trval, y_test = split_dict(y)

    if args.remove_outliers:
        def comb(N_arr, C_arr):
            return np.hstack([N_arr, C_arr]) if C_arr is not None else N_arr
        for split, N_arr, C_arr in [("train", N_trval["train"], C_trval["train"] if C_trval else None),
                                   ("val", N_trval["val"], C_trval["val"] if C_trval else None),
                                   ("test", N_test["test"], C_test["test"] if C_test else None)]:
            data = comb(N_arr, C_arr)
            mask = detect_and_log_outliers(split, data, args.dataset, args.outlier_method)
            if split == "train":
                N_trval["train"] = N_arr[mask]
                if C_trval: C_trval["train"] = C_arr[mask]
                y_trval["train"] = y_trval["train"][mask]

    X_train = np.hstack((N_trval["train"],))  # extend as needed
    y_train = y_trval["train"]
    X_test = np.hstack((N_test["test"],))
    y_t = y_test["test"]

    model_cls = CLASSICAL_MODELS.get(args.model_type)
    if model_cls is None:
        raise ValueError(f"Model {args.model_type} not in classical models")
    model = model_cls()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    methods = ["IsolationForest","ZScore","ModifiedZScore","IQR"]
    log_misclassified_outliers(args.dataset, X_test, y_t, y_pred, methods)