# Imports

In [1]:
pip install -U bitsandbytes accelerate transformers



In [2]:
import os, time, math
from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple
import pandas as pd
import tempfile, shutil

import numpy as np
import torch
import torch.nn.functional as F
import torch.nn.utils.prune as prune
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import BitsAndBytesConfig, AutoConfig
from huggingface_hub import hf_hub_download, snapshot_download
from pathlib import Path

try:
    from safetensors.torch import load_file as load_safetensors
    SAFE_AVAILABLE = True
except Exception:
    SAFE_AVAILABLE = False

try:
    from transformers import BitsAndBytesConfig
    BNB_AVAILABLE = True
except Exception:
    BNB_AVAILABLE = False

try:
    from torch.nn.utils import prune
    PRUNE_AVAILABLE = True
except Exception:
    PRUNE_AVAILABLE = False

import torch.nn as nn
try:
    # PyTorch 2.x preferred path
    from torch.ao.quantization import quantize_dynamic as ao_quantize_dynamic
    _HAS_AO_Q = True
except Exception:
    _HAS_AO_Q = False
try:
    # Fallback (older PyTorch)
    from torch.quantization import quantize_dynamic as legacy_quantize_dynamic
    _HAS_LEGACY_Q = True
except Exception:
    _HAS_LEGACY_Q = False

# Generic Compression Class

In [3]:
@dataclass
class CompressConfig:
    model_id: str = "microsoft/deberta-v3-base"
    weights_path: str = "checkpoint_dir_or_file"  # can be FOLDER or FILE
    num_labels: Optional[int] = None
    max_len: int = 128
    batch_size: int = 32
    prune_amount: float = 0.40
    do_quantized: bool = True
    do_pruned: bool = True
    do_kd: bool = True
    kd_student_id: str = "distilbert-base-uncased"
    kd_epochs: int = 2
    kd_alpha: float = 0.7
    kd_T: float = 2.0
    # NEW:
    quantization_backend: str = "dynamic"  # "dynamic" or "bnb"
    force_cpu_for_all: bool = False        # set True to time every model on CPU (fair vs dynamic int8)


class CompressionComparator:
    """
    Compare Original / 8-bit Quantized / Pruned (/ KD student if train_df is provided)
    on (accuracy, weighted F1, params, nonzero params, requires_training, checkpoint size MB).
    Supports:
      - HF folder (config.json + pytorch_model.bin/model.safetensors)
      - Folder with config.json + custom filename (e.g., best_model.pt) via state_dict=
      - Single weight file path (no config) -> builds from self.cfg.model_id and loads state_dict
    """

    def __init__(self, cfg: CompressConfig):
        self.cfg = cfg
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.amp_dtype = (
            torch.bfloat16
            if torch.cuda.is_available() and torch.cuda.is_bf16_supported()
            else torch.float16
        )

    # ------------- FS helpers -------------
    def _file_size_mb(self, path: str) -> Optional[float]:
        return os.path.getsize(path) / (1024**2) if os.path.exists(path) else None

    def _dir_size_mb(self, path: str) -> float:
        total = 0
        for root, _, files in os.walk(path):
            for f in files:
                total += os.path.getsize(os.path.join(root, f))
        return total / (1024**2)

    def _save_pretrained_and_measure_mb(self, model) -> float:
        tmpdir = tempfile.mkdtemp(prefix="ckpt_tmp_")
        try:
            model.save_pretrained(tmpdir, safe_serialization=True)
            return self._dir_size_mb(tmpdir)
        finally:
            shutil.rmtree(tmpdir, ignore_errors=True)

    def _save_state_dict_and_measure_mb(self, model) -> float:
        tmpf = tempfile.NamedTemporaryFile(delete=False, suffix=".pt")
        tmpf.close()
        try:
            model_cpu = model.to("cpu")
            torch.save(model_cpu.state_dict(), tmpf.name)
            return self._file_size_mb(tmpf.name)
        finally:
            try:
                os.remove(tmpf.name)
            except OSError:
                pass

    # ------------- Loading logic (key upgrade) -------------
    def _has_tokenizer_files(self, folder: str) -> bool:
      # Any of these indicate a tokenizer is present
      candidates = [
          "tokenizer.json",
          "tokenizer_config.json",
          "special_tokens_map.json",
          "vocab.txt",          # BERT/WordPiece
          "merges.txt",         # BPE
          "vocab.json",         # BPE/Roberta-style
          "spiece.model",       # SentencePiece
      ]
      return any(os.path.exists(os.path.join(folder, f)) for f in candidates)

    def _is_dir(self, p: str) -> bool:
        return os.path.isdir(p)

    def _has_config(self, folder: str) -> bool:
        return os.path.exists(os.path.join(folder, "config.json"))

    def _find_weight_file_in_dir(self, folder: str) -> Optional[str]:
        # Prefer standard HF names; else common custom ones (like best_model.pt)
        prefs = ["model.safetensors", "pytorch_model.bin", "best_model.safetensors", "best_model.bin", "best_model.pt"]
        for name in prefs:
            cand = os.path.join(folder, name)
            if os.path.exists(cand):
                return cand
        # Fallback: any .safetensors / .bin / .pt
        for fname in os.listdir(folder):
            if fname.endswith((".safetensors", ".bin", ".pt")):
                return os.path.join(folder, fname)
        return None

    def _load_state_dict_from_path(self, path: str) -> Dict[str, torch.Tensor]:
        if path.endswith(".safetensors") and SAFE_AVAILABLE:
            return load_safetensors(path)
        return torch.load(path, map_location="cpu")

    def _build_from_folder_maybe_custom_weights(self, folder: str, num_labels: int):
      # Tokenizer: prefer folder if present; else fall back to base model_id
      if self._has_tokenizer_files(folder):
          tok = AutoTokenizer.from_pretrained(folder)
      else:
          tok = AutoTokenizer.from_pretrained(self.cfg.model_id)

      # If standard HF filenames exist, use from_pretrained directly
      std_files = ["pytorch_model.bin", "model.safetensors"]
      if any(os.path.exists(os.path.join(folder, f)) for f in std_files):
          model = AutoModelForSequenceClassification.from_pretrained(
              folder, num_labels=num_labels, ignore_mismatched_sizes=True
          )
          return model, tok

      # Otherwise, look for a custom weight file (e.g., best_model.pt)
      wpath = self._find_weight_file_in_dir(folder)
      if wpath is None:
          # No weights in folder -> init from config only (random init)
          model = AutoModelForSequenceClassification.from_pretrained(
              folder, num_labels=num_labels, ignore_mismatched_sizes=True
          )
          return model, tok

      # NEW: build from config, then load your custom state_dict
      config = AutoConfig.from_pretrained(folder)
      # ensure num_labels matches what you want to run with
      if getattr(config, "num_labels", None) != num_labels:
          config.num_labels = num_labels

      model = AutoModelForSequenceClassification.from_config(config)
      state = self._load_state_dict_from_path(wpath)
      missing, unexpected = model.load_state_dict(state, strict=False)
      if missing or unexpected:
          print("[Folder custom weights] missing:", missing, "unexpected:", unexpected)

      return model, tok

    def _build_from_single_file(
        self, file_path: str, num_labels: int
    ) -> Tuple[torch.nn.Module, AutoTokenizer]:
        """
        Single weight file: build from base model_id config and load state_dict into it.
        """
        tok = AutoTokenizer.from_pretrained(self.cfg.model_id)
        # Build from config to ensure classifier head dims match num_labels
        config = AutoConfig.from_pretrained(self.cfg.model_id, num_labels=num_labels)
        model = AutoModelForSequenceClassification.from_config(config)
        state = self._load_state_dict_from_path(file_path)
        missing, unexpected = model.load_state_dict(state, strict=False)
        if missing or unexpected:
            print("[Single-file load] missing:", missing, "unexpected:", unexpected)
        return model, tok

    def _build_teacher_any(self, num_labels: int) -> Tuple[torch.nn.Module, AutoTokenizer, Optional[str]]:
        """
        Main entry: returns (model, tokenizer, origin_path_for_size_measurement)
        origin_path is used to report checkpoint size if it’s a single file.
        """
        p = self.cfg.weights_path
        if self._is_dir(p):
            if not self._has_config(p):
                raise FileNotFoundError(f"{p} is a folder but has no config.json")
            model, tok = self._build_from_folder_maybe_custom_weights(p, num_labels)
            return model, tok, None  # folder size measured via save_pretrained later
        else:
            # single file path
            model, tok = self._build_from_single_file(p, num_labels)
            return model, tok, p

    # ------------- Quant builders -------------
    def _build_dynamic_int8_from_model(self, model_fp: torch.nn.Module) -> torch.nn.Module:
        if not (_HAS_AO_Q or _HAS_LEGACY_Q):
            raise RuntimeError("torch dynamic quantization not available in this environment.")
        model_fp.eval().to("cpu")
        quantize_fn = ao_quantize_dynamic if _HAS_AO_Q else legacy_quantize_dynamic
        q_model = quantize_fn(model_fp, {nn.Linear}, dtype=torch.qint8)
        return q_model

    def _build_bnb_from_model_via_tmp(self, model_fp: torch.nn.Module, num_labels: int) -> torch.nn.Module:
        """
        Save FP model to a temp HF folder and reload with bnb 8-bit to stream weights safely.
        """
        if not BNB_AVAILABLE:
            raise RuntimeError("bitsandbytes not available; install it or set do_quantized=False.")
        tmpdir = tempfile.mkdtemp(prefix="tmp_ckpt_")
        try:
            model_fp.save_pretrained(tmpdir, safe_serialization=True)
            bnb_cfg = BitsAndBytesConfig(load_in_8bit=True)
            q_model = AutoModelForSequenceClassification.from_pretrained(
                tmpdir, quantization_config=bnb_cfg, device_map="auto"
            )
        finally:
            shutil.rmtree(tmpdir, ignore_errors=True)
        return q_model

    # ------------- Metrics / Eval -------------
    def _count_params(self, model: torch.nn.Module) -> int:
        return sum(p.numel() for p in model.parameters())

    @torch.no_grad()
    def _count_nonzero_params(self, model: torch.nn.Module) -> int:
        nz = 0
        for p in model.parameters():
            if p is not None:
                nz += (p != 0).sum().item()
        return nz

    def _to_device(self, batch: Dict[str, torch.Tensor], device: str) -> Dict[str, torch.Tensor]:
        return {k: v.to(device) for k, v in batch.items()}

    def _build_batches(self, tokenizer, texts: List[str]):
        for i in range(0, len(texts), self.cfg.batch_size):
            enc = tokenizer(
                texts[i:i+self.cfg.batch_size],
                truncation=True,
                padding=True,
                max_length=self.cfg.max_len,
                return_tensors="pt"
            )
            yield enc

    @torch.no_grad()
    def _evaluate(self, model, tokenizer, texts, labels, device_override: Optional[str] = None):
        from sklearn.metrics import f1_score
        # Decide device per run
        if device_override is not None:
            run_device = device_override
        elif self.cfg.force_cpu_for_all:
            run_device = "cpu"
        else:
            run_device = self.device

        # dynamic-quantized modules must stay on CPU
        if any("Quantized" in type(m).__name__ for m in model.modules()):
            run_device = "cpu"

        # bnb models already placed by device_map; avoid moving them
        if not getattr(model, "is_loaded_in_8bit", False):
            model.to(run_device)

        model.eval()

        # Warmup
        if texts:
            enc = tokenizer(texts[:min(8, len(texts))], truncation=True, padding=True,
                            max_length=self.cfg.max_len, return_tensors="pt")
            enc = self._to_device(enc, run_device)
            if run_device == "cuda":
                with torch.autocast(device_type="cuda", dtype=self.amp_dtype):
                    _ = model(**enc)
            else:
                _ = model(**enc)

        preds_all = []
        for i in range(0, len(texts), self.cfg.batch_size):
            enc = tokenizer(texts[i:i+self.cfg.batch_size], truncation=True, padding=True,
                            max_length=self.cfg.max_len, return_tensors="pt")
            enc = self._to_device(enc, run_device)

            if run_device == "cuda":
                torch.cuda.synchronize()
                t0 = time.time()
                with torch.autocast(device_type="cuda", dtype=self.amp_dtype):
                    logits = model(**enc).logits
                torch.cuda.synchronize()
            else:
                t0 = time.time()
                logits = model(**enc).logits

            preds_all.append(torch.argmax(logits, dim=-1).cpu().numpy())

        preds_all = np.concatenate(preds_all) if preds_all else np.array([])
        labels = np.array(labels[:len(preds_all)])
        acc = float((preds_all == labels).mean()) if len(preds_all) else float("nan")
        try:
            f1 = float(f1_score(labels, preds_all, average="weighted")) if len(preds_all) else float("nan")
        except Exception:
            f1 = float("nan")
        return acc, f1

    # ------------- Transformations -------------
    def _apply_pruning(self, model: torch.nn.Module, amount: float):
        if not PRUNE_AVAILABLE:
            raise RuntimeError("torch pruning utilities not available.")
        linear_modules = []
        for _, m in model.named_modules():
            if isinstance(m, torch.nn.Linear):
                linear_modules.append(m)
        params_to_prune = [(m, "weight") for m in linear_modules]
        prune.global_unstructured(
            params_to_prune,
            pruning_method=prune.L1Unstructured,
            amount=amount,
        )
        # bake masks
        for m in linear_modules:
            try:
                prune.remove(m, "weight")
            except Exception:
                pass
        return model

    # ------------- KD -------------
    def _run_kd(self, teacher_model, tokenizer_t, train_df, test_df, num_labels):
        from torch.utils.data import DataLoader, Dataset
        from transformers import get_linear_schedule_with_warmup

        class DS(Dataset):
            def __init__(self, df, tok, max_len):
                self.texts = df["text"].tolist()
                self.labels = df["label"].tolist()
                self.tok = tok
                self.max_len = max_len
            def __len__(self): return len(self.texts)
            def __getitem__(self, i):
                enc = self.tok(self.texts[i], truncation=True, padding="max_length", max_length=self.max_len, return_tensors="pt")
                item = {k: v.squeeze(0) for k, v in enc.items()}
                item["labels"] = torch.tensor(self.labels[i], dtype=torch.long)
                return item

        teacher = teacher_model.to(self.device).eval()
        teacher.requires_grad_(False)

        s_tok = AutoTokenizer.from_pretrained(self.cfg.kd_student_id)
        student = AutoModelForSequenceClassification.from_pretrained(
            self.cfg.kd_student_id, num_labels=num_labels
        ).to(self.device)

        train_loader = DataLoader(DS(train_df, s_tok, self.cfg.max_len), batch_size=32, shuffle=True)
        T = self.cfg.kd_T; alpha = self.cfg.kd_alpha
        optim = torch.optim.AdamW(student.parameters(), lr=3e-5)
        sched = get_linear_schedule_with_warmup(optim, 0, self.cfg.kd_epochs*len(train_loader))

        student.train()
        for _ in range(self.cfg.kd_epochs):
            for batch in train_loader:
                optim.zero_grad(set_to_none=True)
                ids = batch["input_ids"].to(self.device)
                msk = batch["attention_mask"].to(self.device)
                y   = batch["labels"].to(self.device)

                s_logits = student(input_ids=ids, attention_mask=msk).logits
                with torch.no_grad():
                    texts_batch = s_tok.batch_decode(ids, skip_special_tokens=True)
                    t_enc = tokenizer_t(
                        texts_batch, truncation=True, padding=True,
                        max_length=self.cfg.max_len, return_tensors="pt"
                    )
                    t_enc = {k: v.to(self.device) for k, v in t_enc.items()}
                    t_logits = teacher(**t_enc).logits

                log_p_s = F.log_softmax(s_logits / T, dim=-1)
                p_t     = F.softmax(t_logits / T, dim=-1)
                loss_kd = F.kl_div(log_p_s, p_t, reduction="batchmean") * (T*T)
                loss_ce = F.cross_entropy(s_logits, y)
                loss = alpha * loss_kd + (1 - alpha) * loss_ce
                loss.backward()
                torch.nn.utils.clip_grad_norm_(student.parameters(), 1.0)
                optim.step(); sched.step()

        acc, f1 = self._evaluate(student, s_tok, test_df["text"].tolist(), test_df["label"].tolist())
        return student, acc, f1

    # ------------- Public run -------------
    def run(self, test_df, train_df: Optional[object] = None):
        import pandas as pd
        assert {"text", "label"}.issubset(test_df.columns), "test_df must have columns: text, label"
        num_labels = self.cfg.num_labels or int(test_df["label"].nunique())

        # --- Build teacher/original from folder-or-file (NEW core path) ---
        orig_model, tok, single_file_origin = self._build_teacher_any(num_labels)
        orig_model = orig_model.to(self.device).eval()

        rows = []

        # Evaluate Original
        acc_o, f1_o = self._evaluate(orig_model, tok, test_df["text"].tolist(), test_df["label"].tolist())
        params_o = self._count_params(orig_model)

        # Size reporting:
        # - If we loaded from a single file: report that file size.
        # - If we loaded from a folder: serialize to temp folder for consistent measurement.
        if single_file_origin is not None:
            orig_ckpt_mb = self._file_size_mb(single_file_origin)
        else:
            orig_ckpt_mb = self._save_pretrained_and_measure_mb(orig_model)

        rows.append({
            "Model": "Original (FP)",
            "Requires Training": False,
            "Params (M)": round(params_o/1e6, 2),
            "Nonzero Params (M)": round(params_o/1e6, 2),
            "Accuracy": round(acc_o, 4),
            "F1 (weighted)": round(f1_o, 4),
            "Checkpoint Size (MB)": round(orig_ckpt_mb, 2) if orig_ckpt_mb is not None else None,
        })

        # Quantized (8-bit)
        if self.cfg.do_quantized:
            if self.cfg.quantization_backend == "dynamic":
                q = self._build_dynamic_int8_from_model(orig_model)
                acc_q, f1_q = self._evaluate(q, tok, test_df["text"].tolist(), test_df["label"].tolist(), device_override="cpu")
                # Param count equals FP architecture param count
                params_q = self._count_params(
                    AutoModelForSequenceClassification.from_pretrained(self.cfg.model_id, num_labels=num_labels)
                )
                q_ckpt_mb = self._save_state_dict_and_measure_mb(q)  # dynamic quant: measure via state_dict
                rows.append({
                    "Model": "8-bit Quantized (dynamic, CPU)",
                    "Requires Training": False,
                    "Params (M)": round(params_q/1e6, 2),
                    "Nonzero Params (M)": round(params_q/1e6, 2),
                    "Accuracy": round(acc_q, 4),
                    "F1 (weighted)": round(f1_q, 4),
                    "Checkpoint Size (MB)": round(q_ckpt_mb, 2) if q_ckpt_mb is not None else None,
                })

            elif self.cfg.quantization_backend == "bnb":
                q = self._build_bnb_from_model_via_tmp(orig_model, num_labels)
                acc_q, f1_q = self._evaluate(q, tok, test_df["text"].tolist(), test_df["label"].tolist())
                params_q = self._count_params(q)
                # save_pretrained may serialize dequantized weights; still useful to log.
                q_ckpt_mb = self._save_pretrained_and_measure_mb(q)
                rows.append({
                    "Model": "8-bit Quantized (bnb, GPU)",
                    "Requires Training": False,
                    "Params (M)": round(params_q/1e6, 2),
                    "Nonzero Params (M)": round(params_q/1e6, 2),
                    "Accuracy": round(acc_q, 4),
                    "F1 (weighted)": round(f1_q, 4),
                    "Checkpoint Size (MB)": round(q_ckpt_mb, 2),
                })
            else:
                raise ValueError("quantization_backend must be 'dynamic' or 'bnb'")

        # Pruned (no FT)
        if self.cfg.do_pruned:
            pr = AutoModelForSequenceClassification.from_pretrained(
                self.cfg.model_id, num_labels=num_labels, ignore_mismatched_sizes=True
            )
            # Load the same weights into the fresh FP model (ensures clean copy before pruning)
            if self._is_dir(self.cfg.weights_path) and self._has_config(self.cfg.weights_path):
                wpath = self._find_weight_file_in_dir(self.cfg.weights_path)
                if wpath and not any(os.path.basename(wpath) == s for s in ["pytorch_model.bin", "model.safetensors"]):
                    # custom filename in folder -> use state_dict
                    state = self._load_state_dict_from_path(wpath)
                    pr.load_state_dict(state, strict=False)
                else:
                    # standard HF folder -> load via from_pretrained directly above? (We already built new pr from base model_id)
                    # Reload from folder to be exact:
                    pr = AutoModelForSequenceClassification.from_pretrained(
                        self.cfg.weights_path, num_labels=num_labels, ignore_mismatched_sizes=True
                    )
            else:
                # single file case
                state = self._load_state_dict_from_path(self.cfg.weights_path)
                pr.load_state_dict(state, strict=False)

            pr = self._apply_pruning(pr, self.cfg.prune_amount).to(self.device).eval()
            acc_p, f1_p = self._evaluate(pr, tok, test_df["text"].tolist(), test_df["label"].tolist())
            params_p = self._count_params(pr)
            nonzero_p = self._count_nonzero_params(pr)
            p_ckpt_mb = self._save_pretrained_and_measure_mb(pr)
            rows.append({
                "Model": f"Pruned ({int(self.cfg.prune_amount*100)}% target, no FT)",
                "Requires Training": False,
                "Params (M)": round(params_p/1e6, 2),
                "Nonzero Params (M)": round(nonzero_p/1e6, 2),
                "Accuracy": round(acc_p, 4),
                "F1 (weighted)": round(f1_p, 4),
                "Checkpoint Size (MB)": round(p_ckpt_mb, 2),
            })

        # KD Student (optional)
        if self.cfg.do_kd:
            if (train_df is None) or (not {"text","label"}.issubset(train_df.columns)):
                rows.append({
                    "Model": f"KD Student ({self.cfg.kd_student_id})",
                    "Requires Training": True,
                    "Params (M)": None, "Nonzero Params (M)": None,
                    "Accuracy": None, "F1 (weighted)": None, "Checkpoint Size (MB)": None
                })
            else:
                student, acc_kd, f1_kd = self._run_kd(orig_model, tok, train_df, test_df, num_labels)
                params_kd = self._count_params(student)
                kd_ckpt_mb = self._save_pretrained_and_measure_mb(student)
                rows.append({
                    "Model": f"KD Student ({self.cfg.kd_student_id})",
                    "Requires Training": True,
                    "Params (M)": round(params_kd/1e6, 2),
                    "Nonzero Params (M)": round(params_kd/1e6, 2),
                    "Accuracy": round(acc_kd, 4),
                    "F1 (weighted)": round(f1_kd, 4),
                    "Checkpoint Size (MB)": round(kd_ckpt_mb, 2),
                })

        df = __import__("pandas").DataFrame(rows)

        # Convenience print if original was a single file path
        if self._is_dir(self.cfg.weights_path):
            print(f"\nLoaded from folder: {self.cfg.weights_path}")
        else:
            ckpt_mb = self._file_size_mb(self.cfg.weights_path)
            if ckpt_mb is not None:
                print(f"\nOriginal checkpoint file size: {ckpt_mb:.2f} MB")

        return df

# Loading Original Data
## No need for other datasets since best models were achieved for original training data

In [4]:
DATA_DIR = "/content/drive/MyDrive/ADV_DL/Data"
train_df = pd.read_csv(f"{DATA_DIR}/Corona_NLP_train.csv", encoding="latin-1")
test_df  = pd.read_csv(f"{DATA_DIR}/Corona_NLP_test.csv",  encoding="latin-1")

train_df = train_df.rename(columns={"OriginalTweet":"text", "Sentiment":"label"})
test_df  = test_df.rename(columns={"OriginalTweet":"text", "Sentiment":"label"})

label_map = {'Extremely Negative':0,'Negative':1,'Neutral':2,
             'Positive':3,'Extremely Positive':4}

train_df["label"] = train_df.label.map(label_map).astype(int)
test_df["label"]  = test_df.label.map(label_map).astype(int)

# Model 1: Full-Code Fine-Tuned DeBERTa-v3-base

In [9]:
repo_id = "CarmelKron/Model_Checkpoints"
subdir  = "full_ft_deberta_v3_base"  # the folder inside the repo

repo_dir = snapshot_download(
    repo_id=repo_id,
    repo_type="model",
    allow_patterns=[f"{subdir}/*"],  # only pull this folder
    # optional: local_dir="checkpoints_cache", local_dir_use_symlinks=False,
)

folder_path = str(Path(repo_dir) / subdir)  # <- this contains config.json + best_model.pt
print("Folder path:", folder_path)

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

config.json: 0.00B [00:00, ?B/s]

Folder path: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/full_ft_deberta_v3_base


In [10]:
cfg = CompressConfig(
    model_id="microsoft/deberta-v3-base",
    weights_path=folder_path,   # <— key change
    num_labels=None,            # infer from test_df
    max_len=64,
    batch_size=32,
    prune_amount=0.4,
    do_quantized=True,
    do_pruned=True,
    do_kd=True,                 # set True and pass train_df to actually train KD
    quantization_backend="bnb",
    force_cpu_for_all=False
)

cmp = CompressionComparator(cfg)
results = cmp.run(test_df=test_df, train_df=train_df)
print(results.to_string(index=False))

Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-v3-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



Loaded from folder: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/full_ft_deberta_v3_base
                               Model  Requires Training  Params (M)  Nonzero Params (M)  Accuracy  F1 (weighted)  Checkpoint Size (MB)
                       Original (FP)              False      184.43              184.43    0.7038         0.7043                703.55
          8-bit Quantized (bnb, GPU)              False      184.43              184.43    0.7156         0.7169                270.69
          Pruned (40% target, no FT)              False      184.43              150.20    0.7109         0.7122                703.55
KD Student (distilbert-base-uncased)               True       66.96               66.96    0.7962         0.7967                255.43


# Model 2: HF Fine-Tuned DeBERTa-v3-base

In [11]:
repo_id = "CarmelKron/Model_Checkpoints"
subdir  = "hf_ft_deberta_v3_base_orig"  # the folder inside the repo

repo_dir = snapshot_download(
    repo_id=repo_id,
    repo_type="model",
    allow_patterns=[f"{subdir}/*"],  # only pull this folder
    # optional: local_dir="checkpoints_cache", local_dir_use_symlinks=False,
)

folder_path = str(Path(repo_dir) / subdir)  # <- this contains config.json + best_model.pt
print("Folder path:", folder_path)

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

Folder path: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/hf_ft_deberta_v3_base_orig


In [12]:
cfg = CompressConfig(
    model_id="microsoft/deberta-v3-base",
    weights_path=folder_path,   # <— key change
    num_labels=None,            # infer from test_df
    max_len=128,
    batch_size=32,
    prune_amount=0.4,
    do_quantized=True,
    do_pruned=True,
    do_kd=True,                 # set True and pass train_df to actually train KD
    quantization_backend="bnb",
    force_cpu_for_all=False
)

cmp = CompressionComparator(cfg)
results = cmp.run(test_df=test_df, train_df=train_df)
print(results.to_string(index=False))

Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-v3-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



Loaded from folder: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/hf_ft_deberta_v3_base_orig
                               Model  Requires Training  Params (M)  Nonzero Params (M)  Accuracy  F1 (weighted)  Checkpoint Size (MB)
                       Original (FP)              False      184.43              184.43    0.8702         0.8700                703.55
          8-bit Quantized (bnb, GPU)              False      184.43              184.43    0.8689         0.8687                270.69
          Pruned (40% target, no FT)              False      184.43              150.21    0.8346         0.8333                703.55
KD Student (distilbert-base-uncased)               True       66.96               66.96    0.8162         0.8164                255.43


# Model 3: Full-Code Fine-Tuned Twitter-RoBERTa

In [5]:
repo_id = "CarmelKron/Model_Checkpoints"
subdir  = "full_ft_twitter_roberta"  # the folder inside the repo

repo_dir = snapshot_download(
    repo_id=repo_id,
    repo_type="model",
    allow_patterns=[f"{subdir}/*"],  # only pull this folder
    # optional: local_dir="checkpoints_cache", local_dir_use_symlinks=False,
)

folder_path = str(Path(repo_dir) / subdir)  # <- this contains config.json + best_model.pt
print("Folder path:", folder_path)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

Folder path: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/full_ft_twitter_roberta


In [6]:
cfg = CompressConfig(
    model_id="cardiffnlp/twitter-roberta-base-sentiment-latest",
    weights_path=folder_path,   # <— key change
    num_labels=None,            # infer from test_df
    max_len=128,
    batch_size=32,
    prune_amount=0.4,
    do_quantized=True,
    do_pruned=True,
    do_kd=True,                 # set True and pass train_df to actually train KD
    quantization_backend="bnb",
    force_cpu_for_all=False
)

cmp = CompressionComparator(cfg)
results = cmp.run(test_df=test_df, train_df=train_df)
print(results.to_string(index=False))

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest and are newly initialized because the shapes did not match:
- classifier.out_proj.weight: found shape torch.Size([3, 768]) in the checkpo


Loaded from folder: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/full_ft_twitter_roberta
                               Model  Requires Training  Params (M)  Nonzero Params (M)  Accuracy  F1 (weighted)  Checkpoint Size (MB)
                       Original (FP)              False      124.65              124.65    0.6438         0.6466                475.52
          8-bit Quantized (bnb, GPU)              False      124.65              124.65    0.6477         0.6507                156.67
          Pruned (40% target, no FT)              False      124.65               90.44    0.3765         0.3059                475.52
KD Student (distilbert-base-uncased)               True       66.96               66.96    0.8004         0.8008                255.43


# Model 4: HF Fine-Tuned Twitter-RoBERTa

In [7]:
repo_id = "CarmelKron/Model_Checkpoints"
subdir  = "hf_ft_twitter_roberta_orig"  # the folder inside the repo

repo_dir = snapshot_download(
    repo_id=repo_id,
    repo_type="model",
    allow_patterns=[f"{subdir}/*"],  # only pull this folder
    # optional: local_dir="checkpoints_cache", local_dir_use_symlinks=False,
)

folder_path = str(Path(repo_dir) / subdir)  # <- this contains config.json + best_model.pt
print("Folder path:", folder_path)

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

Folder path: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/hf_ft_twitter_roberta_orig


In [8]:
cfg = CompressConfig(
    model_id="cardiffnlp/twitter-roberta-base-sentiment-latest",
    weights_path=folder_path,   # <— key change
    num_labels=None,            # infer from test_df
    max_len=254,
    batch_size=32,
    prune_amount=0.4,
    do_quantized=True,
    do_pruned=True,
    do_kd=True,                 # set True and pass train_df to actually train KD
    quantization_backend="bnb",
    force_cpu_for_all=False
)

cmp = CompressionComparator(cfg)
results = cmp.run(test_df=test_df, train_df=train_df)
print(results.to_string(index=False))

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest and are newly initialized because the shapes did not match:
- classifier.out_proj.weight: found shape torch.Size([3, 768]) in the checkpo


Loaded from folder: /root/.cache/huggingface/hub/models--CarmelKron--Model_Checkpoints/snapshots/51639b8e812643cdc286534a504ca5d5deb7ef91/hf_ft_twitter_roberta_orig
                               Model  Requires Training  Params (M)  Nonzero Params (M)  Accuracy  F1 (weighted)  Checkpoint Size (MB)
                       Original (FP)              False      124.65              124.65    0.8589         0.8589                475.52
          8-bit Quantized (bnb, GPU)              False      124.65              124.65    0.8610         0.8611                156.67
          Pruned (40% target, no FT)              False      124.65               90.44    0.7562         0.7431                475.52
KD Student (distilbert-base-uncased)               True       66.96               66.96    0.8254         0.8256                255.43
