In [0]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn import CrossEntropyLoss
from torch.utils.data import Dataset,DataLoader,TensorDataset
from torch.autograd import Variable
import csv
import cv2
# import matplotlib.pyplot as plt 
# import matplotlib.image as mpimg 
import pandas as pd
import time

#定義參數
train_file='/content/drive/My Drive/data/train_truth.csv'
test_file='/content/drive/My Drive/data/test_truth.csv'
train_path=r'/content/drive/My Drive/data/music_train/'
test_path=r'/content/drive/My Drive/data/music_test/'

batch_size = 8
num_workers = 0
num_epoches = 1
learning_rate = 0.001
num_classes = 88
weight_decay = 0.1
k=5
PATH='/content/drive/My Drive/data/cnn.pth'
final_model='/content/drive/My Drive/data/final_model.pth'


#定義函數及網路模型

def read_csv(file_name,img_path,train_yn=True):
    img_data = []
    img_lab = []
    with open(file_name) as csvfile:
    # 讀取 CSV 檔案內容
        rows = csv.reader(csvfile)
        for i in rows:
            if i[1] == 'category': continue
            print(i[0])
            lena = cv2.imread(img_path + i[0]).astype(np.float32) / 255
            img_data.append(torch.from_numpy(lena).reshape(-1,394,520))
          # img_data.append(torch.from_numpy(lena.transpose(2, 0, 1)))
            if train_yn:
                img_lab.append(np.array(int(i[1])))
            else:
                img_lab.append(i[0])    
    return img_data, img_lab

class Mydataset(Dataset):
    def __init__(self, data, label):
        self.data = data
        self.label = label
    def __len__(self):
        return  len(self.data)
    def __getitem__(self, item):
        data=self.data[item]
        label=self.label[item]
        return data, label

class My_Net(nn.Module):
    def __init__(self):
        super(My_Net, self).__init__()
        layer1=nn.Sequential()
        layer1.add_module('conv1', nn.Conv2d(3, 256, 3, 1))
        layer1.add_module('nb1', nn.BatchNorm2d(256))
        layer1.add_module('relu1', nn.ReLU())
        layer1.add_module('pool1', nn.MaxPool2d(2, 2))
        self.layer1=layer1

        layer2=nn.Sequential()
        layer2.add_module('conv2', nn.Conv2d(256,128,3,1))
        layer2.add_module('nb2', nn.BatchNorm2d(128))
        layer2.add_module('relu2', nn.ReLU())
        layer2.add_module('pool2', nn.MaxPool2d(2, 2))
        self.layer2 = layer2

        layer3 = nn.Sequential()
        layer3.add_module('conv3', nn.Conv2d(128, 128, 1, 1))        
        layer3.add_module('nb3', nn.BatchNorm2d(128))
        layer3.add_module('relu3', nn.ReLU())
        layer3.add_module('pool3', nn.MaxPool2d(2, 2))
        self.layer3 = layer3

        layer4 = nn.Sequential()
        layer4.add_module('conv4', nn.Conv2d(128, 64, 3, 1))
        layer4.add_module('nb4', nn.BatchNorm2d(64))
        layer4.add_module('relu4', nn.ReLU())
        layer4.add_module('pool4', nn.MaxPool2d(2, 2))
        self.layer4 = layer4

        layer5 = nn.Sequential()
        layer5.add_module('conv5', nn.Conv2d(64, 32, 3, 1))
        layer5.add_module('nb5', nn.BatchNorm2d(32))
        layer5.add_module('relu5', nn.ReLU())
        layer5.add_module('pool5', nn.MaxPool2d(2, 2))
        self.layer5 = layer5

        layer9 = nn.Sequential()
        layer9.add_module('fc1', nn.Linear(4480,512))
        # layer9.add_module('dp1', nn.Dropout())
        layer9.add_module('fc1_relu', nn.ReLU())
        layer9.add_module('fc2', nn.Linear(512, num_classes))
        self.layer9 = layer9

    def forward(self, x):
        conv1=self.layer1(x)
        conv2=self.layer2(conv1)
        conv3=self.layer3(conv2)
        conv4 = self.layer4(conv3)
        conv5 = self.layer5(conv4)
        fc_input=conv5.view(conv5.size(0),-1)
        fc_out=self.layer9(fc_input)

        return fc_out


      

In [0]:
#讀取訓練資料
tr_datas,tr_labels = read_csv(train_file,img_path=train_path)

In [0]:
#執行程式
if __name__=='__main__':
    ## K折訓練 5折
    fold_size = len(tr_datas) // k
    loss_func = nn.CrossEntropyLoss()
    correct_K=0
    for i in range(k):
        val_data = tr_datas[i * fold_size:(i + 1) * fold_size]
        val_targets = tr_labels[i * fold_size:(i + 1) * fold_size]
        partial_train_data = tr_datas[:i * fold_size] + tr_datas[(i + 1) * fold_size:]
        partial_train_targets = tr_labels[:i * fold_size] + tr_labels[(i + 1) * fold_size:]
       
        tr_dataset = Mydataset(partial_train_data, partial_train_targets)
        val_dataset = Mydataset(val_data, val_targets)
        # 經測試若未加shuffle=True ，訓練loss無法降低
        train_loader = DataLoader(dataset=tr_dataset, batch_size=batch_size,shuffle=True)
        val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size,shuffle=True)

        net = My_Net()
        if  torch.cuda.is_available():
            net = My_Net().cuda()  

        # optimizer = torch.optim.Adam(params=net.parameters(), lr=learning_rate, weight_decay=weight_decay)
        optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)
        print('-' * 25, '第', i + 1, '折', '-' * 25)
        # 開始訓練
        for epoch in range(num_epoches):
            correct = 0
            running_loss = 0.0
            running_acc = 0.0
            iter_no=len(train_loader)
            for i, data in enumerate(train_loader, 1):
                x,y = data
                img = Variable(x)
                label = Variable(y)
                if torch.cuda.is_available():
                    img = Variable(x).cuda()
                    label = Variable(y).cuda()
            
                out = net(img)
                loss = loss_func(out, label.long())
                
                # running_loss += loss.item() * label.size(0)
                _, pred = torch.max(out, 1)
                correct = (pred == label.long()).sum()
                # accuracy = (pred == label.long()).float().mean()
                # running_acc += num_correct.item()
       
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
               
                if (i+1) % 20 == 0:
                    print('Epoch [%d/%d], Iter [%d/%d] Loss: %.6f' %(epoch + 1, num_epoches, i+1,iter_no,loss.item()))
                   

            # 進入驗證模式
            net.eval()
            with torch.no_grad():
                correct = 0
                total = 0
                start = time.time()
                # eval_loss = 0
                # eval_acc = 0
                for data in val_loader:
                    x, y = data
                    img = Variable(x)
                    label = Variable(y)
                    if torch.cuda.is_available():
                        img = Variable(x).cuda()
                        label = Variable(y).cuda()

                    out = net(img)
                    loss = loss_func(out, label.long())

                    _, pred = torch.max(out.data, 1)
                    total += label.size(0)
                    correct += (pred == label.long()).sum().item()

                stop = time.time()
                # 保存正確率最高模型
                if correct > correct_K:
                    correct_K=correct
                    torch.save(net.state_dict(), PATH)
                print('EVAL Accuracy: {:.3f} %, Time: {:.2f}s'.format(100 * correct / total, stop - start))    
                print()
            net.train()



In [0]:
# 選取最佳模型，並將全部資料再訓練一次
net = My_Net()
if  torch.cuda.is_available():
    net = My_Net().cuda()  

        # optimizer = torch.optim.Adam(params=net.parameters(), lr=learning_rate, weight_decay=weight_decay)
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)
net.load_state_dict(torch.load(PATH))
net.train()
train_dataset = Mydataset(tr_datas,tr_labels)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size,shuffle=True)
for i, data in enumerate(train_loader, 1):
    x,y = data
    img = Variable(x)
    label = Variable(y)
    if torch.cuda.is_available():
        img = Variable(x).cuda()
        label = Variable(y).cuda()
            
    out = net(img)
    loss = loss_func(out, label.long())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if (i+1) % 20 == 0:
        print(' Iter [%d/%d] Loss: %.6f' %(i, len(train_loader), loss.item()))
    
torch.save(net.state_dict(), final_model)


In [0]:
# 測試資料載入及預測
tx_datas, file_name = read_csv(test_file, img_path=test_path,train_yn=False)
test_dataset = Mydataset(tx_datas,file_name)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)

# tx1=torch.cat(tx_datas).float() 
# test_tensor = tx1.reshape(-1,3,394,520)

outputs=[]
net = My_Net()
if  torch.cuda.is_available():
    net = My_Net().cuda()  
# net.load_state_dict(torch.load(PATH))
#載入最後訓練模型資料
net.load_state_dict(torch.load(final_model))
net.eval()
with torch.no_grad():  # disable auto-grad
    # 若不做no_grad() 記憶體會吃爆
    for data in test_loader:
        img, _ = data
        if torch.cuda.is_available():
            test_pred = net(img.cuda())
        else:
            test_pred = net(img)

        labelout = torch.argmax(test_pred, dim=1)
        outputs.append(labelout.cpu())

output=torch.cat(outputs).int()  
# #將預測結果轉資料格式並寫入 CSV檔，最後將此結果上傳Kaggle
print('測試完成，預測結果寫入 CSV檔')
res = pd.DataFrame({'filename': file_name, 'category': output})
res.to_csv('./output.csv', index=False)