# Setup

In [None]:
!pip -q install transformers accelerate datasets essential-generators bitsandbytes tqdm google-generativeai tiktoken orjson tenacity pandasgui wandb

In [None]:

#@title Clone GitHub repo

import os, shutil, getpass
from google.colab import drive

update_repo_copy = True #@param {type:"boolean"}
REPO_NAME = "shortcut-llm-icl" #@param {type:"string"}

DIR_NAME = 'Tesi Computer Science/ShortcutProject'  #@param {type:"string"}
DRIVE_PATH = '/content/drive/MyDrive/' + DIR_NAME + '/'
TARGET_DIR = os.path.join(DRIVE_PATH, REPO_NAME)

drive.mount('/content/drive')

if update_repo_copy or not os.path.exists(TARGET_DIR):
  GITHUB_USER = input("Enter GitHub username: ").strip()
  GITHUB_TOKEN = getpass.getpass("Enter GitHub token: ").strip()
  GITHUB_URL = f"https://{GITHUB_USER}:{GITHUB_TOKEN}@github.com/{GITHUB_USER}/{REPO_NAME}.git"
  TEMP_CLONE_DIR = f"/content/{REPO_NAME}"

  if os.path.exists(TEMP_CLONE_DIR):
      shutil.rmtree(TEMP_CLONE_DIR)

  print(f"Cloning {REPO_NAME} into Colab RAM...")
  exit_code = os.system(f'git clone "{GITHUB_URL}" "{TEMP_CLONE_DIR}"')

  if exit_code == 0:
      print(f"Copying to Google Drive → {TARGET_DIR}")
      if os.path.exists(TARGET_DIR):
          shutil.rmtree(TARGET_DIR)
      shutil.copytree(TEMP_CLONE_DIR, TARGET_DIR)

      # remove repo from RAM to save space
      shutil.rmtree(TEMP_CLONE_DIR)
      print("✅ Done.")
  else:
      print("❌ Clone failed. Check token, username or repo visibility.")

%cd "{TARGET_DIR}"
!ls

In [None]:
#@title Imports
import getpass
import shlex
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time, json, math, random, re, hashlib, orjson as oj
from dataclasses import dataclass
from typing import List, Tuple, Dict, Any
from tenacity import retry, wait_exponential_jitter, stop_after_attempt
import google.generativeai as genai
import torch
import torch.nn.functional as F
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from extract_activations import DescriptorSchema, Feature, BaseLLM, GeminiLLM, HuggingFaceLLM
import extract_activations as E
from patched_unibias import WB_logging as L
from representation_engineering import repe_pipeline_registry
repe_pipeline_registry()


HuggingFaceLLM.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
#@title Login Hugging Face
os.environ["HF_TOKEN"] = getpass.getpass("Enter Hugging Face token: ")

In [None]:
#@title Login Weights&Bias
os.environ["WANDB_API_KEY"] = getpass.getpass("Enter W&B API key: ")
!wandb login $WANDB_API_KEY

# Compute Shortcut Activations

Prompts with and without shortcuts are selected and fed to the Representation Engineering framework (https://arxiv.org/pdf/2310.01405) to attempt extracting a latent direction corresponding to a specific kind of LLM shortcut.

In [None]:
#@title Load target model
TARGET_MODEL_NAME = "mistralai/Mistral-7B-Instruct-v0.1" #@param {type: "string"}
target = HuggingFaceLLM(TARGET_MODEL_NAME, os.environ["HF_TOKEN"], quantize=True)

## Load dataset from ShortcutSuite

NLP statements with injected shortcuts for the textual entailment task are extracted from the ShortcutSuite datasets (https://github.com/yyhappier/ShortcutSuite).

In [None]:
#@title Setup
SHORTCUT_TYPE = "negation" #@param ["negation", "position", "style_bible"]
CLEAN_DF_PATH = "data/ShortcutSuite/dev_matched.tsv"
DIRTY_DF_PATH = f"data/ShortcutSuite/dev_matched_{SHORTCUT_TYPE}.tsv"
NUM_SHOT = 0 #@param {type: "slider", min:0, max: 2, step:1}
NUM_SAMPLES = 64 #@param {type: "integer"}
MAX_TOKENS = 5 #@param {type: "integer"}
TEMPERATURE = 0.0
SEED = 20 #@param {type: "integer"}
DEBUG = False #@param {type: "boolean"}

In [None]:
df_standard = E.load_nli_shortcuts_from_tsv(CLEAN_DF_PATH)
df_shortcut = E.load_nli_shortcuts_from_tsv(DIRTY_DF_PATH)
df = E.create_paired_dataset(df_standard, df_shortcut)

In [None]:
COLUMNS = df.columns.to_list()

In [None]:
selected_df = df.sample(n=NUM_SAMPLES, random_state=SEED)

### Prompt selection

In [None]:
def select_if(task, row, pred_clean, pred_dirty):
    gold = row["gold_label"]
    if SHORTCUT_TYPE == "negation":
        return pred_clean == gold and gold == "entailment" and pred_dirty != gold
    elif SHORTCUT_TYPE == "position":
        return pred_clean == gold and pred_dirty != pred_clean
    else:
        raise ValueError(f"Unknown shortcut type: {SHORTCUT_TYPE}")

In [None]:
selected_df = E.select_shortcut_prompts(df, E.Task.NLI, n_samples=NUM_SAMPLES, model=target, num_shot=NUM_SHOT, temperature=TEMPERATURE,
                                        condition = select_if, max_tokens=MAX_TOKENS, seed=SEED, debug=DEBUG, logits_step=0)

In [None]:
NUM_SAMPLES = selected_df.shape[0]

In [None]:
PROMPTS_PATH = f"data/ShortcutSuite/{SHORTCUT_TYPE}_{NUM_SAMPLES}_prompts_seed_{SEED}.csv"

In [None]:
selected_df.to_csv(PROMPTS_PATH, index=False)

In [None]:
from google.colab import files
files.download(PROMPTS_PATH)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## Save dataset on W&B

In [None]:
#@title Setup

DATASET_NAME = "ShortcutSuite" #@param {type: "string"}
TASK = "NLI" #@param ["NLI"]

LABELS = list(E.Task[TASK].reference_gen_to_labels().values())
SELECTION = "RANDOM" #@param ["RANDOM", "MODEL_FAILS", "MODEL_FAILS_ON_SPECIFIC_LABELS"]
SELECTION = L.SelectionMethod[SELECTION]
DESCRIPTION = "Training dataset"  #@param {type: "string"}


In [None]:
L.log_dataset_artifact(
    selected_df,
    DATASET_NAME,
    TASK,
    NUM_SAMPLES,
    COLUMNS,
    LABELS,
    SHORTCUT_TYPE,
    SELECTION,
    SEED,
    description=DESCRIPTION)

## RepReading

In [None]:
CLEAN_INSTR = "Decide if the hypothesis is entailed by the premise." #@param {type: "string"}
DIRTY_INSTR = "Decide if the hypothesis is entailed by the premise." #@param {type: "string"}
SHUFFLE = True #@param {type: "boolean"}

In [None]:
def format_data_nli(df, shuffle):
  c_instr = f"[INST] {CLEAN_INSTR} [/INST] "
  d_instr = f"[INST] {DIRTY_INSTR} [/INST] "
  c_template = lambda prem, hyp: c_instr + f'Premise: {prem}\nHypothesis: {hyp}'
  d_template = lambda prem, hyp: d_instr + f'Premise: {prem}\nHypothesis: {hyp}'
  data = [[d_template(prem_d,hyp_d), c_template(prem_c,hyp_c)] for (prem_d,hyp_d,prem_c,hyp_c)
            in zip(df['premise_dirty'], df['hypothesis_dirty'], df['premise_clean'], df['hypothesis_clean'])]
  labels = []  # 1 = +shortcut (dirty), 0 = -shortcut (clean)
  for d in data:
        dirty = d[0]
        if shuffle:
          random.shuffle(d) # shuffling inside contrastive pairs
        labels.append([s == dirty for s in d])
  return np.concatenate(data).tolist(), labels



In [None]:
TRAIN_FRAC = 1 #@param {type: "slider", min:0.0, max: 1.0, step:0.1}
train_df = selected_df.sample(frac=TRAIN_FRAC, random_state=SEED)
test_df = selected_df.drop(train_df.index)

In [None]:
train_data, train_labels = format_data_nli(train_df, SHUFFLE)
if TRAIN_FRAC < 1:
  test_data, test_labels = format_data_nli(test_df, SHUFFLE)

In [None]:
rep_token = -1 # consider the last token of the sequence
hidden_layers = list(range(-1, -target.model.config.num_hidden_layers, -1))  # consider all hidden layers
direction_method = 'cluster_mean' #@param ["pca", "cluster_mean"]
if direction_method == 'cluster_mean':
  n_difference = 0
  train_labels = [label for sublist in train_labels for label in sublist]
else:
  n_difference = 1
rep_reading_pipeline =  pipeline("rep-reading", model=target.model, tokenizer=target.tokenizer)

Device set to use cuda:0


In [None]:
#@title Train the RepReader
shortcut_rep_reader = rep_reading_pipeline.get_directions(
    train_data,
    rep_token=rep_token,
    hidden_layers=hidden_layers,
    n_difference=n_difference,
    train_labels=train_labels,
    direction_method=direction_method,
    batch_size=32,
)

## Define activations

In [None]:
alpha = -0.2 #@param {type: "slider", min:-5.0, max: 5.0, step:0.1}

In [None]:
activations = {}
for layer in hidden_layers:
    activations[layer] = torch.tensor(alpha * shortcut_rep_reader.directions[layer] * shortcut_rep_reader.direction_signs[layer]).to(target.model.device).half()

In [None]:
#@title Save activations on Drive
DATASET_ARTIFACT_NAME = "ShortcutSuite_64_negation_random_seed_20" #@param {type: "string"}
ACTIVATIONS_ART_NAME = L.get_activations_artifact_name(
    DATASET_ARTIFACT_NAME,
    alpha,
    direction_method,
    CLEAN_INSTR,
    DIRTY_INSTR,
    SHUFFLE)
ACTIVATIONS_FILE_NAME = f"{ACTIVATIONS_ART_NAME}.pt"
activations_path = os.path.join(DRIVE_PATH, "activations", ACTIVATIONS_FILE_NAME)
torch.save(activations, activations_path)
print(f"Saved at {activations_path}")

In [None]:
#@title Store activations on W&B
activations_path = "/content/drive/MyDrive/Tesi Computer Science/ShortcutProject/activations/coeff_-0.2_cluster_mean_ShortcutSuite_64_negation_random_seed_20.pt" #@param {type:"string"}
assert(os.path.exists(activations_path))

L.log_activations_artifact(
    activations_path,
    DATASET_ARTIFACT_NAME,
    alpha,
    rep_token,
    hidden_layers,
    direction_method,
    CLEAN_INSTR,
    DIRTY_INSTR,
    SHUFFLE)

## Prompt-level RepControl

In [None]:
#@title Setup
START_LAYER = -5 #@param {type: "integer"}
END_LAYER = -18 #@param {type: "integer"}
HIDDEN_LAYERS = list(range(START_LAYER, END_LAYER, -1))
block_name = "decoder_block"
control_method = "reading_vec"

In [None]:
tokenizer = AutoTokenizer.from_pretrained(TARGET_MODEL_NAME, token=os.environ["HF_TOKEN"], truncation=True, padding=True)
tokenizer.pad_token_id = 0 if tokenizer.pad_token_id is None else tokenizer.pad_token_id
tokenizer.bos_token_id = 1

In [None]:
rep_control_pipeline = pipeline(
    "rep-control",
    model=target.model,
    tokenizer=tokenizer,
    layers=HIDDEN_LAYERS,
    block_name=block_name,
    control_method=control_method)

Device set to use cuda:0


In [None]:
user_tag =  "[INST]"
assistant_tag =  "[/INST]"

inputs = test_data[:2]

max_new_tokens=30 #@param {type: "integer"}
baseline_outputs = rep_control_pipeline(inputs[0], max_new_tokens=max_new_tokens, do_sample=False)
control_outputs = rep_control_pipeline(inputs[0], activations=activations, max_new_tokens=max_new_tokens, do_sample=False)

for i,s,p in zip(inputs, baseline_outputs["sequences"], control_outputs["sequences"]):
  print("===== Input =====")
  print(i)
  print("===== No Control =====")
  print(target.tokenizer.decode(s))
  print(f"===== - Shortcut Reliance =====")
  print(target.tokenizer.decode(p))
  print()

# RepE Evaluation

In [None]:
#@title Set parameters
DATASET_NAME = "mmlu" #@param ["rte", "mnli", "copa", "cr", "sst2", "wic", "arc", "mmlu"]
NUM_SHOT = 1 #@param {type: "integer"}
REPE = False #@param {type: "boolean"}
ACTIVATIONS_ARTIFACT_NAME = "none" #@param {type: "string"}
LAYERS = "-5 -6 -7 -8 -9 -10 -11 -12 -13 -14 -15 -16 -17" #@param {type: "string"}
RESUME = False #@param {type: "boolean"}
LOG_ON_WB = True #@param {type: "boolean"}

if REPE:
  FAIL_PATH = "fail_examples" +"/" + DATASET_NAME + "_repe" + ".csv"
  REPE = "true"
else:
  FAIL_PATH = "fail_examples" +"/" + DATASET_NAME + ".csv"
  REPE = "false"
if RESUME:
  RESUME = "true"
else:
  RESUME = "false"
if LOG_ON_WB:
  LOG_ON_WB = "true"
else:
  LOG_ON_WB = "false"

In [None]:
!python patched_unibias/main.py \
  --dataset_name {DATASET_NAME} \
  --num_shot {NUM_SHOT} \
  --RepE {REPE} \
  --resume {RESUME} \
  --activations {ACTIVATIONS_ARTIFACT_NAME} \
  --intervention_layers {LAYERS} \
  --log_on_WB {LOG_ON_WB}
