In [None]:
%matplotlib inline
import numpy as np
from tqdm.notebook import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.utils.data import random_split
import matplotlib.pyplot as plt

In [None]:
if torch.cuda.is_available():
    print("The code will run on GPU. This is important so things run faster.")
else:
    print("The code will run on CPU. You should probably not do this.")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
class dataset (Dataset):
    def __init__(self,data,target):
        self.data = data
        self.target = target
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self,idx):
        temp = np.load(self.data[idx])
        
        X = torch.from_numpy(temp).type(torch.DoubleTensor)
        y = torch.tensor(self.target[idx]).type(torch.LongTensor)
        
        return X,y

In [None]:
data=np.load("data_preprocessed_path.npy")
labels=np.load("labels.npy")

torch.manual_seed(0)
split = random_split(data,(50000,len(data)-50000))

data_train = data[split[0].indices]
data_test = data[split[1].indices]
labels_train = labels[split[0].indices]
labels_test = labels[split[1].indices]

train_set = dataset(data_train,labels_train)
test_set = dataset(data_test,labels_test)

train_loader = DataLoader(train_set, batch_size=64, shuffle=True,num_workers=0)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False,num_workers=0)

In [None]:
class ResNetBlock(nn.Module):
    def __init__(self, n_features):
        super(ResNetBlock, self).__init__()
        
        self.w1 = nn.Conv2d(in_channels=n_features,out_channels=n_features,kernel_size=3,stride=1,padding=1)
        self.w2 = nn.Conv2d(in_channels=n_features,out_channels=n_features,kernel_size=3,stride=1,padding=1)
        self.activation = nn.ReLU()
    
    def forward(self, x):
        identity = x.clone()
        x = self.w1(x)
        x = self.activation(x)
        x = self.w2(x)
        x = x+identity
        out = self.activation(x)
        return out
    
class SE_ResNetBlock(nn.Module):
    def __init__(self, n_features,r):
        super(SE_ResNetBlock, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=n_features,out_channels=n_features,kernel_size=3,stride=1,padding=1)
        self.conv2 = nn.Conv2d(in_channels=n_features,out_channels=n_features,kernel_size=3,stride=1,padding=1)
        self.activation = nn.ReLU()
        self.globalpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
        self.fc = nn.Conv2d(in_channels=n_features,out_channels=n_features//r,kernel_size=1,stride=1,padding=0) #nn.Linear(in_features=n_features,out_features=n_features//r) 
        self.fc2 = nn.Conv2d(in_channels=n_features//r,out_channels=n_features,kernel_size=1,stride=1,padding=0) #nn.Linear(in_features=n_features//r,out_features=n_features)
        self.gate = nn.Sigmoid()
    
    def forward(self, x):
        identity = x.clone()
        out = self.conv1(x)
        
        out = self.activation(out)
        out = self.conv2(out)
        
        se = self.globalpool(out) #.unsqueeze(-1).unsqueeze(-1) add if using nn.linear
        se = self.fc(se)
        se = self.activation(se)
        se = self.fc2(se)
        se = self.gate(se)
        
        out = (out*se)+identity
        out = self.activation(out)
        return out

In [None]:
class SE_ResNet(nn.Module):
    def __init__(self, n_in, n_features, num_blocks=2,r=8):
        super(SE_ResNet, self).__init__()
        #First conv layers needs to output the desired number of features.
        conv_layers =[nn.Conv2d(n_in, n_features, kernel_size=3, stride=1, padding=1),
                      nn.ReLU(),
                      nn.Conv2d(n_features,n_features,3,1,1),
                      nn.ReLU(),
                      nn.MaxPool2d(2,2), #160x50
                      nn.Conv2d(n_features,2*n_features,3,1,1),
                      nn.ReLU()]
        
        for i in range(num_blocks):
            conv_layers.append(SE_ResNetBlock(2*n_features,r))
            
        conv_layers.append(nn.Sequential(nn.MaxPool2d(2,2),
                            nn.Conv2d(2*n_features, 4*n_features, kernel_size=3, stride=1, padding=1),
                            nn.ReLU())) #80x25
        
        for i in range(num_blocks):
            conv_layers.append(SE_ResNetBlock(4*n_features,r))
            
        conv_layers.append(nn.Sequential(nn.MaxPool2d(2,2),
                            nn.Conv2d(4*n_features, 8*n_features, kernel_size=3, stride=1, padding=1),
                            nn.ReLU())) #40x13 eller #40x12
        for i in range(num_blocks):
            conv_layers.append(SE_ResNetBlock(8*n_features,r))
        
        self.blocks = nn.Sequential(*conv_layers)
        
        self.fc = nn.Sequential(nn.Linear(40*12*8*n_features, 2048),
                                nn.ReLU(),
                                nn.Linear(2048, 512),
                                nn.ReLU(),
                                nn.Linear(512,5),
                                nn.Softmax(dim=1))
        
    def forward(self, x):
        x = self.blocks(x)
        #reshape x so it becomes flat, except for the first dimension (which is the minibatch)
        x = x.view(x.size(0), -1)
        out = self.fc(x)
        return out

In [None]:
#We define the training as a function so we can easily re-use it.
def train(model, optimizer, num_epochs=10):
    train_acc_all = []
    test_acc_all = []

    for epoch in tqdm(range(num_epochs), unit='epoch'):
        model.train()
        #For each epoch
        train_correct = 0
        for minibatch_no, (data, target) in tqdm(enumerate(train_loader), total=len(train_loader)):
            data, target = data.to(device), target.to(device)
            #Zero the gradients computed for each weight
            optimizer.zero_grad()
            #Forward pass your image through the network
            output = model(data)
            #Compute the loss
            loss = F.nll_loss(torch.log(output), target)
            #Backward pass through the network
            loss.backward()
            #Update the weights
            optimizer.step()
            
            #Compute how many were correctly classified
            predicted = output.argmax(1)
            train_correct += (target==predicted).sum().cpu().item()
            
            #Remove mini-batch from memory
            torch.cuda.empty_cache()
            del data, target, loss
        #Comput the test accuracy
        test_correct = 0
        model.eval()
        for data, target in test_loader:
            data = data.to(device)
            with torch.no_grad():
                output = model(data)
            predicted = output.argmax(1).cpu()
            test_correct += (target==predicted).sum().item()
        train_acc = train_correct/len(trainset)
        test_acc = test_correct/len(testset)
        train_acc_all.append(train_acc)
        test_acc_all.append(test_acc)
        print("Accuracy train: {train:.1f}%\t test: {test:.1f}%".format(test=100*test_acc, train=100*train_acc))
    return test_acc_all, train_acc_all

In [None]:
model = SE_ResNet(n_in=7,n_features=8).double()
model.to(device)
optimizer = optim.SGD(model.parameters(),lr=1e-3)
train(model,optimizer,num_epochs=1)