Thêm các thư viện

In [1]:
import torch
from torch import nn
import torch.nn.functional as F
import math

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import zipfile
import io
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms
import multiprocessing
from torch.utils.data import DataLoader

class CustomDataset(Dataset):
    def __init__(self, zip_path, mode='train', transform=None):
        """
        Args:
            zip_path (str): Đường dẫn tới file zip chứa các hình ảnh.
            mode (str): 'train' hoặc 'test', để chỉ định thư mục nào sẽ được sử dụng.
            transform (callable, optional): Các phép biến đổi được áp dụng lên ảnh.
        """
        self.zip_path = zip_path
        self.transform = transform
        self.mode = mode

        # Mở file zip
        self.zip_file = zipfile.ZipFile(zip_path, 'r')

        # Tạo danh sách các file ảnh và nhãn từ thư mục tương ứng
        self.image_labels = []
        for file_name in self.zip_file.namelist():
            if file_name.startswith(f"{mode}_img/{mode}_img/color") and file_name.endswith(".jpg"):
                # Trích xuất nhãn từ tên file (giả sử nhãn nằm sau dấu gạch dưới cuối cùng)
                label = 1 if file_name.split('_')[-1].split('.')[0] == "real" else 0
                self.image_labels.append((file_name, label))

    def __len__(self):
        """Trả về số lượng mẫu trong dataset."""
        return len(self.image_labels)

    def __getitem__(self, idx):
        """
        Tải và trả về một mẫu từ dataset.
        Args:
            idx (int): Chỉ số của mẫu cần tải.

        Returns:
            tuple: (image, label) tương ứng với ảnh và nhãn của nó.
        """
        img_name, label = self.image_labels[idx]

        # Đọc dữ liệu hình ảnh từ file zip
        with self.zip_file.open(img_name) as img_file:
            image = Image.open(io.BytesIO(img_file.read())).convert('RGB')

        if self.transform:
            image = self.transform(image)

        # Chuyển đổi label thành tensor
        label = torch.tensor(label)

        return image, label

In [4]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [5]:
zip_path = '/content/drive/MyDrive/MyData/Casia-fasd.zip'
train_dataset = CustomDataset(zip_path=zip_path, mode='train', transform=transform)
test_dataset = CustomDataset(zip_path=zip_path, mode='test', transform=transform)
print(len(train_dataset), len(test_dataset))

# Tính toán số lượng luồng dựa trên số lượng CPU cores
num_workers = multiprocessing.cpu_count()
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0)

1655 2408


In [6]:
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))

        x = self.relu(self.batch_norm2(self.conv2(x)))

        x = self.conv3(x)
        x = self.batch_norm3(x)

        #downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        #add identity
        x+=identity
        x=self.relu(x)

        return x

class Block(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Block, self).__init__()


        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
      identity = x.clone()

      x = self.relu(self.batch_norm2(self.conv1(x)))
      x = self.batch_norm2(self.conv2(x))

      if self.i_downsample is not None:
          identity = self.i_downsample(identity)
      print(x.shape)
      print(identity.shape)
      x += identity
      x = self.relu(x)
      return x
class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*ResBlock.expansion, num_classes)

    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []

        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes*ResBlock.expansion)
            )

        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion

        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))

        return nn.Sequential(*layers)



def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)

In [7]:
import torch.optim as optim

# Hyperparameters
num_epochs = 50
learning_rate = 0.001
patience = 10  # Số epoch chờ đợi trước khi dừng sớm nếu không có cải thiện
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [8]:
# Early stopping variables
best_loss = float('inf')
early_stop_counter = 0
best_model_path = "best_model.pth"
train_losses = []
val_losses = []

In [9]:
model = ResNet50(num_classes=2, channels=3)
# model = MobileNetV3(model_mode="SMALL", num_classes=2)
model.to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [10]:
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    # Training phase
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Update running loss
        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    train_losses.append(epoch_loss)  # Lưu lại train loss

    # Validation phase
    model.eval()  # Set model to evaluation mode
    val_running_loss = 0.0

    with torch.no_grad():
        for val_inputs, val_labels in test_loader:
            val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)
            val_outputs = model(val_inputs)
            val_loss = criterion(val_outputs, val_labels)
            val_running_loss += val_loss.item() * val_inputs.size(0)

    val_epoch_loss = val_running_loss / len(test_loader.dataset)
    val_losses.append(val_epoch_loss)  # Lưu lại validation loss
    print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {epoch_loss:.4f}, Validation Loss: {val_epoch_loss:.4f}')

    # Save the best model based on validation loss
    torch.save(model.state_dict(), 'last_model.pth')
    if val_epoch_loss < best_loss:
        best_loss = val_epoch_loss
        early_stop_counter = 0  # Reset early stopping counter
        torch.save(model.state_dict(), best_model_path)
        print(f'Model saved with validation loss: {best_loss:.4f}')
    else:
        early_stop_counter += 1

    # Check for early stopping
    # if early_stop_counter >= patience:
        # print(f'Early stopping at epoch {epoch+1}')
        # break

print('Training finished.')

Epoch [1/50], Training Loss: 0.4145, Validation Loss: 0.8818
Model saved with validation loss: 0.8818
Epoch [2/50], Training Loss: 0.2508, Validation Loss: 0.5217
Model saved with validation loss: 0.5217
Epoch [3/50], Training Loss: 0.1778, Validation Loss: 0.4456
Model saved with validation loss: 0.4456
Epoch [4/50], Training Loss: 0.1920, Validation Loss: 0.3124
Model saved with validation loss: 0.3124
Epoch [5/50], Training Loss: 0.1153, Validation Loss: 0.4969
Epoch [6/50], Training Loss: 0.1457, Validation Loss: 0.6294
Epoch [7/50], Training Loss: 0.1338, Validation Loss: 0.8735
Epoch [8/50], Training Loss: 0.1060, Validation Loss: 0.4661
Epoch [9/50], Training Loss: 0.0871, Validation Loss: 0.5357
Epoch [10/50], Training Loss: 0.0751, Validation Loss: 0.9665
Epoch [11/50], Training Loss: 0.0779, Validation Loss: 0.4881
Epoch [12/50], Training Loss: 0.0904, Validation Loss: 0.3349
Epoch [13/50], Training Loss: 0.0719, Validation Loss: 0.5974
Epoch [14/50], Training Loss: 0.0366, V

In [11]:
from sklearn.metrics import accuracy_score
import numpy as np
import time

def evaluate_model(model, test_loader, device):
    model.eval()  # Chuyển mô hình sang chế độ đánh giá
    all_preds = []
    all_labels = []
    total_time = 0.0
    total_batches = len(test_loader)

    with torch.no_grad():  # Không tính toán gradient trong quá trình đánh giá
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            start_time = time.time()

            # Forward pass
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)  # Lấy nhãn dự đoán

            end_time = time.time()
            total_time += (end_time - start_time)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Chuyển danh sách dự đoán và nhãn thành numpy array
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)

    # Tính accuracy
    accuracy = accuracy_score(all_labels, all_preds)

    # Tính các chỉ số FAR, FRR
    tp = np.sum((all_labels == 1) & (all_preds == 1))  # True Positives
    tn = np.sum((all_labels == 0) & (all_preds == 0))  # True Negatives
    fp = np.sum((all_labels == 0) & (all_preds == 1))  # False Positives
    fn = np.sum((all_labels == 1) & (all_preds == 0))  # False Negatives

    # Tổng số lượng mẫu
    num_fake = np.sum(all_labels == 0)  # Tổng số mẫu giả
    num_real = np.sum(all_labels == 1)  # Tổng số mẫu thật

    # Tính FAR, FRR
    FAR = fp / num_fake if num_fake > 0 else 0
    FRR = fn / num_real if num_real > 0 else 0

    # Tính HTER
    HTER = 0.5 * (FAR + FRR)

    # Tính số lượng tham số của mô hình
    num_parameters = sum(p.numel() for p in model.parameters())

    # Tính thời gian thực hiện trung bình cho mỗi batch
    avg_time_per_batch = total_time / total_batches

    return accuracy, HTER, num_parameters, avg_time_per_batch

In [13]:
model = ResNet50(num_classes=2, channels=3)

model.load_state_dict(torch.load('best_model.pth'))  # Tải mô hình đã huấn luyện
model.to(device)

accuracy, HTER, num_parameters, avg_time_per_batch = evaluate_model(model, test_loader, device)

print(f'Accuracy: {accuracy:.4f}')
print(f'HTER: {HTER:.4f}')
print(f'Prameters: {num_parameters}')
print(f'Time: {avg_time_per_batch:.4f} s')

  model.load_state_dict(torch.load('best_model.pth'))  # Tải mô hình đã huấn luyện


Accuracy: 0.9431
HTER: 0.0588
Prameters: 23538626
Time: 0.0143 s
