In [None]:
# === Setup ===
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer
from safetensors.torch import save_file
from gptq import GPTQ
import math

# === CONFIG ===
MODEL_NAME = "facebook/opt-125m"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 2
N_BATCHES = 5
SEQ_LEN = 32
NUM_BITS = 4
BLOCK_SIZE = 128
FIXED_T = 1000.0
LR = 0.001
NUM_ITERATIONS = 50



# === Load model and tokenizer ===
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
# === Calibration Setup using TinyStories CSV ===
import pandas as pd

CSV_PATH = "validation.csv"        # Path to your TinyStories CSV
TEXT_COLUMN = "text"               # Column containing stories
N_CALIB_SAMPLES = 1000              # Number of samples to use

# Load and preprocess CSV
print("📖 Loading TinyStories from CSV...")
df = pd.read_csv(CSV_PATH)
assert TEXT_COLUMN in df.columns, f"'{TEXT_COLUMN}' column not found in CSV."
texts = df[TEXT_COLUMN].dropna().tolist()[:BATCH_SIZE * N_BATCHES]

# Tokenize
print("🔠 Tokenizing TinyStories for calibration...")
encodings = tokenizer(
    texts,
    padding="max_length",
    truncation=True,
    max_length=SEQ_LEN,
    return_tensors="pt"
)
input_ids = encodings["input_ids"].to(DEVICE)
attention_mask = encodings["attention_mask"].to(DEVICE)
input_batches = input_ids.split(BATCH_SIZE)
mask_batches = attention_mask.split(BATCH_SIZE)


def get_power_bins(a=0.5, num_bits=4, device="cpu"):
    q_levels = 2 ** num_bits
    lin = torch.linspace(0, 1, q_levels, device=device)
    scaled = (lin ** (1 / a)) * 0.5
    bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
    return bins


# === Define BlockwiseQuantizationOptim with GPTQ weight ===
class BlockwiseQuantizationOptim(nn.Module):
    def __init__(self, weight, block_size=128, num_bits=4, fixed_T=100.0,
                 gptq_scale=None, gptq_zero=None, gptq_g_idx=None):
        super().__init__()
        self.block_size = block_size
        self.num_bits = num_bits
        self.fixed_T = fixed_T
        self.original_shape = weight.shape
        self.num_levels = 2 ** num_bits

        padded_rows = math.ceil(weight.size(0) / block_size) * block_size
        padded_cols = math.ceil(weight.size(1) / block_size) * block_size
        self.padded_weight = torch.zeros((padded_rows, padded_cols), device=weight.device)
        self.padded_weight[:weight.size(0), :weight.size(1)] = weight

        self.blocks = []
        self.block_metadata = []
        for i in range(0, padded_rows, block_size):
            for j in range(0, padded_cols, block_size):
                block = self.padded_weight[i:i+block_size, j:j+block_size]
                self.blocks.append(block)
                self.block_metadata.append((i, j))

        self.w_min = nn.ParameterList()
        self.w_max = nn.ParameterList()

        for _, (i, j) in enumerate(self.block_metadata):
            if gptq_scale is not None and gptq_zero is not None and gptq_g_idx is not None:
                # Compute group indices for this block of columns
                col_start = j
                col_end = min(j + block_size, gptq_g_idx.shape[0])
                block_g_idx = gptq_g_idx[col_start:col_end]  # Shape: [block_cols]

                # Take the mean scale and zero for this block's group mapping
                scale_block = gptq_scale[0, block_g_idx].mean().detach()
                zero_block = gptq_zero[0, block_g_idx].mean().detach()

                # Derive min and max from scale/zero
                w_min = (-zero_block * scale_block)
                w_max = ((2 ** self.num_bits - 1 - zero_block) * scale_block)
            else:
                # Fallback to naive initialization
                block = self.padded_weight[i:i+block_size, j:j+block_size]
                w_min = block.min().detach()
                w_max = block.max().detach()

            self.w_min.append(nn.Parameter(w_min.view(1)))
            self.w_max.append(nn.Parameter(w_max.view(1)))


    def forward(self):
        eps = 1e-6
        q_blocks = []
        total_entropy = 0.0
        for idx, block in enumerate(self.blocks):
            w_min = self.w_min[idx].clamp(max=self.w_max[idx].item() - eps)
            w_max = self.w_max[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)
            #q_levels = torch.linspace(0, 1, self.num_levels, device=block.device)
            q_levels = get_power_bins(a=0.5, num_bits=self.num_bits, device=block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - q_levels)
            soft_probs = torch.softmax(dists * self.fixed_T, dim=-1)
            w_q = (soft_probs * q_levels).sum(dim=-1)
            w_deq = w_q * (w_max - w_min) + w_min

            q_blocks.append(w_deq)

            bin_mass = soft_probs.sum(dim=0)
            bin_probs = bin_mass / (bin_mass.sum() + eps)
            entropy = -(bin_probs * (bin_probs + eps).log()).sum()
            total_entropy += entropy

        padded_out = torch.zeros_like(self.padded_weight)
        for idx, (i, j) in enumerate(self.block_metadata):
            padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
        return padded_out[:self.original_shape[0], :self.original_shape[1]], total_entropy

    def export(self):
        eps = 1e-6
        q_blocks = []
        for idx, block in enumerate(self.blocks):
            w_min = self.w_min[idx].clamp(max=self.w_max[idx].item() - eps)
            w_max = self.w_max[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)
            #q_levels = torch.linspace(0, 1, self.num_levels, device=block.device)
            q_levels = get_power_bins(a=0.5, num_bits=self.num_bits, device=block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - q_levels)
            q_idx = torch.argmax(dists, dim=-1).to(torch.int32)
            w_q = q_levels[q_idx]
            w_deq = w_q * (w_max - w_min) + w_min
            q_blocks.append(w_deq)

        padded_out = torch.zeros_like(self.padded_weight)
        for idx, (i, j) in enumerate(self.block_metadata):
            padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
        return padded_out[:self.original_shape[0], :self.original_shape[1]].cpu()

# === Quantization Loop for all Linear Layers ===
safetensor_dict = {}
flag = 0
for name, module in model.named_modules():
    if not isinstance(module, nn.Linear):
        continue
    if flag == 4:
      break
    flag += 1
    print(f"\n🔧 GPTQ + Blockwise Quantizing Layer: {name} | Shape: {module.weight.shape}")
    original_weight = module.weight.data.clone()
    activation_batches = []
    def hook_fn(mod, inp, out):
        activation_batches.append(inp[0].detach())
    hook = module.register_forward_hook(hook_fn)

    with torch.no_grad():
        for x, m in zip(input_batches, mask_batches):
            model(input_ids=x, attention_mask=m)
    hook.remove()

    if not activation_batches:
        continue

    gptq = GPTQ(module)
    for act in activation_batches:
        gptq.add_batch(act, module(act))
    scale, zero, g_idx = gptq.fasterquant(
        blocksize=BLOCK_SIZE,
        percdamp=0.01,
        group_size=128,
        actorder=True,
    )
    q_weight = module.weight.data.clone()

    # Init BlockwiseQuantizationOptim using GPTQ parameters
    quant_layer = BlockwiseQuantizationOptim(
        weight=original_weight,
        block_size=BLOCK_SIZE,
        num_bits=NUM_BITS,
        fixed_T=FIXED_T,
        gptq_scale=scale,
        gptq_zero=zero,
        gptq_g_idx=g_idx
    ).to(DEVICE)
    optimizer = torch.optim.Adam(quant_layer.parameters(), lr=LR)
    mse_loss = nn.MSELoss()

    #original_weight = module.weight.data.clone()

    for it in range(NUM_ITERATIONS):
        for act in activation_batches:
            optimizer.zero_grad()
            w_q, entropy = quant_layer()
            recon = F.linear(act.to(DEVICE), w_q)
            target = F.linear(act.to(DEVICE), q_weight)
            loss = mse_loss(recon, target) + mse_loss(q_weight, w_q)
            print(f"Iteration {it + 1}/{NUM_ITERATIONS}, Entropy: {entropy.item():.4f}, Loss: {loss.item():.8f}")
            loss.backward()
            optimizer.step()

    with torch.no_grad():
        final_weight = quant_layer.export().to(module.weight.device)
        loss = mse_loss(q_weight, final_weight)
        print("weight diff",loss)
        module.weight.copy_(final_weight)
        safetensor_dict[name.replace(".", "_") + ".dequant"] = final_weight

# === Save Final Weights ===
#save_file(safetensor_dict, "quantized_blockwise_gptq.safetensors")
print("\n✅ Finished GPTQ-initialized blockwise quantization for all layers.")


📖 Loading TinyStories from CSV...
🔠 Tokenizing TinyStories for calibration...

🔧 GPTQ + Blockwise Quantizing Layer: model.decoder.layers.0.self_attn.k_proj | Shape: torch.Size([768, 768])
Iteration 1/50, Entropy: 265.4024, Loss: 0.00612533
Iteration 1/50, Entropy: 265.4069, Loss: 0.00573260
Iteration 1/50, Entropy: 265.4255, Loss: 0.00571611
Iteration 1/50, Entropy: 265.4567, Loss: 0.00554545
Iteration 1/50, Entropy: 265.4746, Loss: 0.00559397
Iteration 2/50, Entropy: 265.4818, Loss: 0.00585293
Iteration 2/50, Entropy: 265.4867, Loss: 0.00550320
Iteration 2/50, Entropy: 265.4975, Loss: 0.00556068
Iteration 2/50, Entropy: 265.5141, Loss: 0.00537909
Iteration 2/50, Entropy: 265.5258, Loss: 0.00546224
Iteration 3/50, Entropy: 265.5404, Loss: 0.00576066
Iteration 3/50, Entropy: 265.5574, Loss: 0.00543055
Iteration 3/50, Entropy: 265.5699, Loss: 0.00548551
Iteration 3/50, Entropy: 265.5745, Loss: 0.00532124
Iteration 3/50, Entropy: 265.5700, Loss: 0.00540359
Iteration 4/50, Entropy: 265.563

In [None]:
#test_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
# ==== Test quantized model ====
prompt = "travel while"
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)

with torch.no_grad():
    output = model.generate(**inputs, max_length=10)

print("Sample Output:", tokenizer.decode(output[0], skip_special_tokens=True))

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


Sample Output: travel while you were sleeping.

The following


In [None]:
# === Setup ===
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer
from safetensors.torch import save_file
from gptq import GPTQ
import math

# === CONFIG ===
MODEL_NAME = "facebook/opt-125m"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 2
N_BATCHES = 5
SEQ_LEN = 32
NUM_BITS = 4
BLOCK_SIZE = 128
FIXED_T = 1000.0
LR = 0.001
NUM_ITERATIONS = 10

# === Load model and tokenizer ===
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
# === Calibration Setup using TinyStories CSV ===
import pandas as pd

CSV_PATH = "validation.csv"        # Path to your TinyStories CSV
TEXT_COLUMN = "text"               # Column containing stories
N_CALIB_SAMPLES = 1000              # Number of samples to use

# Load and preprocess CSV
print("📖 Loading TinyStories from CSV...")
df = pd.read_csv(CSV_PATH)
assert TEXT_COLUMN in df.columns, f"'{TEXT_COLUMN}' column not found in CSV."
texts = df[TEXT_COLUMN].dropna().tolist()[:BATCH_SIZE * N_BATCHES]

# Tokenize
print("🔠 Tokenizing TinyStories for calibration...")
encodings = tokenizer(
    texts,
    padding="max_length",
    truncation=True,
    max_length=SEQ_LEN,
    return_tensors="pt"
)
input_ids = encodings["input_ids"].to(DEVICE)
attention_mask = encodings["attention_mask"].to(DEVICE)
input_batches = input_ids.split(BATCH_SIZE)
mask_batches = attention_mask.split(BATCH_SIZE)
def get_power_bins(a=0.5, num_bits=4, device="cpu"):
    q_levels = 2 ** num_bits
    lin = torch.linspace(0, 1, q_levels, device=device)
    scaled = (lin ** (1 / a)) * 0.5
    bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
    return bins
# === Define BlockwiseQuantizationOptim with GPTQ weight ===
class BlockwiseQuantizationOptim(nn.Module):
    def __init__(self, gptq_weight, block_size=BLOCK_SIZE, num_bits=NUM_BITS, fixed_T=FIXED_T):
        super().__init__()
        self.block_size = block_size
        self.num_bits = num_bits
        self.fixed_T = fixed_T
        self.original_shape = gptq_weight.shape
        self.num_levels = 2 ** num_bits

        padded_rows = math.ceil(gptq_weight.size(0) / block_size) * block_size
        padded_cols = math.ceil(gptq_weight.size(1) / block_size) * block_size
        self.padded_weight = torch.zeros((padded_rows, padded_cols), device=gptq_weight.device)
        self.padded_weight[:gptq_weight.size(0), :gptq_weight.size(1)] = gptq_weight

        self.blocks = []
        self.block_metadata = []
        self.w_min = nn.ParameterList()
        self.w_max = nn.ParameterList()

        for i in range(0, padded_rows, block_size):
            for j in range(0, padded_cols, block_size):
                block = self.padded_weight[i:i+block_size, j:j+block_size]
                self.blocks.append(block)
                self.block_metadata.append((i, j))
                w_min = block.min().detach()
                w_max = block.max().detach()
                self.w_min.append(nn.Parameter(w_min.view(1)))
                self.w_max.append(nn.Parameter(w_max.view(1)))

    def forward(self):
        eps = 1e-6
        q_blocks = []
        total_entropy = 0.0
        for idx, block in enumerate(self.blocks):
            w_min = self.w_min[idx].clamp(max=self.w_max[idx].item() - eps)
            w_max = self.w_max[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)
            #q_levels = torch.linspace(0, 1, self.num_levels, device=block.device)
            q_levels = get_power_bins(a=0.55, num_bits=self.num_bits, device=block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - q_levels)
            soft_probs = torch.softmax(dists * self.fixed_T, dim=-1)
            w_q = (soft_probs * q_levels).sum(dim=-1)
            w_deq = w_q * (w_max - w_min) + w_min

            q_blocks.append(w_deq)

            bin_mass = soft_probs.sum(dim=0)
            bin_probs = bin_mass / (bin_mass.sum() + eps)
            entropy = -(bin_probs * (bin_probs + eps).log()).sum()
            total_entropy += entropy

        padded_out = torch.zeros_like(self.padded_weight)
        for idx, (i, j) in enumerate(self.block_metadata):
            padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
        return padded_out[:self.original_shape[0], :self.original_shape[1]], total_entropy

    def export(self):
        eps = 1e-6
        q_blocks = []
        for idx, block in enumerate(self.blocks):
            w_min = self.w_min[idx].clamp(max=self.w_max[idx].item() - eps)
            w_max = self.w_max[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)
            #q_levels = torch.linspace(0, 1, self.num_levels, device=block.device)
            q_levels = get_power_bins(a=0.55, num_bits=self.num_bits, device=block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - q_levels)
            q_idx = torch.argmax(dists, dim=-1).to(torch.int32)
            w_q = q_levels[q_idx]
            w_deq = w_q * (w_max - w_min) + w_min
            q_blocks.append(w_deq)

        padded_out = torch.zeros_like(self.padded_weight)
        for idx, (i, j) in enumerate(self.block_metadata):
            padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
        return padded_out[:self.original_shape[0], :self.original_shape[1]].cpu()

# === Quantization Loop for all Linear Layers ===
safetensor_dict = {}
flag = 0
for name, module in model.named_modules():
    if not isinstance(module, nn.Linear):
        continue
    if "lm_head" in name:
        continue
    if "fc2" in name:
        continue

    print(f"\n🔧 GPTQ + Blockwise Quantizing Layer: {name} | Shape: {module.weight.shape}")

    activation_batches = []
    def hook_fn(mod, inp, out):
        activation_batches.append(inp[0].detach())
    hook = module.register_forward_hook(hook_fn)

    with torch.no_grad():
        for x, m in zip(input_batches, mask_batches):
            model(input_ids=x, attention_mask=m)
    hook.remove()

    if not activation_batches:
        continue
    original_weight = module.weight.data.clone()
    gptq = GPTQ(module)
    for act in activation_batches:
        gptq.add_batch(act, module(act))
    gptq.fasterquant(
        blocksize=BLOCK_SIZE,
        percdamp=0.01,
        group_size=128,
        actorder=True,
    )
    q_weight = module.weight.data.clone()

    quant_layer = BlockwiseQuantizationOptim(original_weight).to(DEVICE)
    optimizer = torch.optim.Adam(quant_layer.parameters(), lr=LR)
    mse_loss = nn.MSELoss()

    #original_weight = module.weight.data.clone()

    for it in range(NUM_ITERATIONS):
        for act in activation_batches:
            optimizer.zero_grad()
            w_q, entropy = quant_layer()
            recon = F.linear(act.to(DEVICE), w_q)
            target = F.linear(act.to(DEVICE), original_weight)
            loss = mse_loss(recon, target) + mse_loss(original_weight, w_q)
            print(f"Iteration {it + 1}/{NUM_ITERATIONS}, Entropy: {entropy.item():.4f}, Loss: {loss.item():.8f}")
            loss.backward()
            optimizer.step()

    with torch.no_grad():
        final_weight = quant_layer.export().to(module.weight.device)
        loss = mse_loss(original_weight, final_weight)
        print("weight diff",loss)
        module.weight.copy_(final_weight)
        safetensor_dict[name.replace(".", "_") + ".dequant"] = final_weight

# === Save Final Weights ===
#save_file(safetensor_dict, "quantized_blockwise_gptq.safetensors")
print("\n✅ Finished GPTQ-initialized blockwise quantization for all layers.")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/651 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


pytorch_model.bin:   0%|          | 0.00/251M [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


generation_config.json:   0%|          | 0.00/137 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/251M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/685 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/441 [00:00<?, ?B/s]

📖 Loading TinyStories from CSV...
🔠 Tokenizing TinyStories for calibration...

🔧 GPTQ + Blockwise Quantizing Layer: model.decoder.layers.0.self_attn.k_proj | Shape: torch.Size([768, 768])
Iteration 1/10, Entropy: 267.9043, Loss: 0.00306232
Iteration 1/10, Entropy: 267.8928, Loss: 0.00295261
Iteration 1/10, Entropy: 267.9056, Loss: 0.00294262
Iteration 1/10, Entropy: 267.9290, Loss: 0.00297390
Iteration 1/10, Entropy: 267.9485, Loss: 0.00291151
Iteration 2/10, Entropy: 267.9654, Loss: 0.00291848
Iteration 2/10, Entropy: 267.9790, Loss: 0.00284403
Iteration 2/10, Entropy: 267.9899, Loss: 0.00286282
Iteration 2/10, Entropy: 267.9994, Loss: 0.00291082
Iteration 2/10, Entropy: 268.0043, Loss: 0.00284972
Iteration 3/10, Entropy: 268.0102, Loss: 0.00285881
Iteration 3/10, Entropy: 268.0182, Loss: 0.00281264
Iteration 3/10, Entropy: 268.0255, Loss: 0.00282637
Iteration 3/10, Entropy: 268.0319, Loss: 0.00286619
Iteration 3/10, Entropy: 268.0364, Loss: 0.00282210
Iteration 4/10, Entropy: 268.040

KeyboardInterrupt: 

In [None]:
# === Setup ===
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer
from safetensors.torch import save_file
#from gptq import GPTQ
import math

# === CONFIG ===
MODEL_NAME = "facebook/opt-125m"
MODEL_NAME = "databricks/dolly-v2-3b"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 2
N_BATCHES = 5
SEQ_LEN = 32
NUM_BITS = 4
BLOCK_SIZE = 128
FIXED_T = 1000.0
LR = 0.001
NUM_ITERATIONS = 0

# === Load model and tokenizer ===
#model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",                # Automatically split layers across available GPUs/CPU
    torch_dtype="float32",               # Use float16 where possible
    low_cpu_mem_usage=True            # Efficient weight loading
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
# === Calibration Setup using TinyStories CSV ===
import pandas as pd

CSV_PATH = "validation.csv"        # Path to your TinyStories CSV
TEXT_COLUMN = "text"               # Column containing stories
N_CALIB_SAMPLES = 1000              # Number of samples to use

# Load and preprocess CSV
print("📖 Loading TinyStories from CSV...")
df = pd.read_csv(CSV_PATH)
assert TEXT_COLUMN in df.columns, f"'{TEXT_COLUMN}' column not found in CSV."
texts = df[TEXT_COLUMN].dropna().tolist()[:BATCH_SIZE * N_BATCHES]

# Tokenize
print("🔠 Tokenizing TinyStories for calibration...")
encodings = tokenizer(
    texts,
    padding="max_length",
    truncation=True,
    max_length=SEQ_LEN,
    return_tensors="pt"
)
input_ids = encodings["input_ids"].to(DEVICE)
attention_mask = encodings["attention_mask"].to(DEVICE)
input_batches = input_ids.split(BATCH_SIZE)
mask_batches = attention_mask.split(BATCH_SIZE)
def get_power_bins(a=0.5, num_bits=4, device="cpu"):
    q_levels = 2 ** num_bits
    lin = torch.linspace(0, 1, q_levels, device=device)
    scaled = (lin ** (1 / a)) * 0.5
    bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
    return bins
# === Define BlockwiseQuantizationOptim with GPTQ weight ===
class BlockwiseQuantizationOptim(nn.Module):
    def __init__(self, weight, block_size=128, num_bits=4, fixed_T=100.0, use_blockwise=True, a=0.5):
        super().__init__()
        self.block_size = block_size
        self.num_bits = num_bits
        self.fixed_T = fixed_T
        self.a = a
        self.original_shape = weight.shape
        self.use_blockwise = use_blockwise
        self.num_levels = 2 ** num_bits

        if use_blockwise:
            padded_rows = math.ceil(weight.size(0) / block_size) * block_size
            padded_cols = math.ceil(weight.size(1) / block_size) * block_size
            self.padded_weight = torch.zeros((padded_rows, padded_cols), device=weight.device)
            self.padded_weight[:weight.size(0), :weight.size(1)] = weight

            self.blocks = []
            self.block_metadata = []
            for i in range(0, padded_rows, block_size):
                for j in range(0, padded_cols, block_size):
                    block = self.padded_weight[i:i+block_size, j:j+block_size]
                    self.blocks.append(block)
                    self.block_metadata.append((i, j))

            self.w_min = nn.ParameterList()
            self.w_max = nn.ParameterList()
            self.learnable_bins = nn.ParameterList()
            for block in self.blocks:
                w_min, w_max = block.min().detach(), block.max().detach()
                pad = 0.05 * (w_max - w_min)
                self.w_min.append(nn.Parameter((w_min - pad).view(1)))
                self.w_max.append(nn.Parameter((w_max + pad).view(1)))
                init_bins = self._get_power_bins().detach()
                self.learnable_bins.append(nn.Parameter(init_bins.clone()))
        else:
            self.blocks = [weight]
            self.block_metadata = [(0, 0)]
            w_min, w_max = weight.min().detach(), weight.max().detach()
            pad = 0.05 * (w_max - w_min)
            self.w_min = nn.Parameter((w_min - pad).view(1))
            self.w_max = nn.Parameter((w_max + pad).view(1))
            init_bins = self._get_power_bins().detach()
            self.learnable_bins = nn.Parameter(init_bins.clone())

    def _get_power_bins(self):
        lin = torch.linspace(0, 1, 2 ** self.num_bits)
        scaled = (lin ** (1 / self.a)) * 0.5
        bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
        return bins

    def forward(self):
        eps = 1e-6
        q_blocks = []
        total_entropy = 0.0

        if not self.use_blockwise:
            blocks = [self.blocks[0]]
            w_mins = [self.w_min]
            w_maxs = [self.w_max]
            bin_lists = [self.learnable_bins]
        else:
            blocks = self.blocks
            w_mins = self.w_min
            w_maxs = self.w_max
            bin_lists = self.learnable_bins

        for idx, block in enumerate(blocks):
            w_min = w_mins[idx].clamp(max=w_maxs[idx].item() - eps)
            w_max = w_maxs[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)

            bins = bin_lists[idx].to(block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - bins)
            soft_probs = torch.softmax(dists * self.fixed_T, dim=-1)
            w_q = (soft_probs * bins).sum(dim=-1)
            w_deq = w_q * (w_max - w_min) + w_min
            q_blocks.append(w_deq)

            # Entropy penalty
            bin_mass = soft_probs.sum(dim=0)
            bin_probs = bin_mass / (bin_mass.sum() + eps)
            entropy = -(bin_probs * (bin_probs + eps).log()).sum()
            total_entropy += entropy

        if self.use_blockwise:
            padded_out = torch.zeros_like(self.padded_weight)
            for idx, (i, j) in enumerate(self.block_metadata):
                padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
            return padded_out[:self.original_shape[0], :self.original_shape[1]], total_entropy
        else:
            return q_blocks[0], total_entropy

    def export(self):
        eps = 1e-6
        q_blocks = []

        if not self.use_blockwise:
            blocks = [self.blocks[0]]
            w_mins = [self.w_min]
            w_maxs = [self.w_max]
            bin_lists = [self.learnable_bins]
        else:
            blocks = self.blocks
            w_mins = self.w_min
            w_maxs = self.w_max
            bin_lists = self.learnable_bins

        for idx, block in enumerate(blocks):
            w_min = w_mins[idx].clamp(max=w_maxs[idx].item() - eps)
            w_max = w_maxs[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)

            bins = bin_lists[idx].to(block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - bins)
            q_idx = torch.argmax(dists, dim=-1).to(torch.int32)
            w_q = bins[q_idx]
            w_deq = w_q * (w_max - w_min) + w_min
            q_blocks.append(w_deq)

        if self.use_blockwise:
            padded_out = torch.zeros_like(self.padded_weight)
            for idx, (i, j) in enumerate(self.block_metadata):
                padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
            return padded_out[:self.original_shape[0], :self.original_shape[1]].cpu()
        else:
            return q_blocks[0].cpu()


# === Quantization Loop for all Linear Layers ===
safetensor_dict = {}
flag = 0
for name, module in model.named_modules():
    if not isinstance(module, nn.Linear):
        continue
    # if "lm_head" in name:
    #     continue
    # if "fc1" in name:
    #     continue

    if "embed_out" in name:
        continue


    print(f"\n🔧 GPTQ + Blockwise Quantizing Layer: {name} | Shape: {module.weight.shape}")

    activation_batches = []
    def hook_fn(mod, inp, out):
        activation_batches.append(inp[0].detach())
    hook = module.register_forward_hook(hook_fn)

    with torch.no_grad():
        for x, m in zip(input_batches, mask_batches):
            model(input_ids=x, attention_mask=m)
    hook.remove()

    if not activation_batches:
        continue
    original_weight = module.weight.data.clone()
    # gptq = GPTQ(module)
    # for act in activation_batches:
    #     gptq.add_batch(act, module(act))
    # gptq.fasterquant(
    #     blocksize=BLOCK_SIZE,
    #     percdamp=0.01,
    #     group_size=128,
    #     actorder=True,
    # )
    q_weight = module.weight.data.clone()

    quant_layer = BlockwiseQuantizationOptim(original_weight,num_bits = 4, use_blockwise=True,a=0.52)#.to(DEVICE)

    optimizer = torch.optim.Adam(quant_layer.parameters(), lr=LR)
    mse_loss = nn.MSELoss()

    #original_weight = module.weight.data.clone()

    for it in range(NUM_ITERATIONS):
        for act in activation_batches:
            optimizer.zero_grad()
            w_q, entropy = quant_layer()
            recon = F.linear(act.to(DEVICE), w_q)
            target = F.linear(act.to(DEVICE), original_weight)
            loss = mse_loss(recon, target) + mse_loss(original_weight, w_q)
            print(f"Iteration {it + 1}/{NUM_ITERATIONS}, Entropy: {entropy.item():.4f}, Loss: {loss.item():.8f}")
            loss.backward()
            optimizer.step()

    with torch.no_grad():
        final_weight = quant_layer.export().to(module.weight.device)
        loss = mse_loss(original_weight, final_weight)
        print("weight diff",loss)
        module.weight.copy_(final_weight)
        #safetensor_dict[name.replace(".", "_") + ".dequant"] = final_weight
    del quant_layer, optimizer, activation_batches
    torch.cuda.empty_cache()

# === Save Final Weights ===
#save_file(safetensor_dict, "quantized_blockwise_gptq.safetensors")
print("\n✅ Finished GPTQ-initialized blockwise quantization for all layers.")




📖 Loading TinyStories from CSV...
🔠 Tokenizing TinyStories for calibration...

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.attention.query_key_value | Shape: torch.Size([7680, 2560])
weight diff tensor(3.4007e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.attention.dense | Shape: torch.Size([2560, 2560])
weight diff tensor(1.7337e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.mlp.dense_h_to_4h | Shape: torch.Size([10240, 2560])
weight diff tensor(1.7251e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.mlp.dense_4h_to_h | Shape: torch.Size([2560, 10240])
weight diff tensor(1.5286e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.1.attention.query_key_value | Shape: torch.Size([7680, 2560])
weight diff tensor(3.3509e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.1.attention.dense | Shape: torch.Size([2560, 2560])
weight diff tensor(1

RuntimeError: Tensor.item() cannot be called on meta tensors

In [None]:
#test_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
# ==== Test quantized model ====
#model.eval()
prompt = "I like travelling to my"
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)

with torch.no_grad():
    output = model.generate(**inputs, max_length=30)

print("Sample Output:", tokenizer.decode(output[0], skip_special_tokens=True))

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


Sample Output: I like travelling to my friends' houses. I like the idea of being able to go to my friends' houses and not having to worry about anything


In [None]:
#test_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
# ==== Test quantized model ====
#model.eval()
prompt = "once upon a time in"
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)

with torch.no_grad():
    output = model.generate(**inputs, max_length=30)

print("Sample Output:", tokenizer.decode(output[0], skip_special_tokens=True))

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


Sample Output: once upon a time in the land of Oz, there was a witch who had a beautiful daughter. The daughter was so beautiful that she was known as


In [None]:
model.eval()

GPTNeoXForCausalLM(
  (gpt_neox): GPTNeoXModel(
    (embed_in): Embedding(50280, 2560)
    (emb_dropout): Dropout(p=0.0, inplace=False)
    (layers): ModuleList(
      (0-31): 32 x GPTNeoXLayer(
        (input_layernorm): LayerNorm((2560,), eps=1e-05, elementwise_affine=True)
        (post_attention_layernorm): LayerNorm((2560,), eps=1e-05, elementwise_affine=True)
        (post_attention_dropout): Dropout(p=0.0, inplace=False)
        (post_mlp_dropout): Dropout(p=0.0, inplace=False)
        (attention): GPTNeoXAttention(
          (query_key_value): Linear(in_features=2560, out_features=7680, bias=True)
          (dense): Linear(in_features=2560, out_features=2560, bias=True)
        )
        (mlp): GPTNeoXMLP(
          (dense_h_to_4h): Linear(in_features=2560, out_features=10240, bias=True)
          (dense_4h_to_h): Linear(in_features=10240, out_features=2560, bias=True)
          (act): GELUActivation()
        )
      )
    )
    (final_layer_norm): LayerNorm((2560,), eps=1e-05

In [13]:
pip install datasets


Collecting datasets
  Downloading datasets-3.5.1-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2025.3.0,>=2023.1.0 (from fsspec[http]<=2025.3.0,>=2023.1.0->datasets)
  Downloading fsspec-2025.3.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.1-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.4/491.4 kB[0m [31m33.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2025.3.0-py3-none-any.whl 

In [17]:
test_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",                # Automatically split layers across available GPUs/CPU
    torch_dtype="float32",               # Use float16 where possible
    low_cpu_mem_usage=True            # Efficient weight loading
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token

In [18]:
import random
import re
import torch
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer

def evaluate_hellaswag_one_sample(model,tokeniser, sample_index=None, max_new_tokens=30):
    # Load one sample from HellaSwag
    hellaswag = load_dataset("hellaswag", split="validation")
    sample = hellaswag[sample_index if sample_index is not None else random.randint(0, len(hellaswag) - 1)]

    ctx = sample["ctx"]
    options = sample["endings"]
    label = int(sample["label"])  # Ground truth index (0-3)

    # Build prompt
    prompt = f"""Context:
{ctx}

Which of the following is the most plausible continuation?

Options:
A. {options[0]}
B. {options[1]}
C. {options[2]}
D. {options[3]}

Answer:"""

    # Load quantized model and tokenizer
    # tokenizer = AutoTokenizer.from_pretrained(model_path)
    # model = AutoModelForCausalLM.from_pretrained(
    #     model_path,
    #     torch_dtype=torch.float16,
    #     device_map="auto"
    # )
    # model.eval()

    # Tokenize and generate
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_length=inputs['input_ids'].shape[1] + max_new_tokens,
            do_sample=False
        )
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Try to extract answer letter (A/B/C/D)
    match = re.search(r"Answer[:\- ]*\s*([A-D])", response)
    predicted_letter = match.group(1) if match else "Unknown"

    # Report
    print("🧠 Prompt:\n", prompt)
    print("🗣️ Model Output:\n", response)
    print(f"\n✅ Predicted Answer: {predicted_letter}")
    print(f"🎯 Ground Truth Answer: {chr(65 + label)} ({label})")

    return {
        "context": ctx,
        "options": options,
        "prompt": prompt,
        "model_output": response,
        "predicted": predicted_letter,
        "ground_truth": chr(65 + label)
    }
evaluate_hellaswag_one_sample(test_model,tokenizer)

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


🧠 Prompt:
 Context:
[header] How to measure a saddle [title] Learn what the bars are. [step] When you're looking at new saddles, one of the things you'll need to pay attention to is the bars. The bars are the weight-distributing foundation for the saddle; the part of the saddle that rests on the horse and holds you up.

Which of the following is the most plausible continuation?

Options:
A. There are two' bars' that distribute weight evenly on either side of the spine. If your saddle is well-fitting, the horse's back will be in contact with the entire length of the bars.
B. Most new saddles have bars on their backs that hang to the sides. [substeps] If your horse's saddle only has bars, flip the bars over so that the back is facing away from you.
C. The bars are located at the top of the horse and at either end of the saddle, which are the sides that you'll almost touch. [substeps] The bars range from two-hundred-200.
D. While the bars are peerequisite for the saddle, they're also usef

{'context': "[header] How to measure a saddle [title] Learn what the bars are. [step] When you're looking at new saddles, one of the things you'll need to pay attention to is the bars. The bars are the weight-distributing foundation for the saddle; the part of the saddle that rests on the horse and holds you up.",
 'options': ["There are two' bars' that distribute weight evenly on either side of the spine. If your saddle is well-fitting, the horse's back will be in contact with the entire length of the bars.",
  "Most new saddles have bars on their backs that hang to the sides. [substeps] If your horse's saddle only has bars, flip the bars over so that the back is facing away from you.",
  "The bars are located at the top of the horse and at either end of the saddle, which are the sides that you'll almost touch. [substeps] The bars range from two-hundred-200.",
  "While the bars are peerequisite for the saddle, they're also useful if you want an added comfort. [substeps] For a standard

In [None]:
import random
import re
import torch
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer

def evaluate_piqa_one_sample(model, tokenizer, sample_index=None, max_new_tokens=30): # Added tokenizer argument
    # Load PIQA dataset
    dataset = load_dataset("piqa", split="validation")
    sample = dataset[sample_index if sample_index is not None else random.randint(0, len(dataset) - 1)]

    goal = sample["goal"]
    sol1 = sample["sol1"]
    sol2 = sample["sol2"]
    label = int(sample["label"])  # correct answer (0 or 1)

    # Format prompt for LLM
    prompt = f"""Task: Choose the most physically plausible solution to the following goal.

Goal: {goal}

Options:
A. {sol1}
B. {sol2}

Answer:"""

    # Inference
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda") # Use the passed tokenizer
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_length=inputs["input_ids"].shape[1] + max_new_tokens,
            do_sample=False
        )
        response = tokenizer.decode(outputs[0], skip_special_tokens=True) # Use the passed tokenizer

    # Extract A/B prediction
    match = re.search(r"Answer[:\- ]*\s*([A-B])", response)
    predicted = match.group(1) if match else "Unknown"

    print("🧠 Prompt:\n", prompt)
    print("🗣️ Model Output:\n", response)
    print(f"\n✅ Predicted Answer: {predicted}")
    print(f"🎯 Ground Truth Answer: {'A' if label == 0 else 'B'}")

    return {
        "goal": goal,
        "option_A": sol1,
        "option_B": sol2,
        "prompt": prompt,
        "model_output": response,
        "predicted": predicted,
        "ground_truth": "A" if label == 0 else "B"
    }

# Call the function with the model and tokenizer objects
evaluate_piqa_one_sample(model, tokenizer) # Pass the tokenizer object

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


🧠 Prompt:
 Task: Choose the most physically plausible solution to the following goal.

Goal: How can you keep your taco shell crispy when building your tacos?

Options:
A. Place cheese in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.
B. Place sauce in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.

Answer:
🗣️ Model Output:
 Task: Choose the most physically plausible solution to the following goal.

Goal: How can you keep your taco shell crispy when building your tacos?

Options:
A. Place cheese in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.
B. Place sauce in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.

Answer: B. Place sauce in the taco before any of the other 

{'goal': 'How can you keep your taco shell crispy when building your tacos?',
 'option_A': 'Place cheese in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.',
 'option_B': 'Place sauce in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.',
 'prompt': 'Task: Choose the most physically plausible solution to the following goal.\n\nGoal: How can you keep your taco shell crispy when building your tacos?\n\nOptions:\nA. Place cheese in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.\nB. Place sauce in the taco before any of the other fillings, it will form a shield over the shell and protect it from moisture in other fillings.\n\nAnswer:',
 'model_output': 'Task: Choose the most physically plausible solution to the following goal.\n\nGoal: How can you keep yo

In [None]:
# === Setup ===
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer
from safetensors.torch import save_file
#from gptq import GPTQ
import math

# === CONFIG ===
MODEL_NAME = "facebook/opt-125m"
MODEL_NAME = "databricks/dolly-v2-3b"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 32
N_BATCHES = 8
SEQ_LEN = 128
NUM_BITS = 3
BLOCK_SIZE = 128
FIXED_T = 1000.0
LR = 0.001
NUM_ITERATIONS = 2

# === Load model and tokenizer ===
#model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",                # Automatically split layers across available GPUs/CPU
    torch_dtype="float32",               # Use float16 where possible
    low_cpu_mem_usage=True            # Efficient weight loading
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
# === Calibration Setup using TinyStories CSV ===
import pandas as pd

CSV_PATH = "validation.csv"        # Path to your TinyStories CSV
TEXT_COLUMN = "text"               # Column containing stories
N_CALIB_SAMPLES = 1000              # Number of samples to use

# Load and preprocess CSV
print("📖 Loading TinyStories from CSV...")
df = pd.read_csv(CSV_PATH)
assert TEXT_COLUMN in df.columns, f"'{TEXT_COLUMN}' column not found in CSV."
texts = df[TEXT_COLUMN].dropna().tolist()[:BATCH_SIZE * N_BATCHES]

# Tokenize
print("🔠 Tokenizing TinyStories for calibration...")
encodings = tokenizer(
    texts,
    padding="max_length",
    truncation=True,
    max_length=SEQ_LEN,
    return_tensors="pt"
)
input_ids = encodings["input_ids"].to(DEVICE)
attention_mask = encodings["attention_mask"].to(DEVICE)
input_batches = input_ids.split(BATCH_SIZE)
mask_batches = attention_mask.split(BATCH_SIZE)
def get_power_bins(a=0.5, num_bits=4, device="cpu"):
    q_levels = 2 ** num_bits
    lin = torch.linspace(0, 1, q_levels, device=device)
    scaled = (lin ** (1 / a)) * 0.5
    bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
    return bins
# === Define BlockwiseQuantizationOptim with GPTQ weight ===
class BlockwiseQuantizationOptim(nn.Module):
    def __init__(self, weight, block_size=128, num_bits=4, fixed_T=100.0, use_blockwise=True, a=0.5):
        super().__init__()
        self.block_size = block_size
        self.num_bits = num_bits
        self.fixed_T = fixed_T
        self.a = a
        self.original_shape = weight.shape
        self.use_blockwise = use_blockwise
        self.num_levels = 2 ** num_bits

        if use_blockwise:
            padded_rows = math.ceil(weight.size(0) / block_size) * block_size
            padded_cols = math.ceil(weight.size(1) / block_size) * block_size
            self.padded_weight = torch.zeros((padded_rows, padded_cols), device=weight.device)
            self.padded_weight[:weight.size(0), :weight.size(1)] = weight

            self.blocks = []
            self.block_metadata = []
            for i in range(0, padded_rows, block_size):
                for j in range(0, padded_cols, block_size):
                    block = self.padded_weight[i:i+block_size, j:j+block_size]
                    self.blocks.append(block)
                    self.block_metadata.append((i, j))

            self.w_min = nn.ParameterList()
            self.w_max = nn.ParameterList()
            self.learnable_bins = nn.ParameterList()
            for block in self.blocks:
                w_min, w_max = block.min().detach(), block.max().detach()
                pad = 0.05 * (w_max - w_min)
                self.w_min.append(nn.Parameter((w_min - pad).view(1)))
                self.w_max.append(nn.Parameter((w_max + pad).view(1)))
                init_bins = self._get_power_bins().detach()
                self.learnable_bins.append(nn.Parameter(init_bins.clone()))
        else:
            self.blocks = [weight]
            self.block_metadata = [(0, 0)]
            w_min, w_max = weight.min().detach(), weight.max().detach()
            pad = 0.05 * (w_max - w_min)
            self.w_min = nn.Parameter((w_min - pad).view(1))
            self.w_max = nn.Parameter((w_max + pad).view(1))
            init_bins = self._get_power_bins().detach()
            self.learnable_bins = nn.Parameter(init_bins.clone())

    def _get_power_bins(self):
        lin = torch.linspace(0, 1, 2 ** self.num_bits)
        scaled = (lin ** (1 / self.a)) * 0.5
        bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
        return bins

    def forward(self):
        eps = 1e-6
        q_blocks = []
        total_entropy = 0.0

        if not self.use_blockwise:
            blocks = [self.blocks[0]]
            w_mins = [self.w_min]
            w_maxs = [self.w_max]
            bin_lists = [self.learnable_bins]
        else:
            blocks = self.blocks
            w_mins = self.w_min
            w_maxs = self.w_max
            bin_lists = self.learnable_bins

        for idx, block in enumerate(blocks):
            w_min = w_mins[idx].clamp(max=w_maxs[idx].item() - eps)
            w_max = w_maxs[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)

            bins = bin_lists[idx].to(block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - bins)
            soft_probs = torch.softmax(dists * self.fixed_T, dim=-1)
            w_q = (soft_probs * bins).sum(dim=-1)
            w_deq = w_q * (w_max - w_min) + w_min
            q_blocks.append(w_deq)

            # Entropy penalty
            bin_mass = soft_probs.sum(dim=0)
            bin_probs = bin_mass / (bin_mass.sum() + eps)
            entropy = -(bin_probs * (bin_probs + eps).log()).sum()
            total_entropy += entropy

        if self.use_blockwise:
            padded_out = torch.zeros_like(self.padded_weight)
            for idx, (i, j) in enumerate(self.block_metadata):
                padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
            return padded_out[:self.original_shape[0], :self.original_shape[1]], total_entropy
        else:
            return q_blocks[0], total_entropy

    def export(self):
        eps = 1e-6
        q_blocks = []

        if not self.use_blockwise:
            blocks = [self.blocks[0]]
            w_mins = [self.w_min]
            w_maxs = [self.w_max]
            bin_lists = [self.learnable_bins]
        else:
            blocks = self.blocks
            w_mins = self.w_min
            w_maxs = self.w_max
            bin_lists = self.learnable_bins

        for idx, block in enumerate(blocks):
            w_min = w_mins[idx].clamp(max=w_maxs[idx].item() - eps)
            w_max = w_maxs[idx].clamp(min=w_min.item() + eps)
            w_norm = (block - w_min) / (w_max - w_min + eps)

            bins = bin_lists[idx].to(block.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - bins)
            q_idx = torch.argmax(dists, dim=-1).to(torch.int32)
            w_q = bins[q_idx]
            w_deq = w_q * (w_max - w_min) + w_min
            q_blocks.append(w_deq)

        if self.use_blockwise:
            padded_out = torch.zeros_like(self.padded_weight)
            for idx, (i, j) in enumerate(self.block_metadata):
                padded_out[i:i+self.block_size, j:j+self.block_size] = q_blocks[idx]
            return padded_out[:self.original_shape[0], :self.original_shape[1]].cpu()
        else:
            return q_blocks[0].cpu()


# === Quantization Loop for all Linear Layers ===
safetensor_dict = {}
flag = 0
for name, module in model.named_modules():
    if not isinstance(module, nn.Linear):
        continue
    # if "lm_head" in name:
    #     continue
    # if "fc1" in name:
    #     continue

    if "embed_out" in name:
        continue


    print(f"\n🔧 GPTQ + Blockwise Quantizing Layer: {name} | Shape: {module.weight.shape}")

    activation_batches = []
    def hook_fn(mod, inp, out):
        activation_batches.append(inp[0].detach())
    hook = module.register_forward_hook(hook_fn)

    with torch.no_grad():
        for x, m in zip(input_batches, mask_batches):
            model(input_ids=x, attention_mask=m)
    hook.remove()

    if not activation_batches:
        continue
    original_weight = module.weight.data.clone()
    # gptq = GPTQ(module)
    # for act in activation_batches:
    #     gptq.add_batch(act, module(act))
    # gptq.fasterquant(
    #     blocksize=BLOCK_SIZE,
    #     percdamp=0.01,
    #     group_size=128,
    #     actorder=True,
    # )
    q_weight = module.weight.data.clone()

    quant_layer = BlockwiseQuantizationOptim(original_weight,num_bits = 4, use_blockwise=True,a=0.52)#.to(DEVICE)

    optimizer = torch.optim.Adam(quant_layer.parameters(), lr=LR)
    mse_loss = nn.MSELoss()

    for it in range(NUM_ITERATIONS):
        for x, m in zip(input_batches, mask_batches):
            # CPU capture, per-batch
            activation_holder = []
            def hook_fn(mod, inp, out):
                activation_holder.append(inp[0].detach().to('cpu'))
            hook = module.register_forward_hook(hook_fn)

            with torch.no_grad():
                model(input_ids=x, attention_mask=m)
            hook.remove()

            act = activation_holder[0].to(DEVICE)
            optimizer.zero_grad()
            w_q, entropy = quant_layer()
            recon = F.linear(act, w_q)
            target = F.linear(act, original_weight)
            loss = mse_loss(recon, target) + mse_loss(original_weight, w_q)
            print(f"Iter {it+1}: Entropy={entropy.item():.4f}, Loss={loss.item():.6f}")
            loss.backward()
            optimizer.step()

            del act, recon, target, w_q, activation_holder
            torch.cuda.empty_cache()

    with torch.no_grad():
        final_weight = quant_layer.export().to(module.weight.device)
        loss = mse_loss(original_weight, final_weight)
        print("weight diff",loss)
        module.weight.copy_(final_weight)
        #safetensor_dict[name.replace(".", "_") + ".dequant"] = final_weight
    del quant_layer, optimizer, activation_batches
    torch.cuda.empty_cache()

# === Save Final Weights ===
#save_file(safetensor_dict, "quantized_blockwise_gptq.safetensors")
print("\n✅ Finished GPTQ-initialized blockwise quantization for all layers.")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


📖 Loading TinyStories from CSV...
🔠 Tokenizing TinyStories for calibration...

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.attention.query_key_value | Shape: torch.Size([7680, 2560])
Iter 1: Entropy=9166.7119, Loss=0.000596
Iter 1: Entropy=9166.1787, Loss=0.000502
Iter 1: Entropy=9165.9590, Loss=0.000441
Iter 1: Entropy=9166.7803, Loss=0.000401
Iter 1: Entropy=9168.5947, Loss=0.000371
Iter 1: Entropy=9171.1123, Loss=0.000345
Iter 1: Entropy=9174.0664, Loss=0.000320
Iter 1: Entropy=9177.1309, Loss=0.000303
Iter 2: Entropy=9180.1807, Loss=0.000285
Iter 2: Entropy=9183.1104, Loss=0.000273
Iter 2: Entropy=9185.8311, Loss=0.000259
Iter 2: Entropy=9188.2900, Loss=0.000248
Iter 2: Entropy=9190.4863, Loss=0.000236
Iter 2: Entropy=9192.3740, Loss=0.000224
Iter 2: Entropy=9193.9043, Loss=0.000213
Iter 2: Entropy=9195.1504, Loss=0.000206
weight diff tensor(2.8527e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.attention.dense | Shape: torch.Size([2560, 256

KeyboardInterrupt: 

In [1]:
# === Setup ===
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer
from safetensors.torch import save_file
#from gptq import GPTQ
import math

# === CONFIG ===
MODEL_NAME = "facebook/opt-125m"
MODEL_NAME = "databricks/dolly-v2-3b"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 32
N_BATCHES = 8
SEQ_LEN = 128
NUM_BITS = 3
BLOCK_SIZE = 128
FIXED_T = 1000.0
LR = 0.001
NUM_ITERATIONS = 0

# === Load model and tokenizer ===
#model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    device_map="auto",                # Automatically split layers across available GPUs/CPU
    torch_dtype="float32",               # Use float16 where possible
    low_cpu_mem_usage=True            # Efficient weight loading
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
# === Calibration Setup using TinyStories CSV ===
import pandas as pd

CSV_PATH = "validation.csv"        # Path to your TinyStories CSV
TEXT_COLUMN = "text"               # Column containing stories
N_CALIB_SAMPLES = 1000              # Number of samples to use

# Load and preprocess CSV
print("📖 Loading TinyStories from CSV...")
df = pd.read_csv(CSV_PATH)
assert TEXT_COLUMN in df.columns, f"'{TEXT_COLUMN}' column not found in CSV."
texts = df[TEXT_COLUMN].dropna().tolist()[:BATCH_SIZE * N_BATCHES]

# Tokenize
print("🔠 Tokenizing TinyStories for calibration...")
encodings = tokenizer(
    texts,
    padding="max_length",
    truncation=True,
    max_length=SEQ_LEN,
    return_tensors="pt"
)
input_ids = encodings["input_ids"].to(DEVICE)
attention_mask = encodings["attention_mask"].to(DEVICE)
input_batches = input_ids.split(BATCH_SIZE)
mask_batches = attention_mask.split(BATCH_SIZE)
def get_power_bins(a=0.5, num_bits=4, device="cpu"):
    q_levels = 2 ** num_bits
    lin = torch.linspace(0, 1, q_levels, device=device)
    scaled = (lin ** (1 / a)) * 0.5
    bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
    return bins
class ChannelwiseQuantizationOptim(nn.Module):
    def __init__(self, weight, num_bits=4, fixed_T=100.0, a=0.5):
        super().__init__()
        self.num_bits = num_bits
        self.fixed_T = fixed_T
        self.a = a
        self.original_shape = weight.shape
        self.num_levels = 2 ** num_bits

        # Store rows (channels)
        self.rows = [weight[i, :].detach() for i in range(weight.shape[0])]

        # Create one min, max, and learnable bin set per channel
        self.w_min = nn.ParameterList()
        self.w_max = nn.ParameterList()
        self.learnable_bins = nn.ParameterList()
        for row in self.rows:
            w_min, w_max = row.min(), row.max()
            pad = 0.05 * (w_max - w_min)
            self.w_min.append(nn.Parameter((w_min - pad).view(1)))
            self.w_max.append(nn.Parameter((w_max + pad).view(1)))
            init_bins = self._get_power_bins().detach()
            self.learnable_bins.append(nn.Parameter(init_bins.clone()))

    def _get_power_bins(self):
        lin = torch.linspace(0, 1, self.num_levels)
        scaled = (lin ** (1 / self.a)) * 0.5
        bins = 0.5 + torch.cat([-scaled.flip(0), scaled[1:]])
        return bins

    def forward(self):
        eps = 1e-6
        quantized_rows = []
        total_entropy = 0.0

        for idx, row in enumerate(self.rows):
            w_min = self.w_min[idx].clamp(max=self.w_max[idx].item() - eps)
            w_max = self.w_max[idx].clamp(min=w_min.item() + eps)
            w_norm = (row - w_min) / (w_max - w_min + eps)

            bins = self.learnable_bins[idx].to(row.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - bins)
            soft_probs = torch.softmax(dists * self.fixed_T, dim=-1)
            w_q = (soft_probs * bins).sum(dim=-1)
            w_deq = w_q * (w_max - w_min) + w_min
            quantized_rows.append(w_deq)

            bin_mass = soft_probs.sum(dim=0)
            bin_probs = bin_mass / (bin_mass.sum() + eps)
            entropy = -(bin_probs * (bin_probs + eps).log()).sum()
            total_entropy += entropy

        return torch.stack(quantized_rows, dim=0), total_entropy

    def export(self):
        eps = 1e-6
        quantized_rows = []

        for idx, row in enumerate(self.rows):
            w_min = self.w_min[idx].clamp(max=self.w_max[idx].item() - eps)
            w_max = self.w_max[idx].clamp(min=w_min.item() + eps)
            w_norm = (row - w_min) / (w_max - w_min + eps)

            bins = self.learnable_bins[idx].to(row.device)
            dists = -torch.abs(w_norm.unsqueeze(-1) - bins)
            q_idx = torch.argmax(dists, dim=-1).to(torch.int32)
            w_q = bins[q_idx]
            w_deq = w_q * (w_max - w_min) + w_min
            quantized_rows.append(w_deq)

        return torch.stack(quantized_rows, dim=0).cpu()



# === Quantization Loop for all Linear Layers ===
safetensor_dict = {}
flag = 0
for name, module in model.named_modules():
    if not isinstance(module, nn.Linear):
        continue
    # if "lm_head" in name:
    #     continue
    # if "fc1" in name:
    #     continue

    if "embed_out" in name:
        continue


    print(f"\n🔧 GPTQ + Blockwise Quantizing Layer: {name} | Shape: {module.weight.shape}")

    activation_batches = []
    def hook_fn(mod, inp, out):
        activation_batches.append(inp[0].detach())
    hook = module.register_forward_hook(hook_fn)

    with torch.no_grad():
        for x, m in zip(input_batches, mask_batches):
            model(input_ids=x, attention_mask=m)
    hook.remove()

    if not activation_batches:
        continue
    original_weight = module.weight.data.clone()
    # gptq = GPTQ(module)
    # for act in activation_batches:
    #     gptq.add_batch(act, module(act))
    # gptq.fasterquant(
    #     blocksize=BLOCK_SIZE,
    #     percdamp=0.01,
    #     group_size=128,
    #     actorder=True,
    # )
    q_weight = module.weight.data.clone()

    quant_layer = ChannelwiseQuantizationOptim(original_weight,num_bits = 3,a=0.52)#.to(DEVICE)

    optimizer = torch.optim.Adam(quant_layer.parameters(), lr=LR)
    mse_loss = nn.MSELoss()

    for it in range(NUM_ITERATIONS):
        for x, m in zip(input_batches, mask_batches):
            # CPU capture, per-batch
            activation_holder = []
            def hook_fn(mod, inp, out):
                activation_holder.append(inp[0].detach().to('cpu'))
            hook = module.register_forward_hook(hook_fn)

            with torch.no_grad():
                model(input_ids=x, attention_mask=m)
            hook.remove()

            act = activation_holder[0].to(DEVICE)
            optimizer.zero_grad()
            w_q, entropy = quant_layer()
            recon = F.linear(act, w_q)
            target = F.linear(act, original_weight)
            loss = mse_loss(recon, target) + mse_loss(original_weight, w_q)
            print(f"Iter {it+1}: Entropy={entropy.item():.4f}, Loss={loss.item():.6f}")
            loss.backward()
            optimizer.step()

            del act, recon, target, w_q, activation_holder
            torch.cuda.empty_cache()

    with torch.no_grad():
        final_weight = quant_layer.export().to(module.weight.device)
        loss = mse_loss(original_weight, final_weight)
        print("weight diff",loss)
        module.weight.copy_(final_weight)
        #safetensor_dict[name.replace(".", "_") + ".dequant"] = final_weight
    del quant_layer, optimizer, activation_batches
    torch.cuda.empty_cache()

# === Save Final Weights ===
#save_file(safetensor_dict, "quantized_blockwise_gptq.safetensors")
print("\n✅ Finished GPTQ-initialized blockwise quantization for all layers.")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


📖 Loading TinyStories from CSV...
🔠 Tokenizing TinyStories for calibration...

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.attention.query_key_value | Shape: torch.Size([7680, 2560])
weight diff tensor(8.2316e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.attention.dense | Shape: torch.Size([2560, 2560])
weight diff tensor(4.1550e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.mlp.dense_h_to_4h | Shape: torch.Size([10240, 2560])
weight diff tensor(6.8068e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.0.mlp.dense_4h_to_h | Shape: torch.Size([2560, 10240])
weight diff tensor(3.9587e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.1.attention.query_key_value | Shape: torch.Size([7680, 2560])
weight diff tensor(9.8122e-06, device='cuda:0')

🔧 GPTQ + Blockwise Quantizing Layer: gpt_neox.layers.1.attention.dense | Shape: torch.Size([2560, 2560])
weight diff tensor(4

In [24]:
#test_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(DEVICE).eval()
# ==== Test quantized model ====
#model.eval()
prompt = "I like travelling"
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)

with torch.no_grad():
    output = model.generate(**inputs, max_length=30)

print("Sample Output:", tokenizer.decode(output[0], skip_special_tokens=True))

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


Sample Output: I like travelling with a companion, and I like to be alone at times. I'm a very social person, but I need alone time to recharge
