# Model Emphasis and Analysis Demo

This notebook demonstrates the key features of the model emphasis, analysis, and generation capabilities.

In [1]:
import mlx.core as mx
import mlx.nn as nn

from mi_experiments.utils.loading import load

import re
from dataclasses import dataclass
from typing import Dict, Optional, Union

from mlx_lm.tokenizer_utils import TokenizerWrapper

import pandas as pd
import numpy as np
from sklearn.decomposition import PCA

import plotly.graph_objects as go


  from .autonotebook import tqdm as notebook_tqdm


In [2]:

# Load the model
model, tokenizer = load("mlx-community/Mistral-7B-Instruct-v0.3-4bit")

# Utility functions
def create_additive_causal_mask(N: int, offset: int = 0):
    rinds = mx.arange(offset + N)
    linds = mx.arange(offset, offset + N) if offset else rinds
    mask = linds[:, None] < rinds[None]
    return mask * -1e9

@dataclass
class ModelArgs:
    model_type: str
    hidden_size: int
    num_hidden_layers: int
    intermediate_size: int
    num_attention_heads: int
    rms_norm_eps: float
    vocab_size: int
    head_dim: Optional[int] = None
    max_position_embeddings: Optional[int] = None
    num_key_value_heads: Optional[int] = None
    attention_bias: bool = False
    mlp_bias: bool = False
    rope_theta: float = 10000
    rope_traditional: bool = False
    rope_scaling: Optional[Dict[str, Union[float, str]]] = None
    tie_word_embeddings: bool = True

    @classmethod
    def from_model(cls, model: nn.Module):
        return cls(
            model_type=model.model_type,
            hidden_size=model.args.hidden_size,
            num_hidden_layers=model.args.num_hidden_layers,
            intermediate_size=model.args.intermediate_size,
            num_attention_heads=model.args.num_attention_heads,
            rms_norm_eps=model.args.rms_norm_eps,
            vocab_size=model.args.vocab_size,
            head_dim=model.head_dim,
            num_key_value_heads=model.n_kv_heads,
            tie_word_embeddings=model.args.tie_word_embeddings
        )

# Extract model arguments
model_args = ModelArgs.from_model(model)

# Ensure tokenizer is wrapped
if not isinstance(tokenizer, TokenizerWrapper):
    tokenizer = TokenizerWrapper(tokenizer)

def format_prompt(prompt: str) -> str:
    return f"[INST] {prompt} [/INST]"

def extract_generated_text(full_text: str) -> str:
    match = re.search(r'\[/INST\]\s*(.*)', full_text, re.DOTALL)
    return match.group(1).strip() if match else ""

Fetching 7 files: 100%|██████████| 7/7 [00:00<00:00, 127100.12it/s]


In [3]:
# import string
# import random
# from mi_experiments.inference.batch_manager import batch_generate

# capital_letters = string.ascii_uppercase
# distinct_pairs = [(a, b) for i, a in enumerate(capital_letters) for b in capital_letters[i + 1:]]

# num_prompts = 10
# prompt_template = "Think of a real word containing both the letters {l1} and {l2}. Then, say 3 sentences which use the word."
# prompts_raw = [prompt_template.format(l1=p[0], l2=p[1]) for p in random.sample(distinct_pairs, num_prompts)]
# prompt_template_2 = "Come up with a real English word containing both the letters {l1} and {l2}. No acronyms. Then, give 3 complete sentences which use the word."
# prompts_raw_2 = [prompt_template_2.format(l1=p[0], l2=p[1]) for p in random.sample(distinct_pairs, num_prompts)]

# response = batch_generate(model, tokenizer, prompts=prompts_raw[:10]+prompts_raw_2[:10], max_tokens=20, verbose=True, temp=0.0)
# # print(response)

In [4]:
# # Generate with emphasis on specific layers
# emphasis_config = {
#     'layers': {
#         '0': 1.0,  # Boost first layer
#         '1': 1.0,  # Reduce second layer
#         '30': 1.0,  # Zero out thirtyith layer
#     },
#     'heads': {
#         '0': {'0': 1.0, '1': 0.5},  # Modify attention heads in first layer
#     },
#     'neurons': {
#         '1': {'10': 0.0, '20': 2.0}  # Modify neurons in second layer
#     }
# }

# # Apply emphasis configuration
# model.set_emphasis_config(emphasis_config)

# # Generate with modified model
# response_emphasized = batch_generate(
#     model, 
#     tokenizer, 
#     prompts=prompts_raw[:10]+prompts_raw_2[:10],
#     max_tokens=20,
#     verbose=True,
#     temp=0.0
# )


In [5]:


def get_pca_values_df_aligned(model, tokenizer):
    # Define the entries to analyze (can be days, months, etc)
    # entries = ["00:00", "01:00", "02:00", "03:00", "04:00", "05:00", "06:00", "07:00", "08:00", "09:00", "10:00", "11:00", "12:00", "13:00", "14:00", "15:00", "16:00", "17:00", "18:00", "19:00", "20:00", "21:00", "22:00", "23:00"]
    # entries = ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"]
    entries = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]

    num_layers = len(model.layers)
    pca_values = []
    
    # Store last layer projections as reference
    reference_proj = None
    
    # First pass to get reference from last layer
    all_hidden_states = []
    for entry in entries:
        prompt = f"The day is {entry}"
        tokens = mx.array(tokenizer._tokenizer(prompt, return_tensors="np")['input_ids'])
        entry_idx = -1  # Take last token
        
        layer_output = model.get_layer_output(num_layers-1, tokens)
        all_hidden_states.append(layer_output[0, entry_idx].tolist())
    
    hidden_states = np.array(all_hidden_states)
    hidden_states = (hidden_states - np.mean(hidden_states, axis=0)) / (np.std(hidden_states, axis=0) + 1e-8)
    
    pca = PCA(n_components=3, svd_solver='full')
    reference_proj = pca.fit_transform(hidden_states)
    
    # Now process all layers
    for layer_num in range(num_layers):
        all_hidden_states = []
        
        # Get hidden states for each entry
        for entry in entries:
            prompt = f"The day is {entry}"
            tokens = mx.array(tokenizer._tokenizer(prompt, return_tensors="np")['input_ids'])
            entry_idx = -1  # Take last token
            
            layer_output = model.get_layer_output(layer_num, tokens)
            all_hidden_states.append(layer_output[0, entry_idx].tolist())
        
        # Convert to numpy array and normalize
        hidden_states = np.array(all_hidden_states)
        hidden_states = (hidden_states - np.mean(hidden_states, axis=0)) / (np.std(hidden_states, axis=0) + 1e-8)
        
        # Perform PCA
        pca = PCA(n_components=3, svd_solver='full')
        projected = pca.fit_transform(hidden_states)
        
        # Get explained variance information
        explained_variance = pca.explained_variance_ratio_
        cumulative_variance = np.cumsum(explained_variance)
        
        # Align with reference projection from last layer
        if layer_num != num_layers-1:  # Skip alignment for the last layer
            for i in range(3):
                corr = np.corrcoef(reference_proj[:, i], projected[:, i])[0, 1]
                if corr < 0:
                    projected[:, i] *= -1
        
        # Store values in DataFrame format
        for i, entry in enumerate(entries):
            pca_values.append({
                'Layer': layer_num,
                'Entry': entry,
                'PCA1': round(projected[i, 0], 4),
                'PCA2': round(projected[i, 1], 4),
                'PCA3': round(projected[i, 2], 4),
                'PCA1_var': round(explained_variance[0] * 100, 2),
                'PCA2_var': round(explained_variance[1] * 100, 2),
                'PCA3_var': round(explained_variance[2] * 100, 2),
                'Cumulative_var': round(cumulative_variance[2] * 100, 2)
            })
    
    return pd.DataFrame(pca_values)

# Usage
df_aligned = get_pca_values_df_aligned(model, tokenizer)
print("\nFull DataFrame:")
display(df_aligned.style.set_table_styles([{'selector': '', 'props': [('max-height', '400px'), ('overflow-y', 'scroll'), ('display', 'block')]}]))


Full DataFrame:


Unnamed: 0,Layer,Entry,PCA1,PCA2,PCA3,PCA1_var,PCA2_var,PCA3_var,Cumulative_var
0,0,Monday,-5.9391,-29.339,-42.1587,22.96,20.04,17.05,60.05
1,0,Tuesday,-24.1455,-20.0585,15.0121,22.96,20.04,17.05,60.05
2,0,Wednesday,-25.2684,-16.6185,21.9364,22.96,20.04,17.05,60.05
3,0,Thursday,-28.7213,8.3444,9.2712,22.96,20.04,17.05,60.05
4,0,Friday,-8.351,57.5687,-28.5657,22.96,20.04,17.05,60.05
5,0,Saturday,37.2715,20.3413,36.7632,22.96,20.04,17.05,60.05
6,0,Sunday,55.1537,-20.2384,-12.2586,22.96,20.04,17.05,60.05
7,1,Monday,-13.975,-28.3967,-28.5221,26.54,19.52,16.42,62.47
8,1,Tuesday,-32.9256,-14.0982,24.8471,26.54,19.52,16.42,62.47
9,1,Wednesday,-26.2712,-5.1338,21.6052,26.54,19.52,16.42,62.47


In [11]:
def plot_pca_from_df(df):
    num_layers = df['Layer'].nunique()
    entries = df['Entry'].unique()
    
    # Generate colors dynamically based on number of entries
    n_colors = len(entries)
    colors = [f'hsl({h},70%,50%)' for h in np.linspace(0, 360, n_colors, endpoint=False)]
    
    # Create single plot
    fig = go.Figure()
    
    # Plot each layer
    for layer_num in range(num_layers):
        layer_data = df[df['Layer'] == layer_num]
        
        for entry, color in zip(entries, colors):
            entry_data = layer_data[layer_data['Entry'] == entry]
            # Scale opacity based on cumulative variance (0.2-0.9 range)
            opacity = 0.2 + (0.7 * (1 - entry_data['Cumulative_var'].iloc[0]/100))
            fig.add_trace(
                go.Scatter3d(
                    x=[entry_data['PCA1'].iloc[0]],
                    y=[entry_data['PCA2'].iloc[0]],
                    z=[entry_data['PCA3'].iloc[0]],
                    mode='markers',
                    marker=dict(color=color, size=5, opacity=opacity),
                    name=entry,  # Use entry as the name for legend grouping
                    legendgroup=entry,  # Group by entry for filtering
                    showlegend=(layer_num == 0)  # Only show in legend for first layer
                )
            )

    # Update layout
    fig.update_layout(
        height=800,
        width=800,
        title_text="3D PCA Projection of Day Representations Across All Layers",
        scene=dict(
            xaxis_title="PCA1",
            yaxis_title="PCA2", 
            zaxis_title="PCA3",
            aspectmode='cube',
            camera=dict(
                up=dict(x=0, y=0, z=1),
                center=dict(x=0, y=0, z=0),
                eye=dict(x=1.5, y=1.5, z=1.5)
            )
        ),
        showlegend=True,
    )
    
    fig.show()

# Use the function
plot_pca_from_df(df_aligned)


In [7]:
def plot_pca_from_df(df):
    num_layers = df['Layer'].nunique()
    entries = df['Entry'].unique()
    
    # Generate colors dynamically based on number of entries
    n_colors = len(entries)
    colors = [f'hsl({h},70%,50%)' for h in np.linspace(0, 360, n_colors, endpoint=False)]
    
    # Create figure with slider
    fig = go.Figure()
    
    # Calculate axis ranges across all layers
    x_min, x_max = df['PCA1'].min(), df['PCA1'].max()
    y_min, y_max = df['PCA2'].min(), df['PCA2'].max() 
    z_min, z_max = df['PCA3'].min(), df['PCA3'].max()
    
    # Create frames for each layer
    frames = []
    for layer_num in range(num_layers):
        frame_traces = []
        layer_data = df[df['Layer'] == layer_num]
        
        for entry, color in zip(entries, colors):
            entry_data = layer_data[layer_data['Entry'] == entry]
            frame_traces.append(
                go.Scatter3d(
                    x=[entry_data['PCA1'].iloc[0]],
                    y=[entry_data['PCA2'].iloc[0]],
                    z=[entry_data['PCA3'].iloc[0]],
                    mode='markers',
                    marker=dict(color=color, size=5, opacity=0.7),
                    name=f"{entry} - Layer {layer_num}",
                    showlegend=True
                )
            )
        frames.append(go.Frame(data=frame_traces, name=str(layer_num)))
    
    # Add frames to figure
    fig.frames = frames
    
    # Add first frame's traces to the figure
    for trace in frames[0].data:
        fig.add_trace(trace)

    # Update layout
    fig.update_layout(
        height=800,
        width=800,
        title_text="3D PCA Projection of Day Representations Across All Layers",
        scene=dict(
            xaxis=dict(range=[x_min, x_max], title="PCA1"),
            yaxis=dict(range=[y_min, y_max], title="PCA2"),
            zaxis=dict(range=[z_min, z_max], title="PCA3"),
            aspectmode='cube',
            camera=dict(
                up=dict(x=0, y=0, z=1),
                center=dict(x=0, y=0, z=0),
                eye=dict(x=1.5, y=1.5, z=1.5)
            )
        ),
        showlegend=True,
        updatemenus=[{
            'type': 'buttons',
            'showactive': False,
            'buttons': [{
                'label': 'Play',
                'method': 'animate',
                'args': [None, {'frame': {'duration': 500, 'redraw': True}, 'fromcurrent': True}]
            }]
        }],
        sliders=[{
            'currentvalue': {'prefix': 'Layer: '},
            'steps': [
                {
                    'method': 'animate',
                    'label': str(k),
                    'args': [[str(k)], {
                        'frame': {'duration': 0, 'redraw': True},
                        'mode': 'immediate',
                        'transition': {'duration': 0}
                    }]
                }
                for k in range(num_layers)
            ]
        }]
    )
    
    fig.show()

# Use the function
plot_pca_from_df(df_aligned)
