In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
from torchvision import models

import numpy as np
from numpy import linalg as LA
import cv2
import matplotlib.pyplot as plt
from tqdm import tqdm

In [2]:

def load_train_img(file_path):
    with open(file_path) as f:
        lines = f.readlines()
    imgs, labels = [], []
    target_size=(128, 128)
    print('Total train images:', len(lines))
    for i in tqdm(range(len(lines)), desc="Loading images"):
    # for i in tqdm(range(100), desc="Loading images"):
        fn, label = lines[i].strip().split(' ')
        im1 = cv2.imread(fn)
        # 如果需要進行影像處理，請在這裡添加相應的處理步驟
        im1 = cv2.resize(im1, target_size)
        # 轉換成 PyTorch Tensor
        im1_tensor = torch.from_numpy(im1.transpose(2, 0, 1))  # 將通道維度移動到最前面

        imgs.append(im1_tensor) 
        labels.append(int(label))

    imgs_tensor = torch.stack(imgs)  # 將列表中的 Tensor 堆疊成一個整體 Tensor
    labels_tensor = torch.tensor(labels)

    return imgs_tensor, labels_tensor

def load_img(f):
    f=open(f)
    lines=f.readlines()
    imgs, labels=[], []
    target_size=(128, 128)
    print('total images:', len(lines))
    for i in tqdm(range(len(lines)), desc="Loading images"):
    # for i in tqdm(range(100), desc="Loading images"):
        fn, label = lines[i].split(' ')
        
        # 原圖
        im1=cv2.imread(fn)
        im1 = cv2.resize(im1, target_size)
        # 轉換成 PyTorch Tensor
        im1_tensor = torch.from_numpy(im1.transpose(2, 0, 1))  # 將通道維度移動到最前面

        imgs.append(im1_tensor) 
        labels.append(int(label))

    imgs_tensor = torch.stack(imgs)  # 將列表中的 Tensor 堆疊成一個整體 Tensor
    labels_tensor = torch.tensor(labels)

    return imgs_tensor, labels_tensor

x, y = load_train_img('train.txt')
val_x, val_y = load_img('val.txt')
tx, ty = load_img('test.txt')

Total train images: 63325


Loading images: 100%|██████████| 63325/63325 [00:24<00:00, 2618.06it/s]


total images: 450


Loading images: 100%|██████████| 450/450 [00:00<00:00, 2782.87it/s]


total images: 450


Loading images: 100%|██████████| 450/450 [00:00<00:00, 2813.66it/s]


In [3]:
y_tensor = torch.nn.functional.one_hot(y, num_classes=50)
val_y_tensor = torch.nn.functional.one_hot(val_y, num_classes=50)
test_y_tensor = torch.nn.functional.one_hot(ty, num_classes=50)
train_dataset = TensorDataset(x, y_tensor)
val_dataset = TensorDataset(val_x, val_y_tensor)
test_dataset = TensorDataset(tx, test_y_tensor)

batch_size = 128  # 可以自行調整 batch size 的大小
shuffle = True   # 是否對資料進行洗牌
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=shuffle)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle)


In [4]:
class attention2d(nn.Module):
    def __init__(self, in_planes, ratios, K, temperature, init_weight=True):
        super(attention2d, self).__init__()
        assert temperature%3==1
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        if in_planes!=3:
            hidden_planes = int(in_planes*ratios)+1
        else:
            hidden_planes = K
        self.fc1 = nn.Conv2d(in_planes, hidden_planes, 1, bias=False)
        # self.bn = nn.BatchNorm2d(hidden_planes)
        self.fc2 = nn.Conv2d(hidden_planes, K, 1, bias=True)
        self.temperature = temperature
        if init_weight:
            self._initialize_weights()


    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            if isinstance(m ,nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def updata_temperature(self):
        if self.temperature!=1:
            self.temperature -=3
            print('Change temperature to:', str(self.temperature))


    def forward(self, x):
        x = self.avgpool(x)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x).view(x.size(0), -1)
        return F.softmax(x/self.temperature, 1)


class Dynamic_conv2d(nn.Module):
    def __init__(self, in_planes, out_planes, kernel_size, ratio=0.25, stride=1, padding=0, dilation=1, groups=1, bias=True, K=4,temperature=34, init_weight=True):
        super(Dynamic_conv2d, self).__init__()
        assert in_planes%groups==0
        self.in_planes = in_planes
        self.out_planes = out_planes
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.dilation = dilation
        self.groups = groups
        self.bias = bias
        self.K = K
        self.attention = attention2d(in_planes, ratio, K, temperature)

        self.weight = nn.Parameter(torch.randn(K, out_planes, in_planes//groups, kernel_size, kernel_size), requires_grad=True)
        if bias:
            self.bias = nn.Parameter(torch.zeros(K, out_planes))
        else:
            self.bias = None
        if init_weight:
            self._initialize_weights()

        #TODO 初始化
    def _initialize_weights(self):
        for i in range(self.K):
            nn.init.kaiming_uniform_(self.weight[i])


    def update_temperature(self):
        self.attention.updata_temperature()

    def forward(self, x):#将batch视作维度变量，进行组卷积，因为组卷积的权重是不同的，动态卷积的权重也是不同的
        softmax_attention = self.attention(x)
        batch_size, in_planes, height, width = x.size()
        x = x.view(1, -1, height, width)# 变化成一个维度进行组卷积
        weight = self.weight.view(self.K, -1)

        # 动态卷积的权重的生成， 生成的是batch_size个卷积参数（每个参数不同）
        aggregate_weight = torch.mm(softmax_attention, weight).view(batch_size*self.out_planes, self.in_planes//self.groups, self.kernel_size, self.kernel_size)
        if self.bias is not None:
            aggregate_bias = torch.mm(softmax_attention, self.bias).view(-1)
            output = F.conv2d(x, weight=aggregate_weight, bias=aggregate_bias, stride=self.stride, padding=self.padding,
                              dilation=self.dilation, groups=self.groups*batch_size)
        else:
            output = F.conv2d(x, weight=aggregate_weight, bias=None, stride=self.stride, padding=self.padding,
                              dilation=self.dilation, groups=self.groups * batch_size)

        output = output.view(batch_size, self.out_planes, output.size(-2), output.size(-1))
        return output

In [40]:
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        
        # 5层卷积层
        self.conv1 = Dynamic_conv2d(in_planes=3, out_planes=32, kernel_size=3, stride=1, padding=1)
        self.conv2 = Dynamic_conv2d(in_planes=32, out_planes=64, kernel_size=3, stride=1, padding=1)
        self.conv3 = Dynamic_conv2d(in_planes=64, out_planes=128, kernel_size=3, stride=1, padding=1)
        self.conv4 = Dynamic_conv2d(in_planes=128, out_planes=256, kernel_size=3, stride=1, padding=1)
        
        # 批归一化层（可选）
        self.bn1 = nn.BatchNorm2d(32)
        self.bn2 = nn.BatchNorm2d(64)
        self.bn3 = nn.BatchNorm2d(128)
        self.bn4 = nn.BatchNorm2d(256)
        
        # 池化层
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        self.flatten = nn.Flatten()
        # self.flattened_size = self._get_flatten_size((128, 128))
        # 3层全连接层
        self.fc1 = nn.Linear(256 * 8 * 8, 1024)  # 这里假设输入图像大小是224x224
        self.fc2 = nn.Linear(1024, 50)  # 这里假设有10个类别
        

    
    def forward(self, x):
        # 卷积层 + 批归一化 + 激活函数 + 池化
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.pool(F.relu(self.bn4(self.conv4(x))))
        
        # print(x.shape)
        # 展平
        x = self.flatten(x)
        # print(x.shape)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

In [41]:
# model = ResModel()
model = CNNModel()
num_classes = 50
# model.fc = nn.Linear(model.fc.in_features, num_classes)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # 檢查是否有 GPU，有則使用 GPU
model.to(device)  # 將模型移動到設備上

CNNModel(
  (conv1): Dynamic_conv2d(
    (attention): attention2d(
      (avgpool): AdaptiveAvgPool2d(output_size=1)
      (fc1): Conv2d(3, 4, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (fc2): Conv2d(4, 4, kernel_size=(1, 1), stride=(1, 1))
    )
  )
  (conv2): Dynamic_conv2d(
    (attention): attention2d(
      (avgpool): AdaptiveAvgPool2d(output_size=1)
      (fc1): Conv2d(32, 9, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (fc2): Conv2d(9, 4, kernel_size=(1, 1), stride=(1, 1))
    )
  )
  (conv3): Dynamic_conv2d(
    (attention): attention2d(
      (avgpool): AdaptiveAvgPool2d(output_size=1)
      (fc1): Conv2d(64, 17, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (fc2): Conv2d(17, 4, kernel_size=(1, 1), stride=(1, 1))
    )
  )
  (conv4): Dynamic_conv2d(
    (attention): attention2d(
      (avgpool): AdaptiveAvgPool2d(output_size=1)
      (fc1): Conv2d(128, 33, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (fc2): Conv2d(33, 4, kernel_size=(1, 1), str

In [42]:

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# for param in model.parameters():
#     param = param.to(device)

# 訓練循環
epochs = 10  # 設定訓練輪數

traing_acc = []
val_acc = []

for epoch in range(epochs):
    model.train()  # 將模型設置為訓練模式
    running_loss = 0.0

    train_correct = 0
    train_total = 0

    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs}"):
        # print(inputs[0].shape)

        inputs, labels = inputs.float().to(device), labels.float().to(device)  # 將數據移動到設備上

        # 正向傳播
        outputs = model(inputs)
        # print(type(outputs))
        # print(outputs.shape)
        # print(labels.shape)

        loss = criterion(outputs, labels)

        # 反向傳播和優化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        labels = labels.argmax(dim=1)
        train_correct += (predicted == labels).sum().item()
        train_total += labels.size(0)

    traing_acc.append(100 * train_correct / train_total)

    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}")

    # 進行模型的評估
    model.eval()  # 將模型設置為評估模式
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in tqdm(val_loader, desc="Evaluating"):
            inputs, labels = inputs.float().to(device), labels.float().to(device)
            labels = labels.argmax(dim=1)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            # print(predicted.shape)
            # print(labels.shape)
            
            correct += (predicted == labels).sum().item()

    val_acc.append(100 * correct / total)

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}, channel = 3, Training Accuracy: {100 * train_correct / train_total:.2f}%, Validation Accuracy: {100 * correct / total:.2f}%")

plt.plot(traing_acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig("accuracy_plot.png")
plt.show()


Epoch 1/10: 100%|██████████| 495/495 [00:16<00:00, 30.07it/s]


Epoch 1/10, Loss: 3.5857


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 106.92it/s]


Validation Accuracy: 0.2200


Epoch 2/10: 100%|██████████| 495/495 [00:15<00:00, 31.31it/s]


Epoch 2/10, Loss: 2.6828


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 95.86it/s]


Validation Accuracy: 0.2533


Epoch 3/10: 100%|██████████| 495/495 [00:15<00:00, 31.10it/s]


Epoch 3/10, Loss: 2.3606


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 103.77it/s]


Validation Accuracy: 0.2822


Epoch 4/10: 100%|██████████| 495/495 [00:16<00:00, 30.13it/s]


Epoch 4/10, Loss: 2.1043


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 93.57it/s]


Validation Accuracy: 0.3511


Epoch 5/10: 100%|██████████| 495/495 [00:16<00:00, 30.05it/s]


Epoch 5/10, Loss: 1.8859


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 107.98it/s]


Validation Accuracy: 0.3933


Epoch 6/10: 100%|██████████| 495/495 [00:16<00:00, 30.53it/s]


Epoch 6/10, Loss: 1.6796


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 44.61it/s]


Validation Accuracy: 0.4044


Epoch 7/10: 100%|██████████| 495/495 [00:16<00:00, 30.48it/s]


Epoch 7/10, Loss: 1.4788


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 105.75it/s]


Validation Accuracy: 0.4356


Epoch 8/10: 100%|██████████| 495/495 [00:16<00:00, 30.22it/s]


Epoch 8/10, Loss: 1.2778


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 100.56it/s]


Validation Accuracy: 0.4156


Epoch 9/10: 100%|██████████| 495/495 [00:16<00:00, 30.50it/s]


Epoch 9/10, Loss: 1.0749


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 104.18it/s]


Validation Accuracy: 0.4600


Epoch 10/10: 100%|██████████| 495/495 [00:16<00:00, 30.07it/s]


Epoch 10/10, Loss: 0.8613


Evaluating: 100%|██████████| 4/4 [00:00<00:00, 91.64it/s]

Validation Accuracy: 0.4289





In [43]:
model.eval()  # 將模型設置為評估模式
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Evaluating"):
        inputs, labels = inputs.float().to(device), labels.float().to(device)
        labels = labels.argmax(dim=1)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        # print(predicted.shape)
        # print(labels.shape)
        
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f"test Accuracy: {accuracy:.4f}")

Evaluating: 100%|██████████| 4/4 [00:00<00:00, 70.87it/s]

test Accuracy: 0.4978



