# Quantization Demo Notebook

In [None]:
!pip install -r requirements.txt

In [None]:
# What version of Python do you have?
import sys
import platform
import torch
import pandas as pd
import sklearn as sk

has_gpu = torch.cuda.is_available()
has_mps = torch.backends.mps.is_built()
device = "mps" if has_mps else "cuda" if torch.cuda.is_available() else "cpu"

print(f"Python Platform: {platform.platform()}")
print(f"PyTorch Version: {torch.__version__}")
print()
print(f"Python {sys.version}")
print(f"Pandas {pd.__version__}")
print(f"Scikit-Learn {sk.__version__}")
print("NVIDIA/CUDA GPU is", "available" if has_gpu else "NOT AVAILABLE")
print("MPS (Apple Metal) is", "AVAILABLE" if has_mps else "NOT AVAILABLE")
print(f"Target device is {device}")

In [None]:
import os
import pandas as pd
import time
import torch

from datasets import Dataset, DatasetDict, load_dataset
from dotenv import dotenv_values
from huggingface_hub import HfApi, HfFolder
from peft import LoraConfig, AutoPeftModelForCausalLM, prepare_model_for_kbit_training, get_peft_model
from pympler import asizeof
from sklearn.model_selection import train_test_split
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, DataCollatorForLanguageModeling, GPTQConfig, TrainingArguments, Trainer, QuantoConfig

secrets = dotenv_values(".env")
HUGGINGFACE_TOKEN = secrets['HUGGINGFACE_TOKEN']
HfFolder.save_token(HUGGINGFACE_TOKEN)
print("saved")

### Analyzing Memory Requirements and Inference Quality 
### For Various Quantized Models

In [None]:
os.environ["TOKENIZERS_PARALLELISM"] = "false"

test_question = "[INST] How much wood could a woodchuck chuck if a woodchuck could chuck wood? [/INST]"

stackoverflow_question = """[INST]
Consider the following stackoverflow question:

Title: Java: Repeat Task Every Random Seconds

Body: <p>I'm already familiar with repeating tasks every n seconds by using Java.util.Timer and Java.util.TimerTask. 
But lets say I want to print "Hello World" to the console every random seconds from 1-5. 
Unfortunately I'm in a bit of a rush and don't have any code to show so far. Any help would be apriciated.  </p>

Tags: <java><repeat>

Choose between one of these three tags: HQ, LQ_EDIT, and LQ_CLOSE.

HQ: High-quality posts without a single edit.
LQ_EDIT: Low-quality posts with a negative score, and multiple community edits. However, they remain open.
LQ_CLOSE: Low-quality posts that were closed by the community without a single edit.

Only respond with either HQ, LQ_EDIT, or LQ_CLOSE.
[/INST]
"""
label = "LQ_EDIT"

def print_size_of_model(_model, name):
    torch.save(_model.state_dict(), "temp.p")
    print(f'{name} size (GB): {round(os.path.getsize("temp.p")/1e9, 2)}')
    os.remove('temp.p')

def check_prediction(_model, _device, name):
    inputs = tokenizer(stackoverflow_question, return_tensors="pt").to(_device)
    start_time = time.time()
    outputs = model.generate(**inputs, max_new_tokens=6)
    end_time = time.time()
    text_response = tokenizer.decode(outputs[0], skip_special_tokens=True)

    print(f"{name} raw text response: {text_response[len(stackoverflow_question):]}\n")
    
    runtime = end_time - start_time
    if label in text_response[len(stackoverflow_question):]:
        print(f"{label} ✅ \n")
    else:
        print(f"{label} ❌ \n")
    
    # print(f"{name} prediction runtime on {_device}: {runtime:.2f} seconds")


# See model card: https://huggingface.co/mistralai/Mistral-7B-Instruct-v0.2 
model_id = "mistralai/Mistral-7B-Instruct-v0.2"
tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.pad_token = tokenizer.eos_token

### Mistral-7B-Instruct float32 non-quantized

In [None]:
torch.cuda.empty_cache()

model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float32, device_map="cpu")
print("============ Mistral-7B-Instruct float32 non-quantized ==================")
print_size_of_model(model, "Mistral-7B-Instruct float32 non-quantized")
check_prediction(model, "cpu", "Mistral-7B-Instruct float32 non-quantized")

### Mistral-7B-Instruct float16 non-quantized

In [None]:
torch.cuda.empty_cache()

model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, device_map="cuda:0")
print("============ Mistral-7B-Instruct float16 non-quantized ==================")
print_size_of_model(model, "Mistral-7B-Instruct float16 non-quantized")
check_prediction(model, "cuda:0", "Mistral-7B-Instruct float16 non-quantized")


### Mistral-7B-Instruct float16 8bit quantized

In [None]:
model = None # ensure memory isn't dedicated to a prior instance
torch.cuda.empty_cache()

nf8_config = BitsAndBytesConfig(
   load_in_8bit=True,
   bnb_8bit_quant_type="nf8",
   bnb_8bit_use_double_quant=True,
   bnb_8bit_compute_dtype=torch.bfloat16
)

model = AutoModelForCausalLM.from_pretrained(
    model_id, 
    torch_dtype=torch.float16, 
    quantization_config=nf8_config,
    device_map="cuda:0"
)
print("============ Mistral-7B-Instruct float16 8bit quantized ==================")
print_size_of_model(model, "Mistral-7B-Instruct float16 8bit quantized")
check_prediction(model, "cuda:0", "Mistral-7B-Instruct float16 8bit quantized")


### Mistral-7B-Instruct float16 4bit quantized

In [None]:
model = None
torch.cuda.empty_cache()

nf4_config = BitsAndBytesConfig(
   load_in_4bit=True,
   bnb_4bit_quant_type="nf4",
   # Slows down inference at the price of being more 
   # memory efficient since the linear layers will be quantized twice.
   bnb_4bit_use_double_quant=False,
   bnb_4bit_compute_dtype=torch.bfloat16
)

model = AutoModelForCausalLM.from_pretrained(
    model_id, 
    torch_dtype=torch.float16, 
    quantization_config=nf4_config,
    device_map="cuda:0"
)
print("============ Mistral-7B-Instruct float16 4bit quantized ==================")
print_size_of_model(model, "Mistral-7B-Instruct float16 4bit quantized")
check_prediction(model, "cuda:0", "Mistral-7B-Instruct float16 4bit quantized")


### Prepare Open-Source Stack Overflow Dataset

In [None]:
MAX_ROWS = 1000
stackoverflow_df = pd.read_csv('train.csv')[:MAX_ROWS]
stackoverflow_df

In [None]:
# Constants
SEED = 999
TEST_SIZE = 0.2
VALIDATION_SIZE = 0.25  # This is 0.25 of the 80% after the initial split

# Stack Overflow prompt template
stackoverflow_prompt_template = """[INST]
Consider the following stackoverflow question:

Title: {title}

Body: {body}

Tags: {tags}

Choose between one of these three tags: HQ, LQ_EDIT, and LQ_CLOSE.

HQ: High-quality posts without a single edit.
LQ_EDIT: Low-quality posts with a negative score, and multiple community edits. However, they remain open.
LQ_CLOSE: Low-quality posts that were closed by the community without a single edit.

Only respond with either HQ, LQ_EDIT, or LQ_CLOSE. [/INST]
"""


MAX_BODY_CHAR_LEN = 1000

# Columns
stackoverflow_df['Body_short'] = stackoverflow_df['Body'].str.slice(0, MAX_BODY_CHAR_LEN)

def create_mistral_prompt(row):
    return stackoverflow_prompt_template.format(
        title=row['Title'],
        body=row['Body_short'], 
        tags=row['Tags'], 
        correct_label=row['Y']
    )
def create_mistral_training_prompt(row):
    return "<s>" + stackoverflow_prompt_template.format(
        title=row['Title'],
        body=row['Body_short'], 
        tags=row['Tags'], 
    ) + f"\n{row['Y']}</s>"

stackoverflow_df['mistral_prompt'] = stackoverflow_df.apply(create_mistral_prompt, axis=1)
stackoverflow_df['mistral_training_prompt'] = stackoverflow_df.apply(create_mistral_training_prompt, axis=1)

train_cols = ['mistral_prompt', 'mistral_training_prompt']
y_col = ['Y']

# Initial split to get test set
X_train_full, X_test, y_train_full, y_test = train_test_split(
    stackoverflow_df[train_cols], 
    stackoverflow_df[y_col], 
    test_size=TEST_SIZE, 
    random_state=SEED
)

# Further split the training set to get validation set
X_train, X_val, y_train, y_val = train_test_split(
    X_train_full, 
    y_train_full, 
    test_size=VALIDATION_SIZE, 
    random_state=SEED
)

# Output shapes to verify the split
print(X_train.shape, X_val.shape, X_test.shape)
X_train.head(3)

### Assess Performance of Non Fine-Tuned Quantized Model

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

MAX_TOKEN_LEN = 1024

# Prepare the evaluation function
def evaluate_model(model, tokenizer, _X_test, _y_test):
    print(f"evaluating {len(_X_test)} rows of test data")
    model.eval()
    predictions = []
    true_labels = []
    num_unknowns = 0

    for i, row in _X_test.iterrows():
        inputs = tokenizer(
            row['mistral_prompt'], 
            return_tensors="pt", 
            truncation=True, 
            padding=True, 
            max_length=MAX_TOKEN_LEN
        ).to("cuda:0")

        outputs = model.generate(**inputs, max_new_tokens=6)
        text_response = tokenizer.decode(
            outputs[0], 
            skip_special_tokens=True,
        )[len(row['mistral_prompt']):].strip()    
        if "HQ" in text_response:
            predictions.append("HQ")
        elif "LQ_EDIT" in text_response:
            predictions.append("LQ_EDIT")
        elif "LQ_CLOSE" in text_response:
            predictions.append("LQ_CLOSE")
        else:
            title_50 = row['mistral_prompt'][55:105]
            print(f"WARNING: unknown found for {title_50}")
            predictions.append("UNKNOWN")
            num_unknowns += 1

        true_labels.append(_y_test.loc[row.name, 'Y'])

    print(f"Found {num_unknowns} unknown labels")
    
    accuracy = accuracy_score(true_labels, predictions)
    precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(true_labels, predictions, average='macro', zero_division=0)
    precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(true_labels, predictions, average='micro', zero_division=0)

    return {
        'accuracy': accuracy,
        'precision_macro': precision_macro,
        'recall_macro': recall_macro,
        'f1_macro': f1_macro,
        'precision_micro': precision_micro,
        'recall_micro': recall_micro,
        'f1_micro': f1_micro
    }

# Evaluate the model
metrics = evaluate_model(
    model, 
    tokenizer, 
    X_test, 
    y_test
)

### Performance Metrics Over Test Set

##### **Accuracy:** the ratio of correctly predicted observations to the total observations, representing the overall effectiveness of a classification model across multiple classes by measuring the proportion of true results (both true positives and true negatives) among the total number of cases examined.

##### **Precision Macro:** the average precision (the ratio of correctly predicted positive observations to all predicted positives) across all classes, ensuring each class is given equal importance regardless of its size or frequency in the data.

##### **Recall Macro:** the average recall (the ratio of correctly predicted positive observations to all actual positives) across all classes, treating each class equally.

##### **F1 Score Macro:** the harmonic mean of precision and recall for each class independently, averaging these scores ensuring that each class contributes equally to the overall metric.

##### **F1 Score Micro:** aggregates the contributions of all classes to compute the overall precision and recall, and then calculating their harmonic mean, effectively giving equal weight to each individual instance rather than each class.


In [None]:
print(f"Accuracy: {metrics['accuracy']:.4f}")
# 
print(f"Precision Macro: {metrics['precision_macro']:.4f}")
# the average recall (the ratio of correctly predicted positive observations to all actual positives) 
# across all classes, treating each class equally regardless of its frequency in the dataset
print(f"Recall Macro: {metrics['recall_macro']:.4f}")
print(f"F1 Score Macro: {metrics['f1_macro']:.4f}")
print(f"F1 Score Micro: {metrics['f1_micro']:.4f}")