# CCS
Implemented from this paper: https://openreview.net/pdf?id=ETKGuby0hcs

https://github.com/collin-burns/discovering_latent_knowledge/blob/main/CCS.ipynb


In [6]:
!pip install scikit-learn


Collecting scikit-learn
  Downloading scikit_learn-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting scipy>=1.6.0 (from scikit-learn)
  Downloading scipy-1.15.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Downloading joblib-1.4.2-py3-none-any.whl.metadata (5.4 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn)
  Downloading threadpoolctl-3.5.0-py3-none-any.whl.metadata (13 kB)
Downloading scikit_learn-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.1/13.1 MB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m:01[0m
[?25hDownloading joblib-1.4.2-py3-none-any.whl (301 kB)
Downloading scipy-1.15.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (40.2 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [3

In [7]:
from tqdm import tqdm
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForMaskedLM, AutoModelForCausalLM
from sklearn.linear_model import LogisticRegression

data = load_dataset("amazon_polarity")["test"]

README.md:   0%|          | 0.00/6.81k [00:00<?, ?B/s]

train-00000-of-00004.parquet:   0%|          | 0.00/260M [00:00<?, ?B/s]

train-00001-of-00004.parquet:   0%|          | 0.00/258M [00:00<?, ?B/s]

train-00002-of-00004.parquet:   0%|          | 0.00/255M [00:00<?, ?B/s]

train-00003-of-00004.parquet:   0%|          | 0.00/254M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/117M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/3600000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/400000 [00:00<?, ? examples/s]

In [10]:
# Here are a few different model options you can play around with:
model_name = "llama-2"
# model_name = "gpt-j"
# model_name = "t5"

# if you want to cache the model weights somewhere, you can specify that here
cache_dir = None

if model_name == "deberta":
    model_type = "encoder"
    tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-v2-xxlarge", cache_dir=cache_dir)
    model = AutoModelForMaskedLM.from_pretrained("microsoft/deberta-v2-xxlarge", cache_dir=cache_dir)
    model.cuda()
elif model_name == "gpt-j":
    model_type = "decoder"
    tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-j-6B", cache_dir=cache_dir)
    model = AutoModelForCausalLM.from_pretrained("EleutherAI/gpt-j-6B", cache_dir=cache_dir)
    model.cuda()
elif model_name == "t5":
    model_type = "encoder_decoder"
    tokenizer = AutoTokenizer.from_pretrained("t5-11b", cache_dir=cache_dir)
    model = AutoModelForSeq2SeqLM.from_pretrained("t5-11b", cache_dir=cache_dir)
    model.parallelize()  # T5 is big enough that we may need to run it on multiple GPUs
elif model_name == "llama-2":
    model_type = "decoder"
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf", cache_dir=cache_dir)
    model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf", cache_dir=cache_dir)
    model.cuda()
elif model_name == "llama-2-chat":
    model_type = "decoder"
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf", cache_dir=cache_dir)
    model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-chat-hf", cache_dir=cache_dir)
    model.cuda()
else:
    print("Not implemented!")

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

# Extract hidden states

In [11]:
def get_encoder_hidden_states(model, tokenizer, input_text, layer=-1):
    """
    Get the hidden states of the encoder for a given input text.

    Return a numpy array of shape (hidden_dim,)
    """

    # tokenizing
    encoder_text_ids = tokenizer(input_text, return_tensors="pt", truncation=True).input_ids.to(model.device)

    # forward pass
    with torch.no_grad():
        output = model(encoder_text_ids, output_hidden_states=True)

    # get the appropriate hidden states
    hs_tuple = output["hidden_states"]

    hs = hs_tuple[layer][0, -1].detach().cpu().numpy()

    return hs

In [12]:
def get_encoder_decoder_hidden_states(model, tokenizer, input_text, layer=-1):
    """
    Given an encoder-decoder model and some text, gets the encoder hidden states
    on that input text (where the full text is given to the encoder).

    Returns a numpy array of shape (hidden_dim,)
    """

    # tokenizing
    encoder_text_ids = tokenizer(input_text, return_tensors="pt").input_ids.to(model.device)
    decoder_text_ids = tokenizer("", return_tensors="pt").input_ids.to(model.device)

    # forward pass
    with torch.no_grad():
        output = model(encoder_text_ids, decoder_text_ids, output_hidden_states=True)

    # get the appropriate hidden states
    hs_tuple = output["encoder_hidden_states"]
    hs = hs_tuple[layer][0, -1].detach().cpu().numpy()

    return hs

In [13]:
def get_decoder_hidden_states(model, tokenizer, input_text, layer=-1):
    """
    Get the hidden states of the decoder for a given input text.

    Return a numpy array of shape (hidden_dim,)
    """

    # tokenizing
    input_ids = tokenizer(input_text + tokenizer.eos_token, return_tensors="pt")

    # forward pass
    with torch.no_grad():
        output = model(input_ids, output_hidden_states=True)

    # get the last layer, last token hidden states
    hs_tuple = output["hidden_states"]
    hs = hs_tuple[layer][0, -1].detach().cpu().numpy()

    return hs

In [14]:
def get_hidden_states(model, tokenizer, input_text, layer=-1, model_type="decoder"):
    """
    Get the hidden states of the model for a given input text.

    Return a numpy array of shape (hidden_dim,)
    """

    if model_type == "encoder":
        return get_encoder_hidden_states(model, tokenizer, input_text, layer)
    elif model_type == "decoder":
        return get_decoder_hidden_states(model, tokenizer, input_text, layer)
    elif model_type == "encoder_decoder":
        return get_encoder_decoder_hidden_states(model, tokenizer, input_text, layer)
    else:
        print("Not implemented!")

# Code formatting for getting all hidden states