In [None]:
import os
import sys
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, GenerationConfig
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from huggingface_hub import notebook_login

# Suppress terminal output for pip install commands
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')

# Install necessary libraries
!pip install transformers[torch] datasets
!pip install -U bitsandbytes
!pip install -U accelerate

from datasets import load_dataset

# Restore terminal output after installation
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__

# Hugging Face login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
# Model
MODEL_NAME = "meta-llama/Meta-Llama-3-8B-Instruct"

def load_model(model_name):
  quantization_config = BitsAndBytesConfig(load_in_4bit=True,
                                          bnb_4bit_compute_dtype=torch.bfloat16,
                                          bnb_4bit_use_double_quant=True,
                                          bnb_4bit_quant_type= "nf4"
                                          )

  quantized_model = AutoModelForCausalLM.from_pretrained(
                    model_name,
                    device_map="auto",
                    torch_dtype=torch.bfloat16,
                    quantization_config=quantization_config
                    )

  tokenizer = AutoTokenizer.from_pretrained(model_name)
  tokenizer.pad_token = tokenizer.eos_token
  return quantized_model, tokenizer

quantized_model, tokenizer = load_model(MODEL_NAME)
quantized_model.eval()

def get_response(prompt):
  # Hyper parameters
  temperature = 0.6
  top_p = 0.9
  top_k = 50
  repetition_penalty = 1.2
  max_new_tokens = 256

  generation_config = GenerationConfig(
      temperature=temperature,
      top_p=top_p,
      top_k=top_k,
      repetition_penalty=repetition_penalty,
      do_sample=True,
      max_new_tokens=max_new_tokens
  )

  inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
  outputs = quantized_model.generate(**inputs, generation_config=generation_config)
  response = tokenizer.decode(outputs[0], skip_special_tokens=True)

  return response


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/187 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/51.0k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/73.0 [00:00<?, ?B/s]

In [None]:
import pandas as pd

def design_prompt(example, task_type):
    if task_type == "regression":
        return f"What are the charges of health insurance for a person with age: {example['age']}, sex: {example['sex']}, bmi: {example['bmi']}, children: {example['children']}, smoker: {example['smoker']}, region: {example['region']}?"
    elif task_type == "classification":
        return f"This is the movie review: {example['text']}, how is the sentiment of this movie, 0 (negative) or 1 (positive)?"

def extract_embeddings(prompt, model, tokenizer, layer):
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    with torch.no_grad():
        outputs = model(**inputs, output_hidden_states=True)
    hidden_states = outputs.hidden_states
    selected_layer_embedding = hidden_states[layer][:, -1, :].squeeze().float().cpu().numpy()
    return selected_layer_embedding

def prepare_data(dataset, task_type, model, tokenizer):
    embeddings = []
    targets = []

    # Check if the dataset is a pandas DataFrame or a list of dictionaries
    if isinstance(dataset, pd.DataFrame):
        # Iterate through the DataFrame for regression
        for _, row in dataset.iterrows():
            example = row.to_dict()  # Convert row to dictionary-like structure

            # Create prompt for each example
            prompt = design_prompt(example, task_type)

            # Extract embeddings
            final_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=-1)
            embeddings.append(final_layer_embedding)

            if task_type == "regression":
                targets.append(float(example["charges"]))  # Regression target
    else:
        # Assume it is a dictionary-like dataset for classification
        for example in dataset:
            prompt = design_prompt(example, task_type)

            # Extract embeddings
            final_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=-1)
            embeddings.append(final_layer_embedding)

            if task_type == "classification":
                targets.append(example["label"])  # Classification target

    return embeddings, targets


# Example usage:

# For classification dataset (dictionary-like structure)
classification_dataset = load_dataset("stanfordnlp/imdb")["train"].shuffle(seed=42).select(range(200))
clf_embeddings, clf_targets = prepare_data(classification_dataset, "classification", quantized_model, tokenizer)
clf_train_embeddings, clf_test_embeddings, clf_train_targets, clf_test_targets = train_test_split(clf_embeddings, clf_targets, test_size=0.2, random_state=42)
clf_model = LogisticRegression(max_iter=1000)
clf_model.fit(clf_train_embeddings, clf_train_targets)

# For regression dataset (pandas DataFrame)
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')
df = pd.read_csv('/content/drive/My Drive/insurance.csv')
df = df.sample(frac=1, random_state=42).iloc[:200]
reg_embeddings, reg_targets = prepare_data(df, "regression", quantized_model, tokenizer)
reg_train_embeddings, reg_test_embeddings, reg_train_targets, reg_test_targets = train_test_split(
    reg_embeddings, reg_targets, test_size=0.2, random_state=42)

# Train a Linear Regression model
reg_model = LinearRegression()
reg_model.fit(reg_train_embeddings, reg_train_targets)


In [None]:
# Step 8: Evaluate the Models
def evaluate_regression_model(model, embeddings, targets):
    predictions = model.predict(embeddings)
    mse = mean_squared_error(targets, predictions)
    r2 = r2_score(targets, predictions)
    print(f"Regression Model - Mean Squared Error: {mse}, R² Score: {r2}")
    return mse, r2

def evaluate_classification_model(model, embeddings, targets):
    predictions = model.predict(embeddings)
    accuracy = accuracy_score(targets, predictions)
    f1 = f1_score(targets, predictions, average='weighted')
    print(f"Classification Model - Accuracy: {accuracy}, F1 Score: {f1}")
    return accuracy, f1

# acc, f1 = evaluate_classification_model(clf_model, clf_test_embeddings, clf_test_targets)
# mse, r2 = evaluate_regression_model(reg_model, reg_test_embeddings, reg_test_targets)

In [None]:
# acc, f1

In [None]:
# mse, r2

In [None]:
def compare_layers(dataset, model, tokenizer, task_type):
    first_layer_results = []
    mid_layer_results = []
    final_layer_results = []

    num_layers = model.config.num_hidden_layers
    mid_layer = num_layers // 2

    # Check if the dataset is a pandas DataFrame or dictionary-like
    if isinstance(dataset, pd.DataFrame):
        # If DataFrame, iterate over rows and convert them to dict
        for _, row in dataset.iterrows():
            example = row.to_dict()  # Convert row to dictionary-like structure

            # Generate prompt from the example
            prompt = design_prompt(example, task_type)

            # Extract embeddings from different layers
            first_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=0)
            mid_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=mid_layer)
            final_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=-1)

            # Append the results
            first_layer_results.append(first_layer_embedding)
            mid_layer_results.append(mid_layer_embedding)
            final_layer_results.append(final_layer_embedding)

    else:
        # If it's a dictionary-like dataset
        for example in dataset:
            # Generate prompt
            prompt = design_prompt(example, task_type)

            # Extract embeddings from different layers
            first_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=0)
            mid_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=mid_layer)
            final_layer_embedding = extract_embeddings(prompt, model, tokenizer, layer=-1)

            # Append the results
            first_layer_results.append(first_layer_embedding)
            mid_layer_results.append(mid_layer_embedding)
            final_layer_results.append(final_layer_embedding)

    return first_layer_results, mid_layer_results, final_layer_results


In [None]:
results = {}

# Comparing embeddings across layers for classification
first_layer_embeddings, mid_layer_embeddings, final_layer_embeddings = compare_layers(classification_dataset, quantized_model, tokenizer, "classification")

# Re-train and evaluate classification model on different layers
print("\nEvaluating Classification Model Across Layers:")
clf_train_embeddings, clf_test_embeddings, clf_train_targets, clf_test_targets = train_test_split(first_layer_embeddings, clf_targets, test_size=0.2, random_state=42)
clf_model.fit(clf_train_embeddings, clf_train_targets)
evaluate_classification_model(clf_model, clf_test_embeddings, clf_test_targets)
acc, f1 = evaluate_classification_model(clf_model, clf_test_embeddings, clf_test_targets)
results["first_layer"] = {"accuracy": acc, "f1": f1}

clf_train_embeddings, clf_test_embeddings, clf_train_targets, clf_test_targets = train_test_split(mid_layer_embeddings, clf_targets, test_size=0.2, random_state=42)
clf_model.fit(clf_train_embeddings, clf_train_targets)
evaluate_classification_model(clf_model, clf_test_embeddings, clf_test_targets)
acc, f1 = evaluate_classification_model(clf_model, clf_test_embeddings, clf_test_targets)
results["mid_layer"] = {"accuracy": acc, "f1": f1}

clf_train_embeddings, clf_test_embeddings, clf_train_targets, clf_test_targets = train_test_split(final_layer_embeddings, clf_targets, test_size=0.2, random_state=42)
clf_model.fit(clf_train_embeddings, clf_train_targets)
evaluate_classification_model(clf_model, clf_test_embeddings, clf_test_targets)
acc, f1 = evaluate_classification_model(clf_model, clf_test_embeddings, clf_test_targets)
results["final_layer"] = {"accuracy": acc, "f1": f1}

print("\nResults across layers:", results)

In [None]:
results

{'first_layer': {'accuracy': 0.5, 'f1': 0.3333333333333333},
 'mid_layer': {'accuracy': 0.975, 'f1': 0.9749843652282676},
 'final_layer': {'accuracy': 0.975, 'f1': 0.9749843652282676}}

In [None]:
results_reg = {}
# Comparing embeddings across layers for regression
first_layer_embeddings, mid_layer_embeddings, final_layer_embeddings = compare_layers(df, quantized_model, tokenizer, "regression")

# Re-train and evaluate regression model on different layers
print("\nEvaluating Regression Model Across Layers:")
reg_train_embeddings, reg_test_embeddings, reg_train_targets, reg_test_targets = train_test_split(first_layer_embeddings, reg_targets, test_size=0.2, random_state=42)
reg_model.fit(reg_train_embeddings, reg_train_targets)
evaluate_regression_model(reg_model, reg_test_embeddings, reg_test_targets)
mse, r2 = evaluate_regression_model(reg_model, reg_test_embeddings, reg_test_targets)
results_reg["first_layer"] = {"mse": mse, "r2": r2}

reg_train_embeddings, reg_test_embeddings, reg_train_targets, reg_test_targets = train_test_split(mid_layer_embeddings, reg_targets, test_size=0.2, random_state=42)
reg_model.fit(reg_train_embeddings, reg_train_targets)
evaluate_regression_model(reg_model, reg_test_embeddings, reg_test_targets)
mse, r2 = evaluate_regression_model(reg_model, reg_test_embeddings, reg_test_targets)
results_reg["mid_layer"] = {"mse": mse, "r2": r2}

reg_train_embeddings, reg_test_embeddings, reg_train_targets, reg_test_targets = train_test_split(final_layer_embeddings, reg_targets, test_size=0.2, random_state=42)
reg_model.fit(reg_train_embeddings, reg_train_targets)
evaluate_regression_model(reg_model, reg_test_embeddings, reg_test_targets)
mse, r2 = evaluate_regression_model(reg_model, reg_test_embeddings, reg_test_targets)
results_reg["final_layer"] = {"mse": mse, "r2": r2}

print("\nResults across layers:", results_reg)


In [None]:
results_reg

{'first_layer': {'mse': 138580525.58468077, 'r2': -0.0024144008664006567},
 'mid_layer': {'mse': 39278848.786163576, 'r2': 0.715878666886391},
 'final_layer': {'mse': 44187610.1427289, 'r2': 0.6803714189994519}}