In [3]:
"""from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import bitsandbytes as bnb

model_name = "mistralai/Mistral-7B-v0.1" 
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    load_in_8bit=True,   
    device_map="auto",       
    torch_dtype=torch.float16  
)

input_text = "Artificial intelligence is revolutionizing"
inputs = tokenizer(input_text, return_tensors="pt").to("cuda")

with torch.no_grad():
    outputs = model.generate(inputs["input_ids"], max_new_tokens=50)
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
print("Generated text:", generated_text)
"""

'from transformers import AutoTokenizer, AutoModelForCausalLM\nimport torch\nimport bitsandbytes as bnb\n\nmodel_name = "mistralai/Mistral-7B-v0.1" \ntokenizer = AutoTokenizer.from_pretrained(model_name)\nmodel = AutoModelForCausalLM.from_pretrained(\n    model_name,\n    load_in_8bit=True,   \n    device_map="auto",       \n    torch_dtype=torch.float16  \n)\n\ninput_text = "Artificial intelligence is revolutionizing"\ninputs = tokenizer(input_text, return_tensors="pt").to("cuda")\n\nwith torch.no_grad():\n    outputs = model.generate(inputs["input_ids"], max_new_tokens=50)\n    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)\n    \nprint("Generated text:", generated_text)\n'

In [4]:
"""def quantize_model_to_fp16(model):
    model_fp16 = copy.deepcopy(model)
    model_fp16.half()
    return model_fp16
"""

'def quantize_model_to_fp16(model):\n    model_fp16 = copy.deepcopy(model)\n    model_fp16.half()\n    return model_fp16\n'

In [None]:
import torch
import torch.nn as nn
from transformers import (
    AutoModelForSequenceClassification, 
    AutoTokenizer
)
from datasets import load_dataset
import numpy as np
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
import torch.nn.functional as F
import torch.nn.utils.prune as prune
import copy
import onnx
import onnxruntime as ort
from onnxruntime.quantization import quantize_dynamic, QuantType

class ModelCompression:
    def __init__(self, model_name="Qwen/Qwen2.5-7B-Instruct", num_labels=2):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model_name = model_name
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model.to(self.device)

    def prepare_data(self, max_length=128, batch_size=16):
        """데이터셋 준비 (SST-2 감성분석 데이터셋 사용)"""
        dataset = load_dataset("glue", "sst2")
        
        def tokenize_function(examples):
            return self.tokenizer(
                examples["sentence"],
                padding="max_length",
                truncation=True,
                max_length=max_length
            )

        tokenized_datasets = dataset.map(tokenize_function, batched=True)
        tokenized_datasets = tokenized_datasets.remove_columns(["sentence", "idx"])
        tokenized_datasets = tokenized_datasets.rename_column("label", "labels")
        tokenized_datasets.set_format("torch")

        train_dataset = tokenized_datasets["train"].select(range(1000))
        eval_dataset = tokenized_datasets["validation"].select(range(200))

        train_dataloader = DataLoader(
            train_dataset,
            shuffle=True,
            batch_size=batch_size
        )
        eval_dataloader = DataLoader(
            eval_dataset,
            batch_size=batch_size
        )
        
        return train_dataloader, eval_dataloader

    def custom_prune_model(self, amount=0.3):
        for name, module in self.model.named_modules():
            if isinstance(module, nn.Linear):
                weight = module.weight.data.clone()
                
                weight_abs = torch.abs(weight)
                
                k = int(amount * weight.numel())
                threshold = torch.kthvalue(weight_abs.reshape(-1), k).values
                
                mask = (weight_abs > threshold).float()
                
                module.weight.data.mul_(mask)
                
                module.register_buffer(f'weight_mask', mask)
        
        return self.model

    def export_to_onnx(self, onnx_model_path, max_length=128):
        self.model.cpu().eval()
        dummy_input = {
            'input_ids': torch.zeros(1, max_length, dtype=torch.long),
            'attention_mask': torch.zeros(1, max_length, dtype=torch.long)
        }
        torch.onnx.export(
            self.model,
            (dummy_input['input_ids'], dummy_input['attention_mask']),
            onnx_model_path,
            input_names=['input_ids', 'attention_mask'],
            output_names=['logits'],
            dynamic_axes={
                'input_ids': {0: 'batch_size', 1: 'sequence_length'},
                'attention_mask': {0: 'batch_size', 1: 'sequence_length'},
                'logits': {0: 'batch_size'}
            },
            opset_version=14 
        )
        print(f"Model exported to {onnx_model_path}")


    def quantize_onnx_model(self, onnx_model_path, quantized_model_path):
        quantize_dynamic(
            onnx_model_path,
            quantized_model_path,
            weight_type=QuantType.QInt8
        )
        print(f"Model quantized and saved to {quantized_model_path}")

    def evaluate_onnx_model(self, quantized_model_path, eval_dataloader, max_length=128):
        session = ort.InferenceSession(quantized_model_path)

        predictions = []
        references = []

        for batch in eval_dataloader:
            input_ids = batch['input_ids'].cpu().numpy()
            attention_mask = batch['attention_mask'].cpu().numpy()
            labels = batch['labels'].cpu().numpy()

            ort_inputs = {
                'input_ids': input_ids,
                'attention_mask': attention_mask
            }
            ort_outs = session.run(None, ort_inputs)
            logits = ort_outs[0]
            preds = np.argmax(logits, axis=1)
            predictions.extend(preds)
            references.extend(labels)

        accuracy = accuracy_score(references, predictions)
        return accuracy

    def knowledge_distillation(self, train_dataloader, num_epochs=3):
        try:
            student_model = AutoModelForSequenceClassification.from_pretrained(
                "prajjwal1/bert-tiny",
                num_labels=2,
                ignore_mismatched_sizes=True
            ).to(self.device)
            
            student_model.classifier.weight.data.normal_(mean=0.0, std=0.02)
            student_model.classifier.bias.data.zero_()
            
            teacher_model = self.model.to(self.device)
            teacher_model.eval()
            
            optimizer = torch.optim.AdamW(student_model.parameters(), lr=1e-4)
            
            for epoch in range(num_epochs):
                student_model.train()
                total_loss = 0
                num_batches = 0
                
                for batch in train_dataloader:
                    batch = {k: v.to(self.device) for k, v in batch.items()}
                    
                    with torch.no_grad():
                        teacher_outputs = teacher_model(**batch).logits
                    
                    student_outputs = student_model(**batch).logits
                    
                    temperature = 2.0
                    alpha = 0.7
                    
                    soft_targets = F.softmax(teacher_outputs / temperature, dim=-1)
                    soft_prob = F.log_softmax(student_outputs / temperature, dim=-1)
                    soft_loss = F.kl_div(
                        soft_prob,
                        soft_targets,
                        reduction='batchmean'
                    ) * (temperature ** 2)
                    
                    hard_loss = F.cross_entropy(student_outputs, batch['labels'])
                    loss = (alpha * hard_loss) + ((1.0 - alpha) * soft_loss)
                    
                    loss.backward()
                    optimizer.step()
                    optimizer.zero_grad()
                    
                    total_loss += loss.item()
                    num_batches += 1
                
                avg_loss = total_loss / num_batches
                print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")
            
            return student_model.eval() 
            
        except Exception as e:
            print(f"Distillation 중 에러 발생: {str(e)}")
            return None

    def train_model(self, train_dataloader, num_epochs=3):
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=1e-4)
        self.model.train()
        
        for epoch in range(num_epochs):
            total_loss = 0
            num_batches = 0
            
            for batch in train_dataloader:
                batch = {k: v.to(self.device) for k, v in batch.items()}
                
                outputs = self.model(**batch)
                loss = outputs.loss
                
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
                
                total_loss += loss.item()
                num_batches += 1
            
            avg_loss = total_loss / num_batches
            print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")
        
        self.model.eval()

    def evaluate_model(self, model, eval_dataloader):
        model.eval()
        predictions = []
        references = []
        total_loss = 0
        
        with torch.no_grad():
            for batch in eval_dataloader:
                batch = {k: v.to(self.device) for k, v in batch.items()}
                outputs = model(**batch)
                
                loss = outputs.loss
                predictions.extend(outputs.logits.argmax(-1).cpu().numpy())
                references.extend(batch["labels"].cpu().numpy())
                total_loss += loss.item()
        
        accuracy = accuracy_score(references, predictions)
        avg_loss = total_loss / len(eval_dataloader)
        return accuracy, avg_loss

print("Starting compression experiments...")

compressor = ModelCompression()
train_dataloader, eval_dataloader = compressor.prepare_data()

student_accuracy = None
student_size = None
quantized_size = None

print("\n1. Training and evaluating base model...")
compressor.train_model(train_dataloader)
base_accuracy, base_loss = compressor.evaluate_model(compressor.model, eval_dataloader)
print(f"Base Model - Accuracy: {base_accuracy:.4f}, Loss: {base_loss:.4f}")
base_size = sum(p.numel() * p.element_size() for p in compressor.model.parameters()) / (1024 * 1024)
print(f"Base Model Size: {base_size:.2f} MB")

print("\n2. Pruning model...")
pruned_model = compressor.custom_prune_model(amount=0.3)
pruned_accuracy, pruned_loss = compressor.evaluate_model(pruned_model, eval_dataloader)
print(f"Pruned Model - Accuracy: {pruned_accuracy:.4f}, Loss: {pruned_loss:.4f}")

print("\n3. Quantizing model using ONNX...")
onnx_model_path = "model.onnx"
quantized_model_path = "model_quantized.onnx"
compressor.export_to_onnx(onnx_model_path)
compressor.quantize_onnx_model(onnx_model_path, quantized_model_path)
quantized_accuracy = compressor.evaluate_onnx_model(quantized_model_path, eval_dataloader)
print(f"Quantized Model - Accuracy: {quantized_accuracy:.4f}")
quantized_size = os.path.getsize(quantized_model_path) / (1024 * 1024)
print(f"Quantized Model Size: {quantized_size:.2f} MB")

print("\n4. Training student model with distillation...")
student_model = compressor.knowledge_distillation(train_dataloader)
if student_model is not None:
    student_accuracy, student_loss = compressor.evaluate_model(student_model, eval_dataloader)
    student_size = sum(p.numel() * p.element_size() for p in student_model.parameters()) / (1024 * 1024)
    print(f"Student Model - Accuracy: {student_accuracy:.4f}, Loss: {student_loss:.4f}")
    print(f"Student Model Size: {student_size:.2f} MB")
else:
    print("Student Model  - Training failed")

print("\nCompression Results Summary:")
print(f"Base Model     - Size: {base_size:.2f} MB, Accuracy: {base_accuracy:.4f}")
print(f"Pruned Model   - Size: {base_size:.2f} MB (30% weights removed), Accuracy: {pruned_accuracy:.4f}")
print(f"Quantized Model - Size: {quantized_size:.2f} MB, Accuracy: {quantized_accuracy:.4f}")
if student_accuracy is not None and student_size is not None:
    print(f"Student Model  - Size: {student_size:.2f} MB, Accuracy: {student_accuracy:.4f}")
else:
    print("Student Model  - Training failed")



Starting compression experiments...


config.json:   0%|          | 0.00/663 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


model.safetensors.index.json:   0%|          | 0.00/27.8k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/4 [00:00<?, ?it/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/3.95G [00:00<?, ?B/s]

model-00002-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

model-00003-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

model-00004-of-00004.safetensors:   0%|          | 0.00/3.56G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Some weights of Qwen2ForSequenceClassification were not initialized from the model checkpoint at Qwen/Qwen2.5-7B-Instruct and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


tokenizer_config.json:   0%|          | 0.00/7.30k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

Map:   0%|          | 0/67349 [00:00<?, ? examples/s]