In [1]:
2+3

5

# Cardio Dataset

In [73]:
import pandas as pd

# Load the dataset
file_path = "CTG.xls"
xls = pd.ExcelFile(file_path)
df = pd.read_excel(xls, sheet_name="Raw Data")
df = df.dropna(how="all")
df = df.drop(columns=["FileName", "Date", "SegFile"], errors="ignore")
df = df.dropna().reset_index(drop=True)

df = df[df['NSP'] != 2]  # removing the Suspect class from the cardio dataset
df['NSP'] = df['NSP'].astype(int)

# Print cleaned dataset
print("First few rows of the cleaned dataset:")
print(df.head())


# Convert all columns to numeric (except NSP, which is our target)
X = df.iloc[:, :-1].values  # Features
y = df.iloc[:, -1].values.astype(float)  # Target

# Test/train split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.fit_transform(X_test)

s = int(len(X_train) / 10)
X_sample, _, y_sample, _ = train_test_split(X_train, y_train, train_size=s, stratify=y_train, random_state=42)

# Print dataset shape
print(f"Final dataset shape: {X.shape}, {y.shape}")


First few rows of the cleaned dataset:
       b       e    LBE     LB   AC   FM    UC  ASTV  MSTV  ALTV  ...    C  \
1    5.0   632.0  132.0  132.0  4.0  0.0   4.0  17.0   2.1   0.0  ...  0.0   
2  177.0   779.0  133.0  133.0  2.0  0.0   5.0  16.0   2.1   0.0  ...  0.0   
3  411.0  1192.0  134.0  134.0  2.0  0.0   6.0  16.0   2.4   0.0  ...  0.0   
4  533.0  1147.0  132.0  132.0  4.0  0.0   5.0  16.0   2.4   0.0  ...  0.0   
5    0.0   953.0  134.0  134.0  1.0  0.0  10.0  26.0   5.9   0.0  ...  0.0   

     D    E   AD   DE   LD   FS  SUSP  CLASS  NSP  
1  0.0  0.0  1.0  0.0  0.0  0.0   0.0    6.0    1  
2  0.0  0.0  1.0  0.0  0.0  0.0   0.0    6.0    1  
3  0.0  0.0  1.0  0.0  0.0  0.0   0.0    6.0    1  
4  0.0  0.0  0.0  0.0  0.0  0.0   0.0    2.0    1  
5  0.0  0.0  0.0  0.0  1.0  0.0   0.0    8.0    3  

[5 rows x 37 columns]
Final dataset shape: (1831, 36), (1831,)


# RandNet

In [74]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import numpy as np
from statistics import median
from copy import deepcopy
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

In [75]:
# class Layer(nn.Module):
#     def __init__(self, fan_in, fan_out, connection_prob=1.0, activation=None):
#         """
#         input : number of input nodes
#         output: number of output nodes
#         """
#         super(Layer, self).__init__()  # call for constructor
#         self.fan_in = fan_in
#         self.fan_out = fan_out
#         self.activation = activation

#         #  creating and intializing weights and biases
#         self.weight = nn.Parameter(torch.Tensor(fan_out, fan_in))
#         self.bias = nn.Parameter(torch.Tensor(fan_out))

#         nn.init.kaiming_uniform_(self.weight)
#         nn.init.zeros_(self.bias)   # Setting bias to zero.

#         # Generate a fixed binary mask for the weight matrix based on connection_prob.
#         self.register_buffer('mask', (torch.rand(fan_out, fan_in) < connection_prob).float())

#     def forward(self, input):
#         masked_weight = self.weight * self.mask
#         out = F.linear(input, masked_weight, self.bias)
#         if self.activation:
#                 out = self.activation(out)
#         return out


class Layer(nn.Module):
    def __init__(self, in_features, out_features, connection_prob):
        super(Layer, self).__init__()
        self.connection_mask = (torch.rand(out_features, in_features) < connection_prob).float()
        self.weights = nn.Parameter(torch.randn(out_features, in_features) * self.connection_mask)
        self.bias = nn.Parameter(torch.zeros(out_features))

    def forward(self, x):
        return F.linear(x, self.weights * self.connection_mask, self.bias)

In [76]:
class RandNet(nn.Module):
    def __init__(self, input_dim, num_layers, structure_param=0.5, connection_prob=0.8):
        super(RandNet, self).__init__()

        if num_layers % 2 == 0:
            raise ValueError("num_layers should be odd to have a single bottleneck layer.")

        # Build encoder
        num_neurons = [input_dim]
        for i in range(1, (num_layers // 2) + 1):
            next_neurons = max(3, int(num_neurons[-1] * structure_param))
            num_neurons.append(next_neurons)

        encoder_layers = []
        for i in range(len(num_neurons) - 1):
            encoder_layers.append(Layer(num_neurons[i], num_neurons[i+1], connection_prob))
            encoder_layers.append(nn.Sigmoid() if i == 0 else nn.ReLU())
        self.encoder = nn.Sequential(*encoder_layers)

        # Build decoder
        decoder_neurons = list(reversed(num_neurons))
        decoder_layers = []
        for i in range(len(decoder_neurons) - 1):
            decoder_layers.append(Layer(decoder_neurons[i], decoder_neurons[i+1], connection_prob))
            decoder_layers.append(nn.ReLU() if i < len(decoder_neurons)-2 else nn.Sigmoid())
        self.decoder = nn.Sequential(*decoder_layers)

    def forward(self, x):
        return self.decoder(self.encoder(x))

def train_autoencoder(model, input, epochs, adaptive_factor=1.01, learning_rate=0.01, device='cpu'):
    model.to(device)
    model.train()

    optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, eps=1e-8, alpha=0.9)
    n_samples = input.shape[0]

    for i in range(1, epochs + 1):
        sample_size = int(min(n_samples, max(10, adaptive_factor ** i)))
        indices = np.random.choice(n_samples, sample_size, replace=False)
        batch = input[indices].to(device)

        optimizer.zero_grad()
        output = model(batch)
        loss = F.mse_loss(output, batch)
        loss.backward()
        optimizer.step()

        if i % 200 == 0:
            print(f"Iteration {i}/{epochs}, Loss: {loss.item():.6f}")
    return model

def compute_reconstruction_loss(model, input, device='cpu'):
    model.to(device)
    model.eval()
    with torch.no_grad():
        input = input.to(device)
        output = model(input)
        loss = torch.sum((input - output) ** 2, dim=1)
    return loss.cpu().numpy()

def calculate_outlier_score(ensemble, input, device='cpu'):
    all_scores = []
    for model in ensemble:
        losses = compute_reconstruction_loss(model, input, device)
        if losses.std() > 0:
            norm_loss = (losses - losses.mean()) / losses.std()
        else:
            norm_loss = losses
        all_scores.append(norm_loss)
    all_scores = np.array(all_scores)
    final_scores = np.median(all_scores, axis=0)
    return final_scores


def train_ensemble(input_data, num_models=100, epochs=300, adaptive_factor=1.01,
                   structure_param=0.5, num_layers=7, connection_prob=0.8,
                   learning_rate=0.01, device='cpu'):
    n_samples = input_data.shape[0]
    ensemble = []
    subsample_size = max(10, n_samples // 10)

    for i in range(num_models):
        indices = np.random.choice(n_samples, subsample_size, replace=False)
        data_subset = input_data[indices]

        model = RandNet(input_dim=input_data.shape[1],
                        num_layers=num_layers,
                        structure_param=structure_param,
                        connection_prob=connection_prob)

        print(f"Training model {i + 1}/{num_models}...")
        trained_model = train_autoencoder(model, data_subset, epochs,
                                          adaptive_factor=adaptive_factor,
                                          learning_rate=learning_rate,
                                          device=device)
        ensemble.append(deepcopy(trained_model))
    return ensemble


In [77]:
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)

y_test = (y_test != 1).astype(int)  # Treat class "1" as inlier, others as outlier (e.g. '2')

In [78]:
ensemble = train_ensemble(
    input_data=X_train_tensor,
    num_models=10,               # Start with 10 for speed; increase later if needed
    epochs=1000,
    adaptive_factor=1.01,
    structure_param=0.5,
    num_layers=7,
    connection_prob=0.8,
    learning_rate=0.01,
    device='cpu'  # or 'cuda' if using GPU
)


Training model 1/10...
Iteration 200/1000, Loss: 0.039993
Iteration 400/1000, Loss: 0.043923
Iteration 600/1000, Loss: 0.044185
Iteration 800/1000, Loss: 0.044166
Iteration 1000/1000, Loss: 0.044164
Training model 2/10...
Iteration 200/1000, Loss: 0.046059
Iteration 400/1000, Loss: 0.043322
Iteration 600/1000, Loss: 0.045531
Iteration 800/1000, Loss: 0.045541
Iteration 1000/1000, Loss: 0.045535
Training model 3/10...
Iteration 200/1000, Loss: 0.047845
Iteration 400/1000, Loss: 0.045260
Iteration 600/1000, Loss: 0.045271
Iteration 800/1000, Loss: 0.045275
Iteration 1000/1000, Loss: 0.045274
Training model 4/10...
Iteration 200/1000, Loss: 0.037986
Iteration 400/1000, Loss: 0.025377
Iteration 600/1000, Loss: 0.020514
Iteration 800/1000, Loss: 0.017557
Iteration 1000/1000, Loss: 0.018242
Training model 5/10...
Iteration 200/1000, Loss: 0.031451
Iteration 400/1000, Loss: 0.021235
Iteration 600/1000, Loss: 0.017533
Iteration 800/1000, Loss: 0.016220
Iteration 1000/1000, Loss: 0.015119
Train

In [79]:
from sklearn.metrics import roc_auc_score
outlier_scores = calculate_outlier_score(ensemble, X_test_tensor, device='cpu')
# Assuming y_test (binary labels: 0 for normal, 1 for anomaly) and outlier_scores are computed
auc_score = roc_auc_score(y_test, outlier_scores)
print(f"Final AUC Score on test data by RandNet: {auc_score:.4f}")

Final AUC Score on test data by RandNet: 0.9817


# LOF

In [80]:
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import roc_auc_score


In [81]:
lof = LocalOutlierFactor(n_neighbors=20, novelty=False)
y_pred_lof = lof.fit_predict(X_test)
lof_scores = -lof.negative_outlier_factor_

In [82]:
auc = roc_auc_score(y_test, lof_scores)
print(f"AUC Score (LOF): {auc:.4f}")

AUC Score (LOF): 0.6355
