# GPTQ

This notebook shows a working code example of how to use AIMET to perform post training quantization using GPTQ).

GPTQ performs layer wise quantization and applies a series of optimization on the OBQ method which uses second order information for updating the weights.

Using GPTQ PTQ, a model is able to achieve an accuracy closer to the FP32 model, while using low bit-width integer quantization. We observe considerable speedup compared to another PTQ method, AdaRound, while achieving similar performance.

#### Overall flow
This notebook covers the following:
1. Define helper functions and the configurations needed to create quantsim and apply GPTQ
2. Load the FP32 model and evaluate the model to find the baseline FP32 accuracy
3. Create a quantization simulation model and determine quantized accuracy
4. Apply GPTQ technique and evaluate the PPL on the quantsim

  #### 1. Define helper functions and the configurations needed to create quantsim and apply GPTQ

First define the checkpoint directory which specifies the model architecture and identifies the bblocks to be used to perform the sampling time optimization

We then define the config file needed to create quantsim of the model, for GPTQ we specify the per channel quantization for params

Next, we define helper functions which help to convert the Conv1D to linear layer as the current implementation applies the optimization on linear layers.

In [None]:
import os
root =  os.getcwd()
gptq_config_path = os.path.join(root, 'GPTQ/quantsim_config.json')
checkpoints_config_path= os.path.join(root, 'GPTQ/gpt2-small_checkpoints_config.json')

In [None]:
import json

import torch
import torch.nn as nn
import transformers

def convert_conv1d_to_linear(model: torch.nn.Module) -> torch.nn.Module:
    change_modules = {}
    device = model.device
    for name, module in model.named_modules():
        #print(name, module)
        if isinstance(module, transformers.Conv1D):
            shapes = module.weight.shape
            new_layer = nn.Linear(shapes[0], shapes[1]).to(device)
            w = module.weight.clone().T
            new_layer.weight.data.copy_(w)
            if module.bias is not None:
                new_layer.bias.data.copy_(module.bias)
            change_modules[name] = new_layer

    for name, new_layer in change_modules.items():
        sublayers = name.split(".")
        _module = model.get_submodule(".".join(sublayers[:-1]))
        setattr(_module, sublayers[-1], new_layer)
    return model


In [None]:

def get_dummy_input(data_loader, device, return_tuple=False):
    for data in data_loader:
        if return_tuple:
            return data[0].to(device)
        else:
            inputs = dict()
            inputs['input'] = data[0].to(device)
            return inputs


We define the class to construct us the dataset and the evaluate function.

We take inspiration from the huggingface implementation of perplexity evaluation

https://huggingface.co/docs/transformers/perplexity



In [None]:

from tqdm import tqdm
from datasets import load_dataset
from transformers import GPT2LMHeadModel, AutoTokenizer
from torch.utils.data import DataLoader
from aimet_torch.utils import get_all_quantizers, in_eval_mode
import random

class WikiTextDataPipeline:
    @staticmethod
    def collate_fn(batch):
        input_batch = torch.stack([item[0] for item in batch])
        target_batch = torch.stack([item[1] for item in batch])
        input_batch = input_batch.squeeze(dim=1)
        target_batch = target_batch.squeeze(dim=1)
        return input_batch, target_batch

    @staticmethod
    def get_train_dataloader(model: torch.nn.Module, batch_size = 1, nsamples=128) -> torch.utils.data.DataLoader:
        traindata = load_dataset('wikitext', 'wikitext-2-raw-v1', split='train')
        tokenizer = AutoTokenizer.from_pretrained(model, use_fast=False)
        trainenc = tokenizer("\n\n".join(traindata['text']), return_tensors='pt')
        model_act = GPT2LMHeadModel.from_pretrained(model,  cache_dir='/local/mnt/workspace/juhimitt/remote_dev/aimet_main')
        max_length = model_act.config.n_positions
        trainloader=[]
        for _ in range(nsamples):
            i=random.randint(0, trainenc.input_ids.shape[1]-max_length-1)
            j=i+max_length
            inp = trainenc.input_ids[:,i:j]
            tar = inp.clone()
            tar[:,:-1]=-100
            trainloader.append((inp,tar))

        print(f'length of the train loader is {len(trainloader)}')
        data_loader = DataLoader(trainloader, batch_size=batch_size, collate_fn=WikiTextDataPipeline.collate_fn)
        return data_loader

    @staticmethod
    def get_val_dataloader(model: torch.nn.Module, batch_size=1, stride=512) ->  torch.utils.data.DataLoader:
        testdata = load_dataset('wikitext', 'wikitext-2-raw-v1', split='test')
        tokenizer = AutoTokenizer.from_pretrained(model, use_fast=False)
        testenc = tokenizer("\n\n".join(testdata['text']), return_tensors='pt')
        testloader = []

        max_len = testenc.input_ids.size(1)
        prev_end_loc = 0
        model_act = GPT2LMHeadModel.from_pretrained(model, cache_dir='/local/mnt/workspace/juhimitt/remote_dev/aimet_main')
        max_length = model_act.config.n_positions
        for begin_loc in tqdm(range(0, max_len, stride)):
            end_loc = min(begin_loc + max_length, max_len)
            trg_len = end_loc - prev_end_loc  # may be different from stride on last loop
            input_ids = testenc.input_ids[:, begin_loc:end_loc]
            target_ids = input_ids.clone()
            target_ids[:, :-trg_len] = -100
            testloader.append((input_ids, target_ids))
            prev_end_loc = end_loc
            if end_loc == max_len:
                break
        data_loader = DataLoader(testloader, batch_size=batch_size, collate_fn=WikiTextDataPipeline.collate_fn)
        return data_loader

    @staticmethod
    def evaluate_model(model: torch.nn.Module, data_loader) -> float:
        model.config.return_dict = False
        model.config.return_past = False
        nlls = []
        with torch.no_grad(), in_eval_mode(model):
            for idx, data in enumerate(data_loader):
                if idx%5==0:
                    print(f'{idx} \n')
                device = model.device
                predictions = model(data[0].to(device), labels=data[1].to(device))
                neg_log_likelihood, *_ = predictions
                nlls.append(neg_log_likelihood)

        ppl = torch.exp(torch.stack(nlls).mean())
        return ppl



### 2. Load the model and evaluate to get a baseline FP32 perplexity score


In [None]:
model_name = 'gpt2'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = GPT2LMHeadModel.from_pretrained(model_name, cache_dir='/local/mnt/workspace/juhimitt/remote_dev/aimet_main').to(device)
model  = convert_conv1d_to_linear(model)
model = model.eval()
val_loader = WikiTextDataPipeline.get_val_dataloader(model_name)
fp32_score = WikiTextDataPipeline.evaluate_model(model, val_loader)
print(f'full precision model score is {fp32_score}')



### 3. Create a quantization simulation model and determine quantized perplexity score


In [None]:
import os
from aimet_torch.qc_quantize_op import QcQuantizeWrapper
from aimet_torch.quantsim import QuantizationSimModel
from aimet_torch.adaround.adaround_weight import Adaround, AdaroundParameters


model_name = 'gpt2'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = GPT2LMHeadModel.from_pretrained(model_name, cache_dir='/local/mnt/workspace/juhimitt/remote_dev/aimet_main').to(device)
model  = convert_conv1d_to_linear(model)
model.config.return_dict = False
model.config.return_past = False
model = model.eval()

val_loader = WikiTextDataPipeline.get_val_dataloader(model_name)
dummy_input = get_dummy_input(val_loader, device, return_tuple=True)

quant_sim = QuantizationSimModel(model, dummy_input=dummy_input,
                                     default_param_bw=4,
                                     config_file=gptq_config_path)

# we disable the quantizers for all the non linear layers
for _, wrapper in quant_sim.model.named_modules():

    if not isinstance(wrapper, QcQuantizeWrapper):
        continue
        # pylint: disable=protected-access
    if not isinstance(wrapper._module_to_wrap, torch.nn.Linear):

        param_quantizers, input_quantizers, output_quantizers = get_all_quantizers(wrapper)

        for q in param_quantizers + input_quantizers + output_quantizers:
            q.enabled = False

ignore_quant_ops_list=[model.lm_head]
Adaround._exclude_modules(model, quant_sim, ignore_quant_ops_list)

quantsim_score = WikiTextDataPipeline.evaluate_model(quant_sim.model, val_loader)
print(f'Quant sim score is {quantsim_score}')



### 4. Apply GPTQ

first we create the model, now we are defining relevant paramters and output directory path to store the updated weigts model and the encodings

In [None]:
from aimet_torch.GPTQ.gptq_weight import GPTQ, GPTQParameters
import os

model_name = 'gpt2'
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = GPT2LMHeadModel.from_pretrained(model_name, cache_dir='/local/mnt/workspace/juhimitt/remote_dev/aimet_main').to(device)
model  = convert_conv1d_to_linear(model)
model.config.return_dict = False
model.config.return_past = False
model = model.eval()

def forward_fn(model, batch):
    inputs = dict()
    inputs['inputs'] = batch[0].to(device)
    output = model(inputs['inputs'])
    return output

num_batches = 128
block_size = 128
percdamp = 0.01
reordering = False

train_loader = WikiTextDataPipeline.get_train_dataloader(model_name)
val_loader = WikiTextDataPipeline.get_val_dataloader(model_name)

dummy_input = get_dummy_input(val_loader, device, return_tuple=True)


params = GPTQParameters(
        data_loader=train_loader,
        num_batches=num_batches,
        block_size = block_size,
        forward_fn=forward_fn,
        percdamp = percdamp,
        reordering = reordering
    )

output_dir = './'
output_path = os.path.join(output_dir, "gptq")
fname = os.path.join(output_path, "gptq_model.pt")
if not os.path.exists(output_path):
    os.makedirs(output_path)


In [None]:
gptq_model = GPTQ.apply_gptq_with_cache(
         model, dummy_input,
         path=output_path,
         filename_prefix="parameter",
         params=params,
         default_param_bw=4,
         default_quant_scheme='tf',
         default_config_file=gptq_config_path,
         ignore_quant_ops_list=[model.lm_head],
         checkpoints_config=checkpoints_config_path
     )

the returned gptq model has the updated weights, we then create a quantsim on this with the initial embeddings that we started with and then evaluate the model

In [None]:
quant_sim = QuantizationSimModel(gptq_model, dummy_input=dummy_input,
                                     default_param_bw=4,
                                     config_file=gptq_config_path)

for _, wrapper in quant_sim.model.named_modules():

    if not isinstance(wrapper, QcQuantizeWrapper):
        continue
    # pylint: disable=protected-access
    if not isinstance(wrapper._module_to_wrap, torch.nn.Linear):

        param_quantizers, input_quantizers, output_quantizers = get_all_quantizers(wrapper)

        for q in param_quantizers + input_quantizers + output_quantizers:
            q.enabled = False
Adaround._exclude_modules(gptq_model, quant_sim, [gptq_model.lm_head])
quant_sim.set_and_freeze_param_encodings(encoding_path='./gptq/parameter.encodings')
model = quant_sim.model

gptq_score = WikiTextDataPipeline.evaluate_model(model, val_loader)
print(f'gptq score is {gptq_score}')