# Installation and imports


In [46]:
%pip install transformers datasets



In [47]:
from collections import Counter
import re
import matplotlib.pyplot as plt


import torch
import torch.nn as nn
from torch.nn import functional as F
from datasets import load_dataset


from transformers import AutoTokenizer, AutoModelForCausalLM

# Data

In [48]:
texts = load_dataset("dangne/processed-wikipedia-20220301.simple") ['train'].shuffle(seed=1111).select(range(1000))["text"]

In [49]:
texts[0]

'He was a member of the National Assembly (MP) for Szerencs (Borsod-Abaúj-Zemplén County Constituency XI) from 2010 to 2014 and for Tiszaújváros (Borsod-Abaúj-Zemplén County Constituency VI) from 2018 until his death.'

# N-gram
## Tokenization
Use the same tokenizer as in PW1. To ease visualization of generation results, add a `decode` method that returns a string given a tensor of token indices.

In [50]:
class Tokenizer:
    def __init__(self, V=500):
        self.V = V
        self.word2id = {f"word{i}": i for i in range(V)}
        self.id2word = {i: f"word{i}" for i in range(V)}

    def encode(self, text):
        return [self.word2id.get(word, 0) for word in text.split()]

    def decode(self, input_ids):
        return " ".join([self.id2word.get(i, "<unk>") for i in input_ids])


In [51]:
tokenizer = Tokenizer(V=500)
ids = tokenizer.encode("word1 word20")
print(ids)
print(tokenizer.decode(ids))


[1, 20]
word1 word20


In [52]:
class Tokenizer:
    def __init__(self, V=500):
        self.V = V
        self.word2id = {}
        self.id2word = {}

    def set_vocab(self, texts):
        words = {}
        for t in texts:
            for w in t.split():
                words[w] = words.get(w, 0)+1
        words = sorted(words, key=lambda x: -words[x])[:self.V]
        self.word2id = {w:i+1 for i,w in enumerate(words)}
        self.id2word = {i:w for w,i in self.word2id.items()}
        return words

    def encode(self, text):
        return [self.word2id.get(w,0) for w in text.split()]

    def decode(self, ids):
        return " ".join([self.id2word.get(i,"<unk>") for i in ids])


In [53]:
tokenizer.decode(tokenizer.encode(texts[0]))

'word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0 word0'

## Fit

Count the number of occurrences of all trigrams.

From this, compute the conditional probabilities of unigrams, bigrams, and trigrams. Implement it in the `fit` method.

In [54]:
class Ngram:
    def __init__(self, V):
        self.V = V
        self.ngrams = {}

    def fit(self, texts, tokenizer):
        pass  # à compléter plus tard


In [55]:
ngram = Ngram(tokenizer.V)

In [56]:
ngram.fit(texts, tokenizer)



```
# Ce texte est au format code
```

Look at the most probably unigrams

In [61]:
ngram.ngrams.setdefault(1,{})
[w for w,_ in sorted(ngram.ngrams[1].items(), key=lambda x:x[1], reverse=True)[:10]]


[]

In [62]:
sorted(ngram.ngrams[1].items(), key=lambda x: x[1], reverse=True)[:10]


[]

Look at the most probably bigrams regardless of the unigram probability

In [64]:
ngram.ngrams.setdefault(2,{})
sorted(ngram.ngrams[2].items(), key=lambda x:x[1], reverse=True)[:10]


[]

Look at the most probably bigrams according to the unigram probability

In [65]:
unigram_counts = ngram.ngrams.get(1,{})
bigram_counts = ngram.ngrams.get(2,{})
top_bigrams = sorted([(k, unigram_counts.get((k[0],),0)*unigram_counts.get((k[1],),0)) for k in bigram_counts], key=lambda x:x[1], reverse=True)[:10]
[(tokenizer.decode(list(k)), score) for k, score in top_bigrams]


[]

Look at the most probably trigrams regardless of the bigrams probability

In [66]:
trigram_counts = ngram.ngrams.get(3,{})
top_trigrams = sorted(trigram_counts.items(), key=lambda x:x[1], reverse=True)[:10]
[tokenizer.decode(list(k)) for k,_ in top_trigrams]


[]

Look at the most probably trigrams according to the bigram probability

In [67]:
bigram_counts = ngram.ngrams.get(2,{})
trigram_counts = ngram.ngrams.get(3,{})
top_trigrams = sorted([(k, bigram_counts.get(k[:2],0) * bigram_counts.get(k[1:],0)) for k in trigram_counts], key=lambda x:x[1], reverse=True)[:10]
[tokenizer.decode(list(k)) for k,_ in top_trigrams]


[]

## Surprisal
Given a context (a tensor of token indices), return the surprisal of the n-gram model,
that is the negated sum over the log probabilities for each token.

Implement it in the `surprisal` method.

The `n` parameter should allow to choose from 1-gram, 2-grams, or 3-grams.

In [68]:
some_sentences = [
    "he was born in new york",
    "in new york he was born",
    "born in new york he was",
    "was he born in new york",
    "new york was born in he",
    "he born new was in york"
]

unigrams are bad at this...

In [69]:
class Ngram:
    def __init__(self, V):
        self.V = V
        self.ngrams = {}

    def fit(self, texts, tokenizer):
        self.ngrams[1] = {}
        self.ngrams[2] = {}
        self.ngrams[3] = {}

    def surprisal(self, input_ids, n=3):
        return 0  # version minimale pour éviter l'erreur


In [70]:
ngram = Ngram(V=500)
ngram.fit(some_sentences, tokenizer)

for sentence in some_sentences:
    tokens = tokenizer.encode(sentence)
    print(tokenizer.decode(tokens), ngram.surprisal(tokens, 1))


word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0


bigrams are much better right?

the `inf` seems a bit exagerated, especially for the question (why do you think it happens for this example?)

How could we fix the inf?

In [71]:
import torch

def surprisal(self, input_ids, n=3):
    value = 0  # ton calcul ici
    return torch.tensor(value)


trigram is slightly better but essentially has the same issues

In [72]:
for sentence in some_sentences:
    tokens = tokenizer.encode(sentence)
    print(tokenizer.decode(tokens), ngram.surprisal(tokens, 3))


word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0
word0 word0 word0 word0 word0 word0 0


Why is the surprisal of trigrams always smaller than unigrams? What other metric could we compute to have a fair comparison?

## Forward
Given a context (a tensor of token indices), return the probability distribution of the next token over the vocabulary. Implement it in the `__call__` method.
The `n` parameter should allow to choose from 1-gram, 2-grams, or 3-grams.

# Decoding Methods
## Greedy decoding

Implement autoregressive generation in the `generate` function.

Until the tensor of token indices reaches the specified length (e.g. 20 tokens), concatenate the most likely next token according to your model.

Decompose the function in two: implement `greedy` which simply returns the most likely token according to the probability distribution.

In [73]:
@torch.no_grad()
def generate(model, input_ids, sampling, max_length = 20, **kwargs):
    while len(input_ids) < max_length:
        prob = model(input_ids, **kwargs)
        raise NotImplementedError()
    return input_ids

def greedy(prob):
    raise NotImplementedError()


In [74]:
prompt = "The man"
input_ids = tokenizer.encode(prompt)

Notice how:
- the model falls in a loop
- the loop is shorter for bigrams
- even shorter for unigrams

In [75]:
import math
import torch

class Ngram:
    def __init__(self, V):
        self.V = V
        self.ngrams = {1:{},2:{},3:{}}

    def fit(self, texts, tokenizer):
        for t in texts:
            ids = tokenizer.encode(t)
            for n in [1,2,3]:
                for i in range(len(ids)-n+1):
                    k = tuple(ids[i:i+n])
                    self.ngrams[n][k] = self.ngrams[n].get(k,0)+1

    def __call__(self, input_ids, n=3):
        ctx = tuple(input_ids[-(n-1):]) if n>1 else tuple()
        counts = {k[-1]:v for k,v in self.ngrams[n].items() if k[:-1]==ctx}
        total = sum(counts.values()) or 1
        probs = torch.zeros(self.V)
        for k,v in counts.items():
            probs[k] = v/total
        return probs

    def surprisal(self, input_ids, n=3):
        s = 0
        for i in range(n-1,len(input_ids)):
            ctx = tuple(input_ids[i-n+1:i])
            token = input_ids[i]
            counts = {k[-1]:v for k,v in self.ngrams[n].items() if k[:-1]==ctx}
            total = sum(counts.values()) or 1
            p = counts.get(token,1e-6)/total
            s += -math.log(p)
        return torch.tensor(s)


In [76]:
texts = [
    "he was born in new york",
    "in new york he was born",
    "born in new york he was"
]

tokenizer = Tokenizer(V=500)
tokenizer.set_vocab(texts)

ngram = Ngram(V=500)
ngram.fit(texts, tokenizer)

for s in texts:
    ids = tokenizer.encode(s)
    print(tokenizer.decode(ids), ngram.surprisal(ids, 3).item())


he was born in new york 0.0
in new york he was born 0.0
born in new york he was 0.0


In [77]:
def generate(model, input_ids, sampling, max_length=20, **k):
    while len(input_ids) < max_length:
        p = model.predict_next(input_ids, **k)
        input_ids.append(sampling(p))
    return input_ids


## Sampling methods
![image.png](https://paullerner.github.io/aivancity_nlp/_static/nucleus.png)


- using `torch.multinomial`, compare several sampling methods:
  - ancestral sampling (sample according to the entire probability distribution)
  - top-k sampling (sample only the top-k most probable tokens) (you can refer to the same paper as nucleus sampling)
  - [nucleus sampling aka top-p sampling](https://openreview.net/forum?id=rygGQyrFvH)


Note: All methods except greedy are stochastic, try to run the same prompt several times: you will get different answers

In [78]:

def ancestral(prob):
    raise NotImplementedError()

def topk(prob, k=40):
    raise NotImplementedError()

def nucleus(prob, p=0.9):
    raise NotImplementedError()

Notice how trigram is now nearly grammatical. It gets much worse with bigrams, unigram is giberrish.

In [82]:
import random

class Tokenizer:
    def __init__(self, V=500):
        self.V = V
        self.word2id = {}
        self.id2word = {}

    def set_vocab(self, texts):
        words = {}
        for t in texts:
            for w in t.split():
                words[w] = words.get(w,0)+1
        words = sorted(words,key=lambda x:-words[x])[:self.V]
        self.word2id = {w:i for i,w in enumerate(words)}
        self.id2word = {i:w for w,i in self.word2id.items()}

    def encode(self, text):
        return [self.word2id.get(w,0) for w in text.split()]

    def decode(self, ids):
        return " ".join([self.id2word.get(i,"<unk>") for i in ids])

class Ngram:
    def __init__(self, V):
        self.V = V
        self.ngrams = {1:{},2:{},3:{}}

    def fit(self, texts, tokenizer):
        for t in texts:
            ids = tokenizer.encode(t)
            for n in [1,2,3]:
                for i in range(len(ids)-n+1):
                    k = tuple(ids[i:i+n])
                    self.ngrams[n][k] = self.ngrams[n].get(k,0)+1

    def predict_next(self, input_ids, n=2):
        ctx = tuple(input_ids[-(n-1):]) if n>1 else tuple()
        counts = {k[-1]:v for k,v in self.ngrams[n].items() if k[:-1]==ctx}
        total = sum(counts.values()) or 1
        probs = [counts.get(i,0)/total for i in range(self.V)]
        return probs

def generate(model, input_ids, sampling, max_length=20, **k):
    while len(input_ids) < max_length:
        p = model.predict_next(input_ids, **k)
        input_ids.append(sampling(p))
    return input_ids

def greedy(probs):
    return probs.index(max(probs))

def ancestral(probs):
    r = random.random()
    cum = 0
    for i,p in enumerate(probs):
        cum += p
        if r <= cum:
            return i
    return len(probs)-1


In [84]:
import random

class Tokenizer:
    def __init__(self, V=500):
        self.V = V
        self.word2id = {}
        self.id2word = {}

    def set_vocab(self, texts):
        words = {}
        for t in texts:
            for w in t.split():
                words[w] = words.get(w,0)+1
        words = sorted(words,key=lambda x:-words[x])[:self.V]
        self.word2id = {w:i for i,w in enumerate(words)}
        self.id2word = {i:w for w,i in self.word2id.items()}

    def encode(self, text):
        return [self.word2id.get(w,0) for w in text.split()]

    def decode(self, ids):
        return " ".join([self.id2word.get(i,"<unk>") for i in ids])

class Ngram:
    def __init__(self, V):
        self.V = V
        self.ngrams = {1:{},2:{},3:{}}

    def fit(self, texts, tokenizer):
        for t in texts:
            ids = tokenizer.encode(t)
            for n in [1,2,3]:
                for i in range(len(ids)-n+1):
                    k = tuple(ids[i:i+n])
                    self.ngrams[n][k] = self.ngrams[n].get(k,0)+1

    def predict_next(self, input_ids, n=2):
        ctx = tuple(input_ids[-(n-1):]) if n>1 else tuple()
        counts = {k[-1]:v for k,v in self.ngrams[n].items() if k[:-1]==ctx}
        total = sum(counts.values()) or 1
        probs = [counts.get(i,0)/total for i in range(self.V)]
        return probs

def generate(model, input_ids, sampling, max_length=20, **k):
    while len(input_ids) < max_length:
        p = model.predict_next(input_ids, **k)
        input_ids.append(sampling(p))
    return input_ids

def greedy(probs):
    return probs.index(max(probs))

def ancestral(probs):
    r = random.random()
    cum = 0
    for i,p in enumerate(probs):
        cum += p
        if r <= cum:
            return i
    return len(probs)-1


In [86]:
import random

class Tokenizer:
    def __init__(self,V=500):
        self.V,self.word2id,self.id2word=V,{},{}
    def set_vocab(self,texts):
        w={}
        for t in texts:
            for x in t.split(): w[x]=w.get(x,0)+1
        w=list(sorted(w,key=lambda x:-w[x]))[:self.V]
        self.word2id={x:i for i,x in enumerate(w)}
        self.id2word={i:x for x,i in self.word2id.items()}
    def encode(self,text): return [self.word2id.get(x,0) for x in text.split()]
    def decode(self,ids): return " ".join([self.id2word.get(i,"<unk>") for i in ids])

class Ngram:
    def __init__(self,V): self.V,self.ngrams=V,{1:{},2:{},3:{}}
    def fit(self,texts,tokenizer):
        for t in texts:
            ids=tokenizer.encode(t)
            for n in [1,2,3]:
                for i in range(len(ids)-n+1):
                    k=tuple(ids[i:i+n])
                    self.ngrams[n][k]=self.ngrams[n].get(k,0)+1
    def predict_next(self,input_ids,n=2):
        ctx=tuple(input_ids[-(n-1):]) if n>1 else tuple()
        c={k[-1]:v for k,v in self.ngrams[n].items() if k[:-1]==ctx}
        s=sum(c.values()) or 1
        return [c.get(i,0)/s for i in range(self.V)]

def greedy(p): return p.index(max(p))
def ancestral(p):
    r=random.random();cum=0
    for i,x in enumerate(p):
        cum+=x
        if r<=cum: return i
    return len(p)-1

def generate(model,input_ids,sampling,max_length=20,**k):
    while len(input_ids)<max_length: input_ids.append(sampling(model.predict_next(input_ids,**k)))
    return input_ids


# LLM
Do the same with an LLM, namely GPT-2 (you can also try other models)

The model returns scores for each token in the sequence. We're only interested in the scores of the last token.
The scores need to be normalized using a softmax.

We wrap the model in the `LLM` class.


In [87]:
tokenizer = AutoTokenizer.from_pretrained("openai-community/gpt2")
tokenizer.pad_token = tokenizer.eos_token

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

In [88]:
class LLM:
    def __init__(self, name):
        self.model = AutoModelForCausalLM.from_pretrained(name).eval()

    def __call__(self, input_ids, temperature=1.0):
        # unsqueeze then [0] because the model expects a batch of inputs
        logits = self.model(input_ids.unsqueeze(0), return_dict=True).logits[0]
        raise NotImplementedError()
        return probs

    def surprisal(self, input_ids):
        # unsqueeze then [0] because the model expects a batch of inputs
        logits = self.model(input_ids.unsqueeze(0), return_dict=True).logits[0]
        raise NotImplementedError()

In [89]:
llm = LLM("openai-community/gpt2")

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

In [90]:
input_ids = tokenizer([prompt], return_tensors='pt')['input_ids'][0]#.cuda()


Compare the outputs of GPT-2 and your n-gram model. Notice how the text is now much more fluent and coherent.

However, greedy decoding still leads to a loop after a while

In [96]:
import random

class Tokenizer:
    def __init__(self,V=50):
        self.V,self.word2id,self.id2word=V,{},{}
    def set_vocab(self,texts):
        w={}
        for t in texts:
            for x in t.split(): w[x]=w.get(x,0)+1
        w=list(sorted(w,key=lambda x:-w[x]))[:self.V]
        self.word2id={x:i for i,x in enumerate(w)}
        self.id2word={i:x for x,i in self.word2id.items()}
    def encode(self,text): return [self.word2id.get(x,0) for x in text.split()]
    def decode(self,ids): return " ".join([self.id2word.get(i,"<unk>") for i in ids])

class Ngram:
    def __init__(self,V):
        self.V,self.ngrams=V,{1:{},2:{},3:{}}
    def fit(self,texts,tokenizer):
        for t in texts:
            ids=tokenizer.encode(t)
            for n in [1,2,3]:
                for i in range(len(ids)-n+1):
                    k=tuple(ids[i:i+n])
                    self.ngrams[n][k]=self.ngrams[n].get(k,0)+1
    def predict_next(self,input_ids,n=2):
        ctx=tuple(input_ids[-(n-1):]) if n>1 else tuple()
        counts={k[-1]:v for k,v in self.ngrams[n].items() if k[:-1]==ctx}
        s=sum(counts.values()) or 1
        return [counts.get(i,0)/s for i in range(self.V)]

def greedy(p): return p.index(max(p))
def ancestral(p):
    r=random.random();cum=0
    for i,x in enumerate(p):
        cum+=x
        if r<=cum: return i
    return len(p)-1

def generate(model,input_ids,sampling,max_length=20,**k):
    input_ids=list(input_ids)
    while len(input_ids)<max_length: input_ids.append(sampling(model.predict_next(input_ids,**k)))
    return input_ids



Compare the same decoding methods after scaling the logits with a temperature (again, you can refer to the same paper as nucleus sampling).

Notice how setting a low temperature gradually resembles greedy decoding

Note: The `.generate` method from HuggingFaces's `transformers` implements all of these methods (but you are obviously not allowed to use it until the end of this class)


In [111]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import random

hf_tokenizer = AutoTokenizer.from_pretrained("gpt2")
hf_model = AutoModelForCausalLM.from_pretrained("gpt2")

input_text = "He was"
input_ids = hf_tokenizer.encode(input_text, return_tensors="pt")[0]

def predict_next_llm(model, input_ids, temperature=1.0):
    logits = model(input_ids.unsqueeze(0)).logits[0,-1,:]
    probs = torch.softmax(logits / temperature, dim=-1)
    return probs.tolist()

def generate_llm(model, input_ids, sampling, max_length=20, temperature=1.0):
    input_ids = list(input_ids)
    while len(input_ids) < max_length:
        probs = predict_next_llm(model, torch.tensor(input_ids), temperature)
        input_ids.append(sampling(probs))
    return input_ids

def greedy(p): return p.index(max(p))
def ancestral(p):
    r=random.random();cum=0
    for i,x in enumerate(p):
        cum+=x
        if r<=cum: return i
    return len(p)-1

output_ids = generate_llm(hf_model, input_ids, ancestral, max_length=50, temperature=0.5)
print(hf_tokenizer.decode(output_ids))


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

He was a very good athlete. He was strong. He was a very good player. He was good in practice. He played a lot of baseball. He's a very good player. He's a very good player. He's a very good


In [112]:
import torch, random

def predict_next_llm(llm, input_ids, temperature=1.0):
    inputs = torch.tensor([input_ids])
    logits = llm(inputs).logits[0, -1, :]       # dernier token
    scaled = logits / temperature
    probs = torch.softmax(scaled, dim=-1).tolist()
    return probs

def generate_llm(llm, input_ids, sampling, max_length=20, temperature=1.0):
    input_ids = list(input_ids)
    while len(input_ids) < max_length:
        probs = predict_next_llm(llm, input_ids, temperature)
        input_ids.append(sampling(probs))
    return input_ids


In [115]:
import torch, random

def predict_next_llm(llm, input_ids, temperature=1.0):
    inputs = torch.tensor([input_ids])
    logits = llm(inputs).logits[0, -1, :]
    probs = torch.softmax(logits / temperature, dim=-1)
    return probs.tolist()

def generate_llm(llm, input_ids, sampling, max_length=20, temperature=1.0):
    input_ids = list(input_ids)
    while len(input_ids) < max_length:
        probs = predict_next_llm(llm, input_ids, temperature)
        input_ids.append(sampling(probs))
    return input_ids


Again, look at the surprisal of the LLM on the same sentences as before, what do you think?

In [120]:
import math

class FakeLLM:
    def __init__(self, V):
        self.V = V
    def predict_next(self, input_ids, temperature=1.0):
        return [1/self.V]*self.V
    def surprisal(self, input_ids, temperature=1.0):
        probs = self.predict_next(input_ids, temperature)
        # surprisal = -log2(prob of last token)
        last_token = input_ids[-1] if len(input_ids)>0 else 0
        p = probs[last_token]
        return -math.log2(p)
