In [None]:
from pathlib import Path
import numpy as np
import os, shutil
import matplotlib.pyplot as plt
from PIL import Image
from tqdm.auto import tqdm
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import matplotlib as plt

from torchvision.transforms import transforms
import torch.optim as optim
from torchvision.models import efficientnet_b6, EfficientNet_B6_Weights

In [None]:
from pathlib import Path
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import DataLoader
import numpy as np

transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor()
])

train_image_path = Path('Train_40_50_60/Train')
validation_image_path = Path('Val_456')
test_image_path = Path('Final_test_40_50_60/Test_40_50_60')

train_dataset = ImageFolder(root=train_image_path, transform=transform)
validation_dataset = ImageFolder(root=validation_image_path, transform=transform)
test_dataset = ImageFolder(root=test_image_path, transform=transform)

BS = 4 
train_loader = DataLoader(train_dataset, batch_size=BS, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=BS, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BS, shuffle=False)




In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import efficientnet_b6, EfficientNet_B6_Weights

class efficientnet_feature_extractor(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = efficientnet_b6(weights=EfficientNet_B6_Weights.DEFAULT)
        self.model.eval()
        for param in self.model.parameters():
            param.requires_grad = False

        def get_features(module, input, output):
            self.features.append(output)
        self.model.features[1].register_forward_hook(get_features)
        self.model.features[2][-1].register_forward_hook(get_features)
        self.model.features[3][-1].register_forward_hook(get_features)
        self.model.features[4][-1].register_forward_hook(get_features)
        self.model.features[5][-1].register_forward_hook(get_features)
        self.model.features[6][-1].register_forward_hook(get_features)
        
        self.pool = nn.AvgPool2d(kernel_size=3, stride=3)
        self.input_size = 512
        self.final_size = 170   
    def forward(self, input):
        self.features = [] 
        with torch.no_grad():
            _ = self.model(input)
        self.feature_maps = self.features
        agg_features = []
        for fmap in self.features:
            fmap_up = F.interpolate(fmap, size=(self.input_size, self.input_size),
                                    mode='bilinear', align_corners=False)
            fmap_agg = self.pool(fmap_up)
            agg_features.append(fmap_agg)
        merged = torch.cat(agg_features, dim=1)
        return merged 

if __name__ == '__main__':
    extractor = efficientnet_feature_extractor()
    x = torch.randn(1, 3, 512, 512)
    out = extractor(x)
    print("Output:", out.shape)


In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from PIL import Image
from sklearn.decomposition import PCA
from torchvision.models import efficientnet_b6, EfficientNet_B6_Weights

class efficientnet_feature_extractor(nn.Module):
    def __init__(self):
        super().__init__()

        weights = EfficientNet_B6_Weights.DEFAULT
        self.preprocess = weights.transforms()      
        self.model = efficientnet_b6(weights=weights)
        self.model.eval()
        for param in self.model.parameters():
            param.requires_grad = False

        def get_features(module, input, output):
            self.features.append(output)
        for idx in range(1, 7):
            self.model.features[idx][-1].register_forward_hook(get_features)

        self.input_size = 512
        self.pool = nn.AvgPool2d(kernel_size=3, stride=3)

    def forward(self, input):
        self.features = []
        with torch.no_grad():
            _ = self.model(input)
        agg = []
        for fmap in self.features:
            fmap_up = F.interpolate(fmap,
                                    size=(self.input_size, self.input_size),
                                    mode='bilinear')
            fmap_agg = self.pool(fmap_up)
            agg.append(fmap_agg)
        return torch.cat(agg, dim=1)

def compute_latent_dim(feature_matrix, threshold=0.90):
    pca = PCA(n_components=feature_matrix.shape[1])
    pca.fit(feature_matrix)
    cumvar = np.cumsum(pca.explained_variance_ratio_)
    return int(np.argmax(cumvar >= threshold) + 1)


if __name__ == '__main__':
    feat_ext = efficientnet_feature_extractor()
    feat_ext.eval()

    image_folder = 'Train_40_50_60/Train/Train_40_50_60'
    image_paths = [os.path.join(image_folder, fn) for fn in os.listdir(image_folder)][:30]

    feats_list = []
    with torch.no_grad():
        for p in image_paths:
            img = Image.open(p).convert('RGB')
            inp = feat_ext.preprocess(img).unsqueeze(0)  
            fmap = feat_ext(inp)                        
            arr = (fmap.cpu()
                       .numpy()
                       .transpose(0,2,3,1)
                       .reshape(-1, fmap.size(1)))   
            feats_list.append(arr)

    X = np.vstack(feats_list)
    print("Running PCA on feature matrix of shape:", X.shape)
    latent_dim = compute_latent_dim(X, threshold=0.90)
    print("→ Selected latent_dim =", latent_dim)


In [None]:
import torch
import torch.nn as nn

class DFR_FeatureCAE(nn.Module):
    def __init__(self, in_channels=832, latent_dim=260, is_bn=True):
        super(DFR_FeatureCAE, self).__init__()
        # --- Encoder ---
        mid1 = (in_channels + 2 * latent_dim) // 2
        enc_layers = []
        enc_layers.append(nn.Conv2d(in_channels, mid1, kernel_size=1, stride=1, padding=0))
        if is_bn:
            enc_layers.append(nn.BatchNorm2d(mid1))
        enc_layers.append(nn.ReLU())
        
        enc_layers.append(nn.Conv2d(mid1, 2 * latent_dim, kernel_size=1, stride=1, padding=0))
        if is_bn:
            enc_layers.append(nn.BatchNorm2d(2 * latent_dim))
        enc_layers.append(nn.ReLU())
        
        enc_layers.append(nn.Conv2d(2 * latent_dim, latent_dim, kernel_size=1, stride=1, padding=0))
        self.encoder = nn.Sequential(*enc_layers)
        
        # --- Decoder ---
        dec_layers = []

        dec_layers.append(nn.Conv2d(latent_dim, 2 * latent_dim, kernel_size=1, stride=1, padding=0))
        if is_bn:
            dec_layers.append(nn.BatchNorm2d(2 * latent_dim))
        dec_layers.append(nn.ReLU())
        
        dec_layers.append(nn.Conv2d(2 * latent_dim, mid1, kernel_size=1, stride=1, padding=0))
        if is_bn:
            dec_layers.append(nn.BatchNorm2d(mid1))
        dec_layers.append(nn.ReLU())
        
        dec_layers.append(nn.Conv2d(mid1, in_channels, kernel_size=1, stride=1, padding=0))
        self.decoder = nn.Sequential(*dec_layers)
    
    def forward(self, x):
        latent = self.encoder(x)
        recon = self.decoder(latent)
        return recon

if __name__ == '__main__':
    cae = DFR_FeatureCAE(in_channels=832, latent_dim=260, is_bn=True)
    x = torch.randn(4, 832, 170, 170)
    recon = cae(x)
    loss = nn.MSELoss()(recon, x)
    print("Reconstruction Loss:", loss.item())


In [None]:
model = DFR_FeatureCAE(in_channels=832, latent_dim=260, is_bn=True).cuda()
backbone = efficientnet_feature_extractor()
backbone.cuda()
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
Loss = []
Validation_Loss = []

BS = 4
num_epochs = 150
for epoch in tqdm(range(num_epochs)):
    model.train()
    for data,_ in train_loader:
        with torch.no_grad():
            features = backbone(data.cuda())
        # Forward pass
        output = model(features)
        # Compute the loss
        loss = criterion(output, features)
        # Backpropagation and optimization step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    Loss.append(loss.item())

    # Calculate validation loss
    model.eval()
    with torch.no_grad():
        val_loss_sum = 0.0
        num_batches = 0
        for data, _ in test_loader:
            features = backbone(data.cuda())
            output = model(features)
            val_loss = criterion(output, features)
            val_loss_sum += val_loss.item()
            num_batches += 1
        val_loss_avg = val_loss_sum / num_batches
        Validation_Loss.append(val_loss_avg)
        print('Epoch [{}/{}], Loss: {:.4f}, Validation Loss: {:.4f}'.format(epoch + 1, num_epochs, loss.item(), val_loss_avg))

plt.plot(Loss, label='Training Loss')
plt.plot(Validation_Loss, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

if len(Loss) >= 5:
    safe_train_loss = min(Loss[-5:])
else:
    safe_train_loss = min(Loss)
print(f"Safe Training Loss (min of last 5 epochs): {safe_train_loss:.4f}")

torch.save(model.state_dict(), 'Eff_transform_40_60-170-1-6_avg_832-260.pth')


In [None]:
def decision_function(segm_map):

    mean_top_k_values = []

    for map in segm_map:
        flattened_tensor = map.reshape(-1)
        sorted_tensor, _ = torch.sort(flattened_tensor,descending=True)
        mean_top_k_value = sorted_tensor[:20].mean()

        mean_top_k_values.append(mean_top_k_value)

    return torch.stack(mean_top_k_values)

In [None]:
y_true=[]
y_pred=[]
y_score=[]

model.eval()
backbone.eval()

test_path = Path('Final_test_40_50_60/Test_40_50_60')

for path in test_path.glob('*/*.jpeg'):
    fault_type = path.parts[-2]
    test_image = transform(Image.open(path)).cuda().unsqueeze(0)
    
    with torch.no_grad():
        features = backbone(test_image)
        recon = model(features)
    
    segm_map = ((features - recon)**2).mean(axis=(1))[:,20:-20,20:-20]
    y_score_image = decision_function(segm_map=segm_map)
    best_threshold = 0.1
    y_pred_image = 1*(y_score_image >= best_threshold)
    
    y_true_image = 0 if fault_type == 'OK' else 1
    
    y_true.append(y_true_image)
    y_pred.append(y_pred_image.cpu().numpy())
    y_score.append(y_score_image.cpu().numpy())
    
y_true = np.array(y_true)
y_pred = np.array(y_pred)
y_score = np.array(y_score)
    

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, roc_curve, confusion_matrix,ConfusionMatrixDisplay, f1_score, accuracy_score, precision_score, recall_score
import seaborn as sns


auc_roc_score = roc_auc_score(y_true, y_score)
print("AUC-ROC Score:", auc_roc_score)


fpr, tpr, thresholds = roc_curve(y_true, y_score)
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % auc_roc_score)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()


f1_scores = [f1_score(y_true, y_score >= threshold) for threshold in thresholds]
best_threshold = thresholds[np.argmax(f1_scores)]

print('Threshold: ',best_threshold)
y_pred = (y_score >= best_threshold).astype(int)

cm = confusion_matrix(y_true, (y_score >= best_threshold).astype(int))
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=['OK','NOK'])
disp.plot(cmap='Blues', values_format='d')

plt.show()

acc = accuracy_score(y_true, y_pred)
pre = precision_score(y_true, y_pred, zero_division=0)
rec = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
print(f"Accuracy:  {acc:.4f}")
print(f"Precision: {pre:.4f}")
print(f"Recall:    {rec:.4f}")
print(f"F1-Score:  {f1:.4f}")


In [None]:
import os, glob, random
import numpy as np
import torch
import cv2
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score, accuracy_score, roc_curve
from torchvision import transforms
from torchvision.transforms import InterpolationMode


image_folder = "Final_test_40_50_60/Test_40_50_60/NOK"
mask_folder = "Masks_40_50_60/Masks_40_50_60"
resize_to = (512, 512)
crop_border = 0

all_image_paths = sorted(glob.glob(os.path.join(image_folder, "*.jpeg")))
valid_paths = [p for p in all_image_paths if os.path.exists(os.path.join(mask_folder, os.path.splitext(os.path.basename(p))[0] + ".png"))]
selected_paths = random.sample(valid_paths, 250)

transform_img = transforms.Compose([
    transforms.Resize(resize_to),
    transforms.ToTensor()
])
transform_mask = transforms.Compose([
    transforms.Resize(resize_to, interpolation=InterpolationMode.NEAREST),
    transforms.ToTensor()
])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.eval()
backbone.eval()

all_scores = []
all_gts = []
error_maps = []

for path in selected_paths:
    base = os.path.splitext(os.path.basename(path))[0]
    mask_path = os.path.join(mask_folder, f"{base}.png")

    img = Image.open(path).convert("RGB")
    mask = Image.open(mask_path).convert("L")

    img_tensor = transform_img(img).unsqueeze(0).to(device)
    mask_tensor = transform_mask(mask).squeeze(0).numpy()

    with torch.no_grad():
        features = backbone(img_tensor)
        recon = model(features)

        if isinstance(recon, tuple):
            recon = recon[0]

        err_map = ((features - recon) ** 2).mean(dim=1).squeeze().cpu().numpy()


    err_map_up = cv2.resize(err_map, resize_to, interpolation=cv2.INTER_LINEAR)
    if crop_border > 0:
        err_map_up = err_map_up[crop_border:-crop_border, crop_border:-crop_border]
        mask_tensor = mask_tensor[crop_border:-crop_border, crop_border:-crop_border]

 
    norm_err = (err_map_up - err_map_up.min()) / (err_map_up.max() - err_map_up.min() + 1e-8)
    all_scores.extend(norm_err.flatten())
    all_gts.extend((mask_tensor > 0.5).astype(np.uint8).flatten())
    error_maps.append(norm_err)


all_scores = np.array(all_scores)
all_gts = np.array(all_gts)


thresholds = np.linspace(0, 1, 200)
f1s = [f1_score(all_gts, all_scores >= t) for t in thresholds]
best_threshold = thresholds[np.argmax(f1s)]
print(f"Best threshold (by F1): {best_threshold:.4f}")


fpr, tpr, _ = roc_curve(all_gts, all_scores)
auc = roc_auc_score(all_gts, all_scores)
print(f"AUROC:{auc}")
plt.figure(figsize=(6,6))
plt.plot(fpr, tpr, label=f"AUROC = {auc:.3f}")
plt.plot([0,1],[0,1],'--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Pixel-wise ROC Curve (Features AE)")
plt.legend()
plt.grid()
plt.show()
