In [1]:
import os
import random
from PIL import Image
from pathlib import Path
from timeit import default_timer as Timer
import matplotlib.pyplot as plt
import torch
from torch import nn
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import Subset, DataLoader, ConcatDataset, Dataset
from torchinfo import summary
import wandb
import onnx
from tqdm.auto import tqdm
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from Scripts import engine

print(torch.__version__)
print(torchvision.__version__)

  from .autonotebook import tqdm as notebook_tqdm


2.6.0+cpu
0.21.0+cpu


In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [None]:
data_path = Path("data/")
image_path = data_path/"CIFAKE"

def walk_trough_dir(dir_path):
    for dirpath, dirnames, filenames in os.walk(dir_path):
        print(f"There are {len(dirnames)} directories & {len(filenames)} images in {dirpath}.")

walk_trough_dir(image_path)

In [4]:
from Scripts import download_data

train_dir, test_dir = download_data.download_data("https://www.kaggle.com/datasets/birdy654/cifake-real-and-ai-generated-synthetic-images")

Downloading data...
Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username:Your Kaggle Key:Dataset URL: https://www.kaggle.com/datasets/birdy654/cifake-real-and-ai-generated-synthetic-images


KeyboardInterrupt: 

In [None]:
train_dir = image_path/"train"
test_dir = image_path/"test"
train_dir, test_dir

In [None]:
torch.manual_seed(42)

random_image_list = list(image_path.glob("*/*/*.jpg"))
random_img_path = random.choice(random_image_list)
image_class = random_img_path.parent.stem
random_img = Image.open(random_img_path)

print(image_class)
random_img

In [None]:
transform = transforms.Compose([
    transforms.Resize((256,256)),
    transforms.RandomResizedCrop((224, 224), scale=(0.1,1)),
    transforms.ToTensor()
])

In [None]:
augmentation_transforms = {
    "No_Augmentation" : transforms.Compose([
        transforms.Resize((256,256)),
        transforms.RandomResizedCrop((224,224), scale = (0.1, 1)),
        transforms.ToTensor()
    ]),
    "Gaussian_Blur" : transforms.Compose([
        transforms.RandomApply([
            transforms.GaussianBlur(kernel_size= 3, sigma= (0.1, 0.3))
        ], p= 0.5),
        transforms.RandomHorizontalFlip(),
        transforms.RandomResizedCrop(224),
        transforms.ToTensor(),
    ])
}

In [None]:
transformed_image = transform(random_img)

plt.figure()
plt.imshow(transformed_image.permute(1,2,0))
plt.title(f"Image class : {image_class} & shape : {transformed_image.shape}")
plt.axis(False);

In [None]:
class_names = ["FAKE", "REAL"]
class_dict = {"FAKE": 0,
              "REAL": 1}

In [None]:
test_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor()
])

In [None]:
train_data = datasets.ImageFolder(root = train_dir,
                                  transform= transform,
                                  target_transform= None)
test_data = datasets.ImageFolder(root = test_dir,
                                 transform= test_transform)

In [None]:
len(train_data), len(test_data)

In [None]:
random_idx = random.randint(0, 20000)
plt.imshow(test_data[random_idx][0].permute(1,2,0))
plt.title(f"Image class: {class_names[test_data[random_idx][1]]} & Image shape : {test_data[random_idx][0].permute(1,2,0).shape}")
plt.axis(False);

In [None]:
ss = list(range(0, 1250))
len(ss)

In [None]:
ss1 = list(range(50000, 51250))
len(ss1)

In [None]:
def create_subset(dataset, num_of_datasets, size_of_datasets):
    """
    This will create n no. of subsets of the given data

    Args:
        dataset: The original dataset.
        num_subsets: Number of subsets to create.
        subset_size: Number of samples in each subset.
    
    Returns:
        A list of Subset objects.
    """
    subsets = []
    for i in range(num_of_datasets):
        start_idx = i * int(size_of_datasets / 2) 
        end_idx = start_idx + int(size_of_datasets / 2)

        start_idx_2 = start_idx + int(len(dataset) / 2)
        end_idx_2 = end_idx + int(len(dataset) / 2)

        subset_indices_1 = list(range(start_idx, end_idx))
        subset_indices_2 = list(range(start_idx_2, end_idx_2))
        subset_indices = subset_indices_1 + subset_indices_2
        random.shuffle(subset_indices)
        
        subsets.append(Subset(dataset, subset_indices))

    return subsets

In [None]:
train_subset = create_subset(train_data, 40, 2500)
test_subset = create_subset(test_data, 40, 500)

img, label = next(iter(test_subset[0]))
img

In [None]:
len(train_subset), len(train_subset[0]),len(train_subset[0][0]), len(train_subset[0][0][0]), len(train_subset[0][0][0][0])

In [None]:
BATCH_SIZE = 50

train_dataloader_subsets = [DataLoader(subset, BATCH_SIZE, shuffle= True) for subset in train_subset]
test_dataloader_subsets = [DataLoader(subset, BATCH_SIZE) for subset in test_subset]

train_dataloader_subsets[0]

In [None]:
for batch, (img, label) in enumerate(test_dataloader_subsets[0]):
    print(batch)

In [None]:
next(iter(test_dataloader_subsets))

In [None]:
fig = plt.figure(figsize=(8,8))

fig.suptitle("Batch Images", fontsize=32)
rows, columns = 5, 10
for batch_idx, (img, label) in enumerate(test_dataloader_subsets[0]):
    if (batch_idx < 1):
        for i in range(1, rows * columns + 1):
            fig.add_subplot(rows, columns, i)
            plt.imshow(img[i-1].permute(1,2,0))
            plt.title(class_names[int(label[i-1])], fontsize=12)
            plt.axis(False);


In [None]:
model_name = "efficientnet_b0"
model_weights_name  = "EfficientNet_B0_Weights"

In [None]:
model_class = getattr(torchvision.models, model_name)

model_weights = getattr(torchvision.models, model_weights_name).DEFAULT

model_weights

In [None]:
efficientb0_model = torchvision.models.efficientnet_b0(weights= model_weights).to(device)
efficientb0_model

In [None]:
resnet_model = getattr(torchvision.models, model_name)
model = resnet_model(weights = model_weights).to(device)
model

In [None]:
layers = [name for name, param in model.named_parameters() if param.requires_grad and "weight" in name]
layers

In [None]:
sequence = getattr(model, layers[-1][: -9])
sequence.out_features = 2
sequence

In [None]:
model.features

In [None]:
summary(model= efficientb0_model,
        input_size= (25,3,224,224),
        col_names= ["input_size", "output_size","num_params", "trainable"])

In [None]:
efficientb0_model.classifier

In [None]:
for parameter in efficientb0_model.features[:-3].parameters():
    parameter.requires_grad = False

efficientb0_model.classifier = nn.Sequential(
    nn.Dropout(p = 0.2, inplace= True),
    nn.Linear(in_features=1280, out_features= 2, bias= True)
)

summary(model= efficientb0_model,
        input_size= (25,3,224,224),
        col_names= ["input_size", "output_size","num_params", "trainable"])

In [None]:
from itertools import chain

# Combine iterators from both DataLoaders
combined_iterator = chain(iter(test_dataloader_subsets[0]), iter(test_dataloader_subsets[1]), iter(test_dataloader_subsets[2]), iter(test_dataloader_subsets[3]))

for batch, (img, label) in enumerate(combined_iterator):
    print(batch)

In [None]:
for i in range(11):
    iterator = chain(iter(iterator),iter(test_dataloader_subsets[i]))
for batch, (img, label) in enumerate(iterator):
    print(batch)

In [None]:
len(train_dataloader_subsets)

In [None]:
class empty_dataset(Dataset):

    def __init__(self):
        pass

    def __len__(self):
        return 0
    
    def __getitem__(self, idx):
        return IndexError("This dataset is empty!")

In [13]:
import opendatasets as od 
 
dataset = "https://www.kaggle.com/datasets/birdy654/cifake-real-and-ai-generated-synthetic-images"
od.download(dataset)

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username:Your Kaggle Key:Dataset URL: https://www.kaggle.com/datasets/birdy654/cifake-real-and-ai-generated-synthetic-images


In [16]:
from urllib.parse import urlparse

parsed_url = urlparse("https://www.kaggle.com/datasets/birdy654/cifake-real-and-ai-generated-synthetic-images")
path = parsed_url.path
data_dir = path.split('/')[-1]
data_dir

'cifake-real-and-ai-generated-synthetic-images'

In [11]:
import requests
import zipfile

data_path = Path("data/")
image_path = data_path / "CIFAKE"

if image_path.is_dir():
    print(f"{image_path} directory exists")
else:
    print(f"Creating {image_path} directory...")
    image_path.mkdir(parents= True, exist_ok= True)

with open(data_path / "cifake-real-and-ai-generated-synthetic-images.zip", "wb") as f:
    request = requests.get("https://www.kaggle.com/datasets/birdy654/cifake-real-and-ai-generated-synthetic-images/cifake-real-and-ai-generated-synthetic-images.zip")
    print("Downloading data...")
    f.write(request.content)

with zipfile.ZipFile(data_path / "cifake-real-and-ai-generated-synthetic-images.zip", "r") as zip_ref:
    print("Unzipping  data...") 
    zip_ref.extractall(image_path)

os.remove(data_path / "cifake-real-and-ai-generated-synthetic-images.zip")

data\CIFAKE directory exists
Downloading data...


BadZipFile: File is not a zip file

In [15]:
from Scripts import model

effnet_model = model.model_builder(model_weights="EfficientNet_B0_Weights",
                                     model_name= "efficientnet_b0",
                                     unfreeze_layers=3,
                                     num_classes= 2,
                                     layer_name= "classifier",
                                     device= device)

summary(model= effnet_model,
        input_size= (25,3,224,224),
        col_names= ["input_size", "output_size","num_params", "trainable"])

Layer (type:depth-idx)                                  Input Shape               Output Shape              Param #                   Trainable
EfficientNet                                            [25, 3, 224, 224]         [25, 2]                   --                        Partial
├─Sequential: 1-1                                       [25, 3, 224, 224]         [25, 1280, 7, 7]          --                        Partial
│    └─Conv2dNormActivation: 2-1                        [25, 3, 224, 224]         [25, 32, 112, 112]        --                        False
│    │    └─Conv2d: 3-1                                 [25, 3, 224, 224]         [25, 32, 112, 112]        (864)                     False
│    │    └─BatchNorm2d: 3-2                            [25, 32, 112, 112]        [25, 32, 112, 112]        (64)                      False
│    │    └─SiLU: 3-3                                   [25, 32, 112, 112]        [25, 32, 112, 112]        --                        --
│    └─Sequenti

In [16]:
leaf_modules = [
        module for module in effnet_model.modules() if not list(module.children()) and list(module.parameters())  
    ]
leaf_modules

[Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False),
 BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
 Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False),
 BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
 Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1)),
 Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1)),
 Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False),
 BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
 Conv2d(16, 96, kernel_size=(1, 1), stride=(1, 1), bias=False),
 BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
 Conv2d(96, 96, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=96, bias=False),
 BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
 Conv2d(96, 4, kernel_size=(1, 1), stride=(1, 1)),
 Conv2d(4, 96, kernel_size=(1, 1), stride=(1, 1)

In [23]:
data = []
data1 = []
for layer in leaf_modules[::-1][1:3]:
    for param in layer.parameters():
        data.append(param)

for param in leaf_modules[::-1][0].parameters():
    data1.append(param)

data1

[Parameter containing:
 tensor([[ 0.0277,  0.0010,  0.0216,  ...,  0.0068,  0.0229, -0.0169],
         [ 0.0093,  0.0132, -0.0184,  ..., -0.0279,  0.0126,  0.0270]],
        requires_grad=True),
 Parameter containing:
 tensor([5.9206e-05, 1.6641e-02], requires_grad=True)]

In [26]:
optimizer = torch.optim.Adam([
    {"params": data1, "lr": 0.01},
    {"params": data, "lr": 0.001}
], weight_decay= 1e-4)
optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.01
    maximize: False
    weight_decay: 0.0001

Parameter Group 1
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0.0001
)

In [None]:
def amount_of_data(dataloader_subsets,
                   multiple,
                   batch_size):
    total_subsets = len(dataloader_subsets)

    concatdataset = empty_dataset()
    
    if multiple > total_subsets:
        raise ValueError(
            "multiple greater than the number of subsets"
        )
    else:
        for i in range(multiple):
            dataloader_dataset = dataloader_subsets[i].dataset
            concatdataset = ConcatDataset([concatdataset, dataloader_dataset])
        dataloaders = DataLoader(concatdataset, batch_size= batch_size, shuffle= True)
        
    return dataloaders

In [None]:
len(train_dataloader_subsets[0])

In [None]:
subset_2_dataloader = amount_of_data(test_dataloader_subsets, 2, 50)
len(subset_2_dataloader)

In [25]:
optimizer = torch.optim.Adam([
        {"params": effnet_model.classifier.parameters(), "lr" : 0.01},
        {"params": effnet_model.features[-2:].parameters(), "lr" : 0.001}
    ], weight_decay= 1e-4)
optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.01
    maximize: False
    weight_decay: 0.0001

Parameter Group 1
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0.0001
)

In [None]:
with wandb.init(project="AI_Image_Classification", name="50_0.01-0.001_10_subs_2_last_3_layer_unfreeze", settings=wandb.Settings(symlink=False)) as run:
    learning_rate_classifier = 0.01
    learning_rate_unfrozenlayer = 0.001
    batch_size = 50
    epochs = 10
    subsetdata_amount = 2

    run.config.learning_rate = learning_rate_classifier
    run.config.learning_rate_unfrozenlayer = learning_rate_unfrozenlayer
    run.config.batch_size = batch_size
    run.config.epochs = epochs
    run.config.subsetdata_amount = subsetdata_amount
    run.config.ARCHITECHTURE = "EfficientNet_B0_unfreezed_last_3_layer"

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam([
        {"params": efficientb0_model.classifier.parameters(), "lr" : learning_rate_classifier},
        {"params": efficientb0_model.features[-2:].parameters(), "lr" : learning_rate_unfrozenlayer}
    ], weight_decay= 1e-4)

    results = { 
            "train loss": [],
            "train acc": [],
            "test loss": [],
            "test acc": []
        }
    
    train_subsetdata_amount_dataloader = amount_of_data(train_dataloader_subsets, subsetdata_amount, batch_size)
    test_subsetdata_amount_dataloader = amount_of_data(test_dataloader_subsets, subsetdata_amount, batch_size)

    for epoch in tqdm(range(epochs)):

        # Training Loop
        train_loss, train_acc, y_train_actual, y_train_predicted = engine.train_loop(model= efficientb0_model,
                                                                                     train_dataloader= train_subsetdata_amount_dataloader,
                                                                                     loss_fn= loss_fn,
                                                                                     optimizer= optimizer,
                                                                                     device= device)
        

        # Testing Loop
        test_loss, test_acc, y_test_actual, y_test_predicted = engine.test_loop(model= efficientb0_model,
                                                                                test_dataloader= test_subsetdata_amount_dataloader,
                                                                                loss_fn= loss_fn,
                                                                                device= device)

        results["train loss"].append(train_loss.item() if isinstance(train_loss, torch.Tensor) else train_loss)
        results["train acc"].append(train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc)
        results["test loss"].append(test_loss.item() if isinstance(test_loss, torch.Tensor) else test_loss)
        results["test acc"].append(test_acc.item() if isinstance(test_acc, torch.Tensor) else test_acc)

        run.log({
            "epoch" : epoch + 1,
            "train_loss" : train_loss,
            "train_accuracy" : train_acc,
            "test_loss" : test_loss,
            "test_accuracy" : test_acc,
        })

        print(f"Epoch {epoch + 1}/{epochs}: train loss: {train_loss:.4f} |\ntrain accuracy: {train_acc:.4f} |\ntest loss: {test_loss:.4f} |\ntest accuracy: {test_acc:.4f}")

        torch.onnx.export(
            efficientb0_model,
            torch.randn(1,3,224,224),
            "efficientnetb0.onnx",
            input_names = ["input"],
            output_names = ["output"],
        )
    
    run.log_artifact("efficientnetb0.onnx", name= "50_0.01-0.001_10_subs_2_last_3_layer_unfreeze", type= "model")

    print("Model training completed.")

In [None]:
classification_report(y_train_actual, y_train_predicted)

In [None]:
classification_report(y_test_actual, y_test_predicted)

In [None]:
cr = classification_report(y_test_actual, y_test_predicted, output_dict= True)
cr["0"]["recall"]

In [None]:
cm = confusion_matrix(y_train_actual, y_train_predicted)
cm

In [None]:
cm_test = confusion_matrix(y_test_actual, y_test_predicted)
cm_test

In [None]:
disp = ConfusionMatrixDisplay(cm, display_labels=class_names)
disp.plot();

In [None]:
disp = ConfusionMatrixDisplay(cm_test, display_labels= class_names)
disp.plot();

In [None]:
from Scripts import datapreprocess

preprocess = datapreprocess.DataPreprocessor(train_dir, test_dir)
preprocess

In [None]:
train_loader, test_loader = preprocess.Build_Dataloaders(train_augmentation= "No_Augmentation",
                                                         test_augmentation= "No_Augmentation",
                                                         num_subsets= 40,
                                                         batch_size= 50,
                                                         percentage_data= 20)

len(train_loader), len(test_loader)