# Assignment 2

## Due: 11:59PM PST, Nov 10, 2023

In this assignment, we will implement (1) depthwise convolution, (2) group convolution, (3) softmax, (4) scaled dot product attention module, and (4) asymmetric linear quantization.

## Helper code: Please do not modify this cell

In [71]:
# Please do not modify this cell
import torch
import torch.quantization

from typing import Final, Tuple
import random
import math

filter_res = [1, 3, 5]
channel_count = [8, 16]
num_groups_list = [1, 4, 8]


number_of_tests: Final[int] = 10

def test_res_printer(
    test_res: Tuple[int],
    test_name: str = "Test"
) -> None:
    print("<Test {} Results>".format(test_name))
    print("- Correct: {}".format(test_res[0]))
    print("- Incorrect: {}".format(test_res[1]))
    print("-> Passed {}% of tests".format((test_res[0]/test_res[2])*100))

## Group convolution example

Group convolution and its special case, depth-wise convolution, are widely used. However, not many materials describe how they work in detail. We already covered the basics in the lecture at a high level. However, to deeply understand how it works, taking a look at code examples will be helpful.

In [72]:
##############################
# Example Tensor dimensions
input_channels = 100
output_channels = 10
input_height = 10
input_width = 10
filter_height = 3
filter_width = 3

# The number of groups must evenly divide num_input_channels and num_output_channels
# This is a valid group size as 100 % 5 == 0, 10 % 5 == 0
num_groups = 5

##############################



example_group_conv = torch.nn.Conv2d(
    input_channels, # 100
    output_channels, # 10
    [filter_height, filter_width], #[3,3]
    groups = num_groups # 5
)

example_input = torch.rand([input_channels,input_height,input_width]) # [N=1, C=100, H=10, W=10]
groupd_conv2d_output = example_group_conv(example_input)

print("<Group convolution case>")
print("Input shape: {}".format(example_input.shape))
print("Weight shape: {}".format(example_group_conv.weight.shape))
print("Output shape: {}\n".format(groupd_conv2d_output.shape))

# Now compare with typical CONV2D without any groups
# The only difference is that we will use groups = 1
example_conv2d = torch.nn.Conv2d(
    input_channels, # 100
    output_channels, # 10
    [filter_height, filter_width], #[3,3]
    groups=1 # no splitting into groups
)

conv2d_output = example_conv2d(example_input)

print("<Normal Conv2D case>")
print("Input shape: {}".format(example_input.shape))
print("Weight shape: {}".format(example_conv2d.weight.shape))
print("Output shape: {}\n".format(conv2d_output.shape))

num_weights_group_conv2d = example_group_conv.weight.numel()
num_weights_conv2d = example_conv2d.weight.numel()

print(">> Group Conv2D generates the same-sized output with {}% less number of weights".format(
    (1 - num_weights_group_conv2d / num_weights_conv2d) * 100)
)

print("Please feel free to try this example again modifyng the example tensor dimensions and num_groups above")


<Group convolution case>
Input shape: torch.Size([100, 10, 10])
Weight shape: torch.Size([10, 20, 3, 3])
Output shape: torch.Size([10, 8, 8])

<Normal Conv2D case>
Input shape: torch.Size([100, 10, 10])
Weight shape: torch.Size([10, 100, 3, 3])
Output shape: torch.Size([10, 8, 8])

>> Group Conv2D generates the same-sized output with 80.0% less number of weights
Please feel free to try this example again modifyng the example tensor dimensions and num_groups above


## Problem 1 (20 pts)

Please implement depthwise convolution. For simplicity, we won't use any padding and stride in this example (i.e., padding = [0, 0], stride = [1, 1]). Your implementation should match the behavior of "torch.nn.Conv2d" in Pytorch.
(reference: https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html).

### Hints

We will use the same dimension convention (notations) used in our lecture slides (N: batch, K: output channel, C: input channel, P/Q: Output height/width, H/W: input height/width, R/S: filter height/width). The depthwise convolution can be described as follows:

$$
  O[n][k][p][q] = \sum_{r=0}^{R}\sum_{s=0}^{S} W[k][0][r][s] \times I[n][k][p+r][q+s]
$$

The tensor shapes are as follows:
  - Weight: [K, 1, R, S]
  - Input: [N, K, H, W]
  - Output: [N, K, P, Q]

Please take a detailed look at the weight shape and the depthwise convolution formula in this cell; this will provide you with a good amount of information about the operation.

### Your Implementation (TODO)

In [83]:
# Implement this cell
def my_depthwise_conv2d(
    weight: torch.Tensor, # [K, 1, R, S];
    input_activation: torch.Tensor, # [1, C, H, W]
) -> torch.Tensor: # [1, K, P, Q]
    # raise NotImplementedError
    K, _, R, S = weight.shape
    N, C, H, W = input_activation.shape
    P = H - R + 1
    Q = W - S + 1

    output = torch.zeros([N, K, P, Q])

    for n in range(N):
        for c in range(C):
            for k in range(K//C):
                for p in range(P):
                    for q in range(Q):
                        output[n, c*(K//C)+k, p, q] = torch.sum(weight[c*(C//K)+k, ...] * input_activation[n, c, p:p+R, q:q+S])

    return output

### Test Code

In [81]:
# Test program: Please do not modify this cell
def test_depthwise_conv2d(num_tests = number_of_tests):
    correct = 0
    incorrect = 0

    for test_id in range(num_tests):
        activaiton_resolution = torch.randint(7, 32,[2,])

        K = int(random.choice(channel_count))
        C = K
        H = activaiton_resolution[0]
        W = activaiton_resolution[1]
        R = int(random.choice(filter_res))
        S = R
        G = C

        if H < R:
            H += R

        if W < S:
            W += S

        # Generate random input activation and weight
        weight = torch.randn(K, 1 , R, S)

        input_activation = torch.randn(1,C,H,W)

        # Compute the reference output
        reference_conv = torch.nn.Conv2d(C, K, (R,S), groups = G, bias=False)
        reference_conv.weight = torch.nn.Parameter(weight)

        reference_output = reference_conv(input_activation)
        # print(f'refout in shape {reference_output.shape}')

        # Compute the test output
        test_output = my_depthwise_conv2d(weight, input_activation)
        # print(f'myout in shape {test_output.shape}')

        if torch.allclose(reference_output, test_output,  atol=1e-3):
            correct += 1
        else:
            incorrect += 1
        print("Finished test run {}".format(test_id))

    return (correct, incorrect, num_tests)

In [84]:
# You can run tests by running this cell

test_res = test_depthwise_conv2d(5)
test_res_printer(test_res, "Problem 1")

Finished test run 0
Finished test run 1
Finished test run 2
Finished test run 3
Finished test run 4
<Test Problem 1 Results>
- Correct: 5
- Incorrect: 0
-> Passed 100.0% of tests


## Problem 2 (50 pts)

Please implement the group convolution, which is a generalization of the depth-wise convolution. The behavior of your function should match that of "torch.nn.Conv2d" in Pytorch.

(reference: https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html)

### Hint


#### High-level overview
Group convolution can be seen as parallel convolutions, splitted from a regular CONV2D. When then number of groups = G, we split input and output channels into G groups. Each group contains C/G input channels and K/G output channels, which means C and K must be dividable by G.

For example, if K = 20, C = 10, and G = 2, we have two independent CONV2Ds that have K = 10 and C = 5 for each. After computing the individual CONV2Ds, we concatenate the results in output channel dimension.

#### More detailed example

Let's take a look at another more concrete example:

We want to split a CONV2D with dimensions N = 1, K = 4, C = 10, H = 3, W = 3, R = 1, and S = 1, into two groups. Let's name the two sub_conv2D ops as sub_conv2D_0 and sub_conv2D_1, respectively. Also, let's use conv2d_full for the original conv2d before splitting.



Then, inputs are splitted as follows:
$$
Input_{sub\_conv2D\_0} = Input_{conv2d\_full}[n=0][\color{red}{c=0,1,2,3,4}][h=0,1,2][w=0,1,2]
$$

$$
Input_{sub\_conv2D\_1} = Input_{conv2d\_full}[n=0][\color{red}{c=5,6,7,8,9}][h=0,1,2][w=0,1,2]
$$



For weights, we are not simply splitting them into two. Instead, we define a new weight tensor, $New\_Weight$, reflecting the reduced amount of weights for the group convolution arithmetic. The shape of $New\_Weight$ is [K/G, C/G, R, S], which is [4/2, 10/2, 1, 1] == [2, 5, 1, 1] in this example.

$$
Weight_{sub\_conv2D\_0} = New\_Weight[\color{blue}{k=0,1}][\color{red}{c'=0,1,2,3,4}][r=0][s=0]
$$

$$
Weight_{sub\_conv2D\_1} = New\_Weight[\color{blue}{k=2,3}][\color{red}{c'=0,1,2,3,4}][r=0][s=0]
$$

Note that the input channel dimension of weights is denoted as c', not c. The size of c' dimension is C/G where C is the input channel count of conf2d_full, and G is the number of groups.



With these new inputs and weights, we compute independent convolutions and concatenate the results into the output channel of the output activation.

$$
Output_{group\_conv}[n=0][\color{blue}{k=0,1}][p=0,1,2][q=0,1,2] = Output_{sub\_conv2D\_0}
$$

$$
Output_{group\_conv}[n=0][\color{blue}{k=2,3}][p=0,1,2][q=0,1,2] = Output_{sub\_conv2D\_1}
$$


### Your Implementation (TODO)

In [75]:
# Note: Please read the tensor dimension in the comment carefully
def my_group_conv2d(
    weight: torch.Tensor, # [K, C/G, R, S]
    input_activation: torch.Tensor, # [1, C, H, W]
    num_groups: int,
) -> torch.Tensor: # [1, K, P, Q]

    # raise NotImplementedError
    K, C_, R, S = weight.shape
    N, C, H, W = input_activation.shape
    G = num_groups
    assert C_ == C/G
    assert C % G == 0
    assert K % G == 0
    P = H - R + 1
    Q = W - S + 1

    output = torch.zeros([N, K, P, Q])
    # print(input_activation.shape)
    # print(weight.shape)
    # print(G)
    # print(output.shape)

    for g in range(G):
        for n in range(N):
            for k in range(K//G):
                for c in range(C//G):
                    for p in range(P):
                        for q in range(Q):
                            output[n, g*(K//G)+k, p, q] += torch.sum(input_activation[n, g*(C//G)+c, p:p+R, q:q+S] * weight[g*(K//G)+k, c, ...])

    return output

### Test Code

In [78]:
# Test program: Please do not modify this cell
def test_group_conv2d(num_tests = number_of_tests):
    correct = 0
    incorrect = 0

    for test_id in range(num_tests):
        activaiton_resolution = torch.randint(7, 32,[2,])

        K = int(random.choice(channel_count))
        C = int(random.choice(channel_count))
        H = activaiton_resolution[0]
        W = activaiton_resolution[1]
        R = int(random.choice(filter_res))
        S = R
        G = int(random.choice(num_groups_list))

        if H < R:
            H += R

        if W < S:
            W += S

        # Generate random input activation and weight
        weight = torch.randn(K,int(C/G),R,S)

        input_activation = torch.randn(1,C,H,W)

        # Compute the reference output
        reference_conv = torch.nn.Conv2d(C, K, (R,S), groups = G, bias=False)
        reference_conv.weight = torch.nn.Parameter(weight)

        reference_output = reference_conv(input_activation)

        # Compute the test output
        test_output = my_group_conv2d(weight, input_activation, G)

        if torch.allclose(reference_output, test_output,  atol=1e-3):
            correct += 1
        else:
            incorrect += 1
        print("Finished test run {}".format(test_id))

    return (correct, incorrect, num_tests)

In [79]:
# You can run this test to validate your implementation
test_res = test_group_conv2d(5)
test_res_printer(test_res, "Problem 2")

Finished test run 0
Finished test run 1
Finished test run 2
Finished test run 3
Finished test run 4
<Test Problem 2 Results>
- Correct: 5
- Incorrect: 0
-> Passed 100.0% of tests


## Problem 3 (50 pts)

In problems 3, 4, and 5, we will implement building blocks for a self-attention module in Transformer models and combine them to implement our own scaled dot product attention module. Like previous problems, you are expected to implement your own version of the module and the generate results will be compared against a reference implementation.

In problem 3, you will implement your own version of softmax. Please read the requirement and tips carefully and implement your softmax.

### Requirements
- You cannot use pre-built softmax functions (e.g., torch.nn.softmax)
- You can use Pytorch functions that do not directly implement the softmax (e.g., torch.exp)

### Spec
- The softmax function expects a 3D tensor as its input and outputs a 3D tensor that has the same shape as the input tensor
- The data dimension of the input tensor is [B, L, L], where B and L refer to the batch and sequence length, respetively. (The dimensionality is based on the Transformer use case.)
- Your softmax should calculate softmax on the last dimension (L), like we do in the Transformer model. To clarify, I mean the right-most L dimension in the shape definition ([B, L, L]).

### Hint

- You should calcuate softmax on each row of the given tensor (i.e., for each batch, for each row, compute softmax across columns within the row).
- If you directly implement softmax following the mathmatical definition, you will likely encounter some "issues." Figuring out a solution is the discussion problem; I will not directly let you know about the solution. Please utilize all the resources and tools available to find a solution (imagine you encoutered such a situation in your future job; you must address the issue).

### Your Implementation (TODO)

In [110]:
# Implement this cell

# B: Batch size
# L: Sequence length
# D: Embedding dimension

def my_softmax(
    input_tensor: torch.Tensor, # Tensor: [B, L, D]
) -> torch.Tensor:
    temp = input_tensor - torch.max(input_tensor, dim=-1, keepdim=True).values
    temp = torch.exp(temp)
    return temp / torch.sum(temp, axis=-1, keepdims=True)

### Test Code

In [111]:
import torch.nn.functional as F

# Test program: Please do not modify this cell
def test_softmax(num_tests = number_of_tests):
    correct = 0
    incorrect = 0

    for test_id in range(num_tests):
        batch_size = torch.randint(1,8,[1,])
        tensor_dim = torch.randint(1,1024, [1,])

        # Generate a random tensor
        softmax_input = torch.rand(batch_size, tensor_dim[0], tensor_dim[0]) * 256

        # Compute the reference output
        reference_softmax = torch.nn.Softmax(dim=2)
        reference_output = reference_softmax(softmax_input)
        # Compute the test output
        test_output = my_softmax(softmax_input)

        #print(reference_output)

        if torch.allclose(reference_output, test_output,  atol=1e-3):
            correct += 1
        else:
            incorrect += 1
        print("Finished test run {}".format(test_id))

    return (correct, incorrect, num_tests)


In [113]:
test_res = test_softmax(20)
test_res_printer(test_res, "Problem 3")

Finished test run 0
Finished test run 1
Finished test run 2
Finished test run 3
Finished test run 4
Finished test run 5
Finished test run 6
Finished test run 7
Finished test run 8
Finished test run 9
Finished test run 10
Finished test run 11
Finished test run 12
Finished test run 13
Finished test run 14
Finished test run 15
Finished test run 16
Finished test run 17
Finished test run 18
Finished test run 19
<Test Problem 3 Results>
- Correct: 20
- Incorrect: 0
-> Passed 100.0% of tests


## Problem 4 (30 pts)

In problem 4, you will implement the multi-head scaled dot product attention block in the Transformer model.

### Requirements
- You cannot use pre-built Pytorch classes and functions that directly implement the scaled dot product attention block
- You can use other Pytorch functions (e.g., torch.bmm)
- You are implementing a block for inferences; no infrastructure for training is required

### Spec
- Your function receives three tensors (Query, Key, and Value in Transformer) as inputs
- The input tensors are 3D tensors
- The dimension of input tensors are identical (common case in Transformer models); [B, L, D] where B, L, and D are batch, sequence length, and Q/K/V dimension
- You do not have to multiply a weight matrix to Q, K and V; assume that we already computed that for Q, K, and V
- Assume that we are not masking any rows out.

### Your Implementation (TODO)

In [99]:
def my_scaled_dot_product_attention(
    query: torch.Tensor, # Tensor [B, L, D]
    key: torch.Tensor,   # Tensor [B, L, D]
    value: torch.Tensor, # Tensor [B, L, D]
) -> torch.Tensor:
    # We will target the common case: the shape of
    # Q, K, and V matrices are the same.
    assert(query.shape == key.shape)
    assert(value.shape == key.shape)

    # Calculate the dot product of the query and key
    dot_product = torch.bmm(query, key.transpose(1, 2))

    # Scale the dot product by the square root of the dimension of the key
    D = key.size(-1)
    scaled_dot_product = dot_product / torch.sqrt(torch.tensor(D).float())

    # Optional: mask
    pass

    # Apply the softmax function to the scaled dot product tensor
    attention_weights = torch.nn.functional.softmax(scaled_dot_product, dim=-1)

    # Multiply the attention weights by the value tensor to get the output
    output = torch.bmm(attention_weights, value)

    return output

### Test Code

In [100]:
import torch.nn.functional as F


# Test program: Please do not modify this cell
def test_scaled_dot_product_attention(num_tests = number_of_tests):
    correct = 0
    incorrect = 0

    for test_id in range(num_tests):

        batch_size = torch.randint(1,32,[1,])
        seq_length = torch.randint(32,128,[1,])
        dim_size = torch.randint(128,256,[1,])


        query = torch.rand(batch_size, seq_length, dim_size, dtype=torch.float)
        key = torch.rand(batch_size, seq_length, dim_size, dtype=torch.float)
        value = torch.rand(batch_size, seq_length, dim_size, dtype=torch.float)

        # Compute the reference output
        reference_output = F.scaled_dot_product_attention(query,key,value)
        # Compute the test output
        test_output = my_scaled_dot_product_attention(query,key,value)

        if torch.allclose(reference_output, test_output,  atol=1e-3):
            correct += 1
        else:
            incorrect += 1
        print("Finished test run {}".format(test_id))

    return (correct, incorrect, num_tests)


In [101]:
test_res = test_scaled_dot_product_attention(10)
test_res_printer(test_res, "Problem 4")

Finished test run 0
Finished test run 1
Finished test run 2
Finished test run 3
Finished test run 4
Finished test run 5
Finished test run 6
Finished test run 7
Finished test run 8
Finished test run 9
<Test Problem 4 Results>
- Correct: 10
- Incorrect: 0
-> Passed 100.0% of tests


## Problem 5 (30 pts)

In this problem, we will implement asymmetric linear quantization function. Your function will receive a 2D tensor as an input and quantize the tensor into UINT8 type (unsigned 8 bit integer). Your function needs to (1) investigate the values to identify minimum and maximum values, (2) compute the scale and zero point based on the linear quantization method, and (3) quantize the input tensor using the computed scale and zero point.

* Note: This problem is about the asymmetric quantization; you don't need to clip (or clamp) values like the symmetric quantization did.

### Your Implementation (TODO)

In [102]:
# Implement this cell

def my_uint8_asymmetric_quantization(input_tensor):
    # UINT8 asymmetric quantization
    min_val, max_val = torch.min(input_tensor), torch.max(input_tensor)

    scale = (max_val - min_val) / 255
    zero = -min_val / scale

    output = torch.round(input_tensor / scale + zero)

    output = output.type(torch.uint8)
    return output

### Test Code

In [103]:
# Test inputs and reference outputs
# Please do not modify this cell

test_input1 = torch.Tensor(
        [[-0.9191,  0.9685,  0.7212, -0.9223],
        [ 0.5400, -0.9494,  0.9794,  0.0037],
        [ 0.9669, -0.8960, -0.5399,  0.3006],
        [-0.9815,  0.9160, -0.2564,  0.2825]]
)

ref_output1 = torch.Tensor(
        [[  8, 254, 222,   8],
        [198,   5, 255, 128],
        [254,  11,  58, 167],
        [  0, 247,  95, 165]]
)

test_input2 = torch.Tensor(
        [[ 0.5587,  0.7415,  0.8307, -0.4854],
        [ 0.5472,  0.1830, -0.0901,  0.3482],
        [-0.5323, -0.3921,  0.1843,  0.1048],
        [-0.7526,  0.0928,  0.3061, -0.2401]]
)

ref_output2 = torch.Tensor(
        [[211, 240, 255,  43],
        [209, 150, 106, 177],
        [ 35,  58, 151, 138],
        [  0, 136, 170,  82]]
)

test_input3 = torch.Tensor(
        [[ 0.9660, -0.4789, -0.7847,  0.1772],
        [-0.0440, -0.6890,  0.7417,  0.1999],
        [ 0.5212,  0.0673,  0.6502,  0.9912],
        [-0.1901,  0.0888,  0.4853,  0.3232]]
)

ref_output3 = torch.Tensor(
        [[252,  44,   0, 138],
        [107,  14, 220, 142],
        [188, 123, 206, 255],
        [ 86, 126, 183, 159]]
)


test_input4 = torch.Tensor(
        [[-0.4374,  0.8469,  0.6904, -0.7379],
        [ 0.4810,  0.7268,  0.1416,  0.0273],
        [ 0.4733,  0.4300, -0.7830,  0.2241],
        [-0.9042,  0.7450, -0.3024,  0.5965]]
)

ref_output4 = torch.Tensor(
        [[ 68, 255, 233,  25],
        [202, 238, 153, 136],
        [201, 195,  18, 165],
        [  0, 240,  88, 219]]
)


test_input5 = torch.Tensor(
        [[ 0.6637, -0.5765, -0.5117,  0.8479],
        [ 0.2494,  0.0126, -0.0602,  0.9708],
        [ 0.4532, -0.6239, -0.2159, -0.3135],
        [ 0.7132,  0.9600, -0.6399, -0.5454]]
)

ref_output5 = torch.Tensor(
        [[206,  10,  20, 235],
        [140, 103,  91, 255],
        [173,   2,  67,  51],
        [214, 253,   0,  15]]
)

test_inputs = [test_input1, test_input2, test_input3, test_input4, test_input5]
ref_outputs = [ref_output1, ref_output2, ref_output3, ref_output4, ref_output5]

In [104]:
# You can validate your implementation using these tests

def test_asymmetric_quantization():
    correct = 0
    incorrect = 0
    num_tests = len(test_inputs)
    for test_id in range(num_tests):
        test_input = test_inputs[test_id]
        ref_output = ref_outputs[test_id]
        test_output = my_uint8_asymmetric_quantization(test_input)

        # Using the tolerance of 1 as different rounding direction can flip the results
        if torch.allclose(ref_output.to(torch.float), test_output.to(torch.float), atol=1):
            correct += 1
        else:
            incorrect += 1

    return (correct, incorrect, num_tests)

test_res = test_asymmetric_quantization()
test_res_printer(test_res, "Problem5")

<Test Problem5 Results>
- Correct: 5
- Incorrect: 0
-> Passed 100.0% of tests
