In [1]:
import torch
from transformers import BertForSequenceClassification, BertTokenizer
from transformers import AutoModelForSequenceClassification
from datasets import load_dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch
import re
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification
from datasets import load_dataset
from torch.utils.data import DataLoader
import torch.nn as nn
from sklearn.cluster import KMeans

import warnings
warnings.filterwarnings("ignore")

model = AutoModelForSequenceClassification.from_pretrained("training/output_bert")
tokenizer = BertTokenizer.from_pretrained('training/output_bert')

if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")


model = model.to(device)


Using device: mps


In [2]:
# torch.save(model.state_dict(), "new_quantization/original_bert.pth")

In [3]:
# 1. Configuration: model and dataset
model_name = "bert-base-uncased"
dataset_name = "glue"
dataset_config = "sst2"
split = "validation"  # SST-2 test split is unlabeled, so we use validation

In [None]:
# Create a custom model class that replicates the quantized structure
import os
import json
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer

class QuantizedLinear(nn.Module):
    def __init__(self, in_features, out_features, bit_precision=8):
        super(QuantizedLinear, self).__init__()
        self.base_layer = nn.Linear(in_features, out_features)
        self.bit_precision = bit_precision
        
    def forward(self, input_tensor):
        return self.base_layer(input_tensor)
        
    def quantize(self, tensor, bits):
        # Quantization logic (not needed for loading)
        return tensor

# Function to create a model with the same structure as the quantized model
def create_quantized_model_structure(config, layer_precision_map=None):
    # Start with a standard model
    model = AutoModelForSequenceClassification.from_config(config)
    
    # Replace linear layers with QuantizedLinear layers
    for name, module in list(model.named_modules()):
        if isinstance(module, nn.Linear):
            parent_name = name.rsplit('.', 1)[0] if '.' in name else ''
            layer_name = name.rsplit('.', 1)[1] if '.' in name else name
            
            # Extract layer number if it exists
            layer_num = None
            if 'layer.' in parent_name:
                try:
                    layer_num = int(parent_name.split('layer.')[1].split('.')[0])
                except:
                    pass
            
            # Get precision for this layer
            bits = 8  # Default
            if layer_precision_map and layer_num is not None:
                bits = layer_precision_map.get(layer_num, 8)
            
            # Create quantized layer
            in_features = module.in_features
            out_features = module.out_features
            
            # Get parent module
            parent = model
            if parent_name:
                for part in parent_name.split('.'):
                    parent = getattr(parent, part)
            
            # Replace the layer
            quant_layer = QuantizedLinear(in_features, out_features, bits)
            setattr(parent, layer_name, quant_layer)
    
    return model

def load_quantized_model(model_dir="pi_bert_new_method"):
    """
    Load a quantized model from the given directory.
    """
    # Load model configuration
    config_path = os.path.join(model_dir, "model_config", "config.json")
    config = AutoConfig.from_pretrained(config_path)
    
    # Load layer precision map if available
    layer_precision_map = {}
    precision_map_path = os.path.join(model_dir, "layer_precision_map.json")
    if os.path.exists(precision_map_path):
        with open(precision_map_path, "r") as f:
            layer_precision_map = json.load(f)
            # Convert string keys back to integers
            layer_precision_map = {int(k): v for k, v in layer_precision_map.items()}
    
    # Create model with quantized structure
    model = create_quantized_model_structure(config, layer_precision_map)
    
    # Load the saved state dict
    model_path = os.path.join(model_dir, "quantized_model.pth")
    model.load_state_dict(torch.load(model_path))
    
    # Load tokenizer
    tokenizer_dir = os.path.join(model_dir, "tokenizer")
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)
    
    # Move model to the appropriate device
    if torch.backends.mps.is_available():
        device = torch.device("mps")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    model = model.to(device)
    
    return model, tokenizer, layer_precision_map

# Test loading the model
try:
    loaded_model, loaded_tokenizer, loaded_precision_map = load_quantized_model()
    print("Model loaded successfully!")
    print(f"Layer precision map: {loaded_precision_map}")
    
    # Test a prediction with the loaded model
    def predict_with_loaded_model(text, model=loaded_model, tokenizer=loaded_tokenizer):
        # Get the device the model is on
        device = next(model.parameters()).device
        
        inputs = tokenizer(text, return_tensors="pt")
        inputs = {k: v.to(device) for k, v in inputs.items()}
        
        with torch.no_grad():
            outputs = model(**inputs)
        
        scores = torch.nn.functional.softmax(outputs.logits, dim=-1)
        pred_idx = torch.argmax(scores, dim=-1).item()
        confidence = scores[0][pred_idx].item()
        label = "Positive" if pred_idx == 1 else "Negative"
        
        return label, confidence
    
    # Test with the same example as before
    test_text = "This movie was fantastic! I really enjoyed it."
    label, confidence = predict_with_loaded_model(test_text)
    print(f"Prediction: {label} (confidence: {confidence:.4f})")
except Exception as e:
    print(f"Error loading model: {e}")


In [None]:
# Create a standalone script for loading the quantized model
standalone_script = '''
import os
import json
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer

class QuantizedLinear(nn.Module):
    """
    Custom module that wraps a linear layer for quantization.
    This structure matches how the model was saved.
    """
    def __init__(self, in_features, out_features, bit_precision=8):
        super(QuantizedLinear, self).__init__()
        self.base_layer = nn.Linear(in_features, out_features)
        self.bit_precision = bit_precision
        
    def forward(self, input_tensor):
        return self.base_layer(input_tensor)
        
    def quantize(self, tensor, bits):
        # Quantization logic (not needed for loading)
        return tensor

def create_quantized_model_structure(config, layer_precision_map=None):
    """
    Create a model with the same structure as the quantized model.
    
    Args:
        config: Model configuration
        layer_precision_map: Dictionary mapping layer numbers to bit precision
        
    Returns:
        Model with quantized structure
    """
    # Start with a standard model
    model = AutoModelForSequenceClassification.from_config(config)
    
    # Replace linear layers with QuantizedLinear layers
    for name, module in list(model.named_modules()):
        if isinstance(module, nn.Linear):
            parent_name = name.rsplit('.', 1)[0] if '.' in name else ''
            layer_name = name.rsplit('.', 1)[1] if '.' in name else name
            
            # Extract layer number if it exists
            layer_num = None
            if 'layer.' in parent_name:
                try:
                    layer_num = int(parent_name.split('layer.')[1].split('.')[0])
                except:
                    pass
            
            # Get precision for this layer
            bits = 8  # Default
            if layer_precision_map and layer_num is not None:
                bits = layer_precision_map.get(layer_num, 8)
            
            # Create quantized layer
            in_features = module.in_features
            out_features = module.out_features
            
            # Get parent module
            parent = model
            if parent_name:
                for part in parent_name.split('.'):
                    parent = getattr(parent, part)
            
            # Replace the layer
            quant_layer = QuantizedLinear(in_features, out_features, bits)
            setattr(parent, layer_name, quant_layer)
    
    return model

def load_quantized_model(model_dir="pi_bert"):
    """
    Load a quantized model from the given directory.
    
    Args:
        model_dir (str): Directory containing the saved model
        
    Returns:
        tuple: (model, tokenizer, layer_precision_map)
    """
    # Load model configuration
    config_path = os.path.join(model_dir, "model_config", "config.json")
    config = AutoConfig.from_pretrained(config_path)
    
    # Load layer precision map if available
    layer_precision_map = {}
    precision_map_path = os.path.join(model_dir, "layer_precision_map.json")
    if os.path.exists(precision_map_path):
        with open(precision_map_path, "r") as f:
            layer_precision_map = json.load(f)
            # Convert string keys back to integers
            layer_precision_map = {int(k): v for k, v in layer_precision_map.items()}
    
    # Create model with quantized structure
    model = create_quantized_model_structure(config, layer_precision_map)
    
    # Load the saved state dict
    model_path = os.path.join(model_dir, "quantized_model.pth")
    model.load_state_dict(torch.load(model_path))
    
    # Load tokenizer
    tokenizer_dir = os.path.join(model_dir, "tokenizer")
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)
    
    # Move model to the appropriate device
    if torch.backends.mps.is_available():
        device = torch.device("mps")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    model = model.to(device)
    
    return model, tokenizer, layer_precision_map

def predict(text, model, tokenizer):
    """
    Make a sentiment prediction using the loaded model.
    
    Args:
        text (str): Input text to classify
        model: The loaded model
        tokenizer: The loaded tokenizer
        
    Returns:
        tuple: (label, confidence)
    """
    # Get the device the model is on
    device = next(model.parameters()).device
    
    inputs = tokenizer(text, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    scores = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_idx = torch.argmax(scores, dim=-1).item()
    confidence = scores[0][pred_idx].item()
    label = "Positive" if pred_idx == 1 else "Negative"
    
    return label, confidence

if __name__ == "__main__":
    # Example usage
    try:
        model, tokenizer, precision_map = load_quantized_model()
        print("Model loaded successfully!")
        
        # Test with some examples
        examples = [
            "This movie was fantastic! I really enjoyed it.",
            "The acting was terrible and the plot made no sense.",
            "It was okay, but I wouldn't watch it again."
        ]
        
        print("\\nMaking predictions with the quantized model:")
        for text in examples:
            label, confidence = predict(text, model, tokenizer)
            print(f"Text: {text}")
            print(f"Prediction: {label} (confidence: {confidence:.4f})")
            print("-" * 50)
    except Exception as e:
        print(f"Error loading model: {e}")
'''

# Write the standalone script to a file
with open("load_quantized_model.py", "w") as f:
    f.write(standalone_script)

print("Standalone script created: load_quantized_model.py")
print("You can now use this script to load and use the quantized model from scratch.")
print("\nExample usage:")
print("```python")
print("from load_quantized_model import load_quantized_model, predict")
print("")
print("# Load the model")
print("model, tokenizer, _ = load_quantized_model()")
print("")
print("# Make a prediction")
print("text = \"This movie was fantastic!\"")
print("label, confidence = predict(text, model, tokenizer)")
print("print(f\"Prediction: {label} (confidence: {confidence:.4f})\")")
print("```")


In [4]:
def tokenize_function(examples):
    return tokenizer(
        examples['sentence'],
        padding="max_length",
        truncation=True,
        max_length=128
    )

In [None]:
# Create a proper standalone script for loading the quantized model
proper_script = '''
import os
import json
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer
from collections import OrderedDict

def load_quantized_model(model_dir="pi_bert_new_method"):
    """
    Load a quantized model from the given directory.
    
    Args:
        model_dir (str): Directory containing the saved model
        
    Returns:
        tuple: (model, tokenizer, layer_precision_map)
    """
    # Load model configuration
    config_path = os.path.join(model_dir, "model_config", "config.json")
    config = AutoConfig.from_pretrained(config_path)
    
    # Create a model with the same architecture as the original
    model = AutoModelForSequenceClassification.from_pretrained("training/output_bert")
    
    # Load the quantized state dict
    model_path = os.path.join(model_dir, "quantized_model.pth")
    state_dict = torch.load(model_path)
    
    # Process the state dict to match the model structure
    # This handles the case where the saved model has different layer structure
    # (e.g., base_layer components) than the loaded model
    new_state_dict = OrderedDict()
    
    for key, value in state_dict.items():
        if ".base_layer." in key:
            # Remove the base_layer part from keys
            new_key = key.replace(".base_layer", "")
            new_state_dict[new_key] = value
        else:
            new_state_dict[key] = value
    
    # Load the processed state dict into the model
    model.load_state_dict(new_state_dict, strict=False)
    
    # Load layer precision map if available
    layer_precision_map = {}
    precision_map_path = os.path.join(model_dir, "layer_precision_map.json")
    if os.path.exists(precision_map_path):
        with open(precision_map_path, "r") as f:
            layer_precision_map = json.load(f)
            # Convert string keys back to integers
            layer_precision_map = {int(k): v for k, v in layer_precision_map.items()}
    
    # Load tokenizer
    tokenizer_dir = os.path.join(model_dir, "tokenizer")
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)
    
    # Move model to the appropriate device
    if torch.backends.mps.is_available():
        device = torch.device("mps")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    model = model.to(device)
    
    return model, tokenizer, layer_precision_map

def predict(text, model, tokenizer):
    """
    Make a sentiment prediction using the loaded model.
    
    Args:
        text (str): Input text to classify
        model: The loaded model
        tokenizer: The loaded tokenizer
        
    Returns:
        tuple: (label, confidence)
    """
    # Get the device the model is on
    device = next(model.parameters()).device
    
    inputs = tokenizer(text, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    scores = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_idx = torch.argmax(scores, dim=-1).item()
    confidence = scores[0][pred_idx].item()
    label = "Positive" if pred_idx == 1 else "Negative"
    
    return label, confidence

if __name__ == "__main__":
    # Example usage
    try:
        model, tokenizer, precision_map = load_quantized_model()
        print("Model loaded successfully!")
        print(f"Layer precision map: {precision_map}")
        
        # Test with some examples
        examples = [
            "This movie was fantastic! I really enjoyed it.",
            "The acting was terrible and the plot made no sense.",
            "It was okay, but I wouldn\'t watch it again."
        ]
        
        print("\\nMaking predictions with the quantized model:")
        for text in examples:
            label, confidence = predict(text, model, tokenizer)
            print(f"Text: {text}")
            print(f"Prediction: {label} (confidence: {confidence:.4f})")
            print("-" * 50)
    except Exception as e:
        print(f"Error loading model: {e}")
'''

# Write the proper standalone script to a file
with open("load_quantized_model_proper.py", "w") as f:
    f.write(proper_script)

print("Proper standalone script created: load_quantized_model_proper.py")
print("This script properly loads the quantized weights and handles the base_layer structure.")
print("\nExample usage:")
print("```python")
print("from load_quantized_model_proper import load_quantized_model, predict")
print("")
print("# Load the model")
print("model, tokenizer, precision_map = load_quantized_model()")
print("")
print("# Make a prediction")
print("text = \"This movie was fantastic!\"")
print("label, confidence = predict(text, model, tokenizer)")
print("print(f\"Prediction: {label} (confidence: {confidence:.4f})\")")
print("```")


In [None]:
# Test the proper loading script
import os
import json
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer
from collections import OrderedDict

def load_quantized_model_test(model_dir="pi_bert_new_method"):
    """
    Load a quantized model from the given directory.
    
    Args:
        model_dir (str): Directory containing the saved model
        
    Returns:
        tuple: (model, tokenizer, layer_precision_map)
    """
    # Load model configuration
    config_path = os.path.join(model_dir, "model_config", "config.json")
    config = AutoConfig.from_pretrained(config_path)
    
    # Create a model with the same architecture as the original
    model = AutoModelForSequenceClassification.from_pretrained("training/output_bert")
    
    # Load the quantized state dict
    model_path = os.path.join(model_dir, "quantized_model.pth")
    state_dict = torch.load(model_path)
    
    # Process the state dict to match the model structure
    # This handles the case where the saved model has different layer structure
    # (e.g., base_layer components) than the loaded model
    new_state_dict = OrderedDict()
    
    for key, value in state_dict.items():
        if ".base_layer." in key:
            # Remove the base_layer part from keys
            new_key = key.replace(".base_layer", "")
            new_state_dict[new_key] = value
        else:
            new_state_dict[key] = value
    
    # Load the processed state dict into the model
    model.load_state_dict(new_state_dict, strict=False)
    
    # Load layer precision map if available
    layer_precision_map = {}
    precision_map_path = os.path.join(model_dir, "layer_precision_map.json")
    if os.path.exists(precision_map_path):
        with open(precision_map_path, "r") as f:
            layer_precision_map = json.load(f)
            # Convert string keys back to integers
            layer_precision_map = {int(k): v for k, v in layer_precision_map.items()}
    
    # Load tokenizer
    tokenizer_dir = os.path.join(model_dir, "tokenizer")
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)
    
    # Move model to the appropriate device
    if torch.backends.mps.is_available():
        device = torch.device("mps")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    model = model.to(device)
    
    return model, tokenizer, layer_precision_map

def predict_test(text, model, tokenizer):
    """
    Make a sentiment prediction using the loaded model.
    """
    # Get the device the model is on
    device = next(model.parameters()).device
    
    inputs = tokenizer(text, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    scores = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_idx = torch.argmax(scores, dim=-1).item()
    confidence = scores[0][pred_idx].item()
    label = "Positive" if pred_idx == 1 else "Negative"
    
    return label, confidence

# Test the loading function
try:
    print("Testing the proper loading function...")
    model, tokenizer, precision_map = load_quantized_model_test()
    print("Model loaded successfully!")
    print(f"Layer precision map: {precision_map}")
    
    # Test with an example
    test_text = "This movie was fantastic! I really enjoyed it."
    label, confidence = predict_test(test_text, model, tokenizer)
    print(f"Prediction: {label} (confidence: {confidence:.4f})")
except Exception as e:
    print(f"Error loading model: {e}")


In [5]:
# 5. Load and tokenize the SST-2 dataset
dataset = load_dataset(dataset_name, dataset_config, split=f"{split}")  # first 10% for speed
dataset = dataset.map(tokenize_function, batched=True)
dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

# 6. DataLoader
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

In [6]:
# Function to introduce sparsity to a given layer
def introduce_layer_sparsity(layer, sparsity_ratio):
    """Apply sparsity to layer parameters by zeroing out values below threshold."""
    with torch.no_grad():
        for param in layer.parameters():
            flat_param = param.data.view(-1)
            threshold = torch.quantile(torch.abs(flat_param), sparsity_ratio)
            mask = torch.abs(flat_param) > threshold
            param.data *= mask.float().view(param.data.shape)

# Function to evaluate the model's accuracy
def calculate_model_accuracy(model):
    """Evaluate model accuracy on the dataset."""
    model.eval()
    correct_predictions = 0
    total_samples = 0
    
    for batch in tqdm(dataloader, desc="Evaluating"):
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'label'}
        labels = batch['label'].to(device)
        
        with torch.no_grad():
            outputs = model(**inputs)
            predictions = torch.argmax(outputs.logits, dim=-1)
            correct_predictions += (predictions == labels).sum().item()
            total_samples += labels.size(0)
            
    return correct_predictions / total_samples

In [7]:
layer_sensitivity = {}
base_accuracy = calculate_model_accuracy(model)
print(f"Base Accuracy: {base_accuracy}")

Evaluating: 100%|██████████| 872/872 [00:11<00:00, 73.51it/s]

Base Accuracy: 0.9277522935779816





In [8]:
sparsity_ratio = 0.3
sensitivities_values = []
for layer_num, layer in enumerate(model.bert.encoder.layer):
    layer_name = f"Encoder_Layer {layer_num}"
    exported_model = AutoModelForSequenceClassification.from_pretrained("training/output_bert").to(device)
    exported_model.load_state_dict(model.state_dict())

    introduce_layer_sparsity(exported_model.bert.encoder.layer[layer_num], sparsity_ratio)
    accuracy = calculate_model_accuracy(exported_model)
    sensitivity_value = base_accuracy - accuracy
    sensitivities_values.append(sensitivity_value)
    del exported_model
    
    if torch.backends.mps.is_available():
        torch.mps.empty_cache()
    else:
        torch.cuda.empty_cache()


Evaluating: 100%|██████████| 872/872 [00:12<00:00, 71.06it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 78.88it/s]
Evaluating: 100%|██████████| 872/872 [00:12<00:00, 71.85it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 76.81it/s]
Evaluating: 100%|██████████| 872/872 [00:10<00:00, 80.99it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 78.89it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 79.04it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 77.36it/s]
Evaluating: 100%|██████████| 872/872 [00:10<00:00, 81.27it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 78.85it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 73.16it/s]
Evaluating: 100%|██████████| 872/872 [00:11<00:00, 76.96it/s]


In [9]:
layer_sensitivity_values = {}

for index, layer_value in enumerate(sensitivities_values):
    layer_sensitivity_values["layer_"+str(index)] = layer_value

print(layer_sensitivity_values)

{'layer_0': 0.022935779816513735, 'layer_1': 0.011467889908256867, 'layer_2': 0.013761467889908174, 'layer_3': 0.04816513761467889, 'layer_4': 0.04013761467889909, 'layer_5': 0.016055045871559592, 'layer_6': 0.010321100917431103, 'layer_7': 0.002293577981651307, 'layer_8': 0.008027522935779796, 'layer_9': 0.002293577981651307, 'layer_10': 0.0, 'layer_11': 0.0011467889908256534}


In [10]:
class QuantizedLinear(nn.Module):
    def __init__(self, base_layer, bit_precision):
        super(QuantizedLinear, self).__init__()
        self.base_layer = base_layer
        self.bit_precision = bit_precision
        self.stored_weight = base_layer.weight.detach().clone()
    def calculate_memory_reduction(self):
        full_memory = self.stored_weight.nelement() * 32  # Assuming original weights are 32-bit floats
        reduced_memory = self.stored_weight.nelement() * self.bit_precision
        savings_percentage = 100 * (1 - reduced_memory / full_memory)
        memory_ratio = full_memory / reduced_memory
        return full_memory, reduced_memory, savings_percentage, memory_ratio

    def quantize(self, tensor, bits):
        lower_bound = -(2 ** (bits - 1))
        upper_bound = (2 ** (bits - 1)) - 1
        tensor_min, tensor_max = tensor.min(), tensor.max()
        scaling_factor = (tensor_max - tensor_min) / (upper_bound - lower_bound)
        scaling_factor = max(scaling_factor, 1e-8)
        offset = lower_bound - tensor_min / scaling_factor
        quantized_tensor = torch.round(tensor / scaling_factor + offset)
        quantized_tensor.clamp_(lower_bound, upper_bound)
        quantized_tensor = (quantized_tensor - offset) * scaling_factor
        return quantized_tensor

    def forward(self, input_tensor):
        processed_weight = self.quantize(self.base_layer.weight, self.bit_precision)
        self.base_layer.weight = nn.Parameter(processed_weight)
        result = self.base_layer(input_tensor)
        self.base_layer.weight = nn.Parameter(self.stored_weight)
        return result

In [11]:
def quantize_model_layers(model, precision_map):
    """
    Quantize model layers based on precision map and calculate memory savings.
    
    Args:
        model: The PyTorch model to quantize
        precision_map: Dictionary mapping layer numbers to bit precision
        
    Returns:
        tuple: (quantized_model, original_memory, quantized_memory, compression_ratio)
    """
    total_original_mem = 0
    total_quantized_mem = 0
    
    # Collect all layers first to avoid OrderedDict mutation issues
    all_modules = []
    for idx, (name, module) in enumerate(model.named_modules()):
        all_modules.append((idx, name, module))
    
    # Process each module
    for _, name, module in all_modules:
        # Extract layer number from name using regex
        layer_num_match = re.findall(r'\d+', name)
        layer_num = int(layer_num_match[0]) if layer_num_match else 0
        
        # Get precision for this layer (default to 8-bit if not specified)
        bits = precision_map.get(layer_num, 8)
        
        # Handle different module types
        if isinstance(module, nn.Linear):
            # Quantize linear layer
            quant_layer = QuantizedLinear(module, bits)
            setattr(model, name, quant_layer)
            model._modules[name] = quant_layer
            
            # Calculate memory usage
            orig_mem, quant_mem, _, compression_ratio = quant_layer.calculate_memory_reduction()
            total_original_mem += orig_mem
            total_quantized_mem += quant_mem
            
        elif isinstance(module, nn.LayerNorm):
            # Quantize layer normalization
            quant_layer = QuantizedLinear(module, bits)
            setattr(model, name, quant_layer)
            model._modules[name] = quant_layer
            
            # Calculate memory usage
            orig_mem, quant_mem, _, compression_ratio = quant_layer.calculate_memory_reduction()
            total_original_mem += orig_mem
            total_quantized_mem += quant_mem
            
        elif isinstance(module, nn.MultiheadAttention):
            # Quantize attention components
            for component_name in ['in_proj_weight', 'in_proj_bias', 'out_proj.weight', 'out_proj.bias']:
                param = getattr(module, component_name, None)
                if param is not None:
                    quant_layer = QuantizedLinear(param, bits)
                    setattr(module, component_name, quant_layer.param)
                    
                    # Calculate memory usage
                    orig_mem = param.nelement() * 32  # FP32
                    quant_mem = param.nelement() * bits
                    total_original_mem += orig_mem
                    total_quantized_mem += quant_mem
                    
    total_orig_mem_mb = total_original_mem / (8 * 1024 * 1024)
    total_quant_mem_mb = total_quantized_mem / (8 * 1024 * 1024)

    return model, total_orig_mem_mb, total_quant_mem_mb, compression_ratio


In [12]:
calculate_model_accuracy(model)

Evaluating: 100%|██████████| 872/872 [00:11<00:00, 77.90it/s]


0.9277522935779816

In [13]:
sensitivities = np.array(list(layer_sensitivity_values.values())).reshape(-1, 1)
kmeans = KMeans(n_clusters=3, random_state=0).fit(sensitivities)
clusters = kmeans.labels_

In [14]:
clusters

array([0, 0, 0, 1, 1, 0, 0, 2, 0, 2, 2, 2], dtype=int32)

In [15]:
sensitivities = np.array(list(layer_sensitivity_values.values())).reshape(-1, 1)
kmeans = KMeans(n_clusters=3, random_state=0).fit(sensitivities)
clusters = kmeans.labels_

cluster_means = {}
for cluster_id in range(3):
    cluster_indices = np.where(clusters == cluster_id)[0]
    cluster_sensitivities = [list(layer_sensitivity_values.values())[i] for i in cluster_indices]
    cluster_means[cluster_id] = np.mean(cluster_sensitivities)

sorted_clusters = sorted(cluster_means.items(), key=lambda x: x[1], reverse=True)
most_sensitive_cluster = sorted_clusters[0][0]
medium_sensitive_cluster = sorted_clusters[1][0]
least_sensitive_cluster = sorted_clusters[2][0]

layer_precision_map = {}
for i, (layer_name, sensitivity) in enumerate(layer_sensitivity_values.items()):
    if clusters[i] == most_sensitive_cluster:
        layer_precision_map[i] = 12  # Highest precision
    elif clusters[i] == medium_sensitive_cluster:
        layer_precision_map[i] = 8   # Medium precision
    else:  # least sensitive cluster
        layer_precision_map[i] = 4   # Lowest precision

layer_precision_map

{0: 8, 1: 8, 2: 8, 3: 12, 4: 12, 5: 8, 6: 8, 7: 4, 8: 8, 9: 4, 10: 4, 11: 4}

In [16]:
quantized_model, total_orig_memory, total_quant_memory, compression_ratio = quantize_model_layers(model, layer_precision_map)

In [17]:
# Convert from bits to megabytes (8 bits = 1 byte, 1024*1024 bytes = 1 MB)
total_orig_mem_mb = total_orig_memory 
total_quant_mem_mb = total_quant_memory
print(f"Original memory: {total_orig_mem_mb:.2f} MB")
print(f"Quantized memory: {total_quant_mem_mb:.2f} MB")
print(f"Compression ratio: {compression_ratio:.2f}x")

Original memory: 326.33 MB
Quantized memory: 74.83 MB
Compression ratio: 4.00x


In [18]:
calculate_model_accuracy(quantized_model)

Evaluating: 100%|██████████| 872/872 [00:12<00:00, 70.82it/s]


0.9277522935779816

In [19]:
total_reduction_percent = 100 * (1 - total_quant_mem_mb / total_orig_mem_mb)
print(f"Total Memory Reduction: Original Memory = {total_orig_mem_mb} MB, Quantized Memory = {total_quant_mem_mb} MB, Reduction = {total_reduction_percent:.2f}%")
print(f"Compression ratio = {compression_ratio:.2f}")

Total Memory Reduction: Original Memory = 326.3291015625 MB, Quantized Memory = 74.830810546875 MB, Reduction = 77.07%
Compression ratio = 4.00


In [20]:
import os
import pathlib

# Create directory if it doesn't exist
if not os.path.exists("new_quantization"):
    os.mkdir("new_quantization")
    
torch.save(quantized_model.state_dict(), "new_quantization/quantized_bert.pth")

In [21]:
torch.save(model.state_dict(), "new_quantization/original_bert.pth")

In [22]:
def predict(text):
    inputs = tokenizer(text, return_tensors="pt")
    # Move inputs to the same device as the model
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = quantized_model(**inputs)
    scores = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_idx = torch.argmax(scores, dim=-1).item()
    confidence = scores[0][pred_idx].item()
    label = "Positive" if pred_idx == 1 else "Negative"

    return label, confidence

text1  = "This movie was fantastic! I really enjoyed it."
predict(text1)

('Positive', 0.9999507665634155)

In [23]:
# Save the model, tokenizer, and configuration for easy loading
import os
import json

# Create directory structure for the quantized model
save_dir = "pi_bert_new_method"
model_config_dir = os.path.join(save_dir, "model_config")
tokenizer_dir = os.path.join(save_dir, "tokenizer")

# Create directories if they don't exist
os.makedirs(model_config_dir, exist_ok=True)
os.makedirs(tokenizer_dir, exist_ok=True)

# Save the model state dict
torch.save(quantized_model.state_dict(), os.path.join(save_dir, "quantized_model.pth"))

# Save the model configuration
model_config = model.config.to_dict()
with open(os.path.join(model_config_dir, "config.json"), "w") as f:
    json.dump(model_config, f, indent=2)

# Save tokenizer files
tokenizer.save_pretrained(tokenizer_dir)

# Save layer precision map for reference
with open(os.path.join(save_dir, "layer_precision_map.json"), "w") as f:
    json.dump({str(k): v for k, v in layer_precision_map.items()}, f, indent=2)

print(f"Model saved to {save_dir}/")
print(f"- Model weights: {save_dir}/quantized_model.pth")
print(f"- Model config: {save_dir}/model_config/config.json")
print(f"- Tokenizer: {save_dir}/tokenizer/")
print(f"- Layer precision map: {save_dir}/layer_precision_map.json")


Model saved to pi_bert_new_method/
- Model weights: pi_bert_new_method/quantized_model.pth
- Model config: pi_bert_new_method/model_config/config.json
- Tokenizer: pi_bert_new_method/tokenizer/
- Layer precision map: pi_bert_new_method/layer_precision_map.json


In [None]:
import os
import json
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer
import os
import json
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer
from collections import OrderedDict
import time
import psutil

def load_quantized_model(model_dir="output_bert", quantization_method=True):
    """
    Load a quantized model from the given directory.
    
    Args:
        model_dir (str): Directory containing the saved model
        
    Returns:
        tuple: (model, tokenizer, layer_precision_map)
    """
    if quantization_method:
        model_dir = "pi_bert_new_method"
    else:
        model_dir = "output_bert"

    # Load model configuration
    config_path = os.path.join(model_dir, "model_config", "config.json")
    try:
        config = AutoConfig.from_pretrained(config_path)
    except Exception as e:
        config = AutoConfig.from_pretrained(model_dir)
    
    # Create a model with the same architecture as the original
    model = AutoModelForSequenceClassification.from_pretrained("output_bert")
    
    if quantization_method:
    
        # Load the quantized state dict
        model_path = os.path.join(model_dir, "quantized_model.pth")
        state_dict = torch.load(model_path, map_location=torch.device('cpu'))
        
        # Process the state dict to match the model structure
        # This handles the case where the saved model has different layer structure
        # (e.g., base_layer components) than the loaded model
        new_state_dict = OrderedDict()
        
        for key, value in state_dict.items():
            if ".base_layer." in key:
                # Remove the base_layer part from keys
                new_key = key.replace(".base_layer", "")
                new_state_dict[new_key] = value
            else:
                new_state_dict[key] = value
        
        # Load the processed state dict into the model
        model.load_state_dict(new_state_dict, strict=False)
    
    # Ensure model is on CPU
    model = model.cpu()
    
    # Load layer precision map if available
    layer_precision_map = {}
    precision_map_path = os.path.join(model_dir, "layer_precision_map.json")
    if os.path.exists(precision_map_path):
        with open(precision_map_path, "r") as f:
            layer_precision_map = json.load(f)
            # Convert string keys back to integers
            layer_precision_map = {int(k): v for k, v in layer_precision_map.items()}
    
    # Load tokenizer
    tokenizer_dir = os.path.join(model_dir, "tokenizer")
    try:
        tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)
    except Exception as e:
        tokenizer = AutoTokenizer.from_pretrained(model_dir)
    
    
    return model, tokenizer, layer_precision_map

def predict(text, model, tokenizer):
    """
    Make a sentiment prediction using the loaded model.
    
    Args:
        text (str): Input text to classify
        model: The loaded model
        tokenizer: The loaded tokenizer
        
    Returns:
        tuple: (label, confidence)
    """
    # Ensure model is on CPU
    model = model.to('cpu')
    device = torch.device('cpu')
    
    inputs = tokenizer(text, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    start_time = time.time()
    with torch.no_grad():
        outputs = model(**inputs)
    inference_time = (time.time() - start_time) * 1000  # ms
    
    scores = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_idx = torch.argmax(scores, dim=-1).item()
    confidence = scores[0][pred_idx].item()
    label = "Positive" if pred_idx == 1 else "Negative"
    
    return label, confidence, inference_time

def print_memory_metrics():
    # System memory
    vm = psutil.virtual_memory()
    swap = psutil.swap_memory()
    print(f"System RAM: {vm.total//2**20} MB, Used: {vm.used//2**20} MB ({vm.percent}%)")
    print(f"Swap used: {swap.used//2**20} MB ({swap.percent}%)")
    # Process memory
    p = psutil.Process(os.getpid())
    mi = p.memory_info()
    print(f"Process RSS: {mi.rss//2**20} MB, VMS: {mi.vms//2**20} MB")

if __name__ == "__main__":
    # Example usage
    try:
        model, tokenizer, precision_map = load_quantized_model(quantization_method=True)
        print("Model loaded successfully!")
        print(f"Layer precision map: {precision_map}")
        
        # Test with some examples
        examples = [
        "This movie was fantastic! I really enjoyed it.",
        "This movie was terrible. I hated every minute of it."
    ]
        
        print("\nMaking predictions with the quantized model:")
        for text in examples:
            label, confidence, inference_time = predict(text, model, tokenizer)
            print(f"Text: {text}")
            print(f"Prediction: {label} (confidence: {confidence:.4f})")
            print(f"Inference time: {inference_time:.2f} ms")
            print("-" * 50)
    except Exception as e:
        print(f"Error loading model: {e}")

Model loaded successfully!
Layer precision map: {0: 8, 1: 8, 2: 8, 3: 12, 4: 12, 5: 8, 6: 8, 7: 4, 8: 8, 9: 4, 10: 4, 11: 4}

Making predictions with the quantized model:
Text: This movie was fantastic! I really enjoyed it.
Prediction: Positive (confidence: 1.0000)
--------------------------------------------------
Text: The acting was terrible and the plot made no sense.
Prediction: Negative (confidence: 0.9999)
--------------------------------------------------
Text: It was okay, but I wouldn't watch it again.
Prediction: Negative (confidence: 0.9544)
--------------------------------------------------
