In [1]:
import torch
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
import time
import copy
import os
import numpy as np
from PIL import Image
import cv2
import dlib
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Dlib's face detector
detector = dlib.get_frontal_face_detector()

# Define a custom dataset class to include face detection and cropping
class FaceCroppingDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(FaceCroppingDataset, self).__init__(root, transform)
        self.transform = transform

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        if image is None:
            return None, None

        # Convert BGR (OpenCV) to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Detect faces
        faces = detector(image, 1)
        if len(faces) > 0:
            # Crop the first detected face
            x, y, w, h = (faces[0].left(), faces[0].top(), faces[0].width(), faces[0].height())
            image = image[y:y+h, x:x+w]
        else:
            return None, None

        # Convert image to PIL format and apply transformations
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)

        return image, target

# Data Preparation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

# Initialize datasets and dataloaders
train_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Train', transform=data_transforms['train'])
val_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Validation', transform=data_transforms['val'])
test_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Test', transform=data_transforms['test'])

def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Load pre-trained models
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Unfreeze the top layers for fine-tuning
for model in [resnet, efficientnet, mobilenet]:
    for name, param in model.named_parameters():
        if "layer4" in name or "blocks" in name or "features" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

# Define Ensemble Model
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.0001)

# Save checkpoint
def save_checkpoint(epoch, model, optimizer, best_acc, path="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_acc': best_acc
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch + 1}.")

# Train function with early stopping
def train_model_with_early_stopping(
    model, criterion, optimizer, num_epochs=20, patience=3, checkpoint_path="best_model.pth", resume=False
):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    start_epoch = 0
    no_improvement_epochs = 0

    if resume and os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        best_loss = checkpoint['best_acc']
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resumed from epoch {start_epoch}.")
    else:
        print("Starting training from scratch...")

    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)
        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validating"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)
        print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            save_checkpoint(epoch, model, optimizer, val_acc, checkpoint_path)
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    model.load_state_dict(best_model_wts)
    return model

# Train the model
trained_model = train_model_with_early_stopping(
    ensemble_model, criterion, optimizer, num_epochs=20, patience=3, checkpoint_path="new_model.pth", resume=True
)

# Testing phase
def evaluate_model(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for inputs, labels in tqdm(dataloader, desc="Evaluating"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    loss = running_loss / len(dataloader.dataset)
    acc = running_corrects.double() / len(dataloader.dataset)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    roc_auc = roc_auc_score(all_labels, all_preds)

    print(f'Test Loss: {loss:.4f} Acc: {acc:.4f}')
    print(f'Precision: {precision:.4f} Recall: {recall:.4f} F1 Score: {f1:.4f}')
    print(f'ROC-AUC Score: {roc_auc:.4f}')

    return loss, acc, precision, recall, f1, roc_auc

test_loss, test_acc, test_precision, test_recall, test_f1, test_roc_auc = evaluate_model(trained_model, test_loader, criterion)


Loading checkpoint...
Resumed from epoch 4.
Epoch 5/20
----------


Training: 100%|██████████| 401/401 [44:39<00:00,  6.68s/it] 


Train Loss: 0.0876 Acc: 0.9267


Validating: 100%|██████████| 63/63 [02:12<00:00,  2.11s/it]


Validation Loss: 0.1000 Acc: 0.9300
Checkpoint saved at epoch 5.
Epoch 6/20
----------


Training: 100%|██████████| 401/401 [40:09<00:00,  6.01s/it]


Train Loss: 0.0696 Acc: 0.9327


Validating: 100%|██████████| 63/63 [02:11<00:00,  2.08s/it]


Validation Loss: 0.1150 Acc: 0.9220
Epoch 7/20
----------


Training: 100%|██████████| 401/401 [40:13<00:00,  6.02s/it]


Train Loss: 0.0644 Acc: 0.9354


Validating: 100%|██████████| 63/63 [02:11<00:00,  2.09s/it]


Validation Loss: 0.1674 Acc: 0.9075
Epoch 8/20
----------


Training:   1%|          | 5/401 [00:26<35:37,  5.40s/it]


KeyboardInterrupt: 

In [1]:
import torch
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
import time
import copy
import os
import numpy as np
from PIL import Image
import cv2
import dlib
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Dlib's face detector
detector = dlib.get_frontal_face_detector()

# Define a custom dataset class to include face detection and cropping
class FaceCroppingDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(FaceCroppingDataset, self).__init__(root, transform)
        self.transform = transform

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        if image is None:
            return None, None

        # Convert BGR (OpenCV) to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Detect faces
        faces = detector(image, 1)
        if len(faces) > 0:
            # Crop the first detected face
            x, y, w, h = (faces[0].left(), faces[0].top(), faces[0].width(), faces[0].height())
            image = image[y:y+h, x:x+w]
        else:
            return None, None

        # Convert image to PIL format and apply transformations
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)

        return image, target

# Data Preparation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

# Initialize datasets and dataloaders
train_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Train', transform=data_transforms['train'])
val_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Validation', transform=data_transforms['val'])
test_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Test', transform=data_transforms['test'])

def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Load pre-trained models
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Unfreeze the top layers for fine-tuning
for model in [resnet, efficientnet, mobilenet]:
    for name, param in model.named_parameters():
        if "layer4" in name or "blocks" in name or "features" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

# Define Ensemble Model
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.0001)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

# Save checkpoint
def save_checkpoint(epoch, model, optimizer, best_acc, path="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_acc': best_acc
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch + 1}.")

# Train function with early stopping
def train_model_with_early_stopping(
    model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="best_model.pth", resume=False
):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    start_epoch = 0
    no_improvement_epochs = 0

    if resume and os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        best_loss = checkpoint['best_acc']
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resumed from epoch {start_epoch}.")
    else:
        print("Starting training from scratch...")

    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)
        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validating"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)
        print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Step the scheduler with the validation loss
        scheduler.step(val_loss)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            save_checkpoint(epoch, model, optimizer, val_acc, checkpoint_path)
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    model.load_state_dict(best_model_wts)
    return model

# Train the model with early stopping and learning rate scheduler
ensemble_model = train_model_with_early_stopping(
    ensemble_model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="new_model.pth", resume=True
)

# Evaluate the model on the test set
ensemble_model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Testing"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = ensemble_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
roc_auc = roc_auc_score(all_labels, all_preds)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")


Loading checkpoint...
Resumed from epoch 5.
Epoch 6/20
----------


Training: 100%|██████████| 401/401 [41:52<00:00,  6.26s/it]


Train Loss: 0.0696 Acc: 0.9340


Validating: 100%|██████████| 63/63 [02:31<00:00,  2.40s/it]


Validation Loss: 0.1149 Acc: 0.9270
Checkpoint saved at epoch 6.
Epoch 7/20
----------


Training: 100%|██████████| 401/401 [41:36<00:00,  6.23s/it]


Train Loss: 0.0709 Acc: 0.9322


Validating: 100%|██████████| 63/63 [02:09<00:00,  2.06s/it]


Validation Loss: 0.1146 Acc: 0.9185
Checkpoint saved at epoch 7.
Epoch 8/20
----------


Training: 100%|██████████| 401/401 [41:29<00:00,  6.21s/it] 


Train Loss: 0.0586 Acc: 0.9385


Validating: 100%|██████████| 63/63 [02:16<00:00,  2.17s/it]


Validation Loss: 0.1208 Acc: 0.9275
Epoch 9/20
----------


Training: 100%|██████████| 401/401 [41:19<00:00,  6.18s/it]


Train Loss: 0.0505 Acc: 0.9400


Validating: 100%|██████████| 63/63 [02:19<00:00,  2.22s/it]


Validation Loss: 0.1123 Acc: 0.9345
Checkpoint saved at epoch 9.
Epoch 10/20
----------


Training: 100%|██████████| 401/401 [40:23<00:00,  6.04s/it]


Train Loss: 0.0443 Acc: 0.9415


Validating: 100%|██████████| 63/63 [03:21<00:00,  3.20s/it]


Validation Loss: 0.1239 Acc: 0.9255
Epoch 11/20
----------


Training: 100%|██████████| 401/401 [46:22<00:00,  6.94s/it]  


Train Loss: 0.0427 Acc: 0.9421


Validating: 100%|██████████| 63/63 [02:05<00:00,  2.00s/it]


Validation Loss: 0.1369 Acc: 0.9265
Epoch 12/20
----------


Training: 100%|██████████| 401/401 [40:50<00:00,  6.11s/it]


Train Loss: 0.0391 Acc: 0.9436


Validating: 100%|██████████| 63/63 [02:06<00:00,  2.01s/it]


Validation Loss: 0.1399 Acc: 0.9275
Epoch 00007: reducing learning rate of group 0 to 5.0000e-05.
Early stopping triggered after 12 epochs.


Testing: 100%|██████████| 63/63 [02:15<00:00,  2.15s/it]

Precision: 0.9373
Recall: 0.9917
F1 Score: 0.9637
ROC AUC: 0.9617





In [None]:
import torch
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
import time
import copy
import os
import numpy as np
from PIL import Image
import cv2
import dlib
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Dlib's face detector
detector = dlib.get_frontal_face_detector()

# Define a custom dataset class to include face detection and cropping
class FaceCroppingDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(FaceCroppingDataset, self).__init__(root, transform)
        self.transform = transform

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        if image is None:
            return None, None

        # Convert BGR (OpenCV) to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Detect faces
        faces = detector(image, 1)
        if len(faces) > 0:
            # Crop the first detected face
            x, y, w, h = (faces[0].left(), faces[0].top(), faces[0].width(), faces[0].height())
            image = image[y:y+h, x:x+w]
        else:
            return None, None

        # Convert image to PIL format and apply transformations
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)

        return image, target

# Data Preparation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

# Initialize datasets and dataloaders
train_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Train', transform=data_transforms['train'])
val_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Validation', transform=data_transforms['val'])
test_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Test', transform=data_transforms['test'])

def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Load pre-trained models
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Unfreeze the top layers for fine-tuning
for model in [resnet, efficientnet, mobilenet]:
    for name, param in model.named_parameters():
        if "layer4" in name or "blocks" in name or "features" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

# Define Ensemble Model
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.0001)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

# Save checkpoint
def save_checkpoint(epoch, model, optimizer, best_acc, path="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_acc': best_acc
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch + 1}.")

# Train function with early stopping
def train_model_with_early_stopping(
    model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="best_model.pth", resume=False
):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    start_epoch = 0
    no_improvement_epochs = 0

    if resume and os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        best_loss = checkpoint['best_acc']
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resumed from epoch {start_epoch}.")
    else:
        print("Starting training from scratch...")

    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)
        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validating"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)
        print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Step the scheduler with the validation loss
        scheduler.step(val_loss)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            save_checkpoint(epoch, model, optimizer, val_acc, checkpoint_path)
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    model.load_state_dict(best_model_wts)
    return model

# Train the model with early stopping and learning rate scheduler
ensemble_model = train_model_with_early_stopping(
    ensemble_model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="new_model.pth", resume=True
)

# Evaluate the model on the test set
ensemble_model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Testing"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = ensemble_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
roc_auc = roc_auc_score(all_labels, all_preds)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")


Loading checkpoint...
Resumed from epoch 9.
Epoch 10/20
----------


Training: 100%|██████████| 401/401 [41:56<00:00,  6.28s/it]


Train Loss: 0.0457 Acc: 0.9407


Validating:  81%|████████  | 51/63 [03:18<00:55,  4.64s/it]

In [1]:
import torch
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
import time
import copy
import os
import numpy as np
from PIL import Image
import cv2
import dlib
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Dlib's face detector
detector = dlib.get_frontal_face_detector()

# Define a custom dataset class to include face detection and cropping
class FaceCroppingDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(FaceCroppingDataset, self).__init__(root, transform)
        self.transform = transform

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        if image is None:
            return None, None

        # Convert BGR (OpenCV) to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Detect faces
        faces = detector(image, 1)
        if len(faces) > 0:
            # Crop the first detected face
            x, y, w, h = (faces[0].left(), faces[0].top(), faces[0].width(), faces[0].height())
            image = image[y:y+h, x:x+w]
        else:
            return None, None

        # Convert image to PIL format and apply transformations
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)

        return image, target

# Data Preparation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

# Initialize datasets and dataloaders
train_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Train', transform=data_transforms['train'])
val_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Validation', transform=data_transforms['val'])
test_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Test', transform=data_transforms['test'])

def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Load pre-trained models
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Unfreeze the top layers for fine-tuning
for model in [resnet, efficientnet, mobilenet]:
    for name, param in model.named_parameters():
        if "layer4" in name or "blocks" in name or "features" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

# Define Ensemble Model
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.0001)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

# Save checkpoint
def save_checkpoint(epoch, model, optimizer, best_acc, path="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_acc': best_acc
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch + 1}.")

# Train function with early stopping
def train_model_with_early_stopping(
    model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="best_model.pth", resume=False
):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    start_epoch = 0
    no_improvement_epochs = 0

    if resume and os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        best_loss = checkpoint['best_acc']
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resumed from epoch {start_epoch}.")
    else:
        print("Starting training from scratch...")

    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)
        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validating"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)
        print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Step the scheduler with the validation loss
        scheduler.step(val_loss)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            save_checkpoint(epoch, model, optimizer, val_acc, checkpoint_path)
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    model.load_state_dict(best_model_wts)
    return model

# Train the model with early stopping and learning rate scheduler
ensemble_model = train_model_with_early_stopping(
    ensemble_model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="new_model.pth", resume=True
)

# Evaluate the model on the test set
ensemble_model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Testing"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = ensemble_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
roc_auc = roc_auc_score(all_labels, all_preds)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")


Loading checkpoint...
Resumed from epoch 10.
Epoch 11/20
----------


Training: 100%|██████████| 401/401 [40:47<00:00,  6.10s/it]


Train Loss: 0.0406 Acc: 0.9428


Validating: 100%|██████████| 63/63 [02:19<00:00,  2.21s/it]


Validation Loss: 0.1359 Acc: 0.9335
Checkpoint saved at epoch 11.
Epoch 12/20
----------


Training: 100%|██████████| 401/401 [46:56<00:00,  7.02s/it]


Train Loss: 0.0411 Acc: 0.9427


Validating: 100%|██████████| 63/63 [04:27<00:00,  4.25s/it]


Validation Loss: 0.1231 Acc: 0.9320
Checkpoint saved at epoch 12.
Epoch 13/20
----------


Training:   9%|▊         | 35/401 [04:44<49:31,  8.12s/it]  


KeyboardInterrupt: 

In [1]:
import torch
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
import time
import copy
import os
import numpy as np
from PIL import Image
import cv2
import dlib
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Dlib's face detector
detector = dlib.get_frontal_face_detector()

# Define a custom dataset class to include face detection and cropping
class FaceCroppingDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(FaceCroppingDataset, self).__init__(root, transform)
        self.transform = transform

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        if image is None:
            return None, None

        # Convert BGR (OpenCV) to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Detect faces
        faces = detector(image, 1)
        if len(faces) > 0:
            # Crop the first detected face
            x, y, w, h = (faces[0].left(), faces[0].top(), faces[0].width(), faces[0].height())
            image = image[y:y+h, x:x+w]
        else:
            return None, None

        # Convert image to PIL format and apply transformations
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)

        return image, target

# Data Preparation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

# Initialize datasets and dataloaders
train_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Train', transform=data_transforms['train'])
val_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Validation', transform=data_transforms['val'])
test_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Test', transform=data_transforms['test'])

def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Load pre-trained models
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Unfreeze the top layers for fine-tuning
for model in [resnet, efficientnet, mobilenet]:
    for name, param in model.named_parameters():
        if "layer4" in name or "blocks" in name or "features" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

# Define Ensemble Model
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.0001)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

# Save checkpoint
def save_checkpoint(epoch, model, optimizer, best_acc, path="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_acc': best_acc
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch + 1}.")

# Train function with early stopping
def train_model_with_early_stopping(
    model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="best_model.pth", resume=False
):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    start_epoch = 0
    no_improvement_epochs = 0

    if resume and os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        best_loss = checkpoint['best_acc']
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resumed from epoch {start_epoch}.")
    else:
        print("Starting training from scratch...")

    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)
        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validating"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)
        print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Step the scheduler with the validation loss
        scheduler.step(val_loss)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            save_checkpoint(epoch, model, optimizer, val_acc, checkpoint_path)
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    model.load_state_dict(best_model_wts)
    return model

# Train the model with early stopping and learning rate scheduler
ensemble_model = train_model_with_early_stopping(
    ensemble_model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="new_model.pth", resume=True
)

# Evaluate the model on the test set
ensemble_model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Testing"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = ensemble_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
roc_auc = roc_auc_score(all_labels, all_preds)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")


Loading checkpoint...
Resumed from epoch 12.
Epoch 13/20
----------


Training: 100%|██████████| 401/401 [41:34<00:00,  6.22s/it]


Train Loss: 0.0379 Acc: 0.9446


Validating: 100%|██████████| 63/63 [02:26<00:00,  2.33s/it]


Validation Loss: 0.1833 Acc: 0.9215
Checkpoint saved at epoch 13.
Epoch 14/20
----------


Training: 100%|██████████| 401/401 [41:35<00:00,  6.22s/it]


Train Loss: 0.0293 Acc: 0.9482


Validating: 100%|██████████| 63/63 [02:26<00:00,  2.32s/it]


Validation Loss: 0.1874 Acc: 0.9145
Epoch 15/20
----------


Training: 100%|██████████| 401/401 [41:27<00:00,  6.20s/it]


Train Loss: 0.0349 Acc: 0.9468


Validating: 100%|██████████| 63/63 [02:26<00:00,  2.33s/it]


Validation Loss: 0.1980 Acc: 0.9225
Epoch 16/20
----------


Training: 100%|██████████| 401/401 [41:23<00:00,  6.19s/it]


Train Loss: 0.0273 Acc: 0.9494


Validating: 100%|██████████| 63/63 [02:27<00:00,  2.33s/it]


Validation Loss: 0.1355 Acc: 0.9300
Checkpoint saved at epoch 16.
Epoch 17/20
----------


Training: 100%|██████████| 401/401 [41:50<00:00,  6.26s/it]


Train Loss: 0.0278 Acc: 0.9480


Validating: 100%|██████████| 63/63 [02:40<00:00,  2.54s/it]


Validation Loss: 0.1556 Acc: 0.9270
Epoch 18/20
----------


Training:   1%|          | 3/401 [00:25<57:23,  8.65s/it]


KeyboardInterrupt: 

In [1]:
import torch
import torch.nn as nn
from torchvision import models

# Define the ensemble model architecture
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

# Load the individual pre-trained base models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

# Update the output layers to Identity
resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

# Send models to device
resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Initialize the ensemble model
ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Load the checkpoint
checkpoint_path = r"C:\Users\91970\Desktop\TruthLens\New_dataset\new_model.pth"
checkpoint = torch.load(checkpoint_path)

# Load model state from checkpoint
ensemble_model.load_state_dict(checkpoint['model_state_dict'])
print("Checkpoint loaded successfully.")

# Save the model state dictionary separately
model_save_path = r"C:\Users\91970\Desktop\TruthLens\New_dataset\ensemble_model_state.pth"
torch.save(ensemble_model.state_dict(), model_save_path)
print(f"Model state saved separately to '{model_save_path}'.")


Checkpoint loaded successfully.
Model state saved separately to 'C:\Users\91970\Desktop\TruthLens\New_dataset\ensemble_model_state.pth'.


In [2]:
import torch
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
import time
import copy
import os
import numpy as np
from PIL import Image
import cv2
import dlib
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Dlib's face detector
detector = dlib.get_frontal_face_detector()

# Define a custom dataset class to include face detection and cropping
class FaceCroppingDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(FaceCroppingDataset, self).__init__(root, transform)
        self.transform = transform

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        if image is None:
            return None, None

        # Convert BGR (OpenCV) to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Detect faces
        faces = detector(image, 1)
        if len(faces) > 0:
            # Crop the first detected face
            x, y, w, h = (faces[0].left(), faces[0].top(), faces[0].width(), faces[0].height())
            image = image[y:y+h, x:x+w]
        else:
            return None, None

        # Convert image to PIL format and apply transformations
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)

        return image, target

# Data Preparation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

# Initialize datasets and dataloaders
train_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Train', transform=data_transforms['train'])
val_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Validation', transform=data_transforms['val'])
test_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens/New_dataset/Test', transform=data_transforms['test'])

def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Load pre-trained models
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Unfreeze the top layers for fine-tuning
for model in [resnet, efficientnet, mobilenet]:
    for name, param in model.named_parameters():
        if "layer4" in name or "blocks" in name or "features" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

# Define Ensemble Model
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.0001)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

# Save checkpoint
def save_checkpoint(epoch, model, optimizer, best_acc, path="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_acc': best_acc
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch + 1}.")

# Train function with early stopping
def train_model_with_early_stopping(
    model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="best_model.pth", resume=False
):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    start_epoch = 0
    no_improvement_epochs = 0

    if resume and os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        best_loss = checkpoint['best_acc']
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resumed from epoch {start_epoch}.")
    else:
        print("Starting training from scratch...")

    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)
        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validating"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)
        print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Step the scheduler with the validation loss
        scheduler.step(val_loss)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            save_checkpoint(epoch, model, optimizer, val_acc, checkpoint_path)
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    model.load_state_dict(best_model_wts)
    return model

# Train the model with early stopping and learning rate scheduler
ensemble_model = train_model_with_early_stopping(
    ensemble_model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="new_model.pth", resume=True
)

# Evaluate the model on the test set
ensemble_model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Testing"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = ensemble_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
roc_auc = roc_auc_score(all_labels, all_preds)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")


Loading checkpoint...
Resumed from epoch 16.
Epoch 17/20
----------


Training: 100%|██████████| 401/401 [44:32<00:00,  6.67s/it] 


Train Loss: 0.0287 Acc: 0.9481


Validating: 100%|██████████| 63/63 [02:30<00:00,  2.39s/it]


Validation Loss: 0.1180 Acc: 0.9310
Checkpoint saved at epoch 17.
Epoch 18/20
----------


Training: 100%|██████████| 401/401 [45:50<00:00,  6.86s/it]


Train Loss: 0.0268 Acc: 0.9485


Validating: 100%|██████████| 63/63 [02:26<00:00,  2.33s/it]


Validation Loss: 0.1310 Acc: 0.9275
Epoch 19/20
----------


Training: 100%|██████████| 401/401 [45:50<00:00,  6.86s/it] 


Train Loss: 0.0266 Acc: 0.9488


Validating: 100%|██████████| 63/63 [02:46<00:00,  2.64s/it]


Validation Loss: 0.1092 Acc: 0.9320
Checkpoint saved at epoch 19.
Epoch 20/20
----------


Training: 100%|██████████| 401/401 [44:08<00:00,  6.60s/it]


Train Loss: 0.0245 Acc: 0.9492


Validating: 100%|██████████| 63/63 [02:28<00:00,  2.36s/it]


Validation Loss: 0.1365 Acc: 0.9300


Testing: 100%|██████████| 63/63 [02:31<00:00,  2.40s/it]

Precision: 0.9398
Recall: 0.9710
F1 Score: 0.9551
ROC AUC: 0.9534





In [2]:
import torch
import torch.nn as nn
from torchvision import models

# Define the ensemble model architecture
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

# Load the individual pre-trained base models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

# Update the output layers to Identity
resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

# Send models to device
resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Initialize the ensemble model
ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Load the checkpoint
checkpoint_path = r"C:\Users\91970\Desktop\TruthLens_Deepfake_Detector\new_model.pth"
checkpoint = torch.load(checkpoint_path)

# Load model state from checkpoint
ensemble_model.load_state_dict(checkpoint['model_state_dict'])
print("Checkpoint loaded successfully.")

# Save the model state dictionary separately
model_save_path = r"C:\Users\91970\Desktop\TruthLens_Deepfake_Detector"
torch.save(ensemble_model.state_dict(), model_save_path)
print(f"Model state saved separately to '{model_save_path}'.")


Checkpoint loaded successfully.


RuntimeError: File C:\Users\91970\Desktop\TruthLens_Deepfake_Detector cannot be opened.

In [3]:
import torch
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
import time
import copy
import os
import numpy as np
from PIL import Image
import cv2
import dlib
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Dlib's face detector
detector = dlib.get_frontal_face_detector()

# Define a custom dataset class to include face detection and cropping
class FaceCroppingDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(FaceCroppingDataset, self).__init__(root, transform)
        self.transform = transform

    def __getitem__(self, index):
        path, target = self.samples[index]
        image = cv2.imread(path)
        if image is None:
            return None, None

        # Convert BGR (OpenCV) to RGB
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Detect faces
        faces = detector(image, 1)
        if len(faces) > 0:
            # Crop the first detected face
            x, y, w, h = (faces[0].left(), faces[0].top(), faces[0].width(), faces[0].height())
            image = image[y:y+h, x:x+w]
        else:
            return None, None

        # Convert image to PIL format and apply transformations
        image = Image.fromarray(image)
        if self.transform:
            image = self.transform(image)

        return image, target

# Data Preparation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

# Initialize datasets and dataloaders
train_dataset = FaceCroppingDataset("C:/Users/91970/Desktop/TruthLens_Deepfake_Detector/New_dataset/Train", transform=data_transforms['train'])
val_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens_Deepfake_Detector/New_dataset/Validation', transform=data_transforms['val'])
test_dataset = FaceCroppingDataset('C:/Users/91970/Desktop/TruthLens_Deepfake_Detector/New_dataset/Test', transform=data_transforms['test'])

def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None and b[1] is not None]
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Load pre-trained models
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Unfreeze the top layers for fine-tuning
for model in [resnet, efficientnet, mobilenet]:
    for name, param in model.named_parameters():
        if "layer4" in name or "blocks" in name or "features" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

# Define Ensemble Model
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.0001)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

# Save checkpoint
def save_checkpoint(epoch, model, optimizer, best_acc, path="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_acc': best_acc
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved at epoch {epoch + 1}.")

# Train function with early stopping
def train_model_with_early_stopping(
    model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="best_model.pth", resume=False
):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    start_epoch = 0
    no_improvement_epochs = 0

    if resume and os.path.exists(checkpoint_path):
        print("Loading checkpoint...")
        checkpoint = torch.load(checkpoint_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        best_loss = checkpoint['best_acc']
        start_epoch = checkpoint['epoch'] + 1
        print(f"Resumed from epoch {start_epoch}.")
    else:
        print("Starting training from scratch...")

    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(train_loader, desc="Training"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        train_loss = running_loss / len(train_dataset)
        train_acc = running_corrects.double() / len(train_dataset)
        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        # Validation phase
        model.eval()
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validating"):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

        val_loss = running_loss / len(val_dataset)
        val_acc = running_corrects.double() / len(val_dataset)
        print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')

        # Step the scheduler with the validation loss
        scheduler.step(val_loss)

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            save_checkpoint(epoch, model, optimizer, val_acc, checkpoint_path)
            no_improvement_epochs = 0
        else:
            no_improvement_epochs += 1

        if no_improvement_epochs >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    model.load_state_dict(best_model_wts)
    return model

# Train the model with early stopping and learning rate scheduler
ensemble_model = train_model_with_early_stopping(
    ensemble_model, criterion, optimizer, scheduler, num_epochs=20, patience=3, checkpoint_path="C:\\Users\\91970\\Desktop\\TruthLens_Deepfake_Detector\\new_model.pth", resume=True
)

# Evaluate the model on the test set
ensemble_model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Testing"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = ensemble_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
roc_auc = roc_auc_score(all_labels, all_preds)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")


Loading checkpoint...
Resumed from epoch 19.
Epoch 20/20
----------


Training: 100%|██████████| 402/402 [41:28<00:00,  6.19s/it] 


Train Loss: 0.0295 Acc: 0.9494


Validating: 100%|██████████| 63/63 [02:18<00:00,  2.20s/it]


Validation Loss: 0.1453 Acc: 0.9290
Checkpoint saved at epoch 20.


Testing: 100%|██████████| 63/63 [02:13<00:00,  2.13s/it]

Precision: 0.9489
Recall: 0.9813
F1 Score: 0.9648
ROC AUC: 0.9634





In [5]:
import os
import torch
import torch.nn as nn
from torchvision import models

# Define the ensemble model architecture
class EnsembleModel(nn.Module):
    def __init__(self, resnet, efficientnet, mobilenet):
        super(EnsembleModel, self).__init__()
        self.resnet = resnet
        self.efficientnet = efficientnet
        self.mobilenet = mobilenet
        self.fc = nn.Linear(2048 + 1280 + 1280, 2)

    def forward(self, x):
        resnet_features = self.resnet(x)
        efficientnet_features = self.efficientnet(x)
        mobilenet_features = self.mobilenet(x)
        combined_features = torch.cat((resnet_features, efficientnet_features, mobilenet_features), dim=1)
        output = self.fc(combined_features)
        return output

# Load the individual pre-trained base models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)

# Update the output layers to Identity
resnet.fc = nn.Identity()
efficientnet.classifier[1] = nn.Identity()
mobilenet.classifier[1] = nn.Identity()

# Send models to device
resnet = resnet.to(device)
efficientnet = efficientnet.to(device)
mobilenet = mobilenet.to(device)

# Initialize the ensemble model
ensemble_model = EnsembleModel(resnet, efficientnet, mobilenet).to(device)

# Load the checkpoint
checkpoint_path = r"C:\Users\91970\Desktop\TruthLens_Deepfake_Detector\new_model.pth"
checkpoint = torch.load(checkpoint_path)

# Load model state from checkpoint
ensemble_model.load_state_dict(checkpoint['model_state_dict'])
print("Checkpoint loaded successfully.")

# Specify the save path
model_save_path = r"C:\Users\91970\Desktop\TruthLens_Deepfake_Detector\ensemble_model_state.pth"

# Save the model state dictionary
torch.save(ensemble_model.state_dict(), model_save_path)
print(f"Model state saved separately to '{model_save_path}'.")


Checkpoint loaded successfully.
Model state saved separately to 'C:\Users\91970\Desktop\TruthLens_Deepfake_Detector\ensemble_model_state.pth'.
