In [None]:
# https://discuss.pytorch.org/t/custom-a-new-convolution-layer-in-cnn/43682/19

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

In [2]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 2

trainset = torchvision.datasets.CIFAR10(root='../data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='../data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified
Files already downloaded and verified


In [45]:
# next(iter(trainloader))
# iter(trainloader).next()

In [46]:
class convZ(torch.autograd.Function):
    
#     def __init__ (self, in_channel, out_channel, kernel_size=3, stride=1, padding=1) :
#         self.in_channel = in_channel
#         self.out_channel = out_channel
#         self.kernel_size = kernel_size
#         self.stride = stride
#         self.padding = padding
#         self.padding_mode = 'zeros'
#         super().__init__()

    # Note that both forward and backward are @staticmethods
    @staticmethod
    # bias is an optional argument
    def forward(ctx, input, weight, bias=None):
        ctx.save_for_backward(input, weight, bias)

        '''Output: (N, C_out, H_out, W_out) where N = batch size = 2, c_out = 1, H=height, W=width
        H_out = (H_in+2×padding[0]−dilation[0]×(kernel_size[0]−1)−1)/stride[0] +1
        W_out = (W_in+2×padding[1]−dilation[1]×(kernel_size[1]−1)−1)/stride[1] +1
        stride = 1
        padding = 0
        dilation = 0
        h_in = 1
        '''
        batch_size = len(input)
        c_out = 1 #len(input[0][0])-1
        h_out = 1
        w_out = len(input[0][0][0]) -1
        output = torch.empty(batch_size, c_out, h_out, w_out)

        start_col_indx = 0
        end_col_indx = 2

        for j in range(len(input)): # batch size
            out_col_indx = 0
            for i in range(len(input[j][0][0]) - 1): # nb of cols in each sample data
                conv_mul = input[j][0][:, start_col_indx:end_col_indx] * weight
                start_col_indx += 1
                end_col_indx += 1
                conv_sum = torch.sum(conv_mul)
                output[j][0][0][out_col_indx] = conv_sum
                out_col_indx +=1

        # output = input.mm(weight.t())
        # output = input + weight
        if bias is not None:
            output += bias.unsqueeze(0).expand_as(output)
        return output
#         return nn.Conv2d(self.in_channel , self.out_channel, self.kernel_size, 
#                          self.stride, self.padding, bias=False)(input)

    # This function has only a single output, so it gets only one gradient
    @staticmethod
    def backward(ctx, grad_output):
        # This is a pattern that is very convenient - at the top of backward
        # unpack saved_tensors and initialize all gradients w.r.t. inputs to
        # None. Thanks to the fact that additional trailing Nones are
        # ignored, the return statement is simple even when the function has
        # optional inputs.
        input, weight, bias = ctx.saved_tensors
        grad_input = grad_weight = grad_bias = None

        # These needs_input_grad checks are optional and there only to
        # improve efficiency. If you want to make your code simpler, you can
        # skip them. Returning gradients for inputs that don't require it is
        # not an error.
        # if ctx.needs_input_grad[0]:
        #     grad_input = grad_output.mm(weight)
        # if ctx.needs_input_grad[1]:
        #     grad_weight = grad_output.t().mm(input)
        # if bias is not None and ctx.needs_input_grad[2]:
        #     grad_bias = grad_output.sum(0).squeeze(0)

        return grad_input, grad_weight, grad_bias

In [47]:
# torch.randn(1, 1, 2, 2).size()
# image, label = next(iter(trainloader))
# image.shape
# weight = torch.randn(3,6,3)
# weight.size()
# myfn = convZ(3,6).apply
# myfn(image, weight)


In [48]:
class MyConvZ(nn.Module):
    def __init__(self):
        super(MyConvZ, self).__init__()
        self.fn = convZ.apply
        # weight tensor = out_channels× in_channels/groups ×kH×kW
        self.weight = nn.Parameter(torch.randn(1, 1, 2, 2)) # when groups=1

    def forward(self, x):
        x = self.fn(x, self.weight)
        '''How to initialize weight with arbitrary tensor:
            https://discuss.pytorch.org/t/how-to-initialize-weight-with-arbitrary-tensor/3432'''
        return x

In [55]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.convZ = MyConvZ()
        self.pool = nn.AvgPool1d(2)

    def forward(self, x):
        x = F.relu(self.convZ(x))
        temp2 = []
        for i in range(len(x)):
            temp2.append(x[i])
        temp3 = [[temp2]]
        temp4 = torch.Tensor(temp3)
        x = self.pool(temp4)
        return x

In [56]:
net = Net()

# Define a Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [57]:
input1 = torch.Tensor([
                        [[[1, 0, 0, 1],
                         [0, 1, 0, 1]]],

                        [[[1, 0, 0, 1],
                         [0, 1, 0, 1]]]
                      ])

label1 = torch.LongTensor([0., 1.])

net = Net()

In [58]:
# Define a Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

# zero the parameter gradients
optimizer.zero_grad()

# forward + backward + optimize
outputs = net(input1)
loss = criterion(outputs, label1)
loss.backward()
optimizer.step()

# print statistics
running_loss = 0.0
running_loss += loss.item()

print(running_loss)

print('Finished Training')

# Test the network on the test data
test = torch.Tensor([
                            [[[1, 1, 1, 1],
                             [0, 1, 0, 1]]],

                            [[[0, 0, 0, 0],
                             [0, 1, 0, 1]]]
                          ])

classes = [0, 1]
test_output = net(test)
print(test_output)
_, predicted = torch.max(test_output, 1)

print('Predicted: ', ' '.join('%5s' % classes[predicted[j]]
                              for j in range(2)))

print('Finished Testing')

RuntimeError: The size of tensor a (0) must match the size of tensor b (2) at non-singleton dimension 3

In [26]:
for i, data in enumerate(trainloader, 0):
    # get the inputs; data is a list of [inputs, labels]
    inputs, labels = data

    # zero the parameter gradients
    optimizer.zero_grad()   
    # forward + backward + optimize
    outputs = net(inputs)
    print(outputs.shape)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()   
    
    # print statistics
    running_loss += loss.item()
    if i % 2000 == 1999:    # print every 2000 mini-batches
        print('[%d, %5d] loss: %.3f' %
              (epoch + 1, i + 1, running_loss / 2000))
        running_loss = 0.0    

RuntimeError: Given groups=1, weight of size [1, 1, 3, 3], expected input[2, 3, 32, 32] to have 1 channels, but got 3 channels instead

In [18]:
# zero the parameter gradients
optimizer.zero_grad()

# forward + backward + optimize
outputs = net(input1)
print(outputs.shape)
loss = criterion(outputs, label1)
loss.backward()
optimizer.step()

# print statistics
running_loss = 0.0
running_loss += loss.item()

print(running_loss)

print('Finished Training')


torch.Size([4, 2])


ValueError: Expected input batch_size (4) to match target batch_size (2).

In [None]:
# Test the network on the test data
test = torch.Tensor([
                            [[[1, 1, 1, 1],
                             [0, 1, 0, 1]]],

                            [[[0, 0, 0, 0],
                             [0, 1, 0, 1]]]
                          ])

classes = [0, 1]
test_output = net(test)
print(test_output)
_, predicted = torch.max(test_output, 1)

print('Predicted: ', ' '.join('%5s' % classes[predicted[j]]
                              for j in range(2)))

print('Finished Testing')

In [None]:
# if __name__ == '__main__':


#     ''' Conv2D when we have just 1 channel(1 input matrix) and the size of the kernel = height of input matrix
#     a 2D convolution where the kernel height is equal to the input height:
#     batch_size = 2 => nb_input in each batch = 2
#     channels = 1 => in RGB photo, each photo includes 3 matrix, so nb_channel = 3, here each input is just a matrix
#     height = 2 => nb_row in each input
#     width = 4 => nb_col in each input
#     kernel_size = (height, 2)
#     (N,C_in,H,W) N is a batch size, C_in denotes a number of channels, H is a height of input planes in pixels, and W is width in pixels.
    
#     # an example is in "https://discuss.pytorch.org/t/2d-input-with-1d-convolution/20331/2" for conv2d standard for 1 input matrix    
#     '''

#     input1 = torch.Tensor([
#                             [[[1, 0, 0, 1],
#                              [0, 1, 0, 1]]],

#                             [[[1, 0, 0, 1],
#                              [0, 1, 0, 1]]]
#                           ])

#     label1 = torch.LongTensor([0., 1.])

#     net = Net()

#     # Define a Loss function and optimizer
#     criterion = nn.CrossEntropyLoss()
#     optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

#     # zero the parameter gradients
#     optimizer.zero_grad()

#     # forward + backward + optimize
#     outputs = net(input1)
#     loss = criterion(outputs, label1)
#     loss.backward()
#     optimizer.step()

#     # print statistics
#     running_loss = 0.0
#     running_loss += loss.item()

#     print(running_loss)

# print('Finished Training')

# # Test the network on the test data
# test = torch.Tensor([
#                             [[[1, 1, 1, 1],
#                              [0, 1, 0, 1]]],

#                             [[[0, 0, 0, 0],
#                              [0, 1, 0, 1]]]
#                           ])

# classes = [0, 1]
# test_output = net(test)
# print(test_output)
# _, predicted = torch.max(test_output, 1)

# print('Predicted: ', ' '.join('%5s' % classes[predicted[j]]
#                               for j in range(2)))

# print('Finished Testing')