In [None]:
from PIL import Image
from helper import W8A16LinearLayer
from helper import linear_dequantization, plot_quantization_errors
from helper import linear_q_with_scale_and_zero_point
from helper import plot_quantization_errors
from helper import plot_results
from helper import quantization_error
from huggingface_hub import hf_hub_download
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from transformers import DetrImageProcessor, DetrForObjectDetection
from transformers import OPTForCausalLM, AutoTokenizer, AutoConfig
from transformers import pipeline
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from dotenv import load_dotenv, find_dotenv

# L2-A - Linear Quantization I: Quantize and De-quantize a Tensor

In this lesson, you will learn the fundamentals of linear quantization.

The libraries are already installed in the classroom.  If you're running this notebook on your own machine, you can install the following:

```Python
!pip install torch==2.1.1
```

## Quantization with Random `Scale` and `Zero Point`

- Implement Linear Quantization for when the "scale" and the "zero point" are known/randomly selected.

In [None]:
def linear_q_with_scale_and_zero_point(
    tensor, scale, zero_point, dtype = torch.int8):

    scaled_and_shifted_tensor = tensor / scale + zero_point

    rounded_tensor = torch.round(scaled_and_shifted_tensor)

    q_min = torch.iinfo(dtype).min
    q_max = torch.iinfo(dtype).max

    q_tensor = rounded_tensor.clamp(q_min,q_max).to(dtype)
    
    return q_tensor

In [None]:
### a dummy tensor to test the implementation
test_tensor=torch.tensor(
    [[191.6, -13.5, 728.6],
     [92.14, 295.5,  -184],
     [0,     684.6, 245.5]]
)

In [None]:
### these are random values for "scale" and "zero_point"
### to test the implementation
scale = 3.5
zero_point = -70

In [None]:
quantized_tensor = linear_q_with_scale_and_zero_point(
    test_tensor, scale, zero_point)

In [None]:
quantized_tensor

## Dequantization with Random `Scale` and `Zero Point`

- Now, Dequantize the tensor to see how precise the quantization is.

In [None]:
dequantized_tensor = scale * (quantized_tensor.float() - zero_point)

In [None]:
# this was the original tensor
# [[191.6, -13.5, 728.6],
#  [92.14, 295.5,  -184],
#  [0,     684.6, 245.5]]

In [None]:
dequantized_tensor

In [None]:
### without casting to float
scale * (quantized_tensor - zero_point)

In [None]:
def linear_dequantization(quantized_tensor, scale, zero_point):
    return scale * (quantized_tensor.float() - zero_point)

- Calculate `dequantized_tensor` using the function `linear_dequantization`.

In [None]:
dequantized_tensor = linear_dequantization(
    quantized_tensor, scale, zero_point)

- Print the results of the `dequantized_tensor`.

In [None]:
dequantized_tensor

### Quantization Error

- Load the `plot_quantization_errors` from the helper file.
- To access the `helper.py` file, you can click `File --> Open...`, on the top left.

- Plot the quantization results.

In [None]:
plot_quantization_errors(test_tensor, quantized_tensor,
                         dequantized_tensor)

**Note:** For the plot above, `Quantization Error Tensor = abs(Original Tensor - Dequantized Tensor)`

- Calculate an "overall" quantization error by using [Mean Squared Error](https://en.wikipedia.org/wiki/Mean_squared_error) technique.

In [None]:
dequantized_tensor - test_tensor

In [None]:
(dequantized_tensor - test_tensor).square()

In [None]:
(dequantized_tensor - test_tensor).square().mean()

# L2-B - Linear Quantization I: Get the Scale and Zero Point

In this lesson, continue to learn about fundamentals of linear quantization, and implement your own Linear Quantizer.

Run the next cell to import all of the functions you have used before in the previous lesson(s) of `Linear Quantization I` to follow along with the video.

- To access the `helper.py` file, you can click `File --> Open...`, on the top left.

In [None]:
### a dummy tensor to test the implementation
test_tensor=torch.tensor(
    [[191.6, -13.5, 728.6],
     [92.14, 295.5,  -184],
     [0,     684.6, 245.5]]
)

## Finding `Scale` and `Zero Point` for Quantization

In [None]:
q_min = torch.iinfo(torch.int8).min
q_max = torch.iinfo(torch.int8).max

In [None]:
q_min

In [None]:
q_max

In [None]:
# r_min = test_tensor.min()
r_min = test_tensor.min().item()

In [None]:
r_min

In [None]:
r_max = test_tensor.max().item()

In [None]:
r_max

In [None]:
scale = (r_max - r_min) / (q_max - q_min)

In [None]:
scale

In [None]:
zero_point = q_min - (r_min / scale)

In [None]:
zero_point

In [None]:
zero_point = int(round(zero_point))

In [None]:
zero_point

- Now, put all of this in a function.

In [None]:
def get_q_scale_and_zero_point(tensor, dtype=torch.int8):
    
    q_min, q_max = torch.iinfo(dtype).min, torch.iinfo(dtype).max
    r_min, r_max = tensor.min().item(), tensor.max().item()

    scale = (r_max - r_min) / (q_max - q_min)

    zero_point = q_min - (r_min / scale)

    # clip the zero_point to fall in [quantized_min, quantized_max]
    if zero_point < q_min:
        zero_point = q_min
    elif zero_point > q_max:
        zero_point = q_max
    else:
        # round and cast to int
        zero_point = int(round(zero_point))
    
    return scale, zero_point

- Test the implementation using the `test_tensor` defined earlier.
```Python
[[191.6, -13.5, 728.6],
 [92.14, 295.5,  -184],
 [0,     684.6, 245.5]]
```

In [None]:
new_scale, new_zero_point = get_q_scale_and_zero_point(
    test_tensor)

In [None]:
new_scale

In [None]:
new_zero_point

## Quantization and Dequantization with Calculated `Scale` and `Zero Point`

- Use the calculated `scale` and `zero_point` with the functions `linear_q_with_scale_and_zero_point` and `linear_dequantization`.

In [None]:
quantized_tensor = linear_q_with_scale_and_zero_point(
    test_tensor, new_scale, new_zero_point)

In [None]:
dequantized_tensor = linear_dequantization(quantized_tensor,
                                           new_scale, new_zero_point)

- Plot to see how the Quantization Error looks like after using calculated `scale` and `zero_point`.

In [None]:
plot_quantization_errors(test_tensor, quantized_tensor, 
                         dequantized_tensor)

In [None]:
(dequantized_tensor-test_tensor).square().mean()

### Put Everything Together: Your Own Linear Quantizer

- Now, put everything togther to make your own Linear Quantizer.

In [None]:
def linear_quantization(tensor, dtype=torch.int8):
    scale, zero_point = get_q_scale_and_zero_point(tensor, 
                                                   dtype=dtype)
    
    quantized_tensor = linear_q_with_scale_and_zero_point(tensor,
                                                          scale, 
                                                          zero_point, 
                                                          dtype=dtype)
    
    return quantized_tensor, scale , zero_point

- Test your implementation on a random matrix.

In [None]:
r_tensor = torch.randn((4, 4))

**Note:** Since the values are random, what you see in the video might be different than what you will get.

In [None]:
r_tensor

In [None]:
quantized_tensor, scale, zero_point = linear_quantization(r_tensor)

In [None]:
quantized_tensor

In [None]:
scale

In [None]:
zero_point

In [None]:
dequantized_tensor = linear_dequantization(quantized_tensor,
                                           scale, zero_point)

In [None]:
plot_quantization_errors(r_tensor, quantized_tensor,
                         dequantized_tensor)

In [None]:
(dequantized_tensor-r_tensor).square().mean()

# L3-A - Linear Quantization II: Symmetric vs. Asymmetric Mode

In this lesson, you will learn a different way of performing linear quantization, Symmetric Mode.

The libraries are already installed in the classroom.  If you're running this notebook on your own machine, you can install the following:

```Python
!pip install torch==2.1.1
```

## Linear Quantization: Symmetric Mode

- Implement a function which returns the `scale` for Linear Quantization in Symmetric Mode.

In [None]:
def get_q_scale_symmetric(tensor, dtype=torch.int8):
    r_max = tensor.abs().max().item()
    q_max = torch.iinfo(dtype).max

    # return the scale
    return r_max/q_max

In [None]:
### test the implementation on a 4x4 matrix
test_tensor = torch.randn((4, 4))

**Note:** Since the values are random, what you see in the video might be different than what you will get.

In [None]:
test_tensor

In [None]:
get_q_scale_symmetric(test_tensor)

- Perform Linear Quantization in Symmetric Mode.
- `linear_q_with_scale_and_zero_point` is the same function you implemented in the previous lesson.

In [None]:
def linear_q_symmetric(tensor, dtype=torch.int8):
    scale = get_q_scale_symmetric(tensor)
    
    quantized_tensor = linear_q_with_scale_and_zero_point(tensor,
                                                     scale=scale,
                   # in symmetric quantization zero point is = 0    
                                                    zero_point=0,
                                                      dtype=dtype)
    
    return quantized_tensor, scale

In [None]:
quantized_tensor, scale = linear_q_symmetric(test_tensor)

### Dequantization

- Perform Dequantization
- Plot the Quantization error.
- `linear_dequantization` is the same function you implemented in the previous lesson.

In [None]:
dequantized_tensor = linear_dequantization(quantized_tensor,scale,0)

In [None]:
plot_quantization_errors(
    test_tensor, quantized_tensor, dequantized_tensor)

In [None]:
print(f"""Quantization Error : \
{quantization_error(test_tensor, dequantized_tensor)}""")

# L3-B - Linear Quantization II: Finer Granularity for more Precision

In this lesson, you will learn about different granularities of performing linear quantization. You will cover `per tensor` in this notebook.

Run the next cell to import all of the functions you have used before in the previous lesson(s) of `Linear Quantization II` to follow along with the video.

- To access the `helper.py` file, you can click `File --> Open...`, on the top left.

In [None]:
import torch

from helper import linear_q_symmetric, get_q_scale_symmetric, linear_dequantization
from helper import plot_quantization_errors, quantization_error

## Different Granularities for Quantization
- For simplicity, you'll perform these using Symmetric mode.

### Per Tensor
- Perform `Per Tensor` Symmetric Quantization.

In [None]:
# test tensor
test_tensor=torch.tensor(
    [[191.6, -13.5, 728.6],
     [92.14, 295.5,  -184],
     [0,     684.6, 245.5]]
)

In [None]:
quantized_tensor, scale = linear_q_symmetric(test_tensor)

In [None]:
dequantized_tensor = linear_dequantization(quantized_tensor, scale, 0)

In [None]:
plot_quantization_errors(test_tensor, quantized_tensor,
                         dequantized_tensor)

In [None]:
print(f"""Quantization Error : \
{quantization_error(test_tensor, dequantized_tensor)}""")

# L3-C - Linear Quantization II: Per Channel Quantization

In this lesson, you will continue to learn about different granularities of performing linear quantization. You will cover `per channel` in this notebook.

## Different Granularities for Quantization
- For simplicity, you'll perform these using Symmetric mode.

### Per Channel
- Implement `Per Channel` Symmetric Quantization
- `dim` parameter decides if it needs to be along the rows or columns

In [None]:
def linear_q_symmetric_per_channel(tensor,dim,dtype=torch.int8):



    return quantized_tensor, scale

In [None]:
test_tensor=torch.tensor(
    [[191.6, -13.5, 728.6],
     [92.14, 295.5,  -184],
     [0,     684.6, 245.5]]
)

- `dim = 0`, along the rows
- `dim = 1`, along the columns

In [None]:
dim=0
output_dim = test_tensor.shape[dim]

In [None]:
output_dim

In [None]:
scale = torch.zeros(output_dim)

In [None]:
scale

- Iterate through each row to calculate its `scale`.

In [None]:
for index in range(output_dim):
    sub_tensor = test_tensor.select(dim,index)
    # print(sub_tensor)
    scale[index] = get_q_scale_symmetric(sub_tensor)

In [None]:
scale

In [None]:
scale_shape = [1] * test_tensor.dim()

In [None]:
scale_shape

In [None]:
scale_shape[dim] = -1

In [None]:
scale_shape

In [None]:
scale = scale.view(scale_shape)

In [None]:
# copy to be used later
copy_scale = scale

scale

#### Understanding tensor by tensor division using `view` function

In [None]:
m = torch.tensor([[1,2,3],[4,5,6],[7,8,9]])

In [None]:
m

In [None]:
s = torch.tensor([1,5,10])

In [None]:
s

In [None]:
s.shape

In [None]:
s.view(1, 3).shape

In [None]:
# alternate way
s.view(1, -1).shape

In [None]:
s.view(-1,1).shape

##### Along the row division

In [None]:
scale = torch.tensor([[1], [5], [10]])

In [None]:
scale.shape

In [None]:
m / scale

##### Along the column division

In [None]:
scale = torch.tensor([[1, 5, 10]])

In [None]:
scale.shape

In [None]:
m / scale

#### Coming back to quantizing the tensor

In [None]:
# the scale you got earlier
scale = copy_scale

scale

In [None]:
scale.shape

In [None]:
quantized_tensor = linear_q_with_scale_and_zero_point(
    test_tensor, scale=scale, zero_point=0)

In [None]:
quantized_tensor

- Now, put all this in `linear_q_symmetric_per_channel` function defined earlier.

In [None]:
def linear_q_symmetric_per_channel(r_tensor, dim, dtype=torch.int8):
    
    output_dim = r_tensor.shape[dim]
    # store the scales
    scale = torch.zeros(output_dim)

    for index in range(output_dim):
        sub_tensor = r_tensor.select(dim, index)
        scale[index] = get_q_scale_symmetric(sub_tensor, dtype=dtype)

    # reshape the scale
    scale_shape = [1] * r_tensor.dim()
    scale_shape[dim] = -1
    scale = scale.view(scale_shape)
    quantized_tensor = linear_q_with_scale_and_zero_point(
        r_tensor, scale=scale, zero_point=0, dtype=dtype)
   
    return quantized_tensor, scale

In [None]:
test_tensor=torch.tensor(
    [[191.6, -13.5, 728.6],
     [92.14, 295.5,  -184],
     [0,     684.6, 245.5]]
)

In [None]:
### along the rows (dim = 0)
quantized_tensor_0, scale_0 = linear_q_symmetric_per_channel(
    test_tensor, dim=0)

### along the columns (dim = 1)
quantized_tensor_1, scale_1 = linear_q_symmetric_per_channel(
    test_tensor, dim=1)

- Plot the quantization error for along the rows.

In [None]:
dequantized_tensor_0 = linear_dequantization(
    quantized_tensor_0, scale_0, 0)

plot_quantization_errors(
    test_tensor, quantized_tensor_0, dequantized_tensor_0)

In [None]:
print(f"""Quantization Error : \
{quantization_error(test_tensor, dequantized_tensor_0)}""")

- Plot the quantization error for along the columns.

In [None]:
dequantized_tensor_1 = linear_dequantization(
    quantized_tensor_1, scale_1, 0)

plot_quantization_errors(
    test_tensor, quantized_tensor_1, dequantized_tensor_1, n_bits=8)

print(f"""Quantization Error : \
{quantization_error(test_tensor, dequantized_tensor_1)}""")

# L3-D - Linear Quantization II: Per Group Quantization

In this lesson, you will continue to learn about different granularities of performing linear quantization. You will cover `per group` in this notebook.

## Different Granularities for Quantization
- For simplicity, you'll perform these using Symmetric mode.

### Per Group
- For simplicity, you'll quantize a 2D tensor along the rows.

In [None]:
def linear_q_symmetric_per_group(tensor, group_size,
                                 dtype=torch.int8):
    
    t_shape = tensor.shape
    assert t_shape[1] % group_size == 0
    assert tensor.dim() == 2
    
    tensor = tensor.view(-1, group_size)
    
    quantized_tensor, scale = linear_q_symmetric_per_channel(
                                tensor, dim=0, dtype=dtype)
    
    quantized_tensor = quantized_tensor.view(t_shape)
    
    return quantized_tensor, scale

In [None]:
def linear_dequantization_per_group(quantized_tensor, scale, 
                                    group_size):
    
    q_shape = quantized_tensor.shape
    quantized_tensor = quantized_tensor.view(-1, group_size)
    
    dequantized_tensor = linear_dequantization(quantized_tensor, 
                                               scale, 0)
    
    dequantized_tensor = dequantized_tensor.view(q_shape)
    
    return dequantized_tensor

In [None]:
test_tensor = torch.rand((6, 6))

**Note:** Since the values are random, what you see in the video might be different than what you will get.

In [None]:
group_size = 3

In [None]:
quantized_tensor, scale = linear_q_symmetric_per_group(
    test_tensor, group_size=group_size)

dequantized_tensor = linear_dequantization_per_group(
    quantized_tensor, scale, group_size=group_size)

plot_quantization_errors(
    test_tensor, quantized_tensor, dequantized_tensor)

In [None]:
print(f"""Quantization Error : \
{quantization_error(test_tensor, dequantized_tensor)}""")

# L3-E - Linear Quantization II: Quantizing Weights & Activations for Inference

In this lesson, you will continue to learn different ways of performing linear quantization.

Run the next cell to import all of the functions you have used before in the previous lesson(s) of `Linear Quantization II` to follow along with the video.

- To access the `helper.py` file, you can click `File --> Open...`, on the top left.

## Linear Quantization: Inference

- `W8A32` means weights in 8-bits and activations in 32-bits.
- For simplicity, the linear layer will be without bias.

In [None]:
def quantized_linear_W8A32_without_bias(input, q_w, s_w, z_w):
    assert input.dtype == torch.float32
    assert q_w.dtype == torch.int8

    dequantized_weight = q_w.to(torch.float32) * s_w + z_w
    output = torch.nn.functional.linear(input, dequantized_weight)
    
    return output

In [None]:
input = torch.tensor([1, 2, 3], dtype=torch.float32)

In [None]:
weight = torch.tensor([[-2,   -1.13, 0.42],
                       [-1.51, 0.25, 1.62],
                       [0.23,  1.35, 2.15]])

In [None]:
q_w, s_w  = linear_q_symmetric(weight)

In [None]:
q_w

In [None]:
s_w

In [None]:
output = quantized_linear_W8A32_without_bias(input,
                                             q_w,
                                             s_w,
                                             0)

In [None]:
print(f"This is the W8A32 output: {output}")

In [None]:
fp32_output = torch.nn.functional.linear(input, weight)

In [None]:
print(f"This is the output if we don't quantize: {fp32_output}")

# L4-A - Building your own Quantizer: Custom Build an 8-Bit Quantizer

In this lesson, you will learn how to compress any model in 8-bit precision.

## Step 1: class `W8A16LinearLayer`

- Build the target class, `W8A16LinearLayer()`, that will be responsible for quantizing your model.

### 1.1 - `w8_a16_forward` Function

-
```Python
W8A16LinearLayer
                    # 8-bit  # 16-bit         # optional
* w8_a16_forward -> weights, input,   scales, bias=None
                    
```
- Cast the 8-bit `weights` to the same data type as the `input`, "casted weights",
- keeping the "casted weights" in the same range as before, [-128, 127]
- Next, $$(({inputs} \cdot \text{``casted weights''}) * {scale}) + {bias}$$ 

In [None]:
random_int8 = torch.randint(-128, 127, (32, 16)).to(torch.int8)
random_hs = torch.randn((1, 16), dtype=torch.bfloat16)
scales = torch.randn((1, 32), dtype=torch.bfloat16)
bias = torch.randn((1, 32), dtype=torch.bfloat16)

**Note:** Since the values are random, what you see in the video might be different than what you will get.

In [None]:
F.linear(random_hs, random_int8.to(random_hs.dtype))

In [None]:
F.linear(random_hs, random_int8.to(random_hs.dtype)) * scales

In [None]:
(F.linear(random_hs, random_int8.to(random_hs.dtype)) * scales) + bias

- Implement all this as a function, `w8_a16_forward`

In [None]:
def w8_a16_forward(weight, input, scales, bias=None):
    
    casted_weights = weight.to(input.dtype)
    output = F.linear(input, casted_weights) * scales
    
    if bias is not None:
        output = output + bias
      
    return output

In [None]:
print("With bias:\n\n", 
      w8_a16_forward(random_int8, random_hs, scales, bias))

print("\nWithout bias:\n\n", 
      w8_a16_forward(random_int8, random_hs, scales))

### 1.2 - `init` Function of class `W8A16LinearLayer`

- This is how the `init` is of [PyTorch Linear layer](https://pytorch.org/docs/stable/_modules/torch/nn/modules/linear.html#Linear):
```Python
def __init__(self, in_features, out_features, bias=True,
             device=None, dtype=None)

```

In [None]:
### running this will result in an error
class W8A16LinearLayer(nn.Module):
    def __init__(self, in_features, out_features, 
                 bias=True, dtype=torch.float32):
        super().__init__()
        
        self.int8_weights = nn.Parameter(torch.Tensor([0, 1]
                                     ).to(dtype=torch.int8))

try:
    
    W8A16LinearLayer(1, 1)
    
except Exception as error:
    print("\033[91m", type(error).__name__, ": ", error, "\033[0m")

In [None]:
class W8A16LinearLayer(nn.Module):
    def __init__(self, in_features, out_features, 
                 bias=True, dtype=torch.float32):
        super().__init__()
        
        
        self.register_buffer(
            "int8_weights",
            torch.randint(
                -128, 127, (out_features, in_features), dtype=torch.int8
            )
        )
        
        self.register_buffer("scales", 
                             torch.randn((out_features), dtype=dtype))
        
        if bias:
            self.register_buffer("bias", 
                                 torch.randn((1, out_features), 
                                             dtype=dtype))
        
        else:
            self.bias = None

- Test your implementation.

In [None]:
dummy_instance = W8A16LinearLayer(16, 32)

In [None]:
print(dummy_instance.int8_weights.shape)
print(dummy_instance.scales.shape)

### 1.3 - `forward` Function of class `W8A16LinearLayer`

- Use the `w8_a16_forward` defined earlier (Step 1.1) to define the `forward` function.

In [None]:
class W8A16LinearLayer(nn.Module):
    def __init__(self, in_features, out_features, 
                 bias=True, dtype=torch.float32):
        super().__init__()
        
        
        self.register_buffer(
            "int8_weights",
            torch.randint(
                -128, 127, (out_features, in_features), dtype=torch.int8
            )
        )
        
        self.register_buffer("scales", 
                             torch.randn((out_features), dtype=dtype))
        
        if bias:
            self.register_buffer("bias", 
                                 torch.randn((1, out_features), 
                                             dtype=dtype))
        
        else:
            self.bias = None

    def forward(self, input):
        return w8_a16_forward(self.int8_weights, 
                              input, self.scales, self.bias)

In [None]:
module = W8A16LinearLayer(16, 32)
dummy_hidden_states = torch.randn(1, 6, 16)

In [None]:
module(dummy_hidden_states).shape

In [None]:
module(dummy_hidden_states).dtype

### 1.4 - `quantize` Function of class `W8A16LinearLayer`

- `quantize` function will dynamically quantize half-precision weights into `torch.int8`

In [None]:
class W8A16LinearLayer(nn.Module):
    def __init__(self, in_features, out_features, 
                 bias=True, dtype=torch.float32):
        super().__init__()
        
        
        self.register_buffer(
            "int8_weights",
            torch.randint(
                -128, 127, (out_features, in_features), dtype=torch.int8
            )
        )
        
        self.register_buffer("scales", 
                             torch.randn((out_features), dtype=dtype))
        
        if bias:
            self.register_buffer("bias", 
                                 torch.randn((1, out_features), 
                                             dtype=dtype))
        
        else:
            self.bias = None

    def quantize(self, weights):
        w_fp32 = weights.clone().to(torch.float32)

        scales = w_fp32.abs().max(dim=-1).values / 127
        scales = scales.to(weights.dtype)

        int8_weights = torch.round(weights
                        /scales.unsqueeze(1)).to(torch.int8)

        self.int8_weights = int8_weights
        self.scales = scales
    
    def forward(self, input):
        return w8_a16_forward(self.int8_weights, 
                              input, self.scales, self.bias)      

In [None]:
module = W8A16LinearLayer(4, 8)

In [None]:
print("Weights before:\n" , module.int8_weights)

In [None]:
random_matrix = torch.randn((4, 8), dtype=torch.bfloat16)

In [None]:
module.quantize(random_matrix)

In [None]:
print("Weights After:\n" , module.int8_weights)

In [None]:
module.scales

In [None]:
module.scales.shape

In [None]:
module.int8_weights.shape

In [None]:
### dequantized weights
module.int8_weights * module.scales.unsqueeze(1)

In [None]:
### original weights
random_matrix

In [None]:
(random_matrix - module.int8_weights 
 * module.scales.unsqueeze(1)).abs().mean()

# L4-B - Building your own Quantizer: Replace PyTorch layers with Quantized Layers

In this lesson, you will learn about the quantization pipline using your own 8-bit quantizer.

Run the next cell to import all of the functions you have used before in the previous lesson(s) of `Building your own Quantizer` to follow along with the video.

- To access the `helper.py` file, you can click `File --> Open...`, on the top left.

## Step 2: Quantization Pipeline

- Replace all of the `torch.nn.Linear` layers with the `W8A16LinearLayer` layer.
- Call `quantize` on the linear layers using the original weights. 

### 2.1 - Model In-place Linear Layer Replacement
- Implement `replace_linear_with_target`

In [None]:
def replace_linear_with_target(module, 
                               target_class, module_name_to_exclude):
    for name, child in module.named_children():
        if isinstance(child, nn.Linear) and not \
          any([x == name for x in module_name_to_exclude]):
            old_bias = child.bias

            new_module = target_class(child.in_features, 
                                      child.out_features, 
                                      old_bias is not None, 
                                      child.weight.dtype)
            setattr(module, name, new_module)
            if old_bias is not None:
              getattr(module, name).bias = old_bias
        else:
            # Recursively call the function for nested modules
            replace_linear_with_target(
                child, target_class, module_name_to_exclude)

In [None]:
class DummyModel(torch.nn.Module):
  def __init__(self):
    super().__init__()
    self.emb = torch.nn.Embedding(1, 1)
    # Try with bias
    self.linear_1 = nn.Linear(1, 1)
    # Try without bias
    self.linear_2 = nn.Linear(1, 1, bias=False)
    # Lm prediction head
    self.lm_head = nn.Linear(1, 1, bias=False)

In [None]:
model_1 = DummyModel()
model_2 = DummyModel()

In [None]:
replace_linear_with_target(model_1, W8A16LinearLayer, ["lm_head"])
print(model_1)

In [None]:
replace_linear_with_target(model_2, W8A16LinearLayer, [])
print(model_2)

### 2.2 - Linear Layer Replacement + Quantization
- Modify the `replace_linear_with_target` function to also perform quantization.
- Implement `replace_linear_with_target_and_quantize`.

In [None]:
def replace_linear_with_target_and_quantize(module, 
                               target_class, module_name_to_exclude):
    for name, child in module.named_children():
        if isinstance(child, nn.Linear) and not \
        any([x == name for x in module_name_to_exclude]):
            old_bias = child.bias
            old_weight = child.weight

            new_module = target_class(child.in_features, 
                                      child.out_features, 
                                      old_bias is not None, 
                                      child.weight.dtype)
            setattr(module, name, new_module)

            getattr(module, name).quantize(old_weight)
            
            if old_bias is not None:
              getattr(module, name).bias = old_bias
        else:
            # Recursively call the function for nested modules
            replace_linear_with_target_and_quantize(child, 
                     target_class, module_name_to_exclude)

In [None]:
model_3 = DummyModel()

In [None]:
replace_linear_with_target_and_quantize(model_3, W8A16LinearLayer, ["lm_head"])
print(model_3)

# L4-C - Building your own Quantizer: Quantize any Open Source PyTorch Model

In this lesson, you will look at the results of open source models compressed using the custom quantizer you built.

Run the next cell to import all of the functions you have used before in the previous lesson(s) of `Building your own Quantizer` to follow along with the video.

- To access the `helper.py` file, you can click `File --> Open...`, on the top left.

## Step 3: Test the Implementation on Various LLMs

### 3.1 - [Salesforce/codegen-350M-mono](https://huggingface.co/Salesforce/codegen-350M-mono)

In [None]:
model_id = "Salesforce/codegen-350M-mono"

model = AutoModelForCausalLM.from_pretrained(model_id, 
                                    torch_dtype=torch.bfloat16, 
                                             low_cpu_mem_usage=True)
tokenizer = AutoTokenizer.from_pretrained(model_id)

In [None]:
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)

In [None]:
print(pipe("def hello_world():", max_new_tokens=20, do_sample=False))

In [None]:
print("Model before:\n\n", model)

In [None]:
replace_linear_with_target_and_quantize(model, 
                                        W8A16LinearLayer, ["lm_head"])

In [None]:
pipe.model

In [None]:
print(pipe("def hello_world():", max_new_tokens=20, 
           do_sample=False)[0]["generated_text"])

### 3.2 - [facebook/detr-resnet-50](https://huggingface.co/facebook/detr-resnet-50)

In [None]:
# you can specify the revision tag if you don't want the timm dependency
processor = DetrImageProcessor.from_pretrained(
    "facebook/detr-resnet-50", revision="no_timm")
model = DetrForObjectDetection.from_pretrained(
    "facebook/detr-resnet-50", revision="no_timm")

In [None]:
previous_memory_footprint = model.get_memory_footprint()

In [None]:
print("Footprint of the model in MBs: ", 
      previous_memory_footprint/1e+6)

In [None]:
img_path = "dinner_with_friends.png"
image = Image.open(img_path).convert("RGB")
image

In [None]:
from helper import plot_results

inputs = processor(images=image, return_tensors="pt")
with torch.no_grad():
  outputs = model(**inputs)

# convert outputs (bounding boxes and class logits) to COCO API
# let's only keep detections with score > 0.9
target_sizes = torch.tensor([image.size[::-1]])
results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.9)[0]

In [None]:
plot_results(model, image, results)

In [None]:
model

In [None]:
replace_linear_with_target_and_quantize(model, 
                                        W8A16LinearLayer, 
               ["0", "1", "2", "class_labels_classifier"])

In [None]:
### Model after quantization
model

- Visualize results after quantization.

In [None]:
inputs = processor(images=image, return_tensors="pt")
with torch.no_grad():
  outputs = model(**inputs)

# convert outputs (bounding boxes and class logits) to COCO API
# let's only keep detections with score > 0.9
target_sizes = torch.tensor([image.size[::-1]])
results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.9)[0]

In [None]:
plot_results(model, image, results)

In [None]:
new_footprint = model.get_memory_footprint()

In [None]:
print("Footprint of the model in MBs: ", 
      new_footprint/1e+6)

In [None]:
### Memory saved
print("Memory saved in MBs: ", 
      (previous_memory_footprint - new_footprint)/1e+6)

# L4-D - Building your own Quantizer: Load your Quantized Weights from Hugging Face Hub

In this lesson, you will learn memory efficient model loading.

Run the next cell to import all of the functions you have used before in the previous lesson(s) of `Building your own Quantizer` to follow along with the video.

- To access the `helper.py` file, you can click `File --> Open...`, on the top left.

In [None]:
_ = load_dotenv(find_dotenv())
hf_access_token = os.getenv("HF_ACCESS_TOKEN")
hf_write = os.getenv("HF_WRITE")
print(hf_access_token)
print(hf_write)

## Memory Efficient Model Loading

- Load [facebook/opt-125m](https://huggingface.co/facebook/opt-125m)

In [None]:
model_id = "facebook/opt-125m"

model = AutoModelForCausalLM.from_pretrained(
    model_id, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True)
tokenizer = AutoTokenizer.from_pretrained(model_id)

In [None]:
replace_linear_with_target_and_quantize(model, 
                             W8A16LinearLayer, 
                                   ["lm_head"])

In [None]:
model

In [None]:
quantized_state_dict = model.state_dict()
torch.save(quantized_state_dict, "quantized_state_dict.pth")

- The below code is for demonstration purposes only.
- You'll need your own Hugging Face username in order for it to run.
- You'll add your usernmae in `YOUR_HF_USERNAME = ""` 

```Python
from huggingface_hub import HfApi, create_repo

YOUR_HF_USERNAME = ""
your_repo_id = f"{YOUR_HF_USERNAME}/opt-125m-quantized-dlai"

api = HfApi()

# create_repo(your_repo_id)

api.upload_file(
 path_or_fileobj="quantized_state_dict.pth",
 path_in_repo="quantized_state_dict.pth",
 repo_id=your_repo_id
)
```

In [None]:

from huggingface_hub import HfApi, create_repo

YOUR_HF_USERNAME = "hjerpe"
your_repo_id = f"{YOUR_HF_USERNAME}/opt-125m-quantized-dlai"

# api = HfApi(token=hf_write)

# create_repo(your_repo_id)

# api.upload_file(
#  path_or_fileobj="quantized_state_dict.pth",
#  path_in_repo="quantized_state_dict.pth",
#  repo_id=your_repo_id
# )

### Load the Model in the Meta Device

In [None]:
model_id = "facebook/opt-125m"
config = AutoConfig.from_pretrained(model_id)

with torch.device("meta"):
  model = OPTForCausalLM(config)

tokenizer = AutoTokenizer.from_pretrained(model_id)

In [None]:
for param in model.parameters():
  print(param)

In [None]:
model

In [None]:
replace_linear_with_target(model, W8A16LinearLayer, ["lm_head"])

In [None]:
model

In [None]:
state_dict_cache_path = hf_hub_download(
    f"{YOUR_HF_USERNAME}/opt-125m-quantized-dlai",
    
    "quantized_state_dict.pth"
)

In [None]:
state_dict = torch.load(state_dict_cache_path)

In [None]:
model.load_state_dict(state_dict, strict=True, assign=True)

- Test your model.
- **Note:** Your generated text might be different than what you see in the video.

In [None]:
from transformers import pipeline

pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)
pipe("Hello today I am", max_new_tokens=40)

In [None]:
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)
pipe("Hello today I am giving a course about", max_new_tokens=10)


# L5-B: Packing 2-bit Weights

In this lesson, you will learn how to store low precision weights through a technique called "packing".

## Packing

**Note:** Younes will explain the below code, and walk through each iteration step. You can go through the comprehensive explaination in the markdown below after first watching Younes's explaination.

```Python
# Example Tensor: [1, 0, 3, 2]
    # 1 0 3 2 - 01 00 11 10

    # Starting point of packed int8 Tensor
    # [0000 0000]
    
    ##### First Iteration Start:
    # packed int8 Tensor State: [0000 0000]
    # 1 = 0000 0001
    # 0000 0001
    # No left shifts in the First Iteration
    # After bit-wise OR operation between 0000 0000 and 0000 0001:
    # packed int8 Tensor State: 0000 0001
    ##### First Iteration End

    ##### Second Iteration Start:
    # packed int8 Tensor State: [0000 0001]
    # 0 = 0000 0000
    # 0000 0000
    # 2 left shifts:
    # [0000 0000] (1 shift)-> 0000 0000 (2 shift)-> 0000 0000
    # After bit-wise OR operation between 0000 0001 and 0000 0000:
    # packed int8 Tensor State: 0000 0001
    ##### Second Iteration End

    ##### Third Iteration Start:
    # packed int8 Tensor State: [0000 0001]
    # 3 = 0000 0011
    # 0000 0011
    # 4 left shifts:
    # [0000 0011] (1 shift)-> 0000 0110 (2 shift)-> 0000 1100
    # 0000 1100 (3 shift)-> 0001 1000 (4 shift)-> 0011 0000
    # After bit-wise OR operation between 0000 0001 and 0011 0000:
    # packed int8 Tensor State: 0011 0001
    ##### Third Iteration End

    ##### Fourth Iteration Start:
    # packed int8 Tensor State: [0011 0001]
    # 2 = 0000 0010
    # 0000 0010
    # 6 left shifts:
    # [0000 0010] (1 shift)-> 0000 0100 (2 shift)-> 0000 1000
    # 0000 1000 (3 shift)-> 0001 0000 (4 shift)-> 0010 0000
    # 0010 0000 (5 shift)-> 0100 0000 (6 shift)-> 1000 0000
    # After bit-wise OR operation between 0011 0001 and 1000 0000:
    # packed int8 Tensor State: 1011 0001
    ##### Fourth Iteration End
    
    # Final packed int8 Tensor State: [1011 0001]
```

In [None]:
def pack_weights(uint8tensor, bits):
    if uint8tensor.shape[0] * bits % 8 != 0:
        raise ValueError(f"The input shape needs to be a mutiple \
        of {8 / bits} - got {uint8tensor.shape[0]}")

    num_values = uint8tensor.shape[0] * bits // 8

    num_steps = 8 // bits

    unpacked_idx = 0

    packed_tensor = torch.zeros((num_values), dtype=torch.uint8)

    # 1 0 3 2 - 01 00 11 10

    # [0000 0000] -> 0000 0001

    # 0000 0001

    # 0000 0000 - 0000 0000

    # 0000 0011 - 0011 0000 - 0011 0001

    # 1011 0001
    
    for i in range(num_values):
        for j in range(num_steps):
            packed_tensor[i] |= uint8tensor[unpacked_idx] << (bits * j)
            unpacked_idx += 1
    return packed_tensor

In [None]:
unpacked_tensor = torch.tensor([1, 0, 3, 2], 
                               dtype=torch.uint8)

In [None]:
pack_weights(unpacked_tensor, 2)

In [None]:
unpacked_tensor = torch.tensor([1, 0, 3, 2, 3, 3, 3, 3], 
                               dtype=torch.uint8)

In [None]:
pack_weights(unpacked_tensor, 2)

# L5-C: Unpacking 2-Bit Weights

In this lesson, you will learn how to "unpack" the stored low precision "packed" weights.

## Unpacking

**Note:** Younes will explain the below code, and walk through each iteration step. You can go through the comprehensive explaination in the markdown below after first watching Younes's explaination.

```Python
# Example Tensor: [10110001]
    # Which was Originally: 1 0 3 2 - 01 00 11 10

    # Starting point of unpacked Tensor
    # [00000000 00000000 00000000 00000000]
    
    ##### First Iteration Start:
    # packed int8 Tensor: [10110001]
    # You want to extract 01 from [101100 01]
    # No right shifts in the First Iteration
    # After bit-wise OR operation between 00000000 and 10110001:
    # [10110001 00000000 00000000 00000000]
    # unpacked Tensor state: [10110001 00000000 00000000 00000000]
    ##### First Iteration End

    ##### Second Iteration Start:
    # packed int8 Tensor: [10110001]
    # You want to extract 00 from [1011 00 01]
    # 2 right shifts:
    # [10110001] (1 shift)-> 01011000 (2 shift)-> 00101100
    # After bit-wise OR operation between 00000000 and 00101100:
    # [10110001 00101100 00000000 00000000]
    # unpacked Tensor state: [10110001 00101100 00000000 00000000]
    ##### Second Iteration End

    ##### Third Iteration Start:
    # packed int8 Tensor: [10110001]
    # You want to extract 11 from [10 11 0001]
    # 4 right shifts:
    # [10110001] (1 shift)-> 01011000 (2 shift)-> 00101100
    # 00101100 (3 shift)-> 00010110 (4 shift)-> 00001011
    # After bit-wise OR operation between 00000000 and 00001011:
    # [10110001 00101100 00001011 00000000]
    # unpacked Tensor state: [10110001 00101100 00001011 00000000]
    ##### Third Iteration End

    ##### Fourth Iteration Start:
    # packed int8 Tensor: [10110001]
    # You want to extract 10 from [10 110001]
    # 6 right shifts:
    # [10110001] (1 shift)-> 01011000 (2 shift)-> 00101100
    # 00101100 (3 shift)-> 00010110 (4 shift)-> 00001011
    # 00001011 (5 shift)-> 00000101 (6 shift)-> 00000010
    # After bit-wise OR operation between 00000000 and 00000010:
    # [10110001 00101100 00001011 00000010]
    # unpacked Tensor state: [10110001 00101100 00001011 00000010]
    ##### Fourth Iteration End
    
    # Last step: Perform masking (bit-wise AND operation)
    # Mask: 00000011
    # Bit-wise AND operation between 
    # unpacked Tensor and 00000011
    # [10110001 00101100 00001011 00000010] <- unpacked tensor
    # [00000011 00000011 00000011 00000011] <- Mask
    # [00000001 00000000 00000011 00000010] <- Result

    # Final
    # unpacked Tensor state: [00000001 00000000 00000011 00000010]

```

In [None]:
def unpack_weights(uint8tensor, bits):
    num_values = uint8tensor.shape[0] * 8 // bits

    num_steps = 8 // bits

    unpacked_tensor = torch.zeros((num_values), dtype=torch.uint8)

    unpacked_idx = 0

    # 1 0 3 2 - 01 00 11 10

    # [00000000 00000000 00000000 00000000]
    # [10110001 00101100 00001011 00000010]
    # [00000001 00000000 00000011 00000010]

    # 10110001
    # 00000011
    
    # 00000001

    # 1: [10110001]
    # 2: [00101100]
    # 3: [00001011]

    mask = 2 ** bits - 1

    for i in range(uint8tensor.shape[0]):
        for j in range(num_steps):
            unpacked_tensor[unpacked_idx] |= uint8tensor[i] >> (bits * j)
            unpacked_idx += 1

    unpacked_tensor &= mask
    return unpacked_tensor

In [None]:
unpacked_tensor = torch.tensor([177, 255], 
                               dtype=torch.uint8)

In [None]:
# Answer should be: torch.tensor([1, 0, 3, 2, 3, 3, 3, 3]
unpack_weights(unpacked_tensor, 2)