In [1]:
import zipfile
import os

# Define the path to the zip file
zip_file_path = "/content/perturbed_unperturbed_split.zip"

# Define the directory to extract files
extract_to = "/content/final_datatset_split"

# Create the extraction directory if it doesn't exist
os.makedirs(extract_to, exist_ok=True)

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to)

print("Unzipped successfully. Files are extracted to:", extract_to)


Unzipped successfully. Files are extracted to: /content/final_datatset_split


In [4]:
import os

# Path to the folder
folder_path = "/content/final_datatset_split"

# Function to count files in a folder
def count_files_in_folder(path):
    file_count = 0
    for dirpath, dirnames, filenames in os.walk(path):
        file_count += len(filenames)
    return file_count

# Get the number of files
total_files = count_files_in_folder(folder_path)

print(f"The total number of files in the folder '{folder_path}' is: {total_files}")

The total number of files in the folder '/content/final_datatset_split' is: 221161


In [3]:
import zipfile

# Path to the ZIP file
zip_file_path = "/content/perturbed_unperturbed_split.zip"

# Function to count files in a ZIP archive
def count_files_in_zip(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        file_count = len(zip_ref.infolist())
    return file_count

# Get the number of files in the ZIP
total_files_in_zip = count_files_in_zip(zip_file_path)

print(f"The total number of files in the ZIP file '{zip_file_path}' is: {total_files_in_zip}")


The total number of files in the ZIP file '/content/perturbed_unperturbed_split.zip' is: 221169


In [5]:
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights
from torchvision.transforms import transforms as T
from skimage.feature import local_binary_pattern
from skimage.color import rgb2gray
from skimage.io import imread
from skimage.transform import resize
from tqdm import tqdm

In [6]:


# New custom noise and filter classes
class DoubleLayerGaussianFilter(nn.Module):
    def __init__(self, channels, kernel_size=3, sigma=1.0):
        super(DoubleLayerGaussianFilter, self).__init__()
        self.channels = channels
        self.kernel_size = kernel_size
        self.sigma = sigma
        self.gaussian_filter = self.create_gaussian_filter()

    def create_gaussian_filter(self):
        k = self.kernel_size // 2
        x = torch.arange(-k, k + 1, dtype=torch.float32)
        y = torch.arange(-k, k + 1, dtype=torch.float32)
        xx, yy = torch.meshgrid(x, y)
        kernel = torch.exp(-(xx**2 + yy**2) / (2 * self.sigma**2))
        kernel /= kernel.sum()  # Normalize kernel
        kernel = kernel.expand(self.channels, 1, -1, -1)
        return nn.Parameter(kernel, requires_grad=False)

    def forward(self, x):
        padding = self.kernel_size // 2
        x = nn.functional.conv2d(x, self.gaussian_filter, padding=padding, groups=self.channels)
        x = nn.functional.conv2d(x, self.gaussian_filter, padding=padding, groups=self.channels)
        return x

class AddGaussianNoise:
    def __init__(self, mean=0., std=0.1):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

class ApplyDoubleGaussianFilter:
    def __init__(self, kernel_size=3, sigma=1.0):
        self.filter = DoubleLayerGaussianFilter(channels=5, kernel_size=kernel_size, sigma=sigma)

    def __call__(self, tensor):
        tensor = tensor.unsqueeze(0)  # Add batch dimension
        filtered = self.filter(tensor)
        return filtered.squeeze(0)  # Remove batch dimension

class AddCustomNoise:
    def __init__(self, noise_level=0.1):
        self.noise_level = noise_level

    def __call__(self, tensor):
        noise = torch.randn_like(tensor) * self.noise_level
        return tensor + noise

# Set device
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Define LBP preprocessing function
def lbp_preprocessing(image):
    image = resize(image, (32, 32), anti_aliasing=True)
    if image.ndim == 2:
        image = np.stack([image] * 3, axis=-1)
    elif image.shape[2] == 4:
        image = image[:, :, :3]
    image_rgb = image.astype("float32")

    image_gray = rgb2gray(image_rgb)
    P, R = 8, 1
    lbp = local_binary_pattern(image_gray, P, R, method="uniform")
    fft = np.log(np.abs(np.fft.fft2(image_gray)) + 1)

    processed_image = np.transpose(
        np.concatenate([
            image_rgb,
            np.expand_dims(lbp, axis=-1),
            np.expand_dims(fft, axis=-1)
        ], axis=-1),
        (2, 0, 1)  # Channel-first format
    )

    return processed_image.astype("float32")

# Updated transform pipeline
transform = T.Compose([
    T.Resize(size=(32, 32)),
    T.RandomApply([
        T.RandomHorizontalFlip(p=0.3),
        T.RandomVerticalFlip(p=0.3),
        T.RandomAffine(degrees=0, scale=(0.8, 1.2)),
    ], p=0.3),
    T.RandomApply([
        ApplyDoubleGaussianFilter(kernel_size=3, sigma=1.0),
        AddCustomNoise(noise_level=0.1)
    ], p=0.3)
])

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


In [7]:
# Custom dataset
class CustomDataset(Dataset):
    def __init__(self, file_paths, labels, transform=None, preprocess_fn=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform
        self.preprocess_fn = preprocess_fn

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img_path = self.file_paths[idx]
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        img = imread(img_path)
        img = self.preprocess_fn(img)
        img = torch.from_numpy(img)

        if self.transform:
            img = self.transform(img)

        return img, label

# Prepare dataset paths and labels
def prepare_dataset(base_dir):
    # Check if the directory exists
    if not os.path.isdir(base_dir):
        raise ValueError(f"Directory {base_dir} does not exist.")

    file_paths = []
    labels = []

    # Assuming all images in 'unperturbed' folder are to be labeled as 0 (for unperturbed)
    label = 0

    # Loop through files in the base directory (unperturbed folder)
    for fname in os.listdir(base_dir):
        file_path = os.path.join(base_dir, fname)
        # Only add image files (based on extension)
        if os.path.isfile(file_path) and fname.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".tiff")):
            file_paths.append(file_path)
            labels.append(label)  # Label as 0 for unperturbed images

    return file_paths, labels


# Paths to new train directory (Update this to your dataset location on Google Colab)
train_dir = "/content/final_datatset_split/train_final/unperturbed"

# Prepare full dataset
file_paths, labels = prepare_dataset(train_dir)

# Perform an 80/20 train-test split
train_size = int(0.8 * len(file_paths))
val_size = len(file_paths) - train_size

train_paths, val_paths = random_split(file_paths, [train_size, val_size])
train_labels, val_labels = random_split(labels, [train_size, val_size])

# Create datasets
batch_size = 64

train_dataset = CustomDataset(train_paths, train_labels, transform=transform, preprocess_fn=lbp_preprocessing)
val_dataset = CustomDataset(val_paths, val_labels, transform=transform, preprocess_fn=lbp_preprocessing)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


In [8]:
# Custom EfficientNetV2S for binary classification
class CustomEffNetV2S(nn.Module):
    def __init__(self, num_classes=1):
        super(CustomEffNetV2S, self).__init__()
        self.effnet = efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.DEFAULT)

        original_first_conv = self.effnet.features[0][0]
        new_first_conv = nn.Conv2d(
            in_channels=5,
            out_channels=original_first_conv.out_channels,
            kernel_size=original_first_conv.kernel_size,
            stride=original_first_conv.stride,
            padding=original_first_conv.padding,
            bias=original_first_conv.bias
        )
        with torch.no_grad():
            new_first_conv.weight[:, :3, :, :] = original_first_conv.weight
            new_first_conv.weight[:, 3:, :, :] = torch.randn_like(new_first_conv.weight[:, 3:, :, :]) * 0.01

        self.effnet.features[0][0] = new_first_conv
        self.effnet.classifier[1] = nn.Linear(self.effnet.classifier[1].in_features, num_classes)

    def forward(self, x):
        return self.effnet(x)

# Initialize the model
model = CustomEffNetV2S().to(DEVICE)

# Loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)


Downloading: "https://download.pytorch.org/models/efficientnet_v2_s-dd5fe13b.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_v2_s-dd5fe13b.pth
100%|██████████| 82.7M/82.7M [00:00<00:00, 204MB/s]


In [None]:
# Early stopping class
class EarlyStopping:
    def __init__(self, patience=3, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_loss = float("inf")
        self.counter = 0
        self.stop = False

    def __call__(self, val_loss):
        if val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.stop = True

# Training and validation loops
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss, correct = 0.0, 0

    for images, labels in tqdm(dataloader, desc="Training"):
        images, labels = images.to(device), labels.to(device).unsqueeze(1)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        predicted = (torch.sigmoid(outputs) > 0.5).float()
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(dataloader.dataset)
    accuracy = correct / len(dataloader.dataset)
    return epoch_loss, accuracy

def validate_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss, correct = 0.0, 0

    with torch.no_grad():
        for images, labels in tqdm(dataloader, desc="Validating"):
            images, labels = images.to(device), labels.to(device).unsqueeze(1)
            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * images.size(0)
            predicted = (torch.sigmoid(outputs) > 0.5).float()
            correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(dataloader.dataset)
    accuracy = correct / len(dataloader.dataset)
    return epoch_loss, accuracy

# Start training with early stopping
early_stopping = EarlyStopping(patience=5, delta=0.01)
num_epochs = 20
best_val_loss = float("inf")
save_path = '/content/final_model.pth'

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")

    train_loss, train_accuracy = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
    val_loss, val_accuracy = validate_epoch(model, val_loader, criterion, DEVICE)

    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

    early_stopping(val_loss)

    if early_stopping.stop:
        print("Early stopping triggered!")
        break

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), save_path)

print(f"Model training complete. Best model saved at {save_path}")


Epoch 1/20


Training:   2%|▏         | 20/1267 [07:24<7:41:41, 22.21s/it]