# Mettere Watermark a testi generati da (Large) Language Models
Ma prima, una minima introduzione ad un concetto di base.

## L'output di GPT
La prendo leggermente alla larga.

Cos'è GPT? L'"[architettura](https://cdn.openai.com/research-covers/language-unsupervised/language_understanding_paper.pdf)" su cui si basano i famosi GPT-qualcosa, tipo [GPT-2](https://d4mucfpksywv.cloudfront.net/better-language-models/language_models_are_unsupervised_multitask_learners.pdf). (ho messo due link ai paper, ma più che altro per le immagini)

**Cosa c'è di importante da sapere di questa architettura?** Ciò che prende in input e ciò che restituisce in output.

Forse, la parte di input, è banale: viene alimentata con una lista di parole (che è una semplificazione, ma non si perde troppo del significato per quello che cercherò di spiegare).

Ma è importante complicare leggermente la descrizione dell'output. **GPT restituisce, per ogni parola che "conosce", la probabilità che questa sia quella che meglio prosegue la frase che ha ricevuto in input.** I parametri `temperature` e `top_p`, servono ad influenzare come le parole vengono scelte, ma non serve capire cosa fanno.

![per rendere l'idea](./assets/output-gpt.png)

## Ora un piccolo esempio di generazione (senza watermark)

Prima di tutto, serve che "tiro fuori" il mio mini modello, dando la posizone di iperparametri e pesi.

Per una serie di ragioni (mi diverte, volevo esempi italiani, non volevo legarmi ad Hugging Face) ho addestrato io un piccolo GPT (0.1*GPT2)

In [51]:
out_dir = 'out-model' # cartella dalla quale caricare il modello
ckpt_file='ckpt3450.pt' # ultimi pesi+iperparametri del modello
tokenizer_folder='tokenizer' # cartella contenente il tokenizer

Poi si inizializzano alcune cose di torch e il modello GPT vero e proprio.

> Io mi sono schierato dal lato oscuro, quello di [Torch](https://pytorch.org/), ma rifare in Keras non dovrebbe essere troppo complicato.

Colgo l'occasione anche per una brave menzione d'onore al buon **[Andrej Karpathy](https://karpathy.ai/)**, che ha fatto un fantastico "corso" su [Youtube](https://youtu.be/kCc8FmEb1nY) dove spiega in maniera ✨meravigliosa✨ come funziona GPT, entrando molto chiaramente anche nel meccanismo dell'attenzione. Molto gentilmente ha messo su [Github il codice](https://github.com/karpathy/nanoGPT) del "suo" GPT e *io l'ho rubato*.

Secondo me è un ottimo punto per partire, o anche solo per incominciare a costruire un intuizione di come questi modelli funzionano.

Per brevità non scendo nei dettagli, al momento il punto e generare del testo d'esempio, senza watermark.

In [52]:
import torch
import os
import random
from model import GPTConfig, GPT
from tokenizers import ByteLevelBPETokenizer
import numpy as np

seed = random.randint(1,9999) # un numero casuale
device = 'cpu' # per semplicita
dtype = 'bfloat16' # per device

torch.manual_seed(seed)

# istanzio il modello
checkpoint = torch.load(os.path.join(out_dir, ckpt_file), map_location=device)
model = GPT(GPTConfig(**checkpoint['model_args']))
state_dict = checkpoint['model']
unwanted_prefix = '_orig_mod.'
for k,v in list(state_dict.items()):
    if k.startswith(unwanted_prefix):
        state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)
model.load_state_dict(state_dict)
model.eval()
model.to(device)

# istanzio il tokenizer

tokenizer = ByteLevelBPETokenizer(f"{tokenizer_folder}/ttookk-vocab.json", f"{tokenizer_folder}/ttookk-merges.txt")
encode = lambda s: np.array(tokenizer.encode(s).ids)
decode = lambda l: tokenizer.decode(l)

number of parameters: 33.61M


Variabili e funzioni per generazione

In [53]:
from torch.nn import functional as F

max_new_tokens = 30 # quante ""parole"" generare
temperature = 0.65
top_k = 70
initial_prompt = "per simulare"

def encode_input(text):
    tokenized_text = encode(text)
    return torch.tensor(tokenized_text, dtype=torch.long, device=device)[None, ...]

@torch.no_grad()
def get_logits(gpt_model, tokenized_input, temperature=1.0, top_k=None):
    """
    funzione per generare gli score associati ad ogni token
    
    temperature: float, piu alto piu casuale
    top_k: int, taglia la coda delle probabilità piu basse
    """
    # taglio l input se non sta nel contesto
    ctx_len = gpt_model.config.block_size 
    idx_cond = tokenized_input if tokenized_input.size(1) <= ctx_len else tokenized_input[:, -ctx_len:]
    
    # eseguo il modello
    logits, _ = gpt_model(idx_cond)
    
    # applico temperature e top_k alle probabilità
    logits = logits[:, -1, :] / temperature
    # optionally crop the logits to only the top k options
    if top_k is not None:
        v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
        logits[logits < v[:, [-1]]] = -float('Inf')

    return logits


Ora che ci sono tutti gli ingredienti di base, si può effettivamente provare a generare qualcosa

In [54]:
x_not_watermarked = encode_input(initial_prompt)
for _ in range(max_new_tokens):
    logits = get_logits(model, x_not_watermarked, temperature, top_k)
    probs = F.softmax(logits, dim=-1)
    idx_next = torch.multinomial(probs, num_samples=1)
    x_not_watermarked = torch.cat((x_not_watermarked, idx_next), dim=1)

print("input text:", initial_prompt)
print("generated text:", decode(x_not_watermarked[0].tolist()))
print(x_not_watermarked[0].tolist())

input text: per simulare
generated text: per simulare la propria capacità di generare un'economia in larga scala. L'economia del Paese ha un carattere di ricerca e di sviluppo sostenibile. Il governo Popolare
[549, 11705, 607, 298, 1791, 2524, 271, 10789, 296, 6, 4118, 286, 5258, 4031, 13, 347, 6, 4118, 279, 3991, 536, 296, 3513, 271, 1834, 277, 271, 1748, 15996, 13, 508, 1316, 8446]


Ho stampato una serie di numeri, perché all inizio ho semplificato e il codice mi costiringe a spiegare qualcosaltro.

In realtà, tra le string in input e il nostro GPT, c'è un altro pezzo, si chiama Tokenizer ed ha il compito di prendere la stringa tipo "mondo" e trasformarla in una lista di numeri.

In [55]:
print(f"{initial_prompt} -> ", encode(initial_prompt))

encoded = encode(initial_prompt)
for i in range(len(encoded)):
    print(f"{encoded[i]} -> {decode([encoded[i]])}")

per simulare ->  [  549 11705   607]
549 -> per
11705 ->  simu
607 -> lare


Per applicare il nostro watermark, ci inseriremo al livello delle probabilità che il modello assegna ad ogni token.

In [56]:
print_top_k = 15

def print_next_token_stats(esempio):
    x = encode_input(esempio)
    logits = get_logits(model, x, temperature, top_k)
    probs = F.softmax(logits, dim=-1)

    # prints the logits of 10 most probable tokens
    logits_x_token = logits[0].topk(print_top_k).values.tolist()
    tokens = logits[0].topk(print_top_k).indices.tolist()
    probs_x_token = probs[0].topk(print_top_k).values.tolist()

    print(f"input text: {esempio}")
    print("tok \t   dec \t   punteg \t   prob")
    for i in range(len(tokens)):
        ti = tokens[i]
        dti = decode([ti])
        lti = logits_x_token[i]
        pti = round(probs_x_token[i]*100, 2)
        print(f"{ti} \t | {dti} \t | {lti:.2f} \t | {pti}%")

print_next_token_stats(initial_prompt)

input text: per simulare
tok 	   dec 	   punteg 	   prob
298 	 |  la 	 | 13.05 	 | 26.32%
305 	 |  il 	 | 12.72 	 | 18.86%
325 	 |  l 	 | 12.28 	 | 12.21%
350 	 |  le 	 | 12.08 	 | 10.01%
281 	 |  i 	 | 12.00 	 | 9.21%
296 	 |  un 	 | 11.98 	 | 9.01%
376 	 |  una 	 | 11.40 	 | 5.04%
507 	 |  gli 	 | 10.26 	 | 1.62%
11 	 | , 	 | 10.02 	 | 1.27%
277 	 |  e 	 | 9.59 	 | 0.83%
448 	 |  lo 	 | 9.33 	 | 0.64%
661 	 |  questo 	 | 8.90 	 | 0.41%
1377 	 |  immagini 	 | 8.75 	 | 0.36%
499 	 |  anche 	 | 8.57 	 | 0.3%
286 	 |  in 	 | 8.57 	 | 0.3%


## OK ma come si fa il watermark?

Tornando all'azzurrissimo esempio di cui sopra, l'idea è quella di manipolare le probabilità (o meglio, i "punteggi" prima che vengano trasformati in probabilità).

Per farlo, ciò che viene suggerito da [alcuni più svegli di me](https://arxiv.org/abs/2301.10226) è di creare una algoritmo che a partire dall'input, genera una maschera, in corrispondenza della quale si modifichino i punteggi associati ai vari token nel vocabolario del GPT.

![masking](./assets/masking.png)

Questo processo è da ripetere per ogni parola.

Siccome noi conosciamo e possiamo ripetere l'algoritmo che genera la maschera, potremo in futuro controllare se un qualsiasi input è stato generato che il LLM al quale abbiamo applicato il nostro watermark.

### L'algoritmo piu semplice?

In [57]:
class WatermarkBase:
    def __init__(
        self,
        vocab: list[int] = None,
        gamma: float = 0.5, # quanto del vocabolario alterare
        delta: float = 2, # quanto perturberemo la probabilità
        hash_key: int = 15485863,  # deve solo essere un numero primo grandino
    ):

        # watermarking parameters
        self.vocab_size = len(vocab)
        self.gamma = gamma
        self.delta = delta
        self.device = 'cpu'
        self.rng = torch.Generator(device=self.device)
        self.hash_key = hash_key

    def _seed_rng(self, prev_token: int) -> None:
        # per inizilaizzare il generatore di numeri casuali
        self.rng.manual_seed(self.hash_key * prev_token)

    def _get_greenlist_ids(self, input_ids: torch.LongTensor) -> list[int]:
        # inizializzo il generatore di numeri casuali
        # prendo l'ultimo token
        calc_seed = lambda x: x[-1].item() if type(x) == torch.Tensor else x[-1]
        self._seed_rng(calc_seed(input_ids))

        # mescolo il vocabolario
        vocab_permutation = torch.randperm(self.vocab_size, device=self.device, generator=self.rng)
        # decido dove tagliare
        greenlist_size = int(self.gamma * self.vocab_size)
        # prendo i primi
        # avrò una lista casuale di id del vocabolario, la maschera
        greenlist_ids = vocab_permutation[:greenlist_size]
        return greenlist_ids   

Una volta preparato l'algoritmo di base per la generazione degli id da alterare, serve qualcosa per **applicare** il watermark e qualcosa per **riconoscerlo** 

In [58]:
class LLMWatermarker(WatermarkBase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _calc_greenlist_mask(self, scores: torch.FloatTensor, green_token_ids) -> torch.BoolTensor:
        """
        crea un vettore come il vocabolario
        con 1 dove ci sono i token da pompare
        """
        green_tokens_mask = torch.zeros_like(scores)
        green_tokens_mask[0][green_token_ids] = 1
        final_mask = green_tokens_mask.bool()
        return final_mask

    def watermark(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
        seed_token = input_ids[-1]
        greenlist_ids = self._get_greenlist_ids(seed_token)
        green_tokens_mask = self._calc_greenlist_mask(scores, greenlist_ids)
        scores[green_tokens_mask] = scores[green_tokens_mask] + self.delta
        return scores
    
class WatermarkDetector(WatermarkBase):
    def __init__( self, *args, **kwargs, ):
        super().__init__(*args, **kwargs)
        self.min_prefix_len = 1
     
    def score_sequence( self, input_ids: torch.Tensor):
        num_tokens_scored = len(input_ids) - self.min_prefix_len
        green_token_count = 0
        green_token_mask = []
        for idx in range(self.min_prefix_len, len(input_ids)):
            curr_token = input_ids[idx]
            greenlist_ids = self._get_greenlist_ids(input_ids[:idx])
            if curr_token in greenlist_ids:
                green_token_count += 1
                green_token_mask.append(True)
            else:
                green_token_mask.append(False)

        score_dict = dict(
            num_tokens_scored=num_tokens_scored,
            num_green_tokens=green_token_count,
            green_fraction=(green_token_count / num_tokens_scored),
            green_token_mask=green_token_mask
        )

        return score_dict
    
watermarker = LLMWatermarker(vocab = tokenizer.get_vocab())
detector = WatermarkDetector(vocab = tokenizer.get_vocab())

Ora ringenero dallo spesso prompt ma aggiungendo il watermark

In [59]:
xw = encode_input(initial_prompt)
for _ in range(max_new_tokens):
    logits = get_logits(model, xw, temperature, top_k)
    logits = watermarker.watermark(xw, logits)
    probs = F.softmax(logits, dim=-1)
    idx_next = torch.multinomial(probs, num_samples=1)
    xw = torch.cat((xw, idx_next), dim=1)


print("input text:", initial_prompt)
print("generated text:", decode(xw[0].tolist()))
print(xw[0].tolist())

input text: per simulare
generated text: per simulare un ruolo importante, i modelli di questo tipo di produzione che lo hanno rimpatricando come la maggior parte dei siti di comunicazione, di cui si
[549, 11705, 607, 296, 2041, 1928, 11, 281, 3770, 271, 661, 1276, 271, 1636, 315, 448, 799, 329, 1484, 427, 2343, 452, 298, 1490, 567, 424, 6135, 271, 4526, 11, 271, 554, 352]


In [60]:
tokenized_text_not_watermarked = x_not_watermarked[0].tolist()
tokenized_text_with_watermark = xw[0].tolist()
not_watermarked_stats = detector.score_sequence(tokenized_text_not_watermarked)
completion_stats =  detector.score_sequence(tokenized_text_with_watermark)

not_wm_green_fraction = not_watermarked_stats['green_fraction']
wm_green_fraction = completion_stats['green_fraction']

print(f"green fraction without watermark: {not_wm_green_fraction:.2f}")
print(f"green fraction with watermark: {wm_green_fraction:.2f}")


green fraction without watermark: 0.44
green fraction with watermark: 0.78


### Pro?
- Veloce
- Preserva varieta
- Non segreto
- Non serve pubblicare i pesi del modello
- "Resistente"

### Contro?
- Richiede accesso alle probabilità calcolate dal LLM
    - il modello deve essere proprietario
    - non può essere usato post-generazione