In [None]:
!pip install sentence-transformers transformers



In [None]:
!pip install sentence-transformers transformers datasets tqdm



In [None]:
import re
import numpy as np
import torch
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

###############################################################################
# 1. Synthetic Dataset
###############################################################################
# Define a set of demonstration texts (movie reviews) and their labels.
demo_texts = [
    "I absolutely loved this film, it was brilliant",
    "The movie was terrible and a waste of time",
    "What a wonderful and uplifting story for cat lovers",
    "I hated every second of this dull flick about soccer",
    "Brilliant performance by the lead actor in a silent drama",
    "A boring script that put me to sleep after 10 minutes",
    "Incredible visuals of outer space and a mesmerizing plot",
    "Lackluster direction with no redeeming qualities in the dialogue",
    "One of the best experiences in cinema for sci-fi enthusiasts",
    "Mediocre acting, nothing special here",
    "Loved the music and the cinematography, especially the violin solos",
    "The dialogue was cheesy and cringe-worthy, especially the romance bits",
]

demo_labels = [
    "positive", "negative", "positive", "negative", "positive", "negative",
    "positive", "negative", "positive", "negative", "positive", "negative"
]

# Create a synthetic test set (new movie reviews)
test_inputs = [
    "The film was engaging but the pacing was off.",
    "I found the movie to be an utter disappointment.",
    "An absolute masterpiece with brilliant acting.",
    "The storyline was weak and the dialogues were cringe-worthy."
]

###############################################################################
# 2. Compute Embeddings for Demonstration Set
###############################################################################
# Use a higher-dimensional, more powerful embedding model (768-dim)
embed_model_name = "sentence-transformers/all-mpnet-base-v2"
embedder = SentenceTransformer(embed_model_name)
demo_embeddings = embedder.encode(demo_texts, convert_to_numpy=True)
print("Demo embeddings computed. Shape:", demo_embeddings.shape)

###############################################################################
# 3. Load a Large LLM for In-Context Prediction (Google T5-Base)
###############################################################################
llm_name = "t5-large"
tokenizer = AutoTokenizer.from_pretrained(llm_name)
llm_model = AutoModelForSeq2SeqLM.from_pretrained(llm_name).cuda()

def llm_classify_with_prompt(selected_demos, test_input):
    """
    Build a strict prompt using the selected demos (formatted as "question => label")
    and the test input. The prompt instructs the model to output only a numeric or categorical answer.

    Prompt format:
      "Below are some movie review examples with their sentiment (output only 'positive' or 'negative'):
       1) <demo_text> => <demo_label>
       ...
       Now classify this new movie review:
       <test_input>
       Answer (positive/negative only):"

    Returns the prompt and the LLM's output.
    """
    demo_str = ""
    for i, (txt, lbl) in enumerate(selected_demos, 1):
        demo_str += f"{i}) {txt} => {lbl}\n"
    prompt = (
        "Below are some movie review examples with their sentiment (output only 'positive' or 'negative'):\n"
        f"{demo_str}\n"
        "Now classify this new movie review:\n"
        f"{test_input}\n"
        "Answer (positive/negative only):"
    )
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).to("cuda")
    with torch.no_grad():
        output_ids = llm_model.generate(inputs.input_ids, max_new_tokens=20, do_sample=False)
    result = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    return prompt, result.strip()

###############################################################################
# 4. CEIL's DPP MAP Inference Method for Demo Selection
###############################################################################
def build_dpp_kernel(demo_embeds, test_embed, scale_factor=1.0):
    n = len(demo_embeds)
    r = np.einsum("ij,j->i", demo_embeds, test_embed)
    sim_mat = demo_embeds @ demo_embeds.T
    Lprime = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            Lprime[i, j] = r[i] * sim_mat[i, j] * r[j]
    return Lprime

def dpp_map_inference(Lprime, k_select=4):
    n = Lprime.shape[0]
    selected = []
    current_logdet = 0.0
    def submatrix_logdet(indices):
        if len(indices) == 0:
            return 0.0
        subm = Lprime[np.ix_(indices, indices)]
        return np.linalg.slogdet(subm)[1]
    candidates = set(range(n))
    for _ in range(k_select):
        best_gain = -1e9
        best_item = None
        for i in candidates:
            new_set = selected + [i]
            gain = submatrix_logdet(new_set) - current_logdet
            if gain > best_gain:
                best_gain = gain
                best_item = i
        selected.append(best_item)
        candidates.remove(best_item)
        current_logdet += best_gain
    return selected

###############################################################################
# 5. Submodular Method for Demo Selection
###############################################################################
def submodular_select(demo_embeds, test_embed, k=4, lambd=1.0):
    n = demo_embeds.shape[0]
    d = demo_embeds.shape[1]
    selected = []
    V_S = lambd * np.eye(d)
    candidate_indices = set(range(n))
    for _ in range(k):
        invV_S = np.linalg.inv(V_S)
        best_val = -1e9
        best_idx = None
        for i in candidate_indices:
            x = demo_embeds[i]
            val = (test_embed @ invV_S @ x)**2 / (1.0 + (x @ invV_S @ x))
            if val > best_val:
                best_val = val
                best_idx = i
        selected.append(best_idx)
        x_sel = demo_embeds[best_idx]
        V_S = V_S + np.outer(x_sel, x_sel)
        candidate_indices.remove(best_idx)
    return selected

###############################################################################
# 6. Compare Both Methods on Test Inputs and Report Final Outputs
###############################################################################
def evaluate_on_synthetic(k_select=4):
    total_cases = len(test_inputs)
    total_dpp_correct = 0
    total_submod_correct = 0

    for test_input in tqdm(test_inputs, desc="Evaluating Synthetic Test Inputs"):
        test_embed = embedder.encode([test_input], convert_to_numpy=True)[0]

        # Method A: CEIL's DPP MAP Inference
        Lprime = build_dpp_kernel(demo_embeddings, test_embed)
        dpp_indices = dpp_map_inference(Lprime, k_select=k_select)
        dpp_demos = [(demo_texts[i], demo_labels[i]) for i in dpp_indices]
        prompt_dpp, dpp_output = llm_classify_with_prompt(dpp_demos, test_input)

        # Method B: Submodular Method
        submod_indices = submodular_select(demo_embeddings, test_embed, k=k_select, lambd=1.0)
        submod_demos = [(demo_texts[i], demo_labels[i]) for i in submod_indices]
        prompt_submod, submod_output = llm_classify_with_prompt(submod_demos, test_input)

        # Here, instead of comparing against ground truth, we simply print the final outputs.
        print("------------------------------------------------------------")
        print(f"Test Input: {test_input}")
        print("\n[CEIL DPP Method] Selected Indices:", dpp_indices)
        print("LLM Output:", dpp_output)
        print("\n[Submodular Method] Selected Indices:", submod_indices)
        print("LLM Output:", submod_output)
        print("------------------------------------------------------------\n")

    # (Optionally, add accuracy computations if ground truth is available.)

###############################################################################
# 7. Run Experiment on Synthetic Dataset
###############################################################################
if __name__ == "__main__":
    evaluate_on_synthetic(k_select=4)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.6k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Demo embeddings computed. Shape: (12, 768)


OSError: google/t5-base is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `huggingface-cli login` or by passing `token=<your_token>`

In [None]:
import re
import numpy as np
import torch
from datasets import load_dataset
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

###############################################################################
# 1. Load GSM8K Dataset and Create Splits
###############################################################################
ds = load_dataset("openai/gsm8k", "main")
K_total = 800
N_demo = 400  # demonstration set size
gsm_subset = ds["train"].select(range(K_total))
print("Total examples used from GSM8K train split:", len(gsm_subset))

demo_set = gsm_subset.select(range(N_demo))
test_set = gsm_subset.select(range(N_demo, K_total))
print("Demo set size:", len(demo_set))
print("Test set size:", len(test_set))

def extract_final_answer(answer_str):
    m = re.search(r"####\s*(\S+)", answer_str)
    if m:
        return m.group(1)
    return answer_str.strip()

###############################################################################
# 2. Prepare Demonstration Data and Compute Embeddings
###############################################################################
demo_texts = [ex["question"] for ex in demo_set]
demo_answers = [ex["answer"] for ex in demo_set]
demo_final_answers = [extract_final_answer(ans) for ans in demo_answers]

# Use a higher-dimensional, more powerful embedding model (768-dim)
embed_model_name = "sentence-transformers/all-mpnet-base-v2"
embedder = SentenceTransformer(embed_model_name)
demo_embeddings = embedder.encode(demo_texts, convert_to_numpy=True)
print("Demo embeddings computed. Shape:", demo_embeddings.shape)

###############################################################################
# 3. Load a Large LLM for In-Context Prediction (Google T5-Base)
###############################################################################
llm_name = "t5-large"
tokenizer = AutoTokenizer.from_pretrained(llm_name)
llm_model = AutoModelForSeq2SeqLM.from_pretrained(llm_name).cuda()

def llm_classify_with_prompt(selected_demos, test_question):
    """
    Build a strict prompt using the selected demos and the test question.
    The prompt instructs the model to output ONLY a numeric answer.

    Prompt format:
      "Below are some grade school math problems with their solutions (output only a number):
       1) <question> => <final answer>
       ...
       Now solve this problem:
       <test_question>
       Answer (numeric only):"

    Returns the prompt and the LLM's output.
    """
    demo_str = ""
    for i, (q, ans) in enumerate(selected_demos, 1):
        demo_str += f"{i}) {q} => {ans}\n"
    prompt = (
        "Below are some grade school math problems with their solutions (output only a number):\n"
        f"{demo_str}\n"
        "Now solve this problem:\n"
        f"{test_question}\n"
        "Answer (numeric only):"
    )
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).to("cuda")
    with torch.no_grad():
        output_ids = llm_model.generate(inputs.input_ids, max_new_tokens=20, do_sample=False)
    result = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    return prompt, result.strip()

###############################################################################
# 4. Method (A): CEIL's DPP MAP Inference for Demo Selection
###############################################################################
def build_dpp_kernel(demo_embeds, test_embed, scale_factor=1.0):
    n = len(demo_embeds)
    r = np.einsum("ij,j->i", demo_embeds, test_embed)
    sim_mat = demo_embeds @ demo_embeds.T
    Lprime = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            Lprime[i, j] = r[i] * sim_mat[i, j] * r[j]
    return Lprime

def dpp_map_inference(Lprime, k_select=4):
    n = Lprime.shape[0]
    selected = []
    current_logdet = 0.0
    def submatrix_logdet(indices):
        if len(indices) == 0:
            return 0.0
        subm = Lprime[np.ix_(indices, indices)]
        return np.linalg.slogdet(subm)[1]
    candidates = set(range(n))
    for _ in range(k_select):
        best_gain = -1e9
        best_item = None
        for i in candidates:
            new_set = selected + [i]
            gain = submatrix_logdet(new_set) - current_logdet
            if gain > best_gain:
                best_gain = gain
                best_item = i
        selected.append(best_item)
        candidates.remove(best_item)
        current_logdet += best_gain
    return selected

###############################################################################
# 5. Method (B): Submodular Method for Demo Selection
###############################################################################
def submodular_select(demo_embeds, test_embed, k=4, lambd=1.0):
    n = demo_embeds.shape[0]
    d = demo_embeds.shape[1]
    selected = []
    V_S = lambd * np.eye(d)
    candidate_indices = set(range(n))
    for _ in range(k):
        invV_S = np.linalg.inv(V_S)
        best_val = -1e9
        best_idx = None
        for i in candidate_indices:
            x = demo_embeds[i]
            val = (test_embed @ invV_S @ x)**2 / (1.0 + (x @ invV_S @ x))
            if val > best_val:
                best_val = val
                best_idx = i
        selected.append(best_idx)
        x_sel = demo_embeds[best_idx]
        V_S = V_S + np.outer(x_sel, x_sel)
        candidate_indices.remove(best_idx)
    return selected

###############################################################################
# 6. Evaluate on GSM8K Test Set and Report Accuracy (Non-Verbose)
###############################################################################
def evaluate_on_gsm8k(k_select=4):
    demo_texts_gsm = [ex["question"] for ex in demo_set]
    demo_answers_gsm = [extract_final_answer(ex["answer"]) for ex in demo_set]
    demo_embeds_gsm = embedder.encode(demo_texts_gsm, convert_to_numpy=True)

    total_dpp_correct = 0
    total_submod_correct = 0
    total_cases = len(test_set)

    for instance in tqdm(test_set, desc="Evaluating GSM8K"):
        test_question = instance["question"]
        gt_answer_str = extract_final_answer(instance["answer"])
        try:
            gt_answer = float(gt_answer_str)
        except:
            gt_answer = None
        test_embed = embedder.encode([test_question], convert_to_numpy=True)[0]

        # Method A: CEIL's DPP MAP Inference
        Lprime = build_dpp_kernel(demo_embeds_gsm, test_embed)
        dpp_indices = dpp_map_inference(Lprime, k_select=k_select)
        dpp_demos = [(demo_texts_gsm[i], demo_answers_gsm[i]) for i in dpp_indices]
        _, dpp_output = llm_classify_with_prompt(dpp_demos, test_question)
        try:
            dpp_pred = float(re.findall(r"(\d+\.?\d*)", dpp_output)[-1])
        except:
            dpp_pred = None

        # Method B: Submodular Method
        submod_indices = submodular_select(demo_embeds_gsm, test_embed, k=k_select, lambd=1.0)
        submod_demos = [(demo_texts_gsm[i], demo_answers_gsm[i]) for i in submod_indices]
        _, submod_output = llm_classify_with_prompt(submod_demos, test_question)
        try:
            submod_pred = float(re.findall(r"(\d+\.?\d*)", submod_output)[-1])
        except:
            submod_pred = None

        dpp_correct_flag = "Yes" if (gt_answer is not None and dpp_pred is not None and abs(dpp_pred - gt_answer) < 1e-6) else "No"
        submod_correct_flag = "Yes" if (gt_answer is not None and submod_pred is not None and abs(submod_pred - gt_answer) < 1e-6) else "No"
        print(f"Test Instance: {test_question}")
        print(f"Ground Truth Final Answer: {gt_answer_str}")
        print(f"CEIL DPP Selected Indices: {dpp_indices} -> Correct? {dpp_correct_flag}")
        print(f"Submodular Selected Indices: {submod_indices} -> Correct? {submod_correct_flag}")
        print("------------------------------------------------------------")

        if gt_answer is not None:
            if dpp_pred is not None and abs(dpp_pred - gt_answer) < 1e-6:
                total_dpp_correct += 1
            if submod_pred is not None and abs(submod_pred - gt_answer) < 1e-6:
                total_submod_correct += 1

    dpp_acc = 100 * total_dpp_correct / total_cases
    submod_acc = 100 * total_submod_correct / total_cases
    print("\n============================================================")
    print(f"Final Accuracy over {total_cases} test cases:")
    print(f"  CEIL's DPP Method Accuracy: {dpp_acc:.2f}%")
    print(f"  Submodular Method Accuracy: {submod_acc:.2f}%")

###############################################################################
# 7. Run Experiment on GSM8K
###############################################################################
if __name__ == "__main__":
    evaluate_on_gsm8k(k_select=4)
