In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
import time
import os.path as osp

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# Definite GhostModule class
class GhostModule(nn.Module):
    def __init__(self, input_channels, output_channels, kernel_size=1, ratio=2):
        super(GhostModule, self).__init__()
        self.primary_conv = nn.Sequential(
            nn.Conv2d(input_channels, output_channels // ratio, kernel_size, bias=False),
            nn.BatchNorm2d(output_channels // ratio),
            nn.ReLU(inplace=True)
        )
        self.cheap_operation = nn.Sequential(
            nn.Conv2d(output_channels // ratio, output_channels, kernel_size, groups=output_channels // ratio, bias=False),
            nn.BatchNorm2d(output_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x1 = self.primary_conv(x)  # The first GhostModule block
        x2 = self.cheap_operation(x1)  # The second GhostModule block
        out = torch.cat([x1, x2], dim=1)
        return out[:, :x.shape[1], :, :]

In [None]:
class SELayer(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction, channels),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y

In [None]:
class GhostBottleneck(nn.Module):
    def __init__(self, input_channels, hidden_channels, output_channels, kernel_size, stride, use_se=True):
        super(GhostBottleneck, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, hidden_channels, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(hidden_channels)
        self.ghost = GhostModule(hidden_channels, hidden_channels, kernel_size=1)
        self.conv2 = nn.Conv2d(hidden_channels, output_channels, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(output_channels)
        self.relu = nn.ReLU(inplace=True)
        self.stride = stride
        self.use_se = use_se

        if self.use_se:
            self.se = SELayer(output_channels)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.ghost(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.use_se:
            out = self.se(out)

        if self.stride == 1 and identity.shape[1] == out.shape[1]:
            identity = nn.functional.interpolate(identity, size=out.shape[2:], mode='nearest')
            out += identity

        return out

In [None]:
class GhostNetV2(nn.Module):
    def __init__(self, num_classes=10):
        super(GhostNetV2, self).__init__()
        self.stem = nn.Sequential(
            nn.Conv2d(1, 16, 3, stride=1, padding=1, bias=False),  # Change the input channel to 1 (grayscale)
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True)
        )

        self.stage1 = self._make_stage(16, 16, 1, stride=1)
        self.stage2 = self._make_stage(16, 24, 2, stride=2)
        self.stage3 = self._make_stage(24, 40, 3, stride=2, use_se=True)  # Using SE layer
        self.stage4 = self._make_stage(40, 80, 3, stride=2, use_se=True)
        self.stage5 = self._make_stage(80, 96, 2, stride=1, use_se=True)
        self.stage6 = self._make_stage(96, 192, 4, stride=2, use_se=True)
        self.stage7 = self._make_stage(192, 320, 1, stride=1, use_se=True)

        self.conv9 = nn.Sequential(
            nn.Conv2d(320, 1280, 1, bias=False),
            nn.BatchNorm2d(1280),
            nn.ReLU(inplace=True)
        )

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(1280, num_classes)

    def _make_stage(self, input_channels, output_channels, num_blocks, stride, use_se=False):  # Add use_se parameter
        layers = []
        layers.append(GhostBottleneck(input_channels, input_channels // 2, output_channels, 3, stride, use_se))  # Using SE layer in GhostBottleneck
        for _ in range(1, num_blocks):
            layers.append(GhostBottleneck(output_channels, output_channels // 2, output_channels, 3, 1, use_se))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.stem(x)
        out = self.stage1(out)
        out = self.stage2(out)
        out = self.stage3(out)
        out = self.stage4(out)
        out = self.stage5(out)
        out = self.stage6(out)
        out = self.stage7(out)
        out = self.conv9(out)
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
# Adjust the number of classes to 10 for MNIST
ghostnet = GhostNetV2(num_classes=10)

In [None]:
# Write data into the log file
def write_log(log_file_path, content):
    with open(log_file_path, 'a') as log_file:
        log_file.write(content + '\n')

In [None]:
# Create read_log function to read data from the log file:
def read_log(log_file_path):
    with open(log_file_path, 'r') as log_file:
        lines = log_file.readlines()

    # Checking if file log contains data
    if len(lines) == 0:
        return 0, None

    # Get the number of epochs has been completed
    num_epochs_completed = len(lines) // 4

    # Checking in the log file if the remaining number of epochs still acceptable 
    if len(lines) < 3 or len(lines) % 3 != 0:
        print("Error: File log contains incomplete or invalid information.")
        return num_epochs_completed, None

    try:
        # Get the information of the last epoch
        last_epoch_info = lines[-5:]
        last_epoch_train_loss = float(last_epoch_info[0].split(":")[1].strip())
        last_epoch_train_accuracy = float(last_epoch_info[1].split(":")[1].strip()[:-1])
    except (ValueError, IndexError) as e:
        print("Error: Invalid format or missing information in log file.")
        return num_epochs_completed, None

    return num_epochs_completed, (last_epoch_train_loss, last_epoch_train_accuracy)

In [None]:
# Load MNIST dataset
train_transform = transforms.Compose([
    transforms.RandomCrop(28, padding=4),  # MNIST images are 28x28
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))  # Normalize for single-channel images
])

test_transform = transforms.Compose([
    transforms.RandomCrop(28, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = MNIST(root='./data_mnist', train=True, download=True, transform=train_transform)
test_dataset = MNIST(root='./data_mnist', train=False, download=True, transform=test_transform)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# Definite loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(ghostnet.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

In [None]:
# Intialize the array to save training time per each epoch
epoch_times_list = []

In [None]:
# Starting to calculate training model time
start_time = time.time()

In [None]:
def train_model(num_epochs, ghostnet, train_loader, test_loader, criterion, optimizer, device, log_file_path):
    # Read the data from the log file
    num_epochs_completed, last_epoch_info = read_log(log_file_path)

    # start_epoch is recorded from the last training turn, if there is the first training turn so num_epochs_completed = 0
    start_epoch = num_epochs_completed + 1

    # Adding information of last epoch
    if last_epoch_info:
        last_epoch_train_loss, last_epoch_train_accuracy, last_epoch_test_loss, last_epoch_test_accuracy = last_epoch_info
        write_log(log_file_path, f"GhostNet V2\n{'-'*50}\nEpoch {num_epochs_completed + 1}:\n\tTrain loss: {last_epoch_train_loss:.3f}, Train Accuracy: {last_epoch_train_accuracy:.2f}%")
        write_log(log_file_path, f"\tTest Loss: {last_epoch_test_loss:.3f}, Test Accuracy: {last_epoch_test_accuracy:.2f}%")
        write_log(log_file_path, f"\tElapsed Time: {elapsed_time_str}")

    # Initialize total elapsed time
    total_elapsed_time = 0

    # Start time for training process
    start_time = time.time()

    # Training model from the last epoch.
    for epoch in range(start_epoch, num_epochs):
        # Start time for current epoch
        epoch_start_time = time.time()

        # Training model for training dataset
        ghostnet.train()
        train_loss = 0.0
        train_correct = 0

        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = ghostnet(images)
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Updating the total loss and number of correct predictions
            train_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            train_correct += (predicted == labels).sum().item()

        # Evaluate mode for testing dataset
        ghostnet.eval()
        test_loss = 0.0
        test_correct = 0

        with torch.no_grad():
            for images, labels in test_loader:
                images = images.to(device)
                labels = labels.to(device)

                outputs = ghostnet(images)
                loss = criterion(outputs, labels)

                test_loss += loss.item() * images.size(0)
                _, predicted = torch.max(outputs, 1)
                test_correct += (predicted == labels).sum().item()

        # Calculate and print the result
        train_loss = train_loss / len(train_dataset)
        train_accuracy = train_correct / len(train_dataset) * 100
        test_loss = test_loss / len(test_dataset)
        test_accuracy = test_correct / len(test_dataset) * 100

        # Saving model
        torch.save(ghostnet.state_dict(), "ghostnetv2_mnist_model.pth")

        # Write the results to log file
        write_log(log_file_path, f"GhostNet V2\n{'-'*50}\nEpoch {epoch+1}:\n\tLoss: {train_loss:.3f}, Accuracy: {train_accuracy:.2f}%")
        write_log(log_file_path, f"\tTest Loss: {train_loss:.3f}, Test Accuracy: {train_accuracy:.2f}%")

        end_time = time.time()
        elapsed_time = end_time - epoch_start_time
        elapsed_time_str = f"{int(elapsed_time // 3600)}h {int((elapsed_time % 3600) // 60)}m {int(elapsed_time % 60)}s"
        epoch_times_list.append((epoch + 1, elapsed_time))

        # Calculate total elapsed time
        total_elapsed_time = sum([epoch_times[1] for epoch_times in epoch_times_list])
        total_elapsed_time_str = f"{int(total_elapsed_time // 3600)}h {int((total_elapsed_time % 3600) // 60)}m {int(total_elapsed_time % 60)}s"
        write_log(log_file_path, f"\tElapsed Time: {elapsed_time_str}")
        write_log(log_file_path, f"\tTotal Elapsed Time: {total_elapsed_time_str}\n{'-'*50}")

        print(f"GhostNet V2\n{'-'*50}\nEpoch {epoch+1}:\n\tLoss: {train_loss:.3f}, Accuracy: {train_accuracy:.2f}%")


In [None]:
# Load GhostNet model
num_epochs = 100
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ghostnet.to(device)

GhostNetV2(
  (stem): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (stage1): Sequential(
    (0): GhostBottleneck(
      (conv1): Conv2d(16, 8, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (ghost): GhostModule(
        (primary_conv): Sequential(
          (0): Conv2d(8, 4, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (cheap_operation): Sequential(
          (0): Conv2d(4, 8, kernel_size=(1, 1), stride=(1, 1), groups=4, bias=False)
          (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
  

In [None]:
log_file_path = "/content/gdrive/MyDrive/Project_II/model_logging/ghost_net_V2_mnist.log"

In [None]:
# Read the information from the log file and continue the training loop from the neareast completed epoch.
num_epochs_completed, last_epoch_info = read_log(log_file_path)
start_epoch = num_epochs_completed

Error: File log contains incomplete or invalid information.


In [None]:
train_model(num_epochs, ghostnet, train_loader, test_loader, criterion, optimizer, device, log_file_path)

GhostNet V2
--------------------------------------------------
Epoch 2:
	Loss: 0.524, Accuracy: 82.31%
GhostNet V2
--------------------------------------------------
Epoch 3:
	Loss: 0.516, Accuracy: 82.51%
GhostNet V2
--------------------------------------------------
Epoch 4:
	Loss: 0.504, Accuracy: 83.01%
GhostNet V2
--------------------------------------------------
Epoch 5:
	Loss: 0.497, Accuracy: 83.26%
GhostNet V2
--------------------------------------------------
Epoch 6:
	Loss: 0.493, Accuracy: 83.35%
GhostNet V2
--------------------------------------------------
Epoch 7:
	Loss: 0.485, Accuracy: 83.69%
GhostNet V2
--------------------------------------------------
Epoch 8:
	Loss: 0.481, Accuracy: 83.82%
GhostNet V2
--------------------------------------------------
Epoch 9:
	Loss: 0.478, Accuracy: 83.85%
GhostNet V2
--------------------------------------------------
Epoch 10:
	Loss: 0.470, Accuracy: 84.11%
GhostNet V2
--------------------------------------------------
Epoch 11: