|<h2>Book:</h2>|<h1><a href="https://open.substack.com/pub/mikexcohen/p/llm-breakdown-16-tokenization-words" target="_blank">50 ML projects to understand LLMs</a></h1>|
|-|:-:|
|<h2>Project:</h2>|<h1><b>[48] "Can" vs. "can't" classification via logistic regression</b></h1>|
|<h2>Author:<h2>|<h1>Mike X Cohen, <a href="https://sincxpress.com" target="_blank">sincxpress.com</a></h1>|

<br>

<i>Using the code without reading the book may lead to confusion or errors.</i>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec

from tqdm import tqdm

import statsmodels.api as sm
from sklearn.model_selection import train_test_split

from datasets import load_dataset

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

In [None]:
### matplotlib adjustments (commented lines are for dark mode)

# svg plots (higher-res)
import matplotlib_inline.backend_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')

plt.rcParams.update({
    # 'figure.facecolor': '#282a2c',
    # 'figure.edgecolor': '#282a2c',
    # 'axes.facecolor':   '#282a2c',
    # 'axes.edgecolor':   '#DDE2F4',
    # 'axes.labelcolor':  '#DDE2F4',
    # 'xtick.color':      '#DDE2F4',
    # 'ytick.color':      '#DDE2F4',
    # 'text.color':       '#DDE2F4',
    'axes.spines.right': False,
    'axes.spines.top':   False,
    'axes.titleweight': 'bold',
    'axes.labelweight': 'bold',
    'savefig.dpi':300
})

# **Part 1: Create two batches of "can" tokens**

In [None]:
tokenizer = AutoTokenizer.from_pretrained('gpt2')

In [None]:
# tokenizations
words = [ "can"," can","can't"," can't" ]
for w in words:
  print(f'"{w}" comprises tokens {list(tokenizer.encode(w))}')

In [None]:
# FYI 1: alternative tokenizations
words = [ " can't"," can’t"," can‘t"," canʼt"," can′t", " can՚t" ]
for w in words:
  print(f'"{w}" comprises tokens {list(tokenizer.encode(w))}')

In [None]:
# FYI 2: unicode for different apostrophes
for a in [ "'","’","‘","ʼ","′","՚" ]:
  print(f'{a} is U+{ord(a):04X}')

In [None]:
# setup the c4 dataset for streaming
dataset = load_dataset('allenai/c4','en',split='train',streaming=True)
dataset

In [None]:
# dataset parameters
context_pre = 10
min_samplesize = 500

# target tokens
cantok = tokenizer.encode(' can')[0]
attok = tokenizer.encode("'t")[0]

# "can" is a noun if these words follow
excluded_following = [tokenizer.encode(word)[0] for word in [' of',' with',' from',' in',' on',' for',' not']]

# initialize empty lists
cant_tokens = []
can_tokens = []

# keep streaming in new samples
for i,sample in enumerate(dataset):

  # stop when enough tokens
  if len(cant_tokens)>=min_samplesize:
    break

  # tokenize the text from this sample
  tokens = tokenizer.encode(sample['text'])

  # loop over tokens
  for ti in range(context_pre,len(tokens)-1):

    # if this token is "can"
    if tokens[ti]==cantok:

      # next token starts with a space
      if tokenizer.decode(tokens[ti+1]).startswith(' '):
        if tokens[ti+1] not in excluded_following:
          can_tokens.append(tokens[ti-context_pre:ti+1])

      # if the next token is "'t"
      if tokens[ti+1]==attok:
        cant_tokens.append(tokens[ti-context_pre:ti+1])

        # print a status update
        if len(cant_tokens)%50==0:
          print(f'Found {len(cant_tokens)} "can\'t" tokens so far...')

len(can_tokens), len(cant_tokens)

In [None]:
# some examples
print('Some "can" sequences:')
for i in range(5):
  print(tokenizer.decode(can_tokens[i]))

print('\nSome "can\'t" sequences:')
for i in range(5):
  print(tokenizer.decode(cant_tokens[i]))


In [None]:
# create batches
batch_can  = torch.tensor(can_tokens)
batch_cant = torch.tensor(cant_tokens)

batch_can, batch_cant

In [None]:
# match length
minN = min(len(can_tokens), len(cant_tokens))

# note: the solution below works, but mixes numpy and pytorch (kinda ugly)
# batch_cant = batch_cant[np.random.choice(np.arange(len(cant_tokens)),minN,replace=False),:]

# torch has no equivalent of np.random.choice, so you can permute and select the first N
idx = torch.randperm(len(batch_can))[:minN]
batch_can = batch_can[idx,:]

idx = torch.randperm(len(batch_can))[:minN]
batch_cant = batch_cant[idx,:]

batch_can.shape, batch_cant.shape

# **Part 2: Get MLP activations**

In [None]:
# load GPT2 model and tokenizer
model = AutoModelForCausalLM.from_pretrained('gpt2-medium')

# use GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# move the model to the GPU
model = model.to(device)
model.eval()

In [None]:
activations = {}

def implant_hook(layer_number):
  def hook(module, input, output):

    # store in the dictionary
    activations[f'mlp_{layer_number}'] = output.detach().cpu().numpy()
  return hook

# put hooks in all layers
handles = []
for layeri in range(len(model.transformer.h)):
  h = model.transformer.h[layeri].mlp.c_fc.register_forward_hook(implant_hook(layeri))
  handles.append(h)

In [None]:
# process the can tokens
with torch.no_grad():
  model(batch_can.to(device))

# copy the activations
can_activations = activations.copy()


### repeat for can't tokens
with torch.no_grad():
  model(batch_cant.to(device))
cant_activations = activations.copy()

In [None]:
print(can_activations.keys(),'\n')

can_activations['mlp_5'].shape

# **Part 3: Logistic regression in all neurons from one layer**

In [None]:
# we'll use this vector repeatedly
category_labels = np.hstack((np.zeros(minN,bool),np.ones(minN,bool)))
category_labels

In [None]:
# some definitions

# MLP transformer layer
whichLayer2use = 3

# for train/test split
test_prop = .2

# number of expansion neurons
nneurons = can_activations['mlp_5'].shape[-1]

In [None]:
# initialize matrix to store the classifier results
classifierResults = np.full((nneurons,4),np.nan)

# loop over neurons for per-neuron analysis
for neuroni in tqdm(range(nneurons)):

  # isolate the final-token activations
  catC = can_activations[f'mlp_{whichLayer2use}'][:,-1,neuroni]
  catT = cant_activations[f'mlp_{whichLayer2use}'][:,-1,neuroni]
  X = sm.add_constant(np.hstack((catC,catT)))

  # split the data
  X_train,X_test, y_train,y_test = train_test_split(X,category_labels,test_size=test_prop,stratify=category_labels)

  # build and run the model
  result = sm.Logit(y_train,X_train
      ).fit_regularized(maxiter=3000,disp=0,method='l1',alpha=.1)

  # extract the results (p-value and beta)
  classifierResults[neuroni,0] = result.pvalues[1]
  classifierResults[neuroni,1] = result.params[1]
  classifierResults[neuroni,2] = 100*((result.predict(X_train)>.5) == y_train).mean()
  classifierResults[neuroni,3] = 100*((result.predict(X_test)>.5) == y_test).mean()

In [None]:
# visualization of model significance and sign

# setup the figure
fig = plt.figure(figsize=(11,7))
gs = GridSpec(2,3,figure=fig)

ax0 = fig.add_subplot(gs[0,:])
ax1 = fig.add_subplot(gs[1,0])
ax2 = fig.add_subplot(gs[1,1])
ax3 = fig.add_subplot(gs[1,2])

# find the negative and positive betas, and the supra-threshold results
negBetas = classifierResults[:,1]<0
posBetas = classifierResults[:,1]>0
pvalThresh = .05/nneurons # p<.05, Bonferroni-corrected
sigBetas = classifierResults[:,0] < pvalThresh


# positive significant betas
idx2plot = posBetas & sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'ro',markerfacecolor=[.7,.7,.7],label='Positive and sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log10(classifierResults[idx2plot,0]),'ro',markerfacecolor=[.7,.7,.7,.5])
ax2.plot(classifierResults[idx2plot,2],-np.log10(classifierResults[idx2plot,0]),'ro',markerfacecolor=[.7,.7,.7,.5])
ax3.plot(classifierResults[idx2plot,3],-np.log10(classifierResults[idx2plot,0]),'ro',markerfacecolor=[.7,.7,.7,.5])

# positive non-significant betas
idx2plot = posBetas & ~sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'rx',markersize=3,label='Positive and non-sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log10(classifierResults[idx2plot,0]),'rx',markersize=3)
ax2.plot(classifierResults[idx2plot,2],-np.log10(classifierResults[idx2plot,0]),'rx',markersize=3)
ax3.plot(classifierResults[idx2plot,3],-np.log10(classifierResults[idx2plot,0]),'rx',markersize=3)

# negative significant betas
idx2plot = negBetas & sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'go',markerfacecolor=[.7,.7,.7],label='Negative and sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log10(classifierResults[idx2plot,0]),'go',markerfacecolor=[.7,.7,.7,.5])
ax2.plot(classifierResults[idx2plot,2],-np.log10(classifierResults[idx2plot,0]),'go',markerfacecolor=[.7,.7,.7,.5])
ax3.plot(classifierResults[idx2plot,3],-np.log10(classifierResults[idx2plot,0]),'go',markerfacecolor=[.7,.7,.7,.5])

# negative non-significant betas
idx2plot = negBetas & ~sigBetas
ax0.plot(np.where(idx2plot)[0],classifierResults[idx2plot,1],'gx',markersize=3,label='Negative and non-sig.')
ax1.plot(classifierResults[idx2plot,1],-np.log10(classifierResults[idx2plot,0]),'gx',markersize=3)
ax2.plot(classifierResults[idx2plot,2],-np.log10(classifierResults[idx2plot,0]),'gx',markersize=3)
ax3.plot(classifierResults[idx2plot,3],-np.log10(classifierResults[idx2plot,0]),'gx',markersize=3)

ax0.set(ylabel='Beta coefficient',xlabel='Neuron index',xlim=[-10,nneurons+9],
              title='A) Statistical parameters of "can" classification')
ax0.legend(fontsize=8)


ax1.axhline(-np.log10(pvalThresh),linestyle='--',color='b',label='Significance threshold')
ax2.axhline(-np.log10(pvalThresh),linestyle='--',color='b',label='Significance threshold')
ax3.axhline(-np.log10(pvalThresh),linestyle='--',color='b',label='Significance threshold')
ax2.axvline(50,linestyle='--',color='k',linewidth=.5,label='Chance')
ax3.axvline(50,linestyle='--',color='k',linewidth=.5,label='Chance')
ax2.legend(fontsize=8)

ax1.set(xlabel='Beta coeff',ylabel='$-log_{10}(p)$',title='B) Betas by p-values')
ax2.set(xlabel='Prediction accuracy (%)',ylabel='$-log_{10}(p)$',title='C) TRAIN accuracy by p-values')
ax3.set(xlabel='Prediction accuracy (%)',ylabel='$-log_{10}(p)$',title='D) TEST accuracy by p-values')
ax1.legend(fontsize=8)

plt.tight_layout()
plt.savefig('ch7_proj48_part3a.png')
plt.show()

In [None]:
yTrain,xTrain = np.histogram(classifierResults[:,2],bins='fd',density=True)
yTest,xTest   = np.histogram(classifierResults[:,3],bins='fd',density=True)

plt.figure(figsize=(8,3))
plt.plot(xTrain[:-1],yTrain,linewidth=2,label='TRAIN')
plt.plot(xTest[:-1],yTest,linewidth=2,label='TEST')
plt.axvline(50,linestyle='--',color='k',linewidth=.5,label='Chance')

plt.gca().set(xlabel='Prediction accuracy (%)',ylabel='Density',title='Distribution of prediction accuracies')
plt.legend()

plt.tight_layout()
plt.savefig('ch7_proj48_part3b.png')
plt.show()

# **Part 4: Laminar profile of classification**

In [None]:
# initialize matrix to store the classifier results
pvalues  = np.ones((model.config.n_layer,nneurons)) # initialize to 1's to ignore in subsequent mask
betas    = np.zeros((model.config.n_layer,nneurons))
accuracy = np.zeros((model.config.n_layer,nneurons,2))


# loop over layers
for layeri in tqdm(range(model.config.n_layer)):

  # loop over neurons for per-neuron analysis
  for neuroni in range(nneurons):

    # isolate the final-token activations
    catC = can_activations[f'mlp_{layeri}'][:,-1,neuroni]
    catT = cant_activations[f'mlp_{layeri}'][:,-1,neuroni]
    X = sm.add_constant(np.hstack((catC,catT)))

    # split the data
    X_train,X_test, y_train,y_test = train_test_split(X,category_labels,test_size=test_prop,stratify=category_labels)

    # build and run the model
    result = sm.Logit(y_train,X_train).fit_regularized(maxiter=3000,disp=0,method='l1',alpha=.1)

    # extract the results (p-value, beta, and accuracy)
    pvalues[layeri,neuroni]    = result.pvalues[1]
    betas[layeri,neuroni]      = result.params[1]
    accuracy[layeri,neuroni,0] = 100*((result.predict(X_train)>.5) == y_train).mean()
    accuracy[layeri,neuroni,1] = 100*((result.predict(X_test)>.5) == y_test).mean()

In [None]:
# create two masks
pvalue_mask = pvalues<.05/nneurons
posbet_mask = betas>0
negbet_mask = betas<0

# get accuracy only from masked neurons
masked_accuracyPosB_train = accuracy[:,:,0].copy()
masked_accuracyPosB_train[~(posbet_mask & pvalue_mask)] = np.nan

masked_accuracyNegB_train = accuracy[:,:,0] + 0
masked_accuracyNegB_train[~(negbet_mask & pvalue_mask)] = np.nan

masked_accuracyPosB_test = accuracy[:,:,1].copy()
masked_accuracyPosB_test[~(posbet_mask & pvalue_mask)] = np.nan

masked_accuracyNegB_test = accuracy[:,:,1] + 0
masked_accuracyNegB_test[~(negbet_mask & pvalue_mask)] = np.nan



# make the plot
_,axs = plt.subplots(1,2,figsize=(12,3.5))

axs[0].plot(100*np.mean(pvalue_mask,axis=1),'kH',markerfacecolor=[.9,.7,.9],markersize=12)
axs[0].set(xlabel='Layer',ylabel='Percent significant neurons (%)',title='A) Laminar profile of significance')

axs[1].plot(np.nanmean(masked_accuracyPosB_train,axis=1),'gs',markerfacecolor=[.7,.9,.7],markersize=10,label='TRAIN $\\beta$s>0')
axs[1].plot(np.nanmean(masked_accuracyNegB_train,axis=1),'b^',markerfacecolor=[.7,.7,.9],markersize=10,label='TRAIN $\\beta$s<0')

axs[1].plot(np.nanmean(masked_accuracyPosB_test,axis=1),'gs-',markerfacecolor=[.7,.9,.7],markersize=5,zorder=-10,label='TEST $\\beta$s>0')
axs[1].plot(np.nanmean(masked_accuracyNegB_test,axis=1),'b^-',markerfacecolor=[.7,.7,.9],markersize=5,zorder=-10,label='TEST $\\beta$s<0')

axs[1].legend()
axs[1].set(xlabel='Layer',ylabel='Prediction accuracy (%)',title='B) Average prediction accuracies in significant neurons')

plt.tight_layout()
plt.savefig('ch7_proj48_part4.png')
plt.show()