In [None]:
!pip install datasets
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
from transformers import T5Tokenizer, T5ForConditionalGeneration
from datasets import load_dataset
from torch.utils.data import DataLoader
import random

# Load the dataset
dataset = load_dataset('imdb')

model_path = r'/content/drive/MyDrive/LLM Models/FLAN-T5-base x SAMsum/best_model.pth'
checkpoint = torch.load(model_path)

# Function to filter out examples with text longer than 1000 characters
def filter_long_text(example):
    return len(example['text']) <= 1000

# Apply the filter to the test split only
filtered_test_dataset = dataset['test'].filter(filter_long_text)

# Split the test dataset into 90% and 10%
test_size = len(filtered_test_dataset)

# Calculate the number of examples to keep as the new test set (10%)
num_to_keep = int(0.1 * test_size)

# Shuffle and select 10% of the test set for evaluation
indices = list(range(test_size))
random.shuffle(indices)
test_indices = indices[:num_to_keep]

# Select the corresponding examples from the filtered test set
new_test_set = filtered_test_dataset.select(test_indices)

def get_model_size(model):
    param_size = 0
    for param in model.parameters():
        param_size += param.nelement() * param.element_size()
    return param_size / 1e6  # Convert to MB


# Preprocessing function
def preprocess_function(examples):
    inputs = [f"Classify sentiment: {text}" for text in examples["text"]]
    model_inputs = tokenizer(inputs, max_length=512, truncation=True, padding="max_length")

    labels = [label for label in examples["label"]]
    label_texts = ["positive" if label == 1 else "negative" for label in labels]
    model_labels = tokenizer(label_texts, max_length=10, truncation=True, padding="max_length").input_ids

    model_inputs["labels"] = model_labels
    return model_inputs

# Load the FLAN-T5 model and tokenizer
model_name = "google/flan-t5-base"
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)
model.load_state_dict(checkpoint['model_state_dict'])

# Tokenize the new test dataset
tokenized_test_dataset = new_test_set.map(preprocess_function, batched=True, remove_columns=["text", "label"])

def collate_fn(batch):
    input_ids = torch.stack([torch.tensor(item["input_ids"]) for item in batch])
    attention_mask = torch.stack([torch.tensor(item["attention_mask"]) for item in batch])
    labels = torch.stack([torch.tensor(item["labels"]) for item in batch])
    return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}

eval_dataloader = DataLoader(tokenized_test_dataset, batch_size=32, collate_fn=collate_fn)

# Function to evaluate the model and compute validation loss and accuracy
def evaluate_model(model, dataloader, device):
    model.eval()
    eval_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            eval_loss += outputs.loss.item()

            # Generate predictions
            predictions = model.generate(input_ids=input_ids, attention_mask=attention_mask, max_length=10)
            decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
            decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

            # Calculate accuracy
            for pred, label in zip(decoded_preds, decoded_labels):
                if pred.strip().lower() == label.strip().lower():
                    correct += 1
                total += 1

    avg_eval_loss = eval_loss / len(dataloader)
    accuracy = correct / total
    return avg_eval_loss, accuracy

# Evaluate the original model on GPU
gpu_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(gpu_device)

original_model_size = get_model_size(model)
print(f"Original model size: {original_model_size:.2f} MB")

original_eval_loss, original_accuracy = evaluate_model(model, eval_dataloader, gpu_device)
print(f"Original Model Evaluation Loss: {original_eval_loss:.4f}")
print(f"Original Model Accuracy: {original_accuracy:.4f}")

# Perform dynamic quantization and move the model to CPU
quantized_model = torch.quantization.quantize_dynamic(
    model.cpu(), {nn.Linear}, dtype=torch.qint8
)

# Evaluate the size of the quantized model
quantized_model_size = get_model_size(quantized_model)
print(f"Quantized model size: {quantized_model_size:.2f} MB")

# Evaluate the quantized model on the CPU
cpu_device = torch.device("cpu")
quantized_eval_loss, quantized_accuracy = evaluate_model(quantized_model, eval_dataloader, cpu_device)
print(f"Quantized Model Evaluation Loss: {quantized_eval_loss:.4f}")
print(f"Quantized Model Accuracy: {quantized_accuracy:.4f}")

# Compare model sizes, evaluation losses, and accuracy
size_reduction = 100 * (original_model_size - quantized_model_size) / original_model_size
loss_increase = 100 * (quantized_eval_loss - original_eval_loss) / original_eval_loss
accuracy_decrease = 100 * (original_accuracy - quantized_accuracy) / original_accuracy

print(f"Size reduction: {size_reduction:.2f}%")
print(f"Loss increase: {loss_increase:.2f}%")
print(f"Accuracy decrease: {accuracy_decrease:.2f}%")

In [None]:
Original model size: 990.31 MB
Original Model Evaluation Loss: 0.0158
Original Model Accuracy: 0.9466
Quantized model size: 98.89 MB

In [None]:
model_path = r'/content/drive/MyDrive/LLM Models/FLAN-T5-base x SAMsum/best_model.pth'
checkpoint = torch.load(model_path)

In [None]:
type(checkpoint)

In [None]:
checkpoints