In [1]:
import torch.nn as nn
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch
import numpy as np
import cv2
import matplotlib.pyplot as plt
from numpy.random import rand, randn
import torchvision.datasets as dset
import torchvision.transforms as transforms

In [2]:
import torch
from torch.nn.parameter import Parameter
from torch.nn import functional as F
from torch.nn.modules.conv import _ConvNd
from torch.nn.modules.utils import _pair

class MyGabor(_ConvNd):
    
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                 padding=0, dilation=1, groups=1, bias=False):
        kernel_size = _pair(kernel_size)
        stride = _pair(stride)
        padding = _pair(padding)
        dilation = _pair(dilation)
        
        super(MyGabor, self).__init__(in_channels, out_channels, kernel_size, stride, padding, dilation,False, _pair(0), groups, bias)
        
        # TODO: подумать над инициализацией параметров
        self.sigma_x = nn.Parameter(kernel_size[0]/3*torch.rand(in_channels,out_channels))
        self.sigma_y = nn.Parameter(kernel_size[0]/3*torch.rand(in_channels,out_channels))
        self.freq = nn.Parameter(kernel_size[0]*torch.rand(in_channels,out_channels))
        self.theta = nn.Parameter(3.14*torch.rand(in_channels,out_channels))
        self.psi = nn.Parameter(3.14*torch.rand(in_channels,out_channels))
        #self.evaluate = False
    
    #def eval(self):
        #self.evaluate = True
        
    #def train(self):
        #self.evaluate = False
        
    def forward(self, input):
        
        '''if self.evaluate == True:
            return F.conv2d(input, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups)'''
        
        x0 = torch.ceil(torch.Tensor([self.kernel_size[0]/2]))[0]
        y0 = torch.ceil(torch.Tensor([self.kernel_size[1]/2]))[0]
        y, x = torch.meshgrid([torch.arange(-y0+1,y0), torch.arange(-x0+1,x0)])
        x = x.to(device)
        y = y.to(device)
        weight = torch.empty(self.weight.shape, requires_grad=False).to(device)
        for i in range(self.in_channels):
            for j in range(self.out_channels):
                sigma_x = self.sigma_x[i,j].expand_as(y)
                sigma_y = self.sigma_y[i,j].expand_as(y)
                freq = self.freq[i,j].expand_as(y)
                theta = self.theta[i,j].expand_as(y)
                psi = self.psi[i,j].expand_as(y)
                
                rotx = x * torch.cos(theta) + y * torch.sin(theta)
                roty = -x * torch.sin(theta) + y * torch.cos(theta) 
                g = torch.zeros(y.shape)
                g = torch.exp(-0.5 * (rotx ** 2 / (sigma_x + 1e-3) ** 2 + roty ** 2 / (sigma_y + 1e-3) ** 2))
                g = g * torch.cos(2 * 3.14 * freq * rotx + psi) #/ (2*3.14*sigma_x*sigma_y)
                weight[j,i] = g
                self.weight.data[j,i] = g    
        return F.conv2d(input, weight, self.bias, self.stride, self.padding, self.dilation, self.groups)

In [3]:
import os
from random import choice
from PIL import Image
import numpy as np

class DatasetLoader():
    def __init__(self, source_path = 'D:\!EMONEW_cropped', classes = 7, train = 0.66):
        self.source_path = os.path.normpath(source_path)
        self.data = []
        self.test_data = []
        self.num_classes = classes
        self.train = train
        
        for path in os.listdir(path=source_path):
            source_path = os.path.join(self.source_path, path)
            if os.path.isdir(source_path):
                classes = classes - 1
                target = [0]*self.num_classes
                target[classes] = 1
                self._list_to_jpeg(source_path, target)
            
    def _list_to_jpeg(self, path, target):
        for i in os.walk(path):
            for j in i[2]:
                if '.jpg' in j or '.png' in j:
                    if np.random.rand() <= self.train:
                        self.data.append([[os.path.join(i[0], j)], target])
                    else:
                        self.test_data.append([[os.path.join(i[0], j)], target])
    def get_batch(self, len_batch, train = True):
        batch_data = []
        batch_target = []
        for i in range(len_batch):
            if train == True:
                d, t = choice(self.data)
            else:
                d, t = choice(self.test_data)
            batch_target.append(t)
            batch_data.append(np.asarray(Image.open(d[0])).transpose(2,0,1))
        return np.array(batch_data), np.array(batch_target)
    
    def lenght(self, train = True):
        if train == True:
            return len(self.data)
        else:
            return len(self.test_data)

In [4]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.g1 = MyGabor(3, 96, kernel_size=(17,17))
        self.bn1 = nn.BatchNorm2d(96)
        self.c1 = nn.Conv2d(96, 128, kernel_size=(3,3))
        self.bn2 = nn.BatchNorm2d(128)
        self.c2 = nn.Conv2d(128, 128, kernel_size=(3,3))
        self.bn3 = nn.BatchNorm2d(128)
        self.c3 = nn.Conv2d(128, 192, kernel_size=(3,3))
        
        self.fc1 = nn.Linear(192*14*13, 32)
        self.fc2 = nn.Linear(32, 7)

    def forward(self, x):
        x = x/255
        x = F.max_pool2d(F.relu(self.bn1(self.g1(x))), kernel_size=4)
        x = F.max_pool2d(F.relu(F.dropout2d(self.bn2(self.c1(x)), inplace=True)), kernel_size=2)
        x = F.max_pool2d(F.relu(F.dropout2d(self.bn3(self.c2(x)), inplace=True)), kernel_size=2)
        x = F.relu(self.c3(x))
        x = x.view(-1, 192*14*13)
        x = F.relu(F.dropout(self.fc1(x), inplace=True))
        x = self.fc2(x)
        return x

In [5]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
net = Net().to(device)
print(net)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.1, momentum=0, weight_decay = 0.9999)
EMONEW = DatasetLoader()

cuda:0
Net(
  (g1): MyGabor(3, 96, kernel_size=(17, 17), stride=(1, 1), bias=False)
  (bn1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (c1): Conv2d(96, 128, kernel_size=(3, 3), stride=(1, 1))
  (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (c2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1))
  (bn3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (c3): Conv2d(128, 192, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=34944, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=7, bias=True)
)


In [None]:
BATCH_SIZE = 16

for epoch in range(250):  # loop over the dataset multiple times
    running_loss = 0.0
    for i in range(int(EMONEW.lenght()/BATCH_SIZE)):
        net.train()
        # get the inputs
        inputs, labels = EMONEW.get_batch(BATCH_SIZE)
        inputs = torch.Tensor(inputs).to(device)
        labels = torch.Tensor(labels).max(1)[1].type(torch.LongTensor).to(device)
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        # print statistics
        running_loss += loss.item()
        if i%20 == 19:
            print('[%d] loss: %.3f' %(epoch + 1, running_loss / (20*BATCH_SIZE)))
            running_loss = 0.0
            
        optimizer.step()
        
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for i in range(int(EMONEW.lenght(False)/BATCH_SIZE)):
            inputs, labels = EMONEW.get_batch(BATCH_SIZE, False)
            inputs = torch.Tensor(inputs).to(device)
            outputs = net(inputs)
            labels = torch.Tensor(labels).max(1)[1].type(torch.LongTensor).to(device)
            test_loss += criterion(outputs, labels) # sum up batch loss
            pred = outputs.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(labels.view_as(pred)).sum().item()

    test_loss /= EMONEW.lenght(False)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, EMONEW.lenght(False),
        100. * correct / EMONEW.lenght(False)))

print('Finished Training')

In [None]:
'''del outputs
del inputs
del net
torch.cuda.empty_cache()'''

In [None]:
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.c0 = nn.Conv2d(3, 96, kernel_size=(5,5))
        self.bn1 = nn.BatchNorm2d(96)
        self.c1 = nn.Conv2d(96, 128, kernel_size=(3,3))
        self.bn2 = nn.BatchNorm2d(128)
        self.c2 = nn.Conv2d(128, 128, kernel_size=(3,3))
        self.bn3 = nn.BatchNorm2d(128)
        self.c3 = nn.Conv2d(128, 192, kernel_size=(3,3))
        
        self.fc1 = nn.Linear(192*15*14, 32)
        self.fc2 = nn.Linear(32, 7)

    def forward(self, x):
        x = x/255
        x = F.max_pool2d(F.relu(self.bn1(self.c0(x))), kernel_size=4)
        x = F.max_pool2d(F.relu(F.dropout2d(self.bn2(self.c1(x)), inplace=True)), kernel_size=2)
        x = F.max_pool2d(F.relu(F.dropout2d(self.bn3(self.c2(x)), inplace=True)), kernel_size=2)
        x = F.relu(self.c3(x))
        x = x.view(-1, 192*15*14)
        x = F.relu(F.dropout(self.fc1(x), inplace=True))
        x = self.fc2(x)
        return x

In [None]:
conv = ConvNet().to(device)
print(conv)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(conv.parameters(), lr=0.1, momentum=0, weight_decay = 0.9999)
EMONEW = DatasetLoader()

In [None]:
BATCH_SIZE = 16

for epoch in range(250):  # loop over the dataset multiple times
    running_loss = 0.0
    for i in range(int(EMONEW.lenght()/BATCH_SIZE)):
        conv.train()
        # get the inputs
        inputs, labels = EMONEW.get_batch(BATCH_SIZE)
        inputs = torch.Tensor(inputs).to(device)
        labels = torch.Tensor(labels).max(1)[1].type(torch.LongTensor).to(device)
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = conv(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i%20 == 19:
            print('[%d] loss: %.3f' %(epoch + 1, running_loss / (20*BATCH_SIZE)))
            running_loss = 0.0
            
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for i in range(int(EMONEW.lenght(False)/BATCH_SIZE)):
            inputs, labels = EMONEW.get_batch(BATCH_SIZE, False)
            inputs = torch.Tensor(inputs).to(device)
            outputs = conv(inputs)
            labels = torch.Tensor(labels).max(1)[1].type(torch.LongTensor).to(device)
            test_loss += criterion(outputs, labels) # sum up batch loss
            pred = outputs.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(labels.view_as(pred)).sum().item()

    test_loss /= EMONEW.lenght(False)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, EMONEW.lenght(False),
        100. * correct / EMONEW.lenght(False)))

print('Finished Training')

In [None]:
del outputs
del inputs
del conv
torch.cuda.empty_cache()