# Family Tree Experiment

## Experiment Overview

The family tree experiment tests whether hierarchical concepts like family relationships intrinsically organize themselves into hierarchical structures in LLM latent spaces.

SMDS allows studying embeddings with a hierarchical nature. Concepts such as family trees may intrinsically take such organization in the latent space. These are functional steps to validate this hypothesis:

A dataset of paragraphs describing a family tree is prepared. Unique names are sampled from a known set (see emailed resources) and organized into a family tree. This family tree is then parsed into a text that describes it. E.g. "Anna's parents are Sofia and Luke. Sofia's parents are Agnes and Robert. Luke's parents are George and Daniela." describes a family tree from a child to its grandparents;
The paragraphs are fed to an LLM and activations in correspondance to the names tokens are recorded (see emailed resources);
Manifold search is applied on these activations, using as features quantities like tree distance (1 for parent-child, 2 for grandparent-child, ...). Several hypothesis manifolds are compared and one is identified as the winner.

### Hypothesis
Concepts such as family trees may intrinsically take hierarchical organization in the latent space of LLMs.

### Methodology
1. **Data Generation**: Create paragraphs describing family relationships (parent-child: distance 1, grandparent-child: distance 2)
2. **Activation Recording**: Feed paragraphs to LLMs and record activations at token positions corresponding to family member names
3. **Manifold Search**: Apply SMDS (Supervised Multidimensional Scaling) on activations using tree distance as features
4. **Analysis**: Compare hypothesis shapes (especially Hierarchical vs. Circular) to identify winning manifold



## Data generation

In [None]:
import os
import random

import pandas as pd


def load_names():
    path = "smds/demos/resources/names.csv"
    if not os.path.exists(path):
        path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../resources/names.csv"))

    df = pd.read_csv(path)
    return df["name"].tolist()


def generate_family_tree_data(names, n_samples=50):
    data = []

    for _ in range(n_samples):
        if len(names) < 7:
            raise ValueError("Not enough names in the dataset to generate a depth-2 tree (needs 7 unique names).")

        family_names = random.sample(names, 7)

        child = family_names[0]
        p1, p2 = family_names[1], family_names[2]
        gp1, gp2 = family_names[3], family_names[4]
        gp3, gp4 = family_names[5], family_names[6]

        base_text = (
            f"{child}'s parents are {p1} and {p2}. "
            f"{p1}'s parents are {gp1} and {gp2}. "
            f"{p2}'s parents are {gp3} and {gp4}. "
            f"Therefore, the family's youngest member is {child}."
        )

        for i in range(7):
            target_name = family_names[i]

            text = base_text + f" The family member is {target_name}."

            if i == 0:
                dist = 0
            elif 1 <= i <= 2:
                dist = 1
            elif 3 <= i <= 6:
                dist = 2

            entry = {"text": text, "names": family_names, "target_map": {target_name: dist}}
            data.append(entry)

    return pd.DataFrame(data)

## Models

In [None]:
import random
import numpy as np
import torch
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer

from smds.demos.family_tree.data_generation import generate_family_tree_data, load_names
from smds.shapes.discrete_shapes.hierarchical import HierarchicalShape
from smds.pipeline.discovery_pipeline import discover_manifolds, DEFAULT_SHAPES

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)


def find_last_token_idx(tokenizer, text, target):
    encoding = tokenizer(text, return_offsets_mapping=True, add_special_tokens=True)
    offset_mapping = encoding["offset_mapping"]

    target_start = text.rfind(target)
    if target_start == -1:
        return -1
    target_end = target_start + len(target)

    matched_idx = -1
    for idx, (tok_start, tok_end) in enumerate(offset_mapping):
        if tok_start == tok_end == 0:
            continue

        if not (tok_end <= target_start or tok_start >= target_end):
            matched_idx = idx
            pass

    return matched_idx


def get_activations_all_layers(df, model, tokenizer):
    model.eval()

    layer_data = {}

    print("Recording activations for all layers...")
    for _, row in tqdm(df.iterrows(), total=len(df)):
        text = row["text"]
        target_map = row["target_map"]

        input_ids = tokenizer(text, return_tensors="pt").to(model.device)

        with torch.no_grad():
            outputs = model(**input_ids, output_hidden_states=True)

        for layer_idx, hidden_state_tensor in enumerate(outputs.hidden_states):
            if layer_idx not in layer_data:
                layer_data[layer_idx] = {"activations": [], "distances": []}

            hidden_state = hidden_state_tensor.squeeze(0).cpu().numpy()

            for name, dist in target_map.items():
                idx = find_last_token_idx(tokenizer, text, name)
                if idx != -1 and idx < len(hidden_state):
                    vect = hidden_state[idx]
                    layer_data[layer_idx]["activations"].append(vect)
                    layer_data[layer_idx]["distances"].append(dist)

    final_data = {}
    for layer_idx, data in layer_data.items():
        if len(data["activations"]) > 0:
            final_data[layer_idx] = (
                np.array(data["activations"]),
                np.array(data["distances"])
            )

    return final_data


def main():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model_name = "gpt2"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

    all_names = load_names()
    df = generate_family_tree_data(all_names, n_samples=50)

    all_layers_data = get_activations_all_layers(df, model, tokenizer)

    hierarchical_shape = HierarchicalShape(level_distances=[5.0, 1.0])

    summary_results = []

    for layer_idx in sorted(all_layers_data.keys()):
        print(f"Analyzing Layer {layer_idx}")
        X, y = all_layers_data[layer_idx]
        y = y.astype(np.float64)

        y_hier = np.zeros((len(y), 2), dtype=np.float64)
        y_hier[:, 1] = y

        res_hier, _ = discover_manifolds(
            X,
            y_hier,
            shapes=[hierarchical_shape],
            experiment_name=f"family_tree_layer_{layer_idx}_hier",
            model_name="gpt2",
            n_folds=5,
            n_jobs=-1,
            save_results=False,
            create_visualization=False,
        )

        if not res_hier.empty:
            row = res_hier.iloc[0]
            summary_results.append({
                "layer": layer_idx,
                "shape": "HierarchicalShape",
                "stress": row["mean_scale_normalized_stress"]
            })

        res_shapes, _ = discover_manifolds(
            X,
            y,
            shapes=DEFAULT_SHAPES,
            experiment_name=f"family_tree_layer_{layer_idx}_shapes",
            model_name="gpt2",
            n_folds=5,
            n_jobs=-1,
            save_results=False,
            create_visualization=False,
        )

        if not res_shapes.empty:
            for _, row in res_shapes.iterrows():
                summary_results.append({
                    "layer": layer_idx,
                    "shape": row["shape"],
                    "stress": row["mean_scale_normalized_stress"]
                })

    summary_df = pd.DataFrame(summary_results)
    print(summary_df)

    plt.figure(figsize=(12, 8))
    for shape_name in summary_df["shape"].unique():
        subset = summary_df[summary_df["shape"] == shape_name]
        subset = subset.sort_values("layer")
        plt.plot(subset["layer"], subset["stress"], marker='o', label=shape_name)

    plt.xlabel("Layer")
    plt.ylabel("Stress")
    plt.title("Stress vs Layer by Shape")
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True)
    plt.tight_layout()
    plt.savefig("stress_vs_layer.png")
    print("Plot saved to stress_vs_layer.png")


main()


In [None]:
import random

import numpy as np
import torch
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

from smds.demos.family_tree.data_generation import generate_family_tree_data, load_names

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)


def find_last_token_idx(tokenizer, text, target):
    encoding = tokenizer(text, return_offsets_mapping=True, add_special_tokens=True)
    offset_mapping = encoding["offset_mapping"]

    target_start = text.rfind(target)
    if target_start == -1:
        return -1
    target_end = target_start + len(target)

    matched_idx = -1
    for idx, (tok_start, tok_end) in enumerate(offset_mapping):
        if tok_start == tok_end == 0:
            continue

        if not (tok_end <= target_start or tok_start >= target_end):
            matched_idx = idx
            pass

    return matched_idx


def get_activations(df, model, tokenizer, layer=-1):
    model.eval()
    activations = []
    distances = []

    print("Recording activations...")
    for _, row in tqdm(df.iterrows(), total=len(df)):
        text = row["text"]
        target_map = row["target_map"]

        input_ids = tokenizer(text, return_tensors="pt").to(model.device)

        with torch.no_grad():
            outputs = model(**input_ids, output_hidden_states=True)

        hidden_state = outputs.hidden_states[layer].squeeze(0).cpu().numpy()

        for name, dist in target_map.items():
            idx = find_last_token_idx(tokenizer, text, name)
            if idx != -1 and idx < len(hidden_state):
                vect = hidden_state[idx]
                activations.append(vect)
                distances.append(dist)
            else:
                pass

    return np.array(activations), np.array(distances)


def main():
    print("Setting up experiment...")
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
    )

    model_name = "meta-llama/Llama-3.1-8B-Instruct"
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=bnb_config, device_map="auto")
    except Exception as e:
        print(f"Error loading model {model_name}: {e}")
        raise e

    print("Generating data...")
    all_names = load_names()
    df = generate_family_tree_data(all_names, n_samples=50)
    print(f"Generated {len(df)} family trees.")
    print(f"Sample text: {df.iloc[0]['text']}")

    X, y = get_activations(df, model, tokenizer, layer=-1)

    print(f"Collected data shape: X={X.shape}, y={y.shape}")

    print("\nRunning Manifold Discovery Pipeline...")

    from smds.pipeline.discovery_pipeline import discover_manifolds

    y = y.astype(np.float64)
    X = X.astype(np.float64)

    results_df, save_path = discover_manifolds(
        X,
        y,
        experiment_name="family_tree_experiment",
        model_name="llama",
        n_folds=5,
        n_jobs=-1,
        save_results=True,
        create_visualization=True,
    )

    print("\nPipeline Results:")
    print(results_df[["shape", "mean_scale_normalized_stress", "std_scale_normalized_stress", "error"]])

    if not results_df.empty:
        winner = results_df.iloc[0]
        print(f"\nWinner: {winner['shape']} (Mean Score: {winner['mean_scale_normalized_stress']:.4f})")

    print("\nRunning Hierarchical Analysis...")
    from smds.shapes.discrete_shapes.hierarchical import HierarchicalShape

    y_hier = np.zeros((len(y), 2), dtype=np.float64)
    y_hier[:, 1] = y

    hierarchical_shape = HierarchicalShape(level_distances=[5.0, 1.0])

    results_hier, _ = discover_manifolds(
        X,
        y_hier,
        shapes=[hierarchical_shape],
        save_path=save_path,  # Append to existing results
        n_folds=5,
        n_jobs=-1,
        save_results=True,
        create_visualization=True,
    )

    print("\nHierarchical Results:")
    print(results_hier[["shape", "mean_scale_normalized_stress", "std_scale_normalized_stress", "error"]])


main()


In [None]:
import random

import numpy as np
import torch
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

from smds.demos.family_tree.data_generation import generate_family_tree_data, load_names

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)


def find_last_token_idx(tokenizer, text, target):
    encoding = tokenizer(text, return_offsets_mapping=True, add_special_tokens=True)
    offset_mapping = encoding["offset_mapping"]

    target_start = text.rfind(target)
    if target_start == -1:
        return -1
    target_end = target_start + len(target)

    matched_idx = -1
    for idx, (tok_start, tok_end) in enumerate(offset_mapping):
        if tok_start == tok_end == 0:
            continue

        if not (tok_end <= target_start or tok_start >= target_end):
            matched_idx = idx
            pass

    return matched_idx


def get_activations(df, model, tokenizer, layer=-1):
    model.eval()
    activations = []
    distances = []

    print("Recording activations...")
    for _, row in tqdm(df.iterrows(), total=len(df)):
        text = row["text"]
        target_map = row["target_map"]

        input_ids = tokenizer(text, return_tensors="pt").to(model.device)

        with torch.no_grad():
            outputs = model(**input_ids, output_hidden_states=True)

        hidden_state = outputs.hidden_states[layer].squeeze(0).cpu().numpy()

        for name, dist in target_map.items():
            idx = find_last_token_idx(tokenizer, text, name)
            if idx != -1 and idx < len(hidden_state):
                vect = hidden_state[idx]
                activations.append(vect)
                distances.append(dist)
            else:
                pass

    return np.array(activations), np.array(distances)


def main():
    print("Setting up experiment...")
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_use_double_quant=True,
    )

    model_name = "Qwen/Qwen2.5-7B-Instruct"
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=bnb_config, device_map="auto")

    print("Generating data...")
    all_names = load_names()
    df = generate_family_tree_data(all_names, n_samples=50)
    print(f"Generated {len(df)} family trees.")
    print(f"Sample text: {df.iloc[0]['text']}")

    X, y = get_activations(df, model, tokenizer, layer=-1)

    print(f"Collected data shape: X={X.shape}, y={y.shape}")

    print("\nRunning Manifold Discovery Pipeline...")

    from smds.pipeline.discovery_pipeline import discover_manifolds

    y = y.astype(np.float64)
    X = X.astype(np.float64)

    results_df, save_path = discover_manifolds(
        X,
        y,
        experiment_name="family_tree_experiment",
        model_name="qwen",
        n_folds=5,
        n_jobs=-1,
        save_results=True,
        create_visualization=True,
    )

    print("\nPipeline Results:")
    print(results_df[["shape", "mean_scale_normalized_stress", "std_scale_normalized_stress", "error"]])

    if not results_df.empty:
        winner = results_df.iloc[0]
        print(f"\nWinner: {winner['shape']} (Mean Score: {winner['mean_scale_normalized_stress']:.4f})")

    print("\nRunning Hierarchical Analysis...")
    from smds.shapes.discrete_shapes.hierarchical import HierarchicalShape

    y_hier = np.zeros((len(y), 2), dtype=np.float64)
    y_hier[:, 1] = y

    hierarchical_shape = HierarchicalShape(level_distances=[5.0, 1.0])

    results_hier, _ = discover_manifolds(
        X,
        y_hier,
        shapes=[hierarchical_shape],
        save_path=save_path,  # Append to existing results
        n_folds=5,
        n_jobs=-1,
        save_results=True,
        create_visualization=True,
    )

    print("\nHierarchical Results:")
    print(results_hier[["shape", "mean_scale_normalized_stress", "std_scale_normalized_stress", "error"]])



main()


### Conclusion
- **GPT-2:** 0.694 (Circular) vs. 0.749 (Hierarchical)
- **Llama:** 0.841 (Circular) vs. 0.878 (Hierarchical)
- **Qwen:** 0.787 (Circular) vs. 0.834 (Hierarchical)
