In [13]:
import torchvision
from torchvision import transforms
import torch
from sklearn.metrics import confusion_matrix, f1_score, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, ExponentialLR, CosineAnnealingLR
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader

In [14]:
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Tải dữ liệu
train_data = torchvision.datasets.ImageFolder(root='D:/DPL302m/project/Abnormal_Behavior/Data/DataSets/SplitData/train', transform=train_transforms)

test_data = torchvision.datasets.ImageFolder(root='D:/DPL302m/project/Abnormal_Behavior/Data/DataSets/SplitData/test', transform=train_transforms)
# Số lượng các lớp
num_classes = len(train_data.classes)
# Tên của các lớp
classes_name = train_data.classes

In [15]:
class SimpleCNNWithDropout(nn.Module):
    def __init__(self, num_classes=10, dropout_prob=0.3):
        super(SimpleCNNWithDropout, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, 3)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(32, 64, 3)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.avgpool = nn.AdaptiveAvgPool2d((16, 16))
        self.fc1 = nn.Linear(64 * 16 * 16, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(p=dropout_prob)  # Add a dropout layer with the specified dropout probability

    def forward(self, x):
        x = self.conv1(x)
        x = nn.functional.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = nn.functional.relu(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = nn.functional.relu(x)
        x = self.pool3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.dropout(x)  # Apply dropout before the final fully connected layer
        x = self.fc2(x)
        return x

class CustomCNN(nn.Module):
    def __init__(self):
        super(CustomCNN, self).__init__()
        self.layers = nn.ModuleList()
        self.layers.append(nn.Conv2d(1, 32, kernel_size=3, padding=1))
        self.layers.append(nn.ReLU())
        self.layers.append(nn.Conv2d(32, 64, kernel_size=3, padding=1))
        self.layers.append(nn.ReLU())
        self.layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        self.layers.append(nn.Conv2d(64, 128, kernel_size=3, padding=1))  
        self.layers.append(nn.ReLU())
        self.layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
        self.layers.append(nn.Flatten())
        self.layers.append(nn.Linear(128 * 7 * 7, 128)) 
        self.layers.append(nn.ReLU())
        self.layers.append(nn.Linear(128, 10)) 

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x
    


class VGG19(nn.Module):
    def __init__(self, num_classes=1000):
        super(VGG19, self).__init__()

        # Define the number of channels and layers for each block
        num_channels = [64, 128, 256, 512, 512]
        num_layers = [2, 2, 4, 4, 4]

        self.features = self._make_layers(num_channels, num_layers)
        
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096), nn.ReLU(inplace=True), nn.Dropout(),
            nn.Linear(4096, 4096), nn.ReLU(inplace=True), nn.Dropout(),
            nn.Linear(4096, num_classes),
        )

    def _make_layers(self, num_channels, num_layers):
        layers = []
        in_channels = 3  # Initial input channels

        for i, (out_channels, layers_count) in enumerate(zip(num_channels, num_layers)):
            # Add convolutional layers
            for j in range(layers_count):
                layers += [
                    nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                    nn.ReLU(inplace=True),
                ]
                in_channels = out_channels

            # Add max-pooling layer after each block
            if i < len(num_channels) - 1:
                layers.append(nn.MaxPool2d(kernel_size=2, stride=2))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# Create an instance of the VGG19 model

# model = VGG19()


# Define the ResNet class
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = None
        
        # If the input and output dimensions do not match, use a convolutional layer to match them
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        
        if self.downsample is not None:
            residual = self.downsample(x)
        
        out += residual
        out = self.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=2):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(128, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(block(out_channels, out_channels, stride=1))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out
# resnet = ResNet(BasicBlock, [2, 2])
# print(resnet)

#FaceNet
class FaceNetBinary(nn.Module):
    def __init__(self, embedding_size):
        super(FaceNetBinary, self).__init__()
        
        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=5, stride=2, padding=2)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        
        # Fully connected layers
        self.fc1 = nn.Linear(256 * 6 * 6, 512)
        self.fc2 = nn.Linear(512, embedding_size)
        self.fc3 = nn.Linear(embedding_size, 2)  # Output for binary classification
        
        # Normalization layers
        self.bn1 = nn.BatchNorm2d(64)
        self.bn2 = nn.BatchNorm2d(128)
        self.bn3 = nn.BatchNorm2d(256)
        self.bn4 = nn.BatchNorm1d(512)
        
        # Dropout layer (optional)
        self.dropout = nn.Dropout(p=0.5)
        
    def forward(self, x):
        x = nn.functional.relu(self.bn1(self.conv1(x)))
        x = nn.functional.max_pool2d(x, kernel_size=3, stride=2, padding=1)
        
        x = nn.functional.relu(self.bn2(self.conv2(x)))
        x = nn.functional.max_pool2d(x, kernel_size=3, stride=2, padding=1)
        
        x = nn.functional.relu(self.bn3(self.conv3(x)))
        
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = nn.functional.relu(self.bn4(self.fc1(x)))
        x = self.dropout(x)
        x = self.fc2(x)
        
        # L2 normalization of the embeddings
        x = nn.functional.normalize(x, p=2, dim=1)
        
        # Classification layer for binary classification
        x = self.fc3(x)
        
        return x
    
class EfficientNetLike(nn.Module):
    def __init__(self, num_classes=2, width_coefficient=1.0, depth_coefficient=1.0, dropout_rate=0.2):
        super(EfficientNetLike, self).__init__()

        # Define the scaling factors for width and depth
        self.width_coefficient = width_coefficient
        self.depth_coefficient = depth_coefficient

        # Define the number of channels for each block
        num_channels = [32, 16, 24, 40, 80, 112, 192, 320, 1280]

        # Define the number of layers for each block
        num_layers = [1, 2, 2, 3, 3, 4, 1]

        # Initial Convolution Layer
        self.features = [nn.Conv2d(3, int(32 * width_coefficient), 3, stride=2, padding=1, bias=False),
                         nn.BatchNorm2d(int(32 * width_coefficient)),
                         nn.ReLU6(inplace=True)]

        # Building the model blocks
        for i in range(7):
            for j in range(num_layers[i]):
                self.features.append(self._build_block(num_channels[i], num_channels[i+1], 6, dropout_rate))

        self.features = nn.Sequential(*self.features)

        # Classification head
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Dropout(dropout_rate),
            nn.Linear(int(1280 * width_coefficient), num_classes)
        )

    def _build_block(self, in_channels, out_channels, expand_ratio, dropout_rate):
        layers = []
        if expand_ratio != 1:
            # Pointwise Convolution
            layers.append(nn.Conv2d(in_channels, int(in_channels * expand_ratio), 1, bias=False))
            layers.append(nn.BatchNorm2d(int(in_channels * expand_ratio)))
            layers.append(nn.ReLU6(inplace=True))

        # Depthwise Convolution
        layers.append(nn.Conv2d(int(in_channels * expand_ratio), int(in_channels * expand_ratio), 3, stride=1, padding=1, groups=int(in_channels * expand_ratio), bias=False))
        layers.append(nn.BatchNorm2d(int(in_channels * expand_ratio)))
        layers.append(nn.ReLU6(inplace=True))

        # Pointwise Convolution
        layers.append(nn.Conv2d(int(in_channels * expand_ratio), out_channels, 1, bias=False))
        layers.append(nn.BatchNorm2d(out_channels))
        layers.append(nn.ReLU6(inplace=True))

        if dropout_rate > 0:
            layers.append(nn.Dropout(dropout_rate))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x




In [17]:
# Init Model
model = SimpleCNNWithDropout(num_classes=num_classes)
model.to(DEVICE)

# Loss
criterion = nn.CrossEntropyLoss()

# Optimizer and Scheduler
optimizer = optim.Adam(model.parameters(), lr=0.001)# Choose the optimizer
scheduler = CosineAnnealingLR(optimizer, T_max=5)  # Choose the scheduler


In [18]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EPOCHS = 1
BATCH_SIZE = 64  # You can adjust this based on your needs
# Create data loaders
TRAINLOADER = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
TESTLOADER = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)
DEVICE

device(type='cpu')

In [None]:
import numpy as np

# Initialize variables for early stopping and best model
early_stopping_counter = 0
best_test_loss = float('inf')
best_model_weights = None
loss_train = []
loss_test = []
f1_train = []
f1_test = []
for epoch in range(EPOCHS):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    predictions_train = []
    true_labels_train = []

    for i, data in tqdm(enumerate(TRAINLOADER), desc='train'):
        inputs, labels = data
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        predictions_train.extend(predicted.tolist())
        true_labels_train.extend(labels.tolist())

    if scheduler is not None:
        scheduler.step()  # Update learning rate with scheduler

    train_loss = running_loss / len(TRAINLOADER)
    train_f1 = f1_score(true_labels_train, predictions_train, average='weighted')

    loss_train.append(train_loss)
    f1_train.append(train_f1)

    # Evaluation on the test set
    model.eval()  # Set the model to evaluation mode
    test_loss_val = 0.0
    predictions = []
    true_labels = []

    with torch.no_grad():
        for data in TESTLOADER:
            inputs, labels = data
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss_val += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            predictions.extend(predicted.tolist())
            true_labels.extend(labels.tolist())

    test_loss_val /= len(TESTLOADER)
    test_f1_val = f1_score(true_labels, predictions, average='weighted')

    loss_test.append(test_loss_val)
    f1_test.append(test_f1_val)
    print(f'Epoch [{epoch + 1}/{EPOCHS}]  - Train Loss: {train_loss:.4f} - Train F1: {train_f1:.4f} - Test Loss: {test_loss_val:.4f} - Test F1: {test_f1_val:.4f}')

    # Check if the test loss has improved
    if test_loss_val < best_test_loss:
        best_test_loss = test_loss_val
        early_stopping_counter = 0
        # Save the weights of the best model
        best_model_weights = model.state_dict()
    else:
        early_stopping_counter += 1

    # Check for early stopping
    if early_stopping_counter >= 5:
        print("Early stopping triggered. No improvement for 5 consecutive epochs.")
        break

print('Finished Training')

# Load the best model weights before saving
if best_model_weights is not None:
    model.load_state_dict(best_model_weights)

# Save the weights of the best model
torch.save(model.state_dict(), 'best_model_weights.pth')
print('Best model weights saved as best_model_weights.pth')


In [19]:
import numpy as np

# Initialize variables for early stopping and best model
early_stopping_counter = 0
best_test_loss = float('inf')
best_model_weights = None
loss_train = []
loss_test = []
f1_train = []
f1_test = []
MAX_ITERATIONS = 10 
for epoch in range(MAX_ITERATIONS):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    predictions_train = []
    true_labels_train = []

    for i, data in tqdm(enumerate(TRAINLOADER), desc='train'):
        inputs, labels = data
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        predictions_train.extend(predicted.tolist())
        true_labels_train.extend(labels.tolist())

    if scheduler is not None:
        scheduler.step()  # Update learning rate with scheduler

    train_loss = running_loss / len(TRAINLOADER)
    train_f1 = f1_score(true_labels_train, predictions_train, average='weighted')

    loss_train.append(train_loss)
    f1_train.append(train_f1)

    # Evaluation on the test set
    model.eval()  # Set the model to evaluation mode
    test_loss_val = 0.0
    predictions = []
    true_labels = []

    with torch.no_grad():
        for data in TESTLOADER:
            inputs, labels = data
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss_val += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            predictions.extend(predicted.tolist())
            true_labels.extend(labels.tolist())

    test_loss_val /= len(TESTLOADER)
    test_f1_val = f1_score(true_labels, predictions, average='weighted')

    loss_test.append(test_loss_val)
    f1_test.append(test_f1_val)
    print(f'Epoch [{epoch + 1}/{EPOCHS}]  - Train Loss: {train_loss:.4f} - Train F1: {train_f1:.4f} - Test Loss: {test_loss_val:.4f} - Test F1: {test_f1_val:.4f}')

    # Check if the test loss has improved
    if test_loss_val < best_test_loss:
        best_test_loss = test_loss_val
        early_stopping_counter = 0
        # Save the weights of the best model
        best_model_weights = model.state_dict()
    else:
        early_stopping_counter += 1

    # Check for early stopping
    if early_stopping_counter >= 5:
        print("Early stopping triggered. No improvement for 5 consecutive epochs.")
        break

print('Finished Training')

# Load the best model weights before saving
if best_model_weights is not None:
    model.load_state_dict(best_model_weights)

# Save the weights of the best model
torch.save(model.state_dict(), 'best_model_weights.pth')
print('Best model weights saved as best_model_weights.pth')


train: 1it [00:00,  1.40it/s]


Epoch [1/1]  - Train Loss: 0.6906 - Train F1: 0.4471 - Test Loss: 0.9699 - Test F1: 0.3333


train: 1it [00:00,  1.98it/s]


Epoch [2/1]  - Train Loss: 0.9212 - Train F1: 0.3769 - Test Loss: 0.9748 - Test F1: 0.3333


train: 1it [00:00,  2.15it/s]


Epoch [3/1]  - Train Loss: 1.0236 - Train F1: 0.2915 - Test Loss: 0.6940 - Test F1: 0.3333


train: 1it [00:00,  2.05it/s]


Epoch [4/1]  - Train Loss: 0.6511 - Train F1: 0.5344 - Test Loss: 0.6303 - Test F1: 0.8730


train: 1it [00:00,  2.14it/s]


Epoch [5/1]  - Train Loss: 0.5691 - Train F1: 0.8848 - Test Loss: 0.6278 - Test F1: 0.5636


train: 1it [00:00,  2.15it/s]


Epoch [6/1]  - Train Loss: 0.5460 - Train F1: 0.7970 - Test Loss: 0.6278 - Test F1: 0.5636


train: 1it [00:00,  2.10it/s]


Epoch [7/1]  - Train Loss: 0.5177 - Train F1: 0.8462 - Test Loss: 0.6281 - Test F1: 0.3333


train: 1it [00:00,  2.02it/s]


Epoch [8/1]  - Train Loss: 0.5640 - Train F1: 0.6808 - Test Loss: 0.6477 - Test F1: 0.3333


train: 1it [00:00,  2.16it/s]


Epoch [9/1]  - Train Loss: 0.5341 - Train F1: 0.6496 - Test Loss: 0.7367 - Test F1: 0.3333


train: 1it [00:00,  2.17it/s]

Epoch [10/1]  - Train Loss: 0.5473 - Train F1: 0.5286 - Test Loss: 0.6319 - Test F1: 0.3333
Early stopping triggered. No improvement for 5 consecutive epochs.
Finished Training
Best model weights saved as best_model_weights.pth



