In [2]:
from datetime import datetime
import torchvision.models as models
import torch.nn.functional as F
import scipy.io
from PIL import Image 
import os
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
from torch.utils.data import TensorDataset
import random
from torch.utils.data import DataLoader, Subset
import numpy as np
from torch import optim, nn
import time
import torch

In [5]:

def load_data(path_X, path_Y, num_data, data_mode="train"):
    data = scipy.io.loadmat(path_X) 
    print(data.keys())
    if data_mode == "train" or data_mode == "test":
        origin_X = np.array(data['x'].flat) # train
    elif data_mode == "dn":
        origin_X = np.array(data['denoise_x']) # denoise train
    elif data_mode == "dn2":
        origin_X = np.array(data['denoise2_x']) # denoise train

    data = scipy.io.loadmat(path_Y) 
    origin_Y = data['y'][0].reshape(num_data,-1)
    origin_Y_onehot= data['y_onehot'].reshape(num_data,4,19)
    
    print ("origin_X shape: "+str(origin_X.shape))
    print ("origin_Y shape: "+str(origin_Y.shape))
    print ("origin_Y_onehot shape: "+str(origin_Y_onehot.shape))
 
    return origin_X,origin_Y,origin_Y_onehot

In [6]:
def resize_img (o_data,write,save):  
    index=0
    p_data=[]
    for i in o_data:
        name='resize_data_image/resize_x_'+str(index)+'.jpg'
        img = Image.fromarray(i, 'RGB')
        img=img.resize((130,50))
        if os.path.isfile(name) and save:      
            print (name+" is existed")    
        elif save:
            img.save(name)
        if write:
            p_data.append(np.array(img))       
        index+=1
        
    p_data=np.array(p_data)   
    print (p_data.shape)
    return p_data

# Load Train Data

In [7]:


def get_data(mode="train"):
    num_data = 5000
    data_mode = mode
    path = "D:\\Casper\\OTHER\\Data\\identification code_database\\train.mat"
    if data_mode == "train":
        path2 = "D:\\Casper\\OTHER\\Data\\identification code_database\\train.mat"
    elif data_mode == "dn":
        path2 = "D:\\Casper\\OTHER\\Data\\identification code_database\\denoise_train.mat"
    elif data_mode == "dn2":
        path2 = "D:\\Casper\\OTHER\\Data\\identification code_database\\denoise_train2.mat"
    elif data_mode == "test":
        num_data = 3000
        path = "D:\\Casper\\OTHER\\Data\\identification code_database\\test.mat"
        path2 = "D:\\Casper\\OTHER\\Data\\identification code_database\\test.mat"

    train_rate=1 #change to 0.9
    origin_X,origin_Y,origin_Y_onehot=load_data(path2, path, num_data, data_mode)
    num_train_data=int(num_data*train_rate)
    print(origin_X.shape)

    if data_mode == "train" or data_mode == "test":
        resize_x = resize_img(origin_X,True,False) # train
    elif data_mode == "dn":
        resize_x = origin_X # denoise train
    elif data_mode == "dn2":
        resize_x = origin_X # denoise train
    print(num_data)
    train_x_orig=resize_x.reshape(num_data,50,130,-1)[0:num_train_data]
    # test_x_orig=resize_x.reshape(num_data,50,130,-1)[num_train_data:]

    x_train=train_x_orig.astype('float32')/255
    # x_test=test_x_orig.astype('float32')/255

    y_train_onehot=origin_Y_onehot[0:num_train_data]
    # y_test_onehot=origin_Y_onehot[num_train_data:]
    origin_X_tensor = torch.tensor(x_train, dtype=torch.float32)
    origin_Y_tensor = torch.tensor(y_train_onehot, dtype=torch.float32)

    origin_X_tensor_permuted = origin_X_tensor.permute(0, 3, 1, 2)
    train_X = origin_X_tensor_permuted
    train_Y = torch.argmax(origin_Y_tensor, dim=-1)
    dataset = TensorDataset(train_X, train_Y)

    return dataset

In [29]:
class SimpleCNN(nn.Module):
    def __init__(self, input_channel):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(input_channel, 32, kernel_size=5, padding=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5, padding=2)
        self.fc1 = nn.Linear(24576, 512)
        self.fc2 = nn.Linear(512, 76)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [9]:


class BASELINE(nn.Module):
    def __init__(self, input_channel=1):
        super(BASELINE, self).__init__()
        
        # Convolutional layers
        self.conv11_W1 = nn.Conv2d(in_channels=input_channel, out_channels=64, kernel_size=5, stride=1, padding='same')
        self.conv12_W1 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)
        self.max_pool1_W1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv23_W1 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=5, stride=1, padding='same')
        self.conv24_W1 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1)
        self.conv25_W1 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1)
        self.bn1_W1 = nn.BatchNorm2d(128)
        self.max_pool2_W1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv36_W1 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=1, stride=1, padding='same')
        self.conv37_W1 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1)
        self.conv38_W1 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1)
        self.max_pool3_W1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv49_W1 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding='same')
        self.conv410_W1 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=1, stride=1)
        self.bn2_W1 = nn.BatchNorm2d(512)
        self.max_pool4_W1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(0.5)
        
        # Dense (Fully Connected) Layers for each output branch
        self.fc_branches = nn.ModuleList([
            nn.Sequential(
                nn.Linear(in_features=3072, out_features=128),
                nn.ReLU(),
                nn.Dropout(0.25),
                nn.Linear(in_features=128, out_features=128),
                nn.ReLU(),
                nn.Linear(in_features=128, out_features=19),
                nn.Softmax(dim=1)
            ) for _ in range(4)
        ])

    def forward(self, x):
        x = F.relu(self.conv11_W1(x))
        x = F.relu(self.conv12_W1(x))
        x = self.max_pool1_W1(x)
        
        x = F.relu(self.conv23_W1(x))
        x = F.relu(self.conv24_W1(x))
        x = F.relu(self.conv25_W1(x))
        x = self.bn1_W1(x)
        x = self.max_pool2_W1(x)
        
        x = F.relu(self.conv36_W1(x))
        x = F.relu(self.conv37_W1(x))
        x = F.relu(self.conv38_W1(x))
        x = self.max_pool3_W1(x)
        
        x = F.relu(self.conv49_W1(x))
        x = F.relu(self.conv410_W1(x))
        x = self.bn2_W1(x)
        x = self.max_pool4_W1(x)
        
        x = self.flatten(x)
        x = self.dropout(x)
        
        # Branch out to the four different dense layers
        outputs = torch.stack([branch(x) for branch in self.fc_branches], dim = 1)
        
        return outputs

In [10]:

def get_dataloaders(dataset, train_ratio, val_ratio, batch_size):
    train_dataset = dataset
    val_dataset = dataset
    test_dataset = dataset
    # obtain training indices that will be used for validation
    num_train = len(test_dataset)
    indices = list(range(num_train))
    print("--------- INDEX checking ---------")
    print(f"Original: {indices[:5]}")
    random.shuffle(indices)
    print(f"Shuffled: {indices[:5]}")
    print("--------- INDEX shuffled ---------\n")

    split_train = int(np.floor(train_ratio * num_train))
    split_val = split_train + int(np.floor(val_ratio * (num_train-split_train)))
    train_idx, val_idx, test_idx = indices[0:split_train], indices[split_train:split_val], indices[split_val:]
    merge_dataset = Subset(train_dataset, train_idx)

    train_loader = DataLoader(merge_dataset, batch_size=batch_size)
    val_loader = DataLoader(Subset(val_dataset, val_idx), batch_size=batch_size)
    test_loader = DataLoader(Subset(test_dataset, test_idx), batch_size=batch_size)
    
    # check dataset
    print(f"Total number of samples: {num_train} datapoints")
    print(f"Number of train samples: {len(train_loader)} batches/ {len(train_loader.dataset)} datapoints")
    print(f"Number of val samples: {len(val_loader)} batches/ {len(val_loader.dataset)} datapoints")
    print(f"Number of test samples: {len(test_loader)} batches/ {len(test_loader.dataset)} datapoints")
    print(f"")
    
    dataloaders = {
        "train": train_loader,
        "val": val_loader,
        "test": test_loader,
    }
    return dataloaders

In [11]:
def pprint(output = '\n', show_time = False): # print and fprint at the same time
    filename = "hw2-2-MAR27.txt"
    print(output)
    with open(filename, 'a') as f:
        if show_time:
            f.write(datetime.now().strftime("[%Y-%m-%d %H:%M:%S] "))

        f.write(str(output))
        f.write('\n')
pprint("START function", True)

START function


In [12]:
def count_parameters(model):
    total_num = 0
    for parameter in model.parameters():
        if parameter.requires_grad:
            total_num += parameter.numel() 
    return total_num

In [27]:
def train(model_lists, model_name, loaders, phases=['train'], reshape=True, save_weight=False):
    model = model_lists[model_name]()
    if "res" in model_name:
        # model.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False) # denoise train
        num_features = model.fc.in_features
        model.fc = torch.nn.Linear(num_features, 76)

    pprint(f"Training model: {model_name}")
    model_parameters_amount = count_parameters(model)  # Assume this function is defined elsewhere
    pprint(f"Total parameters: {model_parameters_amount:,}")

    model = model.cuda()
    criterion = nn.CrossEntropyLoss()
    lr = 0.001
    optimizer = optim.Adam(model.parameters(), lr=lr)
    pprint(f"Learning rate={lr}")
    epochs = 25

    start = time.time()
    for epoch in range(epochs):
        for phase in phases:
            running_loss = 0.0
            correct_predictions = [0, 0, 0, 0]  # Track correct predictions for each of the 4 targets
            total_samples = 0
            model.train() if phase == 'train' else model.eval()  # Simplified model mode setting

            for inputs, labels in tqdm(loaders[phase]):  # Iterate over data.
                inputs, labels = inputs.cuda(), labels.cuda()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)  # [batch_size, 4, 19]
                    if reshape:
                        outputs = outputs.reshape(labels.shape[0], 4, -1)
                    loss = sum([criterion(outputs[:, i, :], labels[:, i]) for i in range(4)])  # Sum loss across all targets

                    if phase == 'train':  # backward + optimize only if in training phase
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item()
                for i in range(4):
                    _, predicted = torch.max(outputs[:, i, :], 1)
                    correct_predictions[i] += (predicted == labels[:, i]).sum().item()

                total_samples += labels.size(0)

            avg_loss = running_loss / total_samples
            top1_accuracy = [cp / total_samples * 100 for cp in correct_predictions]  # Accuracy per target
            pprint(f"Epoch [{epoch+1}/{epochs}], phase: {phase}, samples: {total_samples}, Loss: {avg_loss:.4f}, "
                  f"Top-1 Accuracies: {[f'{acc:.2f}%' for acc in top1_accuracy]}")

    end = time.time()
    pprint(f"Elapsed time: {end - start} seconds")

    if save_weight:
        model_scripted = torch.jit.script(model) # Export to TorchScript
        model_scripted.save(f'{model_name}.pt') # Save
        pprint(f"weight saved as: {model_name}.pt")

    return model

In [31]:
data_lists = [
    'train',
    'dn',
    'dn2',
]
test_dataset = get_data('test')
test_loaders = get_dataloaders(test_dataset, 1, 0.5, 32)
test_loader = test_loaders['train']

input_channel_op = 1

model_list ={
    "BASELINE": lambda: BASELINE(input_channel=input_channel_op),
    "SimpleCNN": lambda: SimpleCNN(input_channel=input_channel_op),
    "resnet18_mod": lambda: mod_resnet(BasicBlock, [2, 2, 2, 2], channel_num_list=[16, 16, 16, 32, 32], num_classes=76, input_channel=input_channel_op),
    "resnet18": lambda: mod_resnet(BasicBlock, [2, 2, 2, 2], channel_num_list=[64, 64, 128, 256, 512], num_classes=76, input_channel=input_channel_op),
}
model_names = [
    "BASELINE",
    "SimpleCNN",
    "resnet18",
    "resnet18",
]
reshape_ops = [
    False,
    True,
    True,
    True,
]

phases = ['train', 'val', 'test']


for ii in range(9, 12):
    data_mode = data_lists[ii%3]
    model_name = model_names[ii//3]
    reshape_op = reshape_ops[ii//3]

    train_dataset = get_data(data_mode)
    loaders = get_dataloaders(train_dataset, 0.8, 1, 32)
    
    if data_mode == 'train':
        input_channel_op = 3
        loaders['test'] = test_loader
        phases = ['train', 'val', 'test']
        saving_weight = True
    else:
        input_channel_op = 1
        phases = ['train', 'val']
        saving_weight = False

    train(model_list, model_name, loaders, phases, reshape_op, saving_weight)

dict_keys(['__header__', '__version__', '__globals__', 'y_onehot', 'x', 'y'])
origin_X shape: (3000,)
origin_Y shape: (3000, 4)
origin_Y_onehot shape: (3000, 4, 19)
(3000,)
(3000, 50, 130, 3)
3000
--------- INDEX checking ---------
Original: [0, 1, 2, 3, 4]
Shuffled: [908, 110, 2493, 866, 1720]
--------- INDEX shuffled ---------

Total number of samples: 3000 datapoints
Number of train samples: 94 batches/ 3000 datapoints
Number of val samples: 0 batches/ 0 datapoints
Number of test samples: 0 batches/ 0 datapoints

dict_keys(['__header__', '__version__', '__globals__', 'y_onehot', 'x', 'y'])
origin_X shape: (5000,)
origin_Y shape: (5000, 4)
origin_Y_onehot shape: (5000, 4, 19)
(5000,)


In [23]:
# model_list ={
#     "SimpleCharCNN": lambda: SimpleCharCNN(),
#     "resnet18": lambda: models.resnet18(weights = models.ResNet18_Weights.DEFAULT),
#     "resnet152": lambda: models.resnet152(weights = models.ResNet152_Weights.DEFAULT),
#     # "r6_btnk": lambda: mod_resnet(Bottleneck, [2, 2, 0, 0], channel_num_list=[16, 16, 16], num_classes=76)
#     "r6_btnk": lambda: mod_resnet(Bottleneck, [2, 2, 0, 0], channel_num_list=[8, 4, 8], num_classes=76)
# }
# model_name = "r6_btnk"
# phases = ['train', 'val']
# loaders = get_dataloaders(train_dataset, 0.8, 0.5, 32)
# train(model_list, model_name, loaders, phases, reshape=True)

In [25]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = F.relu(out)

        return out
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x
class mod_resnet(nn.Module):
    def __init__(self, block, layers, channel_num_list, num_classes=1000, input_channel=3):
        super(mod_resnet, self).__init__()
        self.in_channels = channel_num_list[0]
        self.conv1 = nn.Conv2d(input_channel, channel_num_list[0], kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(channel_num_list[0])
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, channel_num_list[1], layers[0])
        self.layer2 = self._make_layer(block, channel_num_list[2], layers[1], stride=2)
        self.layer3 = self._make_layer(block, channel_num_list[3], layers[2], stride=2)
        self.layer4 = self._make_layer(block, channel_num_list[4], layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(channel_num_list[-1] * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x