In [None]:
import torch

import pandas as pd
import matplotlib.pyplot as plt

import numpy as np
import warnings

In [None]:
import math

import numpy as np
import scipy
import scipy.linalg
from sklearn import preprocessing


def weight_K(K, p=None):
    if p is None:
        return K / K.shape[0]
    else:
        return K * np.outer(np.sqrt(p), np.sqrt(p))


def normalize_K(K):
    d = np.sqrt(np.diagonal(K))
    return K / np.outer(d, d)


def entropy_q(p, q=1):
    p_ = p[p > 0]
    if q == 1:
        return -(p_ * np.log(p_)).sum()
    if q == "inf":
        return -np.log(np.max(p))
    return np.log((p_ ** q).sum()) / (1 - q)


def score_K(K, q=1, p=None, normalize=False):
    if normalize:
        K = normalize_K(K)
    K_ = weight_K(K, p)
    if type(K_) == scipy.sparse.csr.csr_matrix:
        w, _ = scipy.sparse.linalg.eigsh(K_)
    else:
        w = scipy.linalg.eigvalsh(K_)
    vendi = np.exp(entropy_q(w, q=q))
    return vendi, w


def score_X(X, q=1, p=None, normalize=True):
    if normalize:
        X = preprocessing.normalize(X, axis=1)
    K = X @ X.T
    return score_K(K, q=1, p=p)


def score_dual(X, q=1, normalize=True):
    if normalize:
        X = preprocessing.normalize(X, axis=1)
    n = X.shape[0]
    S = X.T @ X
    w = scipy.linalg.eigvalsh(S / n)
    vendi = np.exp(entropy_q(w, q=q))
    m = w > 0
    return vendi, w


def score(samples, k, q=1, p=None, normalize=False):
    n = len(samples)
    K = np.zeros((n, n))
    for i in range(n):
        for j in range(i, n):
            K[i, j] = K[j, i] = k(samples[i], samples[j])
    return score_K(K, p=p, q=q, normalize=normalize)


def intdiv_K(K, q=1, p=None):
    K_ = K ** q
    if p is None:
        p = np.ones(K.shape[0]) / K.shape[0]
    return 1 - np.sum(K_ * np.outer(p, p))


def intdiv_X(X, q=1, p=None, normalize=True):
    if normalize:
        X = preprocessing.normalize(X, axis=1)
    K = X @ X.T
    return intdiv(K, q=q, p=p)


def intdiv(samples, k, q=1, p=None):
    n = len(samples)
    K = np.zeros((n, n))
    for i in range(n):
        for j in range(i, n):
            K[i, j] = K[j, i] = k(samples[i], samples[j])
    return intdiv_K(K, q=q, p=p)

In [None]:
import itertools

import datasets
from nltk.tokenize import word_tokenize
from nltk.translate import bleu_score
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import normalize
import torch
from transformers import AutoModel, AutoTokenizer

from vendi_score import data_utils, vendi
from vendi_score.data_utils import Example, Group


def get_tokenizer(model="roberta-base"):
    tokenizer = AutoTokenizer.from_pretrained("roberta-base", use_fast=True)

    def tokenize(s):
        return tokenizer.convert_ids_to_tokens(tokenizer(s).input_ids)

    return tokenize


def sklearn_tokenizer():
    return CountVectorizer().build_tokenizer()


def get_mnli():
    data = itertools.chain(
        datasets.load_dataset("multi_nli", split="validation_matched"),
        datasets.load_dataset("multi_nli", split="validation_mismatched"),
    )
    seen = set()
    examples = []
    for d in data:
        s = d["premise"]
        if s in seen:
            continue
        seen.add(s)
        examples.append(Example(x=s, labels={"y": d["genre"]}))
    return examples


def get_ngrams(
    sents,
    n=1,
    tokenizer=None,
    return_vectorizer=False,
    lowercase=False,
    **kwargs,
):
    if tokenizer is None:
        tokenizer = word_tokenize
    ngram_range = n if type(n) == tuple else (n, n)
    vectorizer = CountVectorizer(
        tokenizer=tokenizer,
        ngram_range=ngram_range,
        lowercase=lowercase,
        **kwargs,
    )
    X = vectorizer.fit_transform(sents)
    if return_vectorizer:
        return X, vectorizer
    return X


def add_ngrams_to_examples(
    examples, n=1, tokenizer=None, return_vectorizer=False, **kwargs
):
    X = get_ngrams([e.x for e in examples], n=n, tokenizer=tokenizer, **kwargs)
    for e, x in zip(examples, X):
        e.features[f"{n}-grams"] = x
    return examples


def get_embeddings(
    sents,
    model=None,
    tokenizer=None,
    batch_size=32,
    device="cpu",
    model_path="bert-base-uncased"
):
    # Handle device
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if isinstance(device, str):
        device = torch.device(device)

    # Load model/tokenizer if not provided
    if model is None:
        model = AutoModel.from_pretrained(model_path).eval().to(device)
        tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)

    embeddings = []

    # Helper: batching
    def to_batches(data, batch_size):
        for i in range(0, len(data), batch_size):
            yield data[i : i + batch_size]

    for batch in to_batches(sents, batch_size):
        inputs = tokenizer(
            batch,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=512,
        )
        inputs = {k: v.to(device) for k, v in inputs.items()}

        with torch.no_grad():
            output = model(**inputs)

            # Use pooler_output if available; fallback to CLS token
            if hasattr(output, "pooler_output") and output.pooler_output is not None:
                pooled = output.pooler_output  # [batch_size, hidden_dim]
            else:
                pooled = output.last_hidden_state[:, 0]  # [batch_size, hidden_dim]

        # No reshape here â€” each batch is already 2D
        embeddings.append(pooled.cpu().numpy())

    # Concatenate all batches: [total_samples, hidden_dim]
    return np.concatenate(embeddings, axis=0)


def add_embeddings_to_examples(
    examples,
    model=None,
    tokenizer=None,
    batch_size=32,
    device="cpu",
    model_name="princeton-nlp/unsup-simcse-roberta-base",
    feature_name="unsup_simcse",
):
    X = get_embeddings(
        [e.x for e in examples],
        model=model,
        tokenizer=tokenizer,
        batch_size=batch_size,
        device=device,
        model_name=model_name,
    )
    for e, x in zip(examples, X):
        e.features[feature_name] = x
    return examples


def single_ngram_diversity(sents, n, tokenizer=None, **kwargs):
    X = get_ngrams(sents, n=n, tokenizer=tokenizer, **kwargs)
    distinct = X.shape[-1]
    total = X.sum()
    # unique = (counts == 1).sum()
    # total = counts.shape[-1]
    return distinct / total


def ngram_diversity(sents, ns=[1, 2, 3, 4], tokenizer=None, **kwargs):
    return np.mean(
        [
            single_ngram_diversity(sents, n, tokenizer=tokenizer, **kwargs)
            for n in ns
        ]
    )


def bleu(hyps, refs, tokenizer=None):
    if type(hyps[0]) == str:
        if tokenizer is None:
            tokenizer = word_tokenize
        hyp_tokens = [tokenizer(s) for s in hyps]
        ref_tokens = [tokenizer(s) for s in refs]
    else:
        hyp_tokens = hyps
        ref_tokens = refs
    smoothing = bleu_score.SmoothingFunction().method1
    return np.mean(
        [
            bleu_score.sentence_bleu(refs, hyp, smoothing_function=smoothing)
            for hyp in hyps
        ]
    )


def self_bleu(sents, tokenizer):
    examples = [tokenizer(s) for s in sents]
    smoothing = bleu_score.SmoothingFunction().method1
    scores = []
    for i in range(len(examples)):
        hyp = examples[i]
        ref = examples[:i] + examples[i + 1 :]
        scores.append(
            bleu_score.sentence_bleu(ref, hyp, smoothing_function=smoothing)
        )
    return np.mean(scores)


def pairwise_bleu(sents, tokenizer):
    examples = [tokenizer(s) for s in sents]
    smoothing = bleu_score.SmoothingFunction().method1
    scores = []
    for i in range(len(examples)):
        lst = []
        for j in range(len(examples)):
            if j == i:
                continue
            hyp = examples[i]
            ref = [examples[j]]
            lst.append(
                bleu_score.sentence_bleu(ref, hyp, smoothing_function=smoothing)
            )
        scores.append(np.mean(lst))
    return np.mean(scores)


def ngram_vendi_score(sents, ns=[1, 2, 3, 4], tokenizer=None, **kwargs):
    Ks = []
    for n in ns:
        X = normalize(get_ngrams(sents, n=n, tokenizer=tokenizer))
        Ks.append((X @ X.T).A)
    K = np.stack(Ks, axis=0).mean(axis=0)
    return vendi.score_K(K)


def embedding_vendi_score(
    sents,
    model=None,
    tokenizer=None,
    batch_size=32,
    device="cpu",
    model_path="princeton-nlp/unsup-simcse-roberta-base",
):
    X = get_embeddings(
        sents,
        model=model,
        tokenizer=tokenizer,
        batch_size=batch_size,
        device=device,
        model_path=model_path,
    )
    n, d = X.shape
    if n < d:
        s, w = score_X(X)
        return s, w
    s, w = score_dual(X)
    return s, w, X

In [None]:
df_train = pd.read_csv('augment_training/df_train_tc.csv')
df_val = pd.read_csv('augment_training/df_val_tc.csv')
df_test = pd.read_csv('augment_training/df_test_tc.csv')

df_gen = pd.read_csv('augment_training/df_gen_tc.csv')
df_gen['text'] = df_gen['text'].astype(str)


df_gen_01 = pd.read_csv('augment_training/df_comb_tc_0.1.csv')
df_gen_03 = pd.read_csv('augment_training/df_comb_tc_0.3.csv')
df_gen_05 = pd.read_csv('augment_training/df_comb_tc_0.5.csv')
df_gen_07 = pd.read_csv('augment_training/df_comb_tc_0.7.csv')
df_gen_09 = pd.read_csv('augment_training/df_comb_tc_0.9.csv')

df_gen_01_vf = pd.read_csv('augment_training/df_comb_tc_0.1_vf.csv')
df_gen_03_vf = pd.read_csv('augment_training/df_comb_tc_0.3_vf.csv')
df_gen_05_vf = pd.read_csv('augment_training/df_comb_tc_0.5_vf.csv')
df_gen_07_vf = pd.read_csv('augment_training/df_comb_tc_0.7_vf.csv')
df_gen_09_vf = pd.read_csv('augment_training/df_comb_tc_0.9_vf.csv')
df_gen_vf = pd.read_csv('augment_training/df_gen_tc_vf.csv')

df_gen_01_add = pd.read_csv('augment_training/df_comb_tc_0.1_add.csv')
df_gen_03_add = pd.read_csv('augment_training/df_comb_tc_0.3_add.csv')
df_gen_05_add = pd.read_csv('augment_training/df_comb_tc_0.5_add.csv')
df_gen_07_add = pd.read_csv('augment_training/df_comb_tc_0.7_add.csv')
df_gen_09_add = pd.read_csv('augment_training/df_comb_tc_0.9_add.csv')
df_gen_add = pd.read_csv('augment_training/df_gen_tc_add.csv')

df_div_ri_03 = pd.read_csv('EDA_aug/augmented_ri_augp0.3_prop0.3.csv')
df_div_sr_03 = pd.read_csv('EDA_aug/augmented_sr_augp0.3_prop0.3.csv')
df_div_rs_03 = pd.read_csv('EDA_aug/augmented_rs_augp0.3_prop0.3.csv')
df_div_rd_03 = pd.read_csv('EDA_aug/augmented_rd_augp0.3_prop0.3.csv')

df_div_ri_05 = pd.read_csv('EDA_aug/augmented_ri_augp0.5_prop0.5.csv')
df_div_sr_05 = pd.read_csv('EDA_aug/augmented_sr_augp0.5_prop0.5.csv')
df_div_rs_05 = pd.read_csv('EDA_aug/augmented_rs_augp0.5_prop0.5.csv')
df_div_rd_05 = pd.read_csv('EDA_aug/augmented_rd_augp0.5_prop0.5.csv')

In [None]:
import pandas as pd
from vendi_score import text_utils

vs = {}
ngm = {}
eigen = {}
X = {}

results = []

data = [
    ("bs", df_train),
    ("gen_01", df_gen_01),
    ("gen_03", df_gen_03),
    ("gen_05", df_gen_05),
    ("gen_07", df_gen_07),
    ("gen_09", df_gen_09),
    ("gen", df_gen), 
    ("gen_01_vf", df_gen_01_vf),
    ("gen_03_vf", df_gen_03_vf),
    ("gen_05_vf", df_gen_05_vf),
    ("gen_07_vf", df_gen_07_vf),
    ("gen_09_vf", df_gen_09_vf),
    ("gen_vf", df_gen_vf), 
    ("gen_01_add", df_gen_01_add),
    ("gen_03_add", df_gen_03_add),
    ("gen_05_add", df_gen_05_add),
    ("gen_07_add", df_gen_07_add),
    ("gen_09_add", df_gen_09_add),
    ("gen_add", df_gen_add), 
    ("div_ri_03", df_div_ri_03), 
    ("div_sr_03", df_div_sr_03), 
    ("div_rs_03", df_div_rs_03),
    ("div_rd_03", df_div_rd_03), 
    ("div_ri_05", df_div_ri_05), 
    ("div_sr_05", df_div_sr_05), 
    ("div_rs_05", df_div_rs_05),
    ("div_rd_05", df_div_rd_05) 
    ]

for name, df in data:
    print(name)

    pxl = text_utils.ngram_vendi_score(df['text'].tolist(), ns=[1])
    emb, egn, x = embedding_vendi_score(df['text'].tolist(), model_path="bert-base-uncased", device='cuda', batch_size=32)
    eigen[name]= egn
    ngm[name] = pxl
    vs[name] = emb
    X[name] = x

    print(f"Pixel Vendi Score: {pxl:.4f}, Embedding Vendi Score: {emb:.4f}")

    results.append({
        "name": name,
        "pxl": pxl,
        "emb": emb
    })

# Convert to DataFrame
df_results = pd.DataFrame(results)
print(df_results)

In [None]:
bs = {
    'name': ['bs', 'gen', 'gen_01', 'gen_03', 'gen_05', 'gen_07','gen_09'], 
    'acc': [0.875, 0.835, 0.870, 0.852, 0.884, 0.884, 0.890]
}


div_03 = {
    'name': ['div_sr_03', 'div_rd_03', 'div_ri_03', 'div_rs_03'],
    'acc': [0.887,  0.870, 0.907, 0.899]
}


div_05 = {
    'name': ['div_sr_05', 'div_rd_05', 'div_ri_05', 'div_rs_05'],
    'acc': [0.907, 0.870, 0.893,  0.890]
    }


gen_vf = {
    'name': ['gen_01_vf', 'gen_03_vf', 'gen_05_vf', 'gen_07_vf', 'gen_09_vf', 'gen_vf'],
    'acc': [0.867, 0.887, 0.890,  0.890, 0.881, 0.864]
    }


gen_add = {
    'name': ['gen_01_add', 'gen_03_add', 'gen_05_add', 'gen_07_add', 'gen_09_add', 'gen_add'],
    'acc': [0.864, 0.896, 0.916,  0.884, 0.907,  0.904]
    }

df_bs = pd.DataFrame(bs)
df_div03 = pd.DataFrame(div_03)
df_div05 = pd.DataFrame(div_05)
df_gen_vf = pd.DataFrame(gen_vf)
df_gen_add = pd.DataFrame(gen_add)

In [None]:
df_acc = pd.concat([df_bs, df_div03, df_div05, df_gen_vf, df_gen_add])

In [None]:
df = pd.merge(df_results, df_acc, on='name', how='left')

In [None]:
rho = []
for x in X.keys(): 
    if x.endswith('_add'): 
        base  = X['bs']                 # (2759, 768)
        other = X[x]                     # (2758, 768)
        # --- keep only the first 2758 rows of the baseline ---
        n = other.shape[0] - base.shape[0] # 2758
        other = other[base.shape[0]:]
        base  = base[:n]

        r = np.sum(base * other) / (
                np.linalg.norm(base) * np.linalg.norm(other)
            )
        rho.append(r)
    else: 
        base  = X['bs']                 # (2759, 768)
        other = X[x]                    # (2758, 768)
        # --- keep only the first 2758 rows of the baseline ---
        n = min(base.shape[0], other.shape[0])   # 2758
        base  = base[:n]
        other = other[:n]

        r = np.sum(base * other) / (
                np.linalg.norm(base) * np.linalg.norm(other)
            )
        rho.append(r)

df['rho'] = rho

In [None]:
df['category'] = ['bs'] + 6*['gen'] + 6*['gen_vf'] + 6*['gen_add'] + 8*['div']

In [None]:
df['wvs'] = df['emb']*df['rho']

In [None]:
df[['category','emb', 'acc', 'rho', 'wvs']].groupby('category').mean().round(4)

In [None]:
df['pp'] = [0.5] + [0.9, 0.7, 0.5, 0.3, 0.1, 1] + [0.9, 0.7, 0.5, 0.3, 0.1, 1] + [0.1, 0.3, 0.5, 0.7, 0.9, 1] + 8*[0.5]

In [None]:
import seaborn as sns
from matplotlib.lines import Line2D

plt.figure(figsize=(8, 6))

# Create the plot with no legend
plot = sns.scatterplot(
    data=df,
    x='wvs',
    y='acc',
    hue='category',
    size='pp',
    marker='o',
    alpha=0.9,
    sizes=(30, 100),
    linewidth=2,
    legend=False  # disable all legends initially
)

# Get unique categories and their color mappings
handles, labels = plot.get_legend_handles_labels()
# Seaborn disables legends from scatterplot, so we re-plot the legend for hue only
from matplotlib.lines import Line2D
unique_categories = df['category'].unique()
palette = sns.color_palette(n_colors=len(unique_categories))
custom_lines = [Line2D([0], [0], marker='o', color='w', label=cat,
                       markerfacecolor=col, markersize=8)
                for cat, col in zip(unique_categories, palette)]
plt.legend(handles=custom_lines, title='Category')

# Reference lines
plt.axhline(df[df['category'] == 'bs']['acc'].values, color='black', linestyle='--', linewidth=1)
plt.axvline(df[df['category'] == 'bs']['wvs'].values, color='black', linestyle='--', linewidth=1)

# Final styling
plt.xlabel("Weighted VS", fontsize=12)
plt.ylabel("ACC", fontsize=12)
plt.grid(True)
plt.tight_layout()

plt.show()