<a href="https://colab.research.google.com/github/LeninPA/watermarking-ecc/blob/main/tokenizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Watermarking usando Códigos Correctores de Errores

## Tokenización

Se usa un algoritmo tipo BPE para realizar la tokenización. Trabajando con `llama-7b` (por los 7 mil millones de parámetros o _billions_ en inglés).

Se necesita cambiar el entorno de ejecución de CPU a GPU o TPU para ejecutar el siguiente notebook.

In [3]:
!pip install torchao

Collecting torchao
  Downloading torchao-0.11.0-cp39-abi3-manylinux_2_28_x86_64.manylinux_2_24_x86_64.whl.metadata (16 kB)
Downloading torchao-0.11.0-cp39-abi3-manylinux_2_28_x86_64.manylinux_2_24_x86_64.whl (5.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.5/5.5 MB[0m [31m41.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torchao
Successfully installed torchao-0.11.0


In [2]:
# Importa el tokenizador de llama
from transformers import LlamaTokenizerFast
tokenizer = LlamaTokenizerFast.from_pretrained("hf-internal-testing/llama-tokenizer")
# Función que usa el BPE de llama para tokenizar
def llama_tokenize(text):
    return tokenizer.encode(text)

tokenizer_config.json:   0%|          | 0.00/1.54k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.84M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/411 [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama_fast.LlamaTokenizerFast'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message.


In [3]:
cadena_1 = "Hello this is a test"
cadena_2 = "psiconeuroinmunoendocrinología"
cadena_3 = "Ken otisemilhwitiwak"

for cadena in [cadena_1, cadena_2, cadena_3]:
    print(f"Vamos a tokenizar a {cadena}")
    tokens = llama_tokenize(cadena)
    print(tokens)
    binary_tokens = [bin(t) for t in tokens]
    print(binary_tokens)
    print("Ahora lo decodificamos token por token:")
    for token in tokens:
        print(tokenizer.decode(token))
    print("---\nTerminamos de decodificar\n")

Vamos a tokenizar a Hello this is a test
[1, 15043, 445, 338, 263, 1243]
['0b1', '0b11101011000011', '0b110111101', '0b101010010', '0b100000111', '0b10011011011']
Ahora lo decodificamos token por token:
<s>
Hello
this
is
a
test
---
Terminamos de decodificar

Vamos a tokenizar a psiconeuroinmunoendocrinología
[1, 6529, 293, 650, 2192, 262, 29885, 9447, 355, 8415, 262, 14046]
['0b1', '0b1100110000001', '0b100100101', '0b1010001010', '0b100010010000', '0b100000110', '0b111010010111101', '0b10010011100111', '0b101100011', '0b10000011011111', '0b100000110', '0b11011011011110']
Ahora lo decodificamos token por token:
<s>
ps
ic
one
uro
in
m
uno
end
ocr
in
ología
---
Terminamos de decodificar

Vamos a tokenizar a Ken otisemilhwitiwak
[1, 10015, 4932, 275, 331, 309, 26828, 4812, 29893, 557]
['0b1', '0b10011100011111', '0b1001101000100', '0b100010011', '0b101001011', '0b100110101', '0b110100011001100', '0b1001011001100', '0b111010011000101', '0b1000101101']
Ahora lo decodificamos token por token

## Implementación _dummy_ de los algoritmos 3,4,5, y 6

In [4]:
from transformers import AutoTokenizer
import numpy as np
from typing import List
from scipy.stats import binom

In [5]:
class LlamaBinaryConverter:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
        self.l = 16  # Número típico de bits por token aunque puede ser 18

    def encode(self, tokens: List[str]) -> List[int]:
        """
        Convierte una lista de tokens en una secuencia binaria.

        Parámetros:
        tokens (List[str]): Lista de tokens de entrada.

        Regresa:
        List[int]: Lista de bits (0s y 1s) que representan los tokens.
        """
        bitstream = []
        for token in tokens:
            tid = self.tokenizer.convert_tokens_to_ids(token)
            bits = format(tid, f'0{self.l}b')
            bitstream.extend([int(b) for b in bits])
        return bitstream

    def decode(self, bits: List[int]) -> List[str]:
        """
        Convierte una secuencia de bits en tokens.

        Parámetros:
        bits (List[int]): Lista de bits de entrada.

        Regresa:
        List[str]: Lista de tokens decodificados.
        """
        tokens = []
        for i in range(0, len(bits), self.l):
            segment = bits[i:i+self.l]
            if len(segment) < self.l:
                break
            tid = int("".join(map(str, segment)), 2)
            token = self.tokenizer.convert_ids_to_tokens(tid)
            tokens.append(token)
        return tokens


In [6]:
class DummyECC:
    def __init__(self, n: int, k: int):
        self.n = n
        self.k = k

    def encode(self, m: List[int]) -> List[int]:
        """
        Codifica el mensaje con un código de corrección de errores (ficticio).

        Parámetros:
        m (List[int]): Mensaje binario de longitud k.

        Regresa:
        List[int]: Palabra código de longitud n.
        """
        return m + [0] * (self.n - self.k)

    def decode(self, y: List[int]) -> List[int]:
        """
        Decodifica una palabra código y devuelve el mensaje original.

        Parámetros:
        y (List[int]): Palabra código.

        Regresa:
        List[int]: Mensaje recuperado.
        """
        return y[:self.k]

def pbin(_):
    return 0.5  # Probabilidad ficticia

def CBSC_sample(y, q):
    u = np.random.rand()
    return int(((1 - y) + u) / 2 <= q)

def rbc_watermark(N, win, wout, R, converter, ecc):
    """
    Implementa el algoritmo de marcado RBC.

    Parámetros:
    N (int): Longitud total de generación.
    win (int): Ventana de entrada.
    wout (int): Ventana de salida.
    R (np.array): Cadena binaria aleatoria.
    converter (LlamaBinaryConverter): Conversor binario basado en tokens.
    ecc (DummyECC): Código de corrección de errores.

    Regresa:
    List[str]: Tokens generados.
    """
    X = tokenizer.convert_ids_to_tokens(np.random.randint(0, tokenizer.vocab_size, win).tolist())
    i = win
    while i <= N:
        bits_prev = converter.encode(X[i-win:i])
        M = np.bitwise_xor(bits_prev[:ecc.k], R)
        Y = ecc.encode(M)
        block = gen_block(X[:i], Y, wout, converter)
        X.extend(block)
        i += wout
    return X

def gen_block(X_prev, Y, wout, converter):
    """
    Genera un bloque de salida desde la palabra código Y.

    Parámetros:
    X_prev (List[str]): Tokens anteriores.
    Y (List[int]): Palabra código.
    wout (int): Número de tokens de salida.
    converter (LlamaBinaryConverter): Conversor binario.

    Regresa:
    List[str]: Tokens generados.
    """
    B = []
    for j in range(len(Y)):
        q = pbin(X_prev + converter.decode(B))
        B.append(CBSC_sample(Y[j], q))

    while len(B) < wout * converter.l:
        q = pbin(X_prev + converter.decode(B))
        B.append(int(np.random.rand() <= q))

    return converter.decode(B)

def detect(X, win, wout, alpha, R, converter, ecc):
    """
    Detecta si una secuencia de tokens contiene una marca de agua RBC.

    Parámetros:
    X (List[str]): Tokens de entrada.
    win (int): Ventana de entrada.
    wout (int): Ventana de salida.
    alpha (float): Nivel de significancia.
    R (np.array): Cadena binaria aleatoria.
    converter (LlamaBinaryConverter): Conversor binario.
    ecc (DummyECC): Código de corrección de errores.

    Regresa:
    str: Resultado de la detección.
    """
    N = len(X)
    k = ecc.k
    Z = []

    for i in range(N - wout - win + 1):
        M = converter.encode(X[i:i+win])[:k]
        M = np.bitwise_xor(M, R)
        B_bits = converter.encode(X[i+win:i+win+wout])
        M_hat = ecc.decode(B_bits)
        matches = k - np.sum(np.bitwise_xor(M, M_hat))
        Z.append(matches)

    return binomial_comparison(Z, k, alpha)

def binomial_comparison(Z, k, alpha):
    """
    Prueba estadística basada en la distribución binomial.

    Parámetros:
    Z (List[int]): Coincidencias.
    k (int): Longitud de mensaje.
    alpha (float): Nivel de significancia.

    Regresa:
    str: Resultado ("WATERMARKED" o "NOT WATERMARKED").
    """
    m = len(Z)
    T = sum(Z)
    p_val = 1 - binom.cdf(T, m * k, 0.5)
    return "WATERMARKED" if p_val <= alpha else "NOT WATERMARKED"
