|<h2>Course:</h2>|<h1><a href="https://udemy.com/course/dulm_x/?couponCode=202509" target="_blank">A deep understanding of AI language model mechanisms</a></h1>|
|-|:-:|
|<h2>Part 5:</h2>|<h1>Observation (non-causal) mech interp<h1>|
|<h2>Section:</h2>|<h1>Investigating neurons and dimensions<h1>|
|<h2>Lecture:</h2>|<h1><b>CodeChallenge: Negation tuning in MLP neurons<b></h1>|

<br>

<h5><b>Teacher:</b> Mike X Cohen, <a href="https://sincxpress.com" target="_blank">sincxpress.com</a></h5>
<h5><b>Course URL:</b> <a href="https://udemy.com/course/dulm_x/?couponCode=202509" target="_blank">udemy.com/course/dulm_x/?couponCode=202509</a></h5>
<i>Using the code without the course may lead to confusion or errors.</i>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib.gridspec import GridSpec

import statsmodels.api as sm

import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, GPT2Tokenizer

import requests

# vector plots
import matplotlib_inline.backend_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')

# Exercise 1: Import the model and implant MLP hooks

In [None]:
# load GPT2 model and tokenizer
model = AutoModelForCausalLM.from_pretrained('gpt2-large')
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

# use GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# move the model to the GPU
model = model.to(device)
model.eval()

In [None]:
# this time, hook the output instead of re-calculating the linear layer
activations = {}

def implant_hook(layer_number):
  def hook(module, input, output):

    # store in the dictionary
    activations[f'mlp_{layer_number}'] = output.detach().cpu()
  return hook

# put hooks in all layers
for layeri in range(len(model.transformer.h)):
  model.transformer.h[layeri].mlp.c_fc.register_forward_hook(implant_hook(layeri))

In [None]:
nneurons = model.transformer.h[0].mlp.c_fc.weight.shape[-1]
model.transformer.h[0].mlp.c_fc.weight.shape

# Exercise 2: Get text, find negations and affirmations

In [None]:
# https://gutenberg.org/ebooks/32154
text = requests.get('https://gutenberg.org/cache/epub/32154/pg32154.txt').text
tokens = tokenizer.encode(text,return_tensors='pt')
num_tokens = len(tokens[0])
print(f'There are {num_tokens:,} tokens, of which {len(np.unique(tokens[0])):,} are unique.')

In [None]:
# context window size (tokens before and after each target)
context_pre = 90
context_pst = 10

In [None]:
# list of negation words to match exactly
negation_words = ['not','cannot','nor','never']

# initialize vector
isNegation = np.zeros(num_tokens,dtype=int)

# loop over all tokens
for ti in range(context_pre,num_tokens-context_pst):

  # current token
  currtok = tokenizer.decode(tokens[0,ti]).strip().lower()

  # token contains a 't contraction
  condA = ("'t" in currtok) or ("n't" in currtok)

  # word need to match completely (c.f., not->noted, nor->enormous)
  condB = currtok in negation_words

  # next token starts with a space (c.f., not->connotative)
  condC = tokenizer.decode(tokens[0,ti+1])[0] == ' '

  # test
  if (condA or condB) and condC:
    isNegation[ti] = 1


# count the targets
numNegationTokens = sum(isNegation)

In [None]:
negationsIdx = np.where(isNegation)[0]
context_win = 15

# examine some proper nouns
print(f'There are {numNegationTokens} "negation" tokens in the dataset.\n')
for t in negationsIdx[:10]:
  print(f'Example {t}:\n{tokenizer.decode(tokens[0,t-context_win:t+context_win])}\n')

In [None]:
# all the negation tokens
tokenizer.decode(tokens[0,negationsIdx])

In [None]:
# list of affirmation words to match exactly
affirmation_words = ['agree','always','allow','can','certainly','could','definitely','may','might','shall','should']

# initialize vector
isAffirmation = np.zeros(num_tokens,dtype=int)

# loop over all tokens
for ti in range(context_pre,num_tokens-context_pst):

  # current token
  currtok = tokenizer.decode(tokens[0,ti]).strip().lower()

  # next token can't be 'not'
  condA = tokenizer.decode(tokens[0,ti+1]) != ' not'

  # word need to match completely (c.f., not->noted, nor->enormous)
  condB = currtok in affirmation_words

  # next token starts with a space (c.f., not->connotative)
  condC = tokenizer.decode(tokens[0,ti+1])[0] == ' '

  # test
  if condA and condB and condC:
    isAffirmation[ti] = 1

# count the number of target tokens
numAffirmationTokens = sum(isAffirmation)

In [None]:
affirmationsIdx = np.where(isAffirmation)[0]

# examine some examples
print(f'There are {numAffirmationTokens} "affirmation" tokens in the dataset.\n')
for t in affirmationsIdx[:10]:
  print(f'Example {t}:\n{tokenizer.decode(tokens[0,t-context_win:t+context_win])}\n')

# Exercise 3: Create batches and get activations

In [None]:
# create batches
batch_negations = torch.zeros((numNegationTokens,context_pre+context_pst+1),dtype=torch.long)
batch_affirmations = torch.zeros((numAffirmationTokens,context_pre+context_pst+1),dtype=torch.long)


# negation sequences
for b in range(numNegationTokens):
  tokenLoc = negationsIdx[b]
  batch_negations[b,:] = tokens[0,tokenLoc-context_pre:tokenLoc+context_pst+1]

# affirmation sequences
for b in range(numAffirmationTokens):
  tokenLoc = affirmationsIdx[b]
  batch_affirmations[b,:] = tokens[0,tokenLoc-context_pre:tokenLoc+context_pst+1]

#
print('Shape of negations batch:',batch_negations.shape)
print('Shape of affirmations batch:',batch_affirmations.shape)

In [None]:
# process the target (negation) tokens
with torch.no_grad():
  model(batch_negations.to(device))

# copy the activations
negations_activations = activations.copy()


### repeat for affirmations tokens
with torch.no_grad():
  model(batch_affirmations.to(device))
affirmations_activations = activations.copy()

In [None]:
print(affirmations_activations.keys(),'\n')

affirmations_activations['mlp_5'].shape

# Exercise 4: Logistic regression in all neurons from one layer

In [None]:
# we'll use this vector repeatedly
category_labels = np.hstack((np.zeros(numAffirmationTokens),np.ones(numNegationTokens)))

In [None]:
# MLP transformer layer
whichLayer2use = 13

In [None]:
# confirm getting the right token
tokenizer.decode(batch_negations[3,context_pre]), tokenizer.decode(batch_affirmations[3,context_pre])

In [None]:
# initialize matrix to store the classifier results
classifierResults = np.full((nneurons,2),np.nan)

# loop over neurons for per-neuron analysis
for neuroni in range(nneurons):

  # vectorize the activations over batches
  targs = negations_activations[f'mlp_{whichLayer2use}'][:,context_pre,neuroni]
  comps = affirmations_activations[f'mlp_{whichLayer2use}'][:,context_pre,neuroni]

  # build and run the model
  try:
    result = sm.Logit(
        category_labels,
        sm.add_constant(np.hstack((comps,targs)))
        ).fit(maxiter=3000,disp=0)

    # extract the results (p-value and beta)
    classifierResults[neuroni,0] = result.pvalues[1]
    classifierResults[neuroni,1] = result.params[1]
  except: pass

In [None]:
# visualization of model significance and sign

# setup the figure
fig = plt.figure(figsize=(12,4))
gs = GridSpec(1,4,figure=fig)

ax0 = fig.add_subplot(gs[:3])
ax1 = fig.add_subplot(gs[3])

# find the negative and positive betas, and the supra-threshold results
negBetas = classifierResults[:,1]<0
posBetas = classifierResults[:,1]>0
pvalThresh = .05/nneurons # p<.05, Bonferroni-corrected
sigBetas = classifierResults[:,0] < pvalThresh


# positive significant betas
idx2plot = posBetas & sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'ro',markerfacecolor=[.7,.7,.7],label='Positive and sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log(classifierResults[idx2plot,0]),'ro',markerfacecolor=[.7,.7,.7,.5])

# positive non-significant betas
idx2plot = posBetas & ~sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'rx',markersize=3,label='Positive and non-sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log(classifierResults[idx2plot,0]),'rx',markersize=3)

# negative significant betas
idx2plot = negBetas & sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'go',markerfacecolor=[.7,.7,.7],label='Negative and sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log(classifierResults[idx2plot,0]),'go',markerfacecolor=[.7,.7,.7,.5])

# negative non-significant betas
idx2plot = negBetas & ~sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'gx',markersize=3,label='Negative and non-sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log(classifierResults[idx2plot,0]),'gx',markersize=3)

ax0.set(ylabel='Beta coefficient',xlabel='Neuron index',xlim=[-10,nneurons+9],
              title='Statistical parameters of negation-term classification')
ax0.legend(fontsize=8)


ax1.axhline(-np.log(pvalThresh),linestyle='--',color='b',label='Significance threshold')
ax1.set(xlabel='Beta coeff',ylabel='-log(p)',title='Betas by p-values')
ax1.legend(fontsize=8)

plt.tight_layout()
plt.show()

In [None]:
# find the neuron with best classification
maxBeta = np.max(classifierResults[sigBetas,1])
maxBetaNeuron = np.where(classifierResults[:,1]==maxBeta)[0][0]
maxBetaNeuron

In [None]:
# show the prediction probabilities for the max neuron

# need to re-run the model for that neuron
targs = negations_activations[f'mlp_{whichLayer2use}'][:,context_pre,maxBetaNeuron]
comps = affirmations_activations[f'mlp_{whichLayer2use}'][:,context_pre,maxBetaNeuron]
result = sm.Logit(category_labels,sm.add_constant(np.hstack((comps,targs)))).fit(disp=0)

# per-token accuracy
accuracy = (result.predict()>.5)==category_labels

In [None]:
# visualization
plt.figure(figsize=(10,4))

plt.plot(category_labels,'rx')
plt.plot(result.predict(),'ko',markerfacecolor=[.7,.9,.7,.5],markersize=8)

plt.axvline(numAffirmationTokens+.5,linestyle='--',color='k')
plt.axhline(.5,linestyle='--',color='k')

plt.gca().set(xlabel='Data sample',ylabel='Probability or category',
              xlim=[-3,numAffirmationTokens+numNegationTokens+2],title=f'Accuracy from neuron #{maxBetaNeuron} = {100*accuracy.mean():.2f}%')

plt.show()

# Exercise 5: Text heatmap of “best” neuron’s activations

In [None]:
# scale the activity for colormapping
negActs = negations_activations[f'mlp_{whichLayer2use}'][:,:,maxBetaNeuron]
negActsNorm = (negActs - negActs.min()) / (negActs.max()-negActs.min())

In [None]:
# all of this code is copied from several previous code files
fig,ax = plt.subplots(figsize=(10,2))
temp_text = ax.text(0,0,'n',fontsize=12,fontfamily='monospace')
bbox = temp_text.get_window_extent(renderer=fig.canvas.get_renderer())
inv = ax.transAxes.inverted()
bbox_axes = inv.transform([[bbox.x0,bbox.y0], [bbox.x1,bbox.y1]])
en_width = bbox_axes[1,0] - bbox_axes[0,0] # bbox is [(x0,y0),(x1,y1)]
plt.close(fig)

In [None]:
x_pos = 0  # starting x position (in axis coordinates)
y_pos = 1  # vertical center

fig, ax = plt.subplots(figsize=(10,2))
ax.axis('off')


# loop over rows in the batch
for batchi in range(20): # just the first 20 sequences

  for toki in range(context_pre-5,batch_negations.shape[1]):

    # text of this token
    toktext = tokenizer.decode([batch_negations[batchi,toki]])

    # width of the token
    token_width = en_width*len(toktext)

    # text object with background color matching the activation
    ax.text(x_pos+token_width/2, y_pos, toktext, fontsize=12, ha='center', va='center',fontfamily='monospace',
            bbox = dict(boxstyle='round,pad=.3', facecolor=mpl.cm.Reds(negActsNorm[batchi,toki]**2), edgecolor='none', alpha=.8))

    # update the token counter and x_pos
    x_pos += token_width + .01 # plus a small gap

  # end of the line; reset coordinates and counter
  y_pos -= .17
  x_pos = 0

plt.show()

# Exercise 6: Laminar profile of classification

In [None]:
# initialize matrix to store the classifier results
pvalues  = np.ones((model.config.n_layer,nneurons)) # initialize to 1's to ignore in subsequent mask
betas    = np.zeros((model.config.n_layer,nneurons))
accuracy = np.zeros((model.config.n_layer,nneurons))


# loop over layers
for layeri in range(model.config.n_layer):

  # loop over neurons for per-neuron analysis
  for neuroni in range(nneurons):
    # vectorize the activations over batches
    targs = negations_activations[f'mlp_{layeri}'][:,context_pre,neuroni]
    comps = affirmations_activations[f'mlp_{layeri}'][:,context_pre,neuroni]

    # build and run the model
    try: # sometimes crashes for linear-algebra reasons
      result = sm.Logit(category_labels,sm.add_constant(np.hstack((comps,targs)))).fit(maxiter=3000,disp=0)

      # extract the results (p-value, beta, and accuracy)
      pvalues[layeri,neuroni]  = result.pvalues[1]
      betas[layeri,neuroni]    = result.params[1]
      accuracy[layeri,neuroni] = 100*((result.predict()>.5)==category_labels).mean()

    except: pass

  print(f'Finished layer {layeri+1:2}/{model.config.n_layer}')

In [None]:
# create two masks
pvalue_mask = pvalues<.05/nneurons
posbet_mask = betas>0

# get accuracy only from masked neurons
# gratuitously confusingly, np.ma.masked_where() actually keeps the False values, and masks *out* True values
masked_accuracy = np.ma.masked_where(~(pvalue_mask & posbet_mask),accuracy)

# make the plot
_,axs = plt.subplots(1,2,figsize=(12,4))

axs[0].plot(100*np.mean(pvalue_mask,axis=1),'kH',markerfacecolor=[.7,.7,.7],markersize=9)
axs[0].set(xlabel='Layer',ylabel='Percent significant tests (%)',title='Laminar profile of significance')

axs[1].plot(np.mean(accuracy,axis=1),'ko',markerfacecolor=[.9,.7,.7],markersize=9,label='All tests')
axs[1].plot(np.mean(masked_accuracy,axis=1),'ks',markerfacecolor=[.7,.9,.7],markersize=9,label='Only sig. $\\beta$s>0')
axs[1].legend()

axs[1].set(xlabel='Layer',ylabel='Prediction accuracy (%)',title='Laminar profile of prediction accuracy')
plt.show()