## Implementation of this paper:
#### https://cs.nju.edu.cn/zhouzh/zhouzh.files/publication/tkdd11.pdf

In [None]:
import numpy as np
import cv2
import glob
import pickle
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.mixture import GaussianMixture
from sklearn.covariance import EllipticEnvelope

from scipy.stats import entropy

from tensorflow.keras.models import Model
from tensorflow.keras import layers, models, losses
from tensorflow.keras.applications import VGG16

from skimage.io import imread_collection
from skimage.feature import hog
from skimage.color import rgb2gray


In [None]:
def normalize_img(image, bins = (4, 6, 3)):
    """
    Takes an image and resuces its dimensionality.
    This function was written as an alternative to using PCA.
    
    Parameters
    ----------
    image: The merged microcopy image in the folder
    bins: Required for histogram and measuring pixel intesitiy
    
    Returns
    ----------
    A flattened image with one dimension
    """
    
    hist = cv2.calcHist([image], [0, 1, 2], None, bins,
                       [0, 180, 0, 256, 0, 256])
    hist = cv2.normalize(hist, hist).flatten()
    return hist

In [None]:
def dataset_preparation(path):
    """
    Reads merged images, transform them to HSV images and then reduces their dimensionality
    
    Parameters
    ----------
    path: Path to the training images with / in the end 
    
    Returns
    ----------
    HSV images with reduced dimensionality
    """
    
    final_path = path + "*.*" 
    colored_imgs = [cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2HSV) for img in glob.glob(final_path)]
    return np.array(list(map(normalize_img, colored_imgs)))

In [None]:
data = dataset_preparation(path)
model = IsolationForest(n_estimators = 100, contamination = 0.01,
                       random_state = 42)

model.fit(data)
scores = model.predict(data)

In [None]:
model_file = open("OneClassIF.pkl", "wb")
pickle.dump(model, model_file)

In [None]:
result = pd.DataFrame(index = np.arange(1, len(scores), 1), columns = ["Outlier", "Normal"])

for index, value in enumerate(scores, start = 1):
    if value == 1:
        result.loc[index] = [0, 1]
        
    else:
        result.loc[index] = [-1, 0]

In [None]:
result.to_csv(path, name)

## Implementation of One Class SVM
## Source: https://link.springer.com/article/10.1007/s42979-020-00418-2#Sec10

In [None]:
# Option A: traditional features
def extract_traditional_features(img):
    gray = rgb2gray(img)
    h = hog(gray, pixels_per_cell = (16,16), cells_per_block = (2,2), feature_vector = True)
    return h

# Option B: deep features via pre-trained VGG16
base_vgg = VGG16(weights = 'imagenet', include_top = False, input_shape = (img_h, img_w, 3))
feat_model = Model(inputs=base_vgg.input, outputs = base_vgg.get_layer('block3_conv3').output)

def extract_deep_features(img):
    img = img[np.newaxis]  # batch dimension
    feats = feat_model.predict(img)
    return feats.flatten()

In [None]:
# Train on Set 1
feats = [extract_deep_features(img) for img in X_train]
X_feats = np.stack(feats)

models = {
    'ocsvm': OneClassSVM(kernel = 'rbf', gamma = 'auto', nu = 0.1).fit(X_feats),
    'lof': LocalOutlierFactor(n_neighbors = 20, novelty = True).fit(X_feats),
    'iforest': IsolationForest(contamination = 0.05).fit(X_feats),
    'cov': EllipticEnvelope(contamination = 0.05).fit(X_feats),
}

In [None]:
def ensemble_predict(x):
    votes = [int(m.predict([x])[0] == -1) for m in models.values()]
    return sum(votes) >= 2  # at least 2 models flag → anomaly

# Process Set 2
feat_test = np.stack([extract_deep_features(img) for img in X_test])
anomaly_labels = [ensemble_predict(x) for x in feat_test]

In [None]:
for i, x in enumerate(feat_test):
    if anomaly_labels[i]:
        dists = [euclidean(x, db) for db in X_feats]
        closest = X_feats[np.argmin(dists)]
        # Compute feature-wise difference
        diff = np.abs(x - closest)
        # Identify top contributing features
        top_indices = np.argsort(diff)[-5:]
        print(f"Image {i} anomaly – top diff feature dims: {top_indices}")


## Implementation of Gaussian Mixture Modeling (GMM)
## Source: https://www.mdpi.com/1999-4893/16/4/195

In [None]:
# 1️⃣ Extract features via pre-trained VGG16 block3_conv3
base_vgg = VGG16(weights = 'imagenet', include_top=False)
feat_model = Model(inputs = base_vgg.input, outputs = base_vgg.get_layer('block3_conv3').output)

def extract_features(img):
    img = img[np.newaxis]
    return feat_model.predict(img).reshape(-1)

X_train_feats = np.stack([extract_features(img) for img in X_train])
X_test_feats = np.stack([extract_features(img) for img in X_test])

# 2️⃣ Fit initial GMM on normal features
n_components = 5  # adjust based on data
gmm = GaussianMixture(n_components=n_components, covariance_type='full', random_state=0)
gmm.fit(X_train_feats)

# 3️⃣ Compute entropy of cluster assignments to guide noise init
probs = gmm.predict_proba(X_train_feats)  # shape (n_samples, n_components)
sample_entropy = entropy(probs.T)         # per-sample entropy
noise_data = X_train_feats[np.argsort(sample_entropy)[-int(0.05*len(sample_entropy)):]]

# 4️⃣ Add a dedicated noise component initialized from high-entropy data
X_aug = np.vstack([X_train_feats, noise_data])
gmm2 = GaussianMixture(n_components=n_components+1, covariance_type='full', random_state=0)
gmm2.fit(X_aug)

# 5️⃣ Anomaly detection: test samples with low likelihood cluster
log_probs = gmm2.score_samples(X_test_feats)
threshold = np.percentile(gmm2.score_samples(X_train_feats), 5)
anomalies = log_probs < threshold

print(f"Threshold log-likelihood: {threshold:.2f}")
print(f"Detected {anomalies.sum()} anomalies out of {len(X_test_feats)}")

## Implementation of Deep Perceptual Autoencoders (DPA)
## Source: https://arxiv.org/pdf/2006.13265

In [None]:
# Load Set 1 (normal training images)
train_paths = [...]  # List of 384 paths to normal images
X_train = np.array([cv2.imread(img).astype('float32') / 65535.0 for img in train_paths])  # Normalize 16-bit to [0, 1]

# Load Set 2 (experimental images: control + anomalies)
test_paths = [...]  # List of 384 paths to experimental images
X_test = np.array([cv2.imread(img).astype('float32') / 65535.0 for img in test_paths])

In [None]:
# Load Set 1 (normal training images)
train_paths = [...]  # List of 384 paths to normal images
X_train = np.array([cv2.imread(img).astype('float32') / 65535.0 for img in train_paths])  # Normalize 16-bit to [0, 1]

# Load Set 2 (experimental images: control + anomalies)
test_paths = [...]  # List of 384 paths to experimental images
X_test = np.array([cv2.imread(img).astype('float32') / 65535.0 for img in test_paths])

In [None]:
# Encoder
input_img = layers.Input(shape=(X_train.shape[1], X_train.shape[2], 3))  # BGR input
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
encoded = layers.MaxPooling2D((2, 2), padding='same')(x)

# Decoder
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(encoded)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

# Autoencoder
autoencoder = models.Model(input_img, decoded)

In [None]:
# Load pre-trained VGG16 for perceptual loss
vgg = VGG16(weights = 'imagenet', include_top = False, input_shape=(X_train.shape[1], X_train.shape[2], 3))
vgg.trainable = False
perceptual_model = models.Model(inputs = vgg.input, outputs=vgg.get_layer('block3_conv3').output)

def perceptual_loss(y_true, y_pred):
    mse = losses.mse(y_true, y_pred)  # Pixel-wise error
    true_features = perceptual_model(y_true)
    pred_features = perceptual_model(y_pred)
    perc_loss = losses.mse(true_features, pred_features)  # Feature-wise error
    return mse + 0.1 * perc_loss  # Combined loss

autoencoder.compile(optimizer = 'adam', loss=perceptual_loss)

In [None]:
history = autoencoder.fit(
    X_train, X_train,  # Input = Target (self-supervised)
    epochs = 50,
    batch_size = 8,
    shuffle = True
)

In [None]:
# Get reconstructions and errors
reconstructions = autoencoder.predict(X_test)
errors = np.mean(np.abs(X_test - reconstructions), axis=(1, 2, 3))  # Per-image error

# Flag anomalies (top 5% highest errors)
threshold = np.percentile(errors, 95)
anomalies = errors > threshold

# Print results
print(f"Anomaly threshold: {threshold:.4f}")
for i in range(len(X_test)):
    if anomalies[i]:
        print(f"Image {i}: ANOMALY (Score: {errors[i]:.4f})")
    else:
        print(f"Image {i}: Normal (Score: {errors[i]:.4f})")

In [None]:
# Plot top 5 most anomalous images
anomaly_indices = np.argsort(errors)[-5:]
for idx in anomaly_indices:
    plt.figure()
    plt.imshow(X_test[idx])
    plt.title(f"Anomaly Score: {errors[idx]:.4f}")
    plt.show()