In [1]:
import numpy as np
import torch # As Reference

# 1. Convolution

In [50]:
class ConvLayer:  
    def __init__(self, input_channels: int, output_channels: int, kernel_size: int, stride: int, padding: int):   
        self.input_channels: int = input_channels
        self.output_channels: int = output_channels
        self.kernel_size: int = kernel_size  
        self.stride: int = stride
        self.padding: int = padding 
  
    def forward(self, x: np.ndarray, weight: np.ndarray):  
        """    
        input x: (N, C, H, W) [batchsize, input channels, x_height, x_width]
        input w: (K, C, R, S) [output channels, input channels, w_height, w_width] 
        output: (N, K, P, Q) [batchsize, output channels, output_height, output_width]
        """  
        assert x.shape[1] == weight.shape[1], "x and weight have different input channels!"
        N, C, H, W = x.shape
        K, C, R, S = weight.shape
        
        
        assert C == self.input_channels, "Invalid Input Channels!"
        assert K == self.output_channels, "Invalid Output Channels!"
         
        # Complete padding operation
        
        # x_padded = np.zeros([N, C, H + 2 * self.padding, W + 2 * self.padding])
        # if (self.padding):
        #     x_padded[:, :, self.padding: -self.padding, self.padding : -self.padding] = x
        x_padded = np.pad(
            x, ((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)), 
            mode='constant'
        )
        # Compute output size using self.padding and self.stride
        ## P = floor((H + 2 * padding_x - R) / stride) + 1
        ## Q = floor((W + 2 * padding_y - S) / stride) + 1
        P: np.ndarray = np.floor((H + 2 * self.padding - R) / self.stride) + 1
        Q: np.ndarray = np.floor((W + 2 * self.padding - S) / self.stride) + 1
        P, Q = P.astype(np.int32).item(), Q.astype(np.int32).item()
        
        output = np.zeros([N, K, P, Q])
        # complete convolution operation
        for batch_id in range(N):
            for output_channel in range(K):
                feature = np.zeros([P, Q])
                for input_channel in range(C):
                    # Each Single channel
                    x_processed = x_padded[batch_id, input_channel]
                    w_processed = weight[output_channel, input_channel]
                    for i in range(0, H + 2 * self.padding - R + 1, self.stride):
                        for j in range(0, W + 2 * self.padding - S + 1, self.stride):
                            x_partial = x_processed[i: i + R, j: j + S]
                            #height_partial, width_partial = x_partial.shape
                            #w_partial = w_processed[:height_partial, :width_partial]
                            #feature_propossed = np.sum(x_partial * w_partial)
                            feature_propossed = np.sum(x_partial * w_processed)
                            feature[i // self.stride, j // self.stride] += feature_propossed
                output[batch_id, output_channel] = feature
        return output

In [51]:
# Test Convolution
conv = ConvLayer(3, 4, 5, 1, 0)
conv_ref = torch.nn.Conv2d(3, 4, 5, 1, 0, bias = False)

# batch_size: 5; input_channels: 3; output_channels: 6
# x_w, x_h: 10, 10; kernel_w, kernel_h: 3, 3
# output_shape sould be: [5, 6, 10, 10]
x = np.ones([5, 3, 32, 32], dtype = np.float32)
x_torch = torch.tensor(x)
w_torch = conv_ref.weight
w = w_torch.detach().numpy()

out = conv.forward(x, w)
ref = conv_ref.forward(x_torch)

print(out.shape, ref.shape)
mse = torch.sum(torch.square(torch.tensor(out) - ref))
print(mse)

(5, 4, 28, 28) torch.Size([5, 4, 28, 28])
tensor(1.2151e-10, dtype=torch.float64, grad_fn=<SumBackward0>)


# 2. Max Pooling

In [8]:
class MaxPooling2D:  
    def __init__(self, pool_size: tuple = (2, 2), stride: int = 2):  
        self.pool_size: tuple = pool_size  
        self.stride: int = stride  
    def forward(self, x: np.ndarray):  
        """    
        input x: (N, C, H, W) [batchsize, input channels, x_height, x_width]
        output: (N, C, pooled_height, pooled_width)
        """  
        N, C, H, W = x.shape
        # compute output size using self.pool_size and self.stride
        
        pooled_height: np.ndarray = np.floor((H - self.pool_size[0]) / self.stride) + 1
        pooled_width: np.ndarray = np.floor((W - self.pool_size[1]) / self.stride) + 1
        pooled_height, pooled_width = pooled_height.astype(np.int32).item(), pooled_width.astype(np.int32).item()
  
        output = np.zeros((N, C, pooled_height, pooled_width))
        # complete max-pooling operation
        for batch_id in range(N):
            for input_channel  in range(C):
                feature = np.zeros([pooled_height, pooled_width])
                # Single Image
                x_processed = x[batch_id, input_channel]
                i_cnt, j_cnt = 0, 0
                for i in range(0, H, self.stride):
                    for j in range(0, W, self.stride):
                        feature_processed = np.max(x_processed[i: i+self.pool_size[0], j: j + self.pool_size[1]])
                        feature[i_cnt, j_cnt] = feature_processed
                        j_cnt += 1
                    i_cnt += 1
                    j_cnt = 0
                output[batch_id, input_channel] = feature
        return output

In [9]:
# Test Max Pooling
pooling = MaxPooling2D()
pooling_ref = torch.nn.MaxPool2d(kernel_size = (2, 2), stride = 2)


out = pooling.forward(x)
ref = pooling_ref.forward(x_torch)
print(out.shape, ref.shape)
mse = torch.sum(torch.square(torch.tensor(out) - ref))
print(mse)

(5, 3, 5, 5) torch.Size([5, 3, 5, 5])
tensor(0., dtype=torch.float64)


# 3. MLP

In [26]:
class fclayer():
    def __init__(self, in_features: int, out_features: int):
        self.in_features: int = in_features
        self.out_features: int = out_features
        
    def forward(self, x: np.ndarray, weight: np.ndarray):
        # complete forward propagation of fully-connected layer
        
        assert self.in_features == x.shape[-1]
        assert self.in_features == weight.shape[-1]
        assert self.out_features == weight.shape[0]
        output = np.dot(x, weight.T)
        return output  

In [27]:
mlp = fclayer(64, 3)
mlp_ref = torch.nn.Linear(64, 3, bias = False)

inp = np.random.randn(5, 64).astype(np.float32)
inp_torch = torch.tensor(inp)
w = mlp_ref.weight.detach().numpy()

out = mlp.forward(inp, w)
ref = mlp_ref.forward(inp_torch)

print(out.shape, ref.shape)
mse = torch.sum(torch.square(torch.tensor(out) - ref))
print(mse)

(5, 3) torch.Size([5, 3])
tensor(7.8881e-14, grad_fn=<SumBackward0>)
