# Imports

In [1]:
%load_ext autoreload
%autoreload 2

# This must be first
from dotenv import load_dotenv
load_dotenv(".env")

import os
import string
import pickle

import numpy as np
import matplotlib.pyplot as plt
import torch
from core.utils.misc import seed_everything
from tqdm.auto import tqdm

# Load model

In [3]:
from core.utils.misc import limit_gpus
from core.models.llm_loading import load_model_and_tokenizer, load_tokenizer

limit_gpus(range(0, 2))

# model_type, model_variant = "gpt-2", "1.5B"
model_type, model_variant = "llama", "7B"
# model_type, model_variant = "gpt-j", "6B"
# model_type, model_variant = "pythia", "2.8B"
# model_type, model_variant = "pythia", "6.9B"
# model_type, model_variant = "falcon", "7B"
# model_type, model_variant = "mpt", "7B"

model, tokenizer = load_model_and_tokenizer(model_type, model_variant)

KeyError: 'LLAMA_DIR'

# In Context Learning Analysis

In [None]:
# Main Experiment

from core.task_vectors import run_icl, run_task_vector
from core.data.task_helpers import get_task_by_name
from core.analysis.evaluation import calculate_accuracy, calculate_accuracy_on_datasets, print_evaluation_summary

seed_everything(41)

# task_name = "knowledge_country_capital"
# task_name = "knowledge_person_language"
# task_name = "knowledge_location_continent"
# task_name = "knowledge_location_religion"

# task_name = "algorithmic_prev_letter"  # 0.57,0.40
# task_name = "algorithmic_next_letter"  # 0.91, 0.94
# task_name = "algorithmic_list_first"  # 1.00, 0.99
# task_name = "algorithmic_list_last"  # 0.97, 0.86
# task_name = "algorithmic_to_upper"  # 1.00, 0.96
# task_name = "algorithmic_to_lower"  # 1.00, 0.88

task_name = "translation_fr_en"
# task_name = "translation_es_en"
# task_name = "translation_it_en" 
# task_name = "translation_en_fr"
# task_name = "translation_en_es"
# task_name = "translation_en_it"

# task_name = "linguistic_present_simple_gerund"  # 0.96, 0.80
# task_name = "linguistic_present_simple_past_simple"  # 0.95, 0.94
# task_name = "linguistic_present_simple_past_perfect"  # 0.79, 0.61
# task_name = "linguistic_plural_singular"  # 0.90, 0.81
# task_name = "linguistic_antonyms"  # 0.90, 0.88

# task_name = "sentiment"

num_examples = 5

task = get_task_by_name(tokenizer, task_name)

test_datasets = task.create_datasets(num_datasets=100, num_examples=num_examples)
dev_datasets = task.create_datasets(num_datasets=50, num_examples=num_examples)

icl_predictions = run_icl(model, tokenizer, task, test_datasets)
print_evaluation_summary(task, icl_predictions, test_datasets)

tv_predictions, tv_dev_accuracy_by_layer, task_hiddens = run_task_vector(
    model,
    tokenizer,
    task,
    test_datasets,
    dev_datasets,
    multi_context=False
)
print_evaluation_summary(task, tv_predictions, test_datasets)
# print(tv_dev_accuracy_by_layer)

# icl_accuracy = calculate_accuracy_on_datasets(icl_predictions, test_datasets)
# tv_accuracy = calculate_accuracy_on_datasets(tv_predictions, test_datasets)
# print(f"ICL accuracy: {icl_accuracy:.2f}")
# print(f"TV accuracy: {tv_accuracy:.2f}")

In [None]:
# Overriding Experiment

from typing import Any
from core.task_vectors import run_icl, run_overriding_task_vector
from core.data.task_helpers import get_task_by_name
from core.analysis.evaluation import calculate_accuracy
from core.data.tasks.task import Task
from scripts.experiments.overriding import OVERRIDING_TASK_PAIRS

def is_valid_input(task: Task, inp: Any) -> bool:
    try:
        task.calc_output(inp)
        return True
    except:
        return False

seed_everything(41)


task_name, overriding_task_name = OVERRIDING_TASK_PAIRS[3]

num_examples = 4

task = get_task_by_name(tokenizer, task_name)
overriding_task = get_task_by_name(tokenizer, overriding_task_name)

test_datasets = task.create_datasets(num_datasets=1000, num_examples=num_examples)
overriding_datasets = overriding_task.create_datasets(num_datasets=100, num_examples=num_examples)

# filter only test_datasets that are valid inputs for the overriding task
test_datasets = [dataset for dataset in test_datasets if is_valid_input(overriding_task, dataset.test_input)]
test_datasets = test_datasets[:len(overriding_datasets)]

assert len(test_datasets) == len(overriding_datasets)

icl_predictions = run_icl(model, tokenizer, task, test_datasets)
tv_predictions, tv_dev_accuracy_by_layer, task_hiddens = run_overriding_task_vector(
    model,
    tokenizer,
    task,
    test_datasets,
    overriding_datasets,
    verbose=True,
)

expected_outputs_original = [dataset.test_output for dataset in test_datasets]
expected_outputs_patched = [overriding_task.calc_output(dataset.test_input) for dataset in test_datasets]

icl_accuracy_original = calculate_accuracy(task,icl_predictions, expected_outputs_original)
icl_accuracy_patched = calculate_accuracy(task,icl_predictions, expected_outputs_patched)

tv_accuracy_original = calculate_accuracy(task,tv_predictions, expected_outputs_original)
tv_accuracy_patched = calculate_accuracy(task,tv_predictions, expected_outputs_patched)

print(f"ICL accuracy original: {icl_accuracy_original:.2f}")
print(f"ICL accuracy patched: {icl_accuracy_patched:.2f}")
print(f"TV accuracy original: {tv_accuracy_original:.2f}")
print(f"TV accuracy patched: {tv_accuracy_patched:.2f}")

In [None]:
# Top tokens

from core.analysis.utils import logits_top_tokens, tokens_ranks
from core.models.utils.inference import hidden_to_logits
from itertools import chain

tv_ordered_tokens_by_layer = {}

for layer_num in tv_dev_accuracy_by_layer.keys():
    task_hidden = task_hiddens.mean(axis=0)[layer_num]
    logits = hidden_to_logits(model, task_hidden)
    tv_ordered_tokens_by_layer[layer_num] = logits_top_tokens(logits, tokenizer, k=500)
    print("Top tokens for layer", layer_num, ":", tv_ordered_tokens_by_layer[layer_num][:12])

In [None]:
# Check if input+task=output

from core.models.utils.llm_layers import get_lm_head, get_lm_pipeline
from core.models.utils.inference import hidden_to_logits, logits_to_tokens

# Find the layer that has the highest accuracy
layer_num = max(tv_dev_accuracy_by_layer, key=tv_dev_accuracy_by_layer.get) + 3

task_hidden = task_hiddens.mean(axis=0)[layer_num]

embeddings = get_lm_head(model).weight.float().cpu()

inputs = [dataset.test_input.strip() for dataset in test_datasets]

inputs_token_ids = tokenizer(inputs, add_special_tokens=False).input_ids

inputs_token_ids = [x[0] for x in inputs_token_ids if len(x) == 1]

input_token_embeddings = embeddings[inputs_token_ids].cpu()

# normalize embeddings and task_hidden
input_token_embeddings = input_token_embeddings / input_token_embeddings.norm(dim=-1, keepdim=True)
task_hidden = task_hidden / task_hidden.norm(dim=-1, keepdim=True)

input_plus_task_embeddings = input_token_embeddings + task_hidden * 1.0

logits = hidden_to_logits(model, input_plus_task_embeddings)
# logits = embeddings @ input_plus_task_embeddings.T

ignore_ids=inputs_token_ids
# ignore_ids=None

outputs = logits_to_tokens(logits, tokenizer, ignore_ids=ignore_ids)

list(zip(inputs, outputs))

In [None]:
from core.models.utils.inference import batch_generate, tokenize_prompts

from transformers import TextGenerationPipeline

prompt_examples = [
    "Canada -> Ottawa",
    # "Australia -> Canberra",
    "France -> Paris",
    "Germany -> Berlin",
    # "Australia -> Sydney",
    "Switzerland ->",
    # "India -> Mumbai",
    # "China -> Shanghai",
    # "Australia ->"
]

prompt = "\n".join(prompt_examples)

pipeline = TextGenerationPipeline(model, tokenizer)

completion = pipeline(prompt, max_new_tokens=2, num_return_sequences=1, do_sample=False)

print(completion)