In [None]:
# Import Required Packages
import torch
import os
import json
import sys
import re
import random
import importlib.util
from typing import *
from tqdm import tqdm 
from typing import List

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import ListedColormap


from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments
from peft import get_peft_model, LoraConfig
from datasets import Dataset


from JS_Architects import *

# Summary

We will use Architects model

1) Implement Custom pipeline, no prompting just prediction with shrekking of the embedding 

2) Automated Forsequence Classification
    1) No prompting
    2) Prompt: "Classify with labels encode in binary"
    3) Prompt: "Clasify with list label: ["depth",...]"
    4) Prompt "Classify with list label: ["depth",...]. Depth represents..., Containment represents...."

# Custom Pipeline 

## 1. Prepare Data

### Load 


In [2]:
with open("perceptions_training.json", "r") as f:
    dic_training = json.load(f)

with open("perceptions_testing.json", "r") as f:
    dic_testing = json.load(f)

label_list = [
    "containment",
    "depth",
    "symmetry",
    "categorical",
    "spatial-Orientation",
    "spatial-Ordinal",
    "similarity",
    "quantitative",
    "replication",
    "figure-Ground",
    "continuity",
    "size",
    "closure",
    "centroid",
    "topological",
    "motion",
]

### Format

In [3]:
from datasets import Dataset

def prepare_data_for_multilabel_classification(
    dic_training,
    instruction,
    label_list,
    inp_prefix="<I>",
    out_prefix="<O>",
    arr_sep="\n",
    exa_sep="\n---\n",
    bos_token="<|begin_of_text|>",
    eos_token="<|end_of_text|>"
):
    llama_data = []

    # Create a mapping from label to index
    label_to_index = {label: i for i, label in enumerate(label_list)}

    for entry_id, content in dic_training.items():
        # Extract perceptions (labels) and encode them as a binary vector
        perceptions = content.get("perceptions", [])
        label_vector = labels_to_binary(label_list, perceptions)

        # Combine train and test examples
        examples = content.get("example", {}).get("train", []) + content.get("example", {}).get("test", [])

        # Format examples into a single input string
        formatted_examples = []
        for example in examples:
            input_data = f"{inp_prefix}{format_array(example['input'], arr_sep)}"
            output_data = f"{out_prefix}{format_array(example['output'], arr_sep)}{eos_token}"
            formatted_examples.append(f"{input_data}{exa_sep}{output_data}")

        # Combine all examples into one input text and prepend the BOS token
        combined_text = f"{exa_sep.join(formatted_examples)}"

        # Add the structured data for fine-tuning
        llama_data.append({
            "instruction": f"{instruction}",
            "input": combined_text,
            "output": label_vector,  # Multi-label as binary vector
        })

    return llama_data

def format_array(array, arr_sep="\n"):
    """
    Helper function to format a 2D array into a string with row-wise separation.
    """
    return arr_sep.join([" ".join(map(str, row)) for row in array])

def labels_to_binary(label_list, input_labels):
    """
    Convert perceptions into a binary vector based on the label list.
    Handles both single strings and lists of strings for input_labels.
    """
    # Ensure input_labels is treated as a list
    if isinstance(input_labels, str):
        input_labels = [input_labels]
    
    # Create a set of lowercase input labels
    input_set = set(label.lower() for label in input_labels)
    
    # Generate the binary vector
    return [1 if label.lower() in input_set else 0 for label in label_list]


In [None]:
instruction0 = ""
instruction1 = "Classify the relationship between the input and output sequences based on perceptions"

# List
llama_data = prepare_data_for_multilabel_classification(dic_training,instruction0,label_list)

# Dict
llama_data_dict = {
    "instruction": [item["instruction"] for item in llama_data],
    "input": [item["input"] for item in llama_data],
    "output": [item["output"] for item in llama_data],
}

# Dataset
llama_data_dataset = Dataset.from_dict(llama_data_dict)

llama_data_dataset[0]


In [None]:
llama_data_list_testing = prepare_data_for_multilabel_classification(dic_testing,instruction0, label_list)

# Restructure llama_data
llama_data_dict_testing = {
    "instruction": [item["instruction"] for item in llama_data_list_testing],
    "input": [item["input"] for item in llama_data_list_testing],
    "output": [item["output"] for item in llama_data_list_testing],
}

llama_data_dataset_testing = Dataset.from_dict(llama_data_dict_testing)
llama_data_dataset_testing[0]

## 2. Load Model and Tokenizer

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-3B")
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-3B")


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSequenceClassification
import torch
from peft import get_peft_model, LoraConfig

# Install bitsandbytes if not already installed
# !pip install bitsandbytes


# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM

# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-3B")
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-3B")

print("Predefined Special Tokens:")
print(f"EOS Token: {tokenizer.bos_token}, ID: {tokenizer.bos_token_id}")
print(len(tokenizer.vocab))

## 3. Shrink the tokenizer and embedding 

In [None]:
special_tokens_dict = {
    "input": "<I>",
    "output": "<O>",
    "array_sep": "\n",
    "example_sep": "\n---\n",
    "eos_token": "<|end_of_text|>",
    "bos_token": "<|begin_of_text|>",
    "pad_token": "[PAD]"
}

# Add special tokens
tokenizer.add_special_tokens({
    "additional_special_tokens": [
        special_tokens_dict["input"],
        special_tokens_dict["output"],
        special_tokens_dict["array_sep"],
        special_tokens_dict["example_sep"]
    ],
    "eos_token": special_tokens_dict["eos_token"],
    "bos_token": special_tokens_dict["bos_token"],
    "pad_token": special_tokens_dict["pad_token"]
})

# Set the tokenizer pad token explicitly
tokenizer.pad_token = special_tokens_dict["pad_token"]

# Check the updated tokens
print(f"Special Tokens: {tokenizer.special_tokens_map}")
print(f"Vocabulary Size: {len(tokenizer)}")

# Resize model embeddings
model.resize_token_embeddings(len(tokenizer))


In [None]:
def build_corpus_for_shrinking(hf_dataset):
    """
    Concatenate the 'input' + 'instruction' from the dataset
    to ensure all relevant tokens appear.
    """
    corpus_list = []
    for sample in hf_dataset:
        text = (sample["instruction"] or "") + " " + (sample["input"] or "")
        corpus_list.append(text)
    # Combine into one big string
    corpus = "\n".join(corpus_list)
    return corpus

corpus = build_corpus_for_shrinking(llama_data_dataset)

len(corpus)

shrink_embeddings(
            model=model,
            tokenizer=tokenizer,
            corpus=corpus,                  # ensures relevant tokens are kept
            keep_special_tokens=True,
            keep_normalizer=False,
            keep_token_order=True
        )

print("Tokenizer size after shrinking:", len(tokenizer.vocab))

tokenizer.vocab

In [None]:
test_sequence = llama_data_dataset[0]["input"]

encoded = tokenizer.encode(test_sequence)
decoded = tokenizer.decode(encoded)
print("Encoded:", encoded)
print("Decoded:", decoded)

## 4. Apply LoRA to the Shrunk Model

In [None]:
from peft import get_peft_model, LoraConfig
import torch.nn as nn
import torch

# Configure LoRA
peft_config = LoraConfig(
    r=64,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", "embed_tokens", "lm_head"],
    lora_alpha=16,
    lora_dropout=0.0,
    bias="none"
)


# Wrap the shrunk model with LoRA
model_shrinked = get_peft_model(model, peft_config)

# Make sure to unfreeze embeddings if you want to train them directly 
# (LoRA on embed_tokens will still add ranks; but if you want the base embedding 
#  weights to be trainable, do something like):
for param in model_shrinked.get_input_embeddings().parameters():
    param.requires_grad = True


## 5. Full architecture

### Add Classification head

In [11]:
from transformers import PreTrainedModel
from transformers.modeling_utils import unwrap_model
import torch.nn as nn

class LLMWithClassificationHead(PreTrainedModel):
    def __init__(self, base_model, config, num_labels):
        super().__init__(config)
        if isinstance(base_model, LLMWithClassificationHead):
            raise ValueError("base_model cannot be an instance of LLMWithClassificationHead")
        
        self.base_model_1 = base_model
        self.num_labels = num_labels
        hidden_size = config.hidden_size

        self.classifier = nn.Linear(hidden_size, num_labels)

        # Initialize weights and apply final processing
        self.post_init()
    def forward(self, input_ids, output_hidden_states=True, return_dict=True, attention_mask=None, labels=None):
        outputs = self.base_model_1(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict
        )
        last_hidden = outputs.hidden_states[-1]
        pooled = last_hidden[:, -1, :]  # Taking the last token's hidden state
        logits = self.classifier(pooled)

        loss = None
        if labels is not None:
            loss_fct = nn.BCEWithLogitsLoss()
            loss = loss_fct(logits, labels.float())

        return {
            "logits": logits,
            "loss": loss,
            "hidden_states": outputs.hidden_states,
        }


In [12]:
from transformers import AutoModelForCausalLM, AutoConfig

# Load configuration and base model
base_config = AutoConfig.from_pretrained("meta-llama/Llama-3.2-3B")

# Define number of labels for classification
num_labels = 16

# Instantiate the custom model
model_classification = LLMWithClassificationHead(
    base_model=model,
    config=base_config,
    num_labels=num_labels
)


### Tokenize data

In [None]:
lengths = [len(tokenizer.encode(example["input"])) for example in llama_data_dataset]
print("Max length:", max(lengths), "95th percentile:", np.percentile(lengths, 95))


In [None]:
def tokenize_function(examples):
    tokenized = tokenizer(
        examples["input"],
        padding="longest",
        truncation=True,
        max_length=7000  # Reduced from 8192 to 2048
    )
    return {
        "input_ids": tokenized["input_ids"],
        "attention_mask": tokenized["attention_mask"],
        "labels": examples["output"]
    }


# Keep only the input output no instructions for now
final_dataset = llama_data_dataset.remove_columns(["instruction"])
final_dataset_testing = llama_data_dataset_testing.remove_columns(["instruction"])

tokenized_final_dataset = final_dataset.map(tokenize_function, batched=True)
tokenized_final_test = final_dataset_testing.map(tokenize_function, batched=True)

tokenized_final_dataset

### Define Data Collector

class DataCollatorWithLabels:
    def __call__(self, features):
        input_ids = torch.tensor([f["input_ids"] for f in features], dtype=torch.long)
        attention_mask = torch.tensor([f["attention_mask"] for f in features], dtype=torch.long)
        labels = torch.tensor([f["labels"] for f in features], dtype=torch.float32)
        return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}

data_collator = DataCollatorWithLabels()


## 6. Train

In [15]:
from transformers import Trainer, TrainingArguments

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        # Extract labels
        labels = inputs.pop("labels", None)
        if labels is None:
            raise ValueError("Labels are missing in inputs")

        
        # Forward pass with additional arguments
        outputs = model(**inputs, labels=labels)
        
        # Extract loss
        loss = outputs["loss"]
        
        # Return loss and outputs if required
        return (loss, outputs) if return_outputs else loss


### Training

In [None]:
# %% [markdown]
# ## 6. Train - Optimized

# %%
from transformers import Trainer, TrainingArguments, DataCollatorWithPadding
import torch
import gc

# Clear cache before training
gc.collect()
torch.cuda.empty_cache()

# Define TrainingArguments with optimizations
training_args = TrainingArguments(
    output_dir="./JS_finetuned_model",
    evaluation_strategy="no",
    learning_rate=1e-4,  # instead of 1e-4
    per_device_train_batch_size=1,  # Reduced batch size
    per_device_eval_batch_size=1,  # Reduced batch size
    num_train_epochs=3,  # Increased epochs if feasible
    fp16=False,
    gradient_accumulation_steps=8,  # Increased gradient accumulation
    #gradient_checkpointing=True,  # Enable gradient checkpointing
    save_total_limit=1,
    save_strategy="steps",
    save_steps=500,
    logging_dir="./logs",
    logging_steps=50,
    report_to="wandb",  # Log to W&B
    # Add any other necessary arguments
)

# Use built-in data collator with dynamic padding
data_collator = DataCollatorWithPadding(tokenizer, padding='longest')

# Initialize Trainer
trainer = CustomTrainer(
    model=model_classification,
    args=training_args,
    train_dataset=tokenized_final_dataset,
    eval_dataset=tokenized_final_test,
    tokenizer=tokenizer,
    data_collator=data_collator
)

# Start Training
trainer.train()

# Clear memory after training
gc.collect()
torch.cuda.empty_cache()


In [None]:
trainer.save_model(output_dir="working/models",safe_serialization=False)


In [None]:
small_dataset = tokenized_final_test.select(range(10))

small_dataset

In [None]:
# Clear memory after training
gc.collect()
torch.cuda.empty_cache()

# Run predictions on the evaluation dataset
predictions = trainer.predict(tokenized_final_test)

# Extract logits, true labels, and metrics
logits = predictions.predictions  # Model outputs
true_labels = predictions.label_ids  # True labels from the dataset
metrics = predictions.metrics  # Evaluation metrics

# Print metrics
print("Evaluation Metrics:", metrics)

# Clear memory after training
gc.collect()
torch.cuda.empty_cache()

What I find weird is that it has two outputs

In [None]:
for i in range(len(logits[0])):
    exp_scores = np.exp(logits[0][i])
    probabilities = exp_scores / np.sum(exp_scores)
    pred_label_indices = np.argsort(probabilities)[-2:]  # get 2 largest
    pred_label_indices
    print(f"Example {i}")
    print("Predicted Labels:", [label_list[idx] for idx in pred_label_indices])
    print("True Labels:", [label_list[idx] for idx, val in enumerate(true_labels[i]) if val == 1])
    
    #print("True Labels:", true_labels[i])
    #print("Probabilities:", probabilities)

In [None]:
exp_scores = np.exp(logits[0][i])
probabilities = exp_scores / np.sum(exp_scores)
probabilities
predicted_label_index = np.argmax(probabilities)
predicted_label_index

pred_label_indices = np.argsort(probabilities)[-2:]  # get 2 largest
pred_label_indices

In [None]:
import torch
from torch.nn.utils.rnn import pad_sequence

# Convert to PyTorch tensors and pad
logits_padded = pad_sequence([torch.tensor(logit) for logit in logits], batch_first=True)
print("Padded logits shape:", logits_padded.shape)


In [None]:
print("Logits shape:", logits.shape)
print("Logits:", logits)
