# 0. Settings

In [None]:
model_distributer = "meta-llama"
model_name = "Llama-3.1-8B"
root_path = "./data"
save_path = root_path +"/"+ model_name.lower()
dataset = ["CommonsenseQA_test.jsonl",]
dataset_path = root_path + "/" + dataset[0]


In [None]:
save_path

'/content/drive/MyDrive/KAIRI_Experiment/Prompt_Bias/llama-3.1-8b'

# 1. üóû Data and Prompts

## Import Data

In [None]:
# from google.colab import drive
# drive.mount("/content/drive")

In [None]:
!pip install jsonlines

Collecting jsonlines
  Downloading jsonlines-4.0.0-py3-none-any.whl.metadata (1.6 kB)
Downloading jsonlines-4.0.0-py3-none-any.whl (8.7 kB)
Installing collected packages: jsonlines
Successfully installed jsonlines-4.0.0


In [None]:
import jsonlines

In [None]:
data = []
with jsonlines.open(dataset_path) as json_file:
  for line in json_file:
    data.append(line)

## Make prompts

In [None]:
prompts = [
"""question:{question}
options:{options}
answer:""",
"""Question:{question}
Options:{options}
Answer:""",
"""QUESTION:{question}
OPTIONS:{options}
ANSWER:""",
""" question: {question}
 options: {options}
 answer:""",
""" Question: {question}
 Options: {options}
 Answer:""",
""" QUESTION: {question}
 OPTIONS: {options}
 ANSWER:""",]

# 2. üëΩ Model

In [None]:
import transformers
from transformers import AutoTokenizer, LlamaForCausalLM, AutoModelForCausalLM
import torch

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if "Llama" in model_name:
  model = LlamaForCausalLM.from_pretrained(model_distributer+"/"+model_name, torch_dtype=torch.float16)
elif "gemma" in model_name:
  model = AutoModelForCausalLM.from_pretrained(model_distributer+"/"+model_name, torch_dtype=torch.float16)
else:
  try:
    model = AutoModelForCausalLM.from_pretrained(model_distributer+"/"+model_name, torch_dtype=torch.float16)
  except:
    raise ValueError("The model is not supported.")
tokenizer = AutoTokenizer.from_pretrained(model_distributer+"/"+model_name)

In [None]:
model.to(device)

In [None]:
model_unembed = model.lm_head

# 3. Functions

In [None]:
from typing import Any, Dict, List, Union, Tuple

In [None]:
def apply_content(template: str, content: dict) -> str:
  """
  Fills a specified prompt template with content from a single data entry
  (e.g., from a JSON Lines file).

  Args:
      template (str): A string template selected from multiple prompt candidates.
                      This template must contain {question} and {options} placeholders.
      content (dict): A JSON object read from a line in a .jsonl file,
                      containing information like questions and choices.

  Returns:
      str: The completed prompt string with content applied.
  """
  # Extract the question text from the content dictionary.
  question = content["questions"]["original"]

  # Create the options list string using the 'options' list from content.
  # Example: "\nA. Answer 1\nB. Answer 2"
  options_str = "".join(
      f"\n{option['label']}. {option['text']}" for option in content["options"]
  )

  # Replace the template's placeholders with the actual content and return the final string.
  return template.format(question=question, options=options_str)

In [None]:
for i in data:
  if i['id'] == 'edd1634d911614590c6b8ca730df95fe':
    for j in range(3):
      print(apply_content(template=prompts[j], content=i))
    break

In [None]:
def make_tokenized_prompts(
    templates: List[str],
    json_data: Dict,
    tokenizer: AutoTokenizer = tokenizer,
    device: torch.device = device
) -> List[Dict]:
    """
    Applies json_data to a list of prompt templates and tokenizes each result.

    Args:
        templates (List[str]): A list of prompt templates to be filled.
        json_data (Dict): The JSON object containing data to fill into the prompts.
        tokenizer (AutoTokenizer): The tokenizer object to use.
        device (torch.device): The device to move the tokenized tensors to.

    Returns:
        List[Dict]: A list of tokenized prompt tensors (as dictionaries from the tokenizer).
    """
    return [
        tokenizer(apply_content(template, json_data), return_tensors = "pt").to(device)
        for template in templates
    ]

In [None]:
def process_data(
    model: Any, # PreTrainedModel
    tokenized_prompts: List[Dict[str, Any]], # torch.Tensor
) -> Dict[str, Dict[int, List[float]]]:
  """
  Runs the model to extract the hidden states of the last token
  for each prompt across all layers.

  Args:
      model (transformers.PreTrainedModel):
          A pre-trained transformer model object from Hugging Face.
          Must support the `output_hidden_states=True` option.

      tokenized_prompts (List[Dict[str, torch.Tensor]]):
          A list of tokenized prompt data.
          Each element is a dictionary of the form `{'input_ids': torch.Tensor, ...}`.
          - Example: `[{'input_ids': tensor([[101, 2054, ...]])}, {'input_ids': tensor([[101, 2500, ...]])}]`

  Returns:
      Dict[str, Dict[int, List[float]]]:
          A dictionary containing the hidden states of the last token,
          organized by prompt and by layer.
          - Structure:
            {
                "Prompt_1": {
                    0: [0.1, 0.2, ...],  # Hidden state vector for layer 0
                    1: [0.3, 0.1, ...],  # Hidden state vector for layer 1
                    ...
                },
                "Prompt_2": { ... }
            }
  """
  hidden_states_contents = {}
  with torch.no_grad():
    for i, item in enumerate(tokenized_prompts):
      # Run the model
      outputs = model(item['input_ids'], output_hidden_states=True)

      # Use a dictionary comprehension to store the last token's hidden state for each layer
      # outputs.hidden_states is a tuple of hidden state tensors (one for each layer).
      # The shape of each tensor (hs) is (1, seq_len, dim),
      # so the hidden state for the last token is accessed via hs[0, -1] or hs[0][-1].
      hidden_states_contents[f"Prompt_{i+1}"] = {
          layer_idx: hs for layer_idx, hs in enumerate(outputs.hidden_states)
    }
  return hidden_states_contents

In [None]:
def _process_layer(
    layer_hidden_state: torch.Tensor,
    model_unembed: Any,
    tokenizer: Any
) -> Tuple[int, float, str]:
    # Extract the vector for the last token from the (batch, seq_len, dim) tensor
    last_token_hs = layer_hidden_state[0, -1]

    # Calculate logits
    logits = model_unembed(last_token_hs)

    # Calculate the max value and index at once using torch.max
    max_logit_val, argmax_index_tensor = torch.max(logits, dim=-1)

    argmax_index = argmax_index_tensor.item()
    token = tokenizer.convert_ids_to_tokens(argmax_index)

    return (argmax_index, max_logit_val.item(), token)

def find_argmax(
    hidden_states_by_prompt: List[List[torch.Tensor]],
    model_unembed: Any,
    tokenizer: Any
) -> Tuple[List[List[int]], List[List[float]], List[List[str]]]:
    """
    Calculates the argmax token, index, and logit value from the
    hidden states for each prompt and layer.
    (This function is the same as the previous response and is written correctly.)
    """
    all_indices, all_logits, all_tokens = [], [], []

    for prompt_layers_hs in hidden_states_by_prompt:
        if not prompt_layers_hs:
            all_indices.append([])
            all_logits.append([])
            all_tokens.append([])
            continue

        layer_results = [_process_layer(layer_hs, model_unembed, tokenizer) for layer_hs in prompt_layers_hs]
        indices, logits, tokens = zip(*layer_results)

        all_indices.append(list(indices))
        all_logits.append(list(logits))
        all_tokens.append(list(tokens))

    return all_indices, all_logits, all_tokens

In [None]:
import json
import os
from tqdm import tqdm

def process_and_save_results(
    data,
    prompts,
    model,
    tokenizer,
    model_unembed,
    device,
    output_filepath
):
    """
    Processes data to extract key information from hidden states,
    adds an 'answers' key, and saves the final results to a JSON file.
    Automatically creates the output directory if it does not exist.

    Args:
        data (list): A list of data items to process.
        prompts (list): A list of prompt templates to use.
        model: A Hugging Face transformer model.
        tokenizer: The tokenizer corresponding to the model.
        model_unembed: The unembedding layer of the model.
        device (str): The device to perform computations on ('cpu' or 'cuda').
        output_filepath (str): The file path to save the final JSON results.
    """
    final_results = {}

    print("Starting data processing...")
    # 1. Run model inference and information extraction for each item
    for item in tqdm(data, desc="Processing data"):
        prompt_list = make_tokenized_prompts(prompts, item, tokenizer, device)
        hidden_states_data = process_data(model, prompt_list)
        hidden_states_for_find_argmax = [
            list(prompt_layers.values()) for prompt_layers in hidden_states_data.values()
        ]
        indices, logits, tokens = find_argmax(
            hidden_states_for_find_argmax, model_unembed, tokenizer
        )

        item_result_dict = {}
        prompt_keys = list(hidden_states_data.keys())
        for i, prompt_key in enumerate(prompt_keys):
            item_result_dict[prompt_key] = {
                "index": indices[i],
                "logit": logits[i],
                "token": tokens[i],
            }
        final_results[item['id']] = item_result_dict

    print("All data processing is complete.")
    print("Now adding the 'answers' key and saving to file...")

    try:
        # 2. Add the 'answers' key to the top level of each ID
        for id_key, prompts_data in final_results.items():
            answers = []
            prompt_keys_sorted = sorted(
                [key for key in prompts_data if key.startswith("Prompt_")],
                key=lambda x: int(x.split('_')[1])
            )
            for p_key in prompt_keys_sorted:
                token_list = prompts_data.get(p_key, {}).get("token", [])
                if token_list:
                    answers.append(token_list[-1]) # Get the last token
                else:
                    answers.append(None)

            # Create a new ordered dictionary with 'answers' first
            new_ordered_data = {'answers': answers}
            for key, value in prompts_data.items():
                new_ordered_data[key] = value
            final_results[id_key] = new_ordered_data

        # 3. Check the directory path for the output file, create if it doesn't exist
        output_dir = os.path.dirname(output_filepath)
        if output_dir and not os.path.exists(output_dir): # Check if output_dir is not empty and doesn't exist
            print(f"INFO: Output directory '{output_dir}' does not exist. Creating it.")
            os.makedirs(output_dir)

        # 4. Save the final results to the new file
        with open(output_filepath, 'w', encoding='utf-8') as f:
            json.dump(final_results, f, ensure_ascii=False, indent=4)

        print(f"‚úÖ Final results successfully saved to '{output_filepath}'.")

    except Exception as e:
        print(f"‚ùå An error occurred while saving the file: {e}")

#4. ‚úÖ Save

In [None]:
# Run
process_and_save_results(
        data=data,
        prompts=prompts,
        model=model,
        tokenizer=tokenizer,
        model_unembed=model_unembed,
        device=device,
        output_filepath=save_path + "/data_ilt.json"
)

Îç∞Ïù¥ÌÑ∞ Ï≤òÎ¶¨Î•º ÏãúÏûëÌï©ÎãàÎã§...


Îç∞Ïù¥ÌÑ∞ Ï≤òÎ¶¨ Ï§ë:  99%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñâ| 970/977 [20:56<00:09,  1.30s/it]