In [6]:
import numpy as np
import galois

# Step 1: LDPC Setup
GF2 = galois.GF(2)
n, g, t, r = 100, 50, 3, 50

# Generate sparse parity-check matrix P
P = GF2.Zeros((r, n))
for row in P:
    indices = np.random.choice(n, size=t, replace=False)
    row[indices] = 1

# Compute generator matrix G (null space of P)
null_space = P.null_space()
G = null_space.T  # Shape (n, g) = (100, 50)

# Step 2: Generate pseudorandom codeword x with Bernoulli noise
s = GF2.Random(g)  # Secret vector
e_np = np.random.binomial(1, p=0.1, size=n)  # Noise vector η=0.1
e = GF2(e_np)
x = G @ s + e

# Step 3/4: Generate watermarked text (fixed)
def generate_watermarked(prompt, x):
    t = []
    for i in range(len(x)):
        p_i = 0.5  # Assume model's probability is 0.5 (maximal bias)
        x_i = int(x[i])  # Convert GF2 element to 0/1 integer

        if p_i <= 0.5:
            prob = 2 * x_i * p_i  # e.g., 0 or 1.0 if p_i=0.5
        else:
            prob = 1 - 2 * (1 - x_i) * (1 - p_i)

        t_i = np.random.binomial(1, prob)
        t.append(t_i)
    return GF2(t)  # Convert final list to GF2 array

watermarked_text = generate_watermarked("Prompt", x)

# Step 5: Detect watermark
def detect_watermark(t, P, threshold):
    syndrome = P @ t
    weight = np.sum(syndrome)
    return weight < threshold

threshold = (0.5 - r**-0.25) * r
is_watermarked = detect_watermark(watermarked_text, P, threshold)
print("Watermark detected:", is_watermarked)

Watermark detected: True


In [2]:
!pip install galois

Collecting galois
  Downloading galois-0.4.6-py3-none-any.whl.metadata (14 kB)
Downloading galois-0.4.6-py3-none-any.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m49.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: galois
Successfully installed galois-0.4.6


In [7]:
!pip install fire

Collecting fire
  Downloading fire-0.7.0.tar.gz (87 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/87.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.2/87.2 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: fire
  Building wheel for fire (setup.py) ... [?25l[?25hdone
  Created wheel for fire: filename=fire-0.7.0-py3-none-any.whl size=114249 sha256=b996cec4700a5115d469dd9d75c9c24c574706af39860b22a479e3500d3b4d4f
  Stored in directory: /root/.cache/pip/wheels/46/54/24/1624fd5b8674eb1188623f7e8e17cdf7c0f6c24b609dfb8a89
Successfully built fire
Installing collected packages: fire
Successfully installed fire-0.7.0


In [9]:
! mv /content/gpt-2/src/* /content/

In [8]:
!git clone https://github.com/openai/gpt-2.git

Cloning into 'gpt-2'...
remote: Enumerating objects: 239, done.[K
remote: Counting objects: 100% (5/5), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 239 (delta 1), reused 0 (delta 0), pack-reused 234 (from 2)[K
Receiving objects: 100% (239/239), 4.38 MiB | 8.87 MiB/s, done.
Resolving deltas: 100% (125/125), done.


In [11]:
!pip install tensorflow



In [12]:
import fire
import json
import os
import numpy as np
import tensorflow as tf
import galois
from model import default_hparams
from sample import sample_sequence
from encoder import get_encoder

def generate_watermark_params(length):
    """Generate LDPC matrices and watermark codeword."""
    GF2 = galois.GF(2)
    n, g, t, r = length, length//2, 3, 50  # Adjust parameters

    # Generate parity-check matrix P
    P = GF2.Zeros((r, n))
    for row in P:
        indices = np.random.choice(n, size=t, replace=False)
        row[indices] = 1

    # Compute generator matrix G
    null_space = P.null_space()
    G = null_space.T

    # Generate codeword x with noise
    s = GF2.Random(g)
    e = GF2(np.random.binomial(1, p=0.1, size=n))
    x = G @ s + e
    return P, x

def bias_logits(logits, x_bit, vocab_size):
    """Bias logits based on watermark bit (simplified example)."""
    # Create bias: +δ for even tokens if x_bit=1, +δ for odd tokens if x_bit=0
    bias = np.zeros(vocab_size, dtype=np.float32)
    even_mask = np.array([i % 2 == 0 for i in range(vocab_size)])
    odd_mask = ~even_mask

    if x_bit == 1:
        bias[even_mask] = 0.5  # Adjust bias strength
    else:
        bias[odd_mask] = 0.5
    return logits + bias

def sample_model(
    model_name='124M',
    seed=None,
    nsamples=0,
    batch_size=1,
    length=None,
    temperature=1,
    top_k=0,
    top_p=1,
    models_dir='models',
    watermark=True
):
    models_dir = os.path.expanduser(os.path.expandvars(models_dir))
    enc = get_encoder(model_name, models_dir)
    hparams = default_hparams()

    with open(os.path.join(models_dir, model_name, 'hparams.json')) as f:
        hparams.override_from_dict(json.load(f))

    if length is None:
        length = hparams.n_ctx

    # Generate watermark parameters
    if watermark:
        P, x = generate_watermark_params(length)
        x_bits = x.tolist()
    else:
        x_bits = [0]*length

    with tf.Session(graph=tf.Graph()) as sess:
        context = tf.placeholder(tf.int32, [batch_size, None])
        np.random.seed(seed)
        tf.set_random_seed(seed)

        # Modified sampling with watermark biasing
        def modified_sample():
            logits = sample_sequence(
                hparams=hparams, length=length,
                start_token=enc.encoder['<|endoftext|>'],
                batch_size=batch_size,
                temperature=temperature, top_k=top_k, top_p=top_p
            )

            if watermark:
                # Apply watermark bias to logits
                logits_list = tf.unstack(logits, axis=1)
                for i in range(length):
                    bit = x_bits[i % len(x_bits)]
                    logits_list[i] = tf.py_func(
                        lambda l: bias_logits(l, bit, hparams.n_vocab),
                        [logits_list[i]],
                        tf.float32
                    )
                logits = tf.stack(logits_list, axis=1)

            return logits[:, 1:]

        output = modified_sample()
        saver = tf.train.Saver()
        ckpt = tf.train.latest_checkpoint(os.path.join(models_dir, model_name))
        saver.restore(sess, ckpt)

        generated = 0
        while nsamples == 0 or generated < nsamples:
            out = sess.run(output, feed_dict={context: [[enc.encoder['<|endoftext|>']]]})
            for i in range(batch_size):
                text = enc.decode(out[i])
                print("=" * 40 + " SAMPLE " + str(generated+1) + " " + "=" * 40)
                print(text)
                generated += 1

if __name__ == '__main__':
    fire.Fire(sample_model)

ModuleNotFoundError: No module named 'tensorflow.contrib'

In [2]:
!pip install galois

Collecting galois
  Downloading galois-0.4.6-py3-none-any.whl.metadata (14 kB)
Downloading galois-0.4.6-py3-none-any.whl (4.2 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/4.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.6/4.2 MB[0m [31m20.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m4.2/4.2 MB[0m [31m68.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m48.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: galois
Successfully installed galois-0.4.6


In [3]:
import numpy as np
import torch
import torch.nn.functional as F
import galois
from transformers import GPT2LMHeadModel, GPT2Tokenizer

# --------------------------
# LDPC Code Utilities
# --------------------------
class LDPCCode:
    def __init__(self, n=32, g=16, r=16, t=3, eta=0.1):
        self.n, self.g, self.r, self.t, self.eta = n, g, r, t, eta
        self.P, self.G = self._generate_ldpc()

    def _generate_ldpc(self):
        """Generate sparse parity-check matrix P and random generator matrix G from its null space."""
        # Step 1: Generate sparse P
        P = np.zeros((self.r, self.n), dtype=int)
        for i in range(self.r):
            ones = np.random.choice(self.n, self.t, replace=False)
            P[i][ones] = 1

        # Step 2: Compute null space basis (columns of B span ker(P))
        GF2 = galois.GF(2)
        P_gf = GF2(P)
        B = P_gf.null_space().T  # Transpose to get basis as columns, shape (n, k)

        # Step 3: Randomly sample g vectors from ker(P) to form G
        k = B.shape[1]  # Dimension of null space
        if k < self.g:
            raise ValueError(f"Null space dimension {k} < required g={self.g}; increase n or reduce g.")

        # Generate a random binary matrix R ∈ F2^(k × g) for linear combinations
        R = GF2.Random((k, self.g))

        # Compute G = B * R over GF(2): each column of G is a random codeword in ker(P)
        G = B @ R  # Matrix multiplication over GF(2)

        return P, G.view(np.ndarray)  # Convert back to NumPy array

    def encode_seed(self):
        """Generate a watermarked seed vector x = G*s ⊕ e."""
        s = np.random.randint(0, 2, self.g)
        e = np.random.binomial(1, self.eta, self.n)
        x = (self.G @ s) % 2  # Matrix multiplication over F2
        x = (x + e) % 2  # Add noise
        return x

    def decode_watermark(self, x_prime):
        """Detect watermark via parity-check weight."""
        GF2 = galois.GF(2)
        P_gf = GF2(self.P)
        x_prime_gf = GF2(x_prime)
        syndrome = (P_gf @ x_prime_gf).tolist()
        weight = np.sum(syndrome)
        threshold = self.r / 2 - self.r**0.75  # Empirical threshold
        return 1 if weight < threshold else 0


# --------------------------
# Token-to-Bit Mapping
# --------------------------
class BitMask:
    def __init__(self, vocab_size, device="cpu"):
        self.vocab_size = vocab_size
        self.bit_mask = self._create_bit_mask()
        self.bit_mask_tensor = torch.tensor(self.bit_mask, dtype=torch.bool, device=device)

    def _create_bit_mask(self):
        """Split vocabulary into two groups for bit assignment."""
        bit_mask = np.zeros(self.vocab_size, dtype=int)
        for token_idx in range(self.vocab_size):
            bit_mask[token_idx] = token_idx % 2  # Even = 0, Odd = 1
        return bit_mask


# --------------------------
# Watermarked Generation
# --------------------------
class WatermarkedGenerator:
    def __init__(self, model, tokenizer, ldpc, bit_mask):
        self.model = model
        self.tokenizer = tokenizer
        self.ldpc = ldpc
        self.bit_mask = bit_mask  # This must be a BitMask instance

    def _watermarked_sample(self, logits, xi):
        probs = F.softmax(logits, dim=-1)

        probs_squeezed = probs.squeeze()
        p_1 = probs_squeezed[self.bit_mask.bit_mask_tensor].sum().item()

        if p_1 <= 0.5:
            b = np.random.binomial(1, 2 * xi * p_1)
        else:
            adjusted_p = 1 - 2 * (1 - xi) * (1 - p_1)
            b = np.random.binomial(1, adjusted_p)

        b_mask = self.bit_mask.bit_mask_tensor == b
        probs_masked = probs_squeezed[b_mask]
        probs_masked /= probs_masked.sum()

        token_ids = torch.arange(self.bit_mask.vocab_size, device=logits.device)[b_mask]
        token_id = token_ids[torch.multinomial(probs_masked, num_samples=1)]

        return token_id  # Shape: (1,)

    def generate(self, prompt, max_length=50):
        input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.model.device)
        generated_text = prompt
        x = self.ldpc.encode_seed()  # Generate seed

        for i in range(min(max_length, len(x))):
            with torch.no_grad():
                outputs = self.model(input_ids)
                logits = outputs.logits[:, -1, :]
                xi = x[i]
                token_id = self._watermarked_sample(logits, xi)

                # Convert token_id from (1,) -> (1, 1)
                input_ids = torch.cat([input_ids, token_id.unsqueeze(0)], dim=1)
                generated_text += self.tokenizer.decode(token_id)

        return generated_text, x


# --------------------------
# Example Usage
# --------------------------
if __name__ == "__main__":
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Using device:", device)

    # Load model and tokenizer
    model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)
    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

    # Initialize components
    vocab_size = tokenizer.vocab_size
    bit_mask = BitMask(vocab_size, device=device)
    ldpc = LDPCCode(n=32, g=16, r=16, t=3)

    w_generator = WatermarkedGenerator(model, tokenizer, ldpc, bit_mask)

    # Generate watermarked text
    prompt = "Once upon a time"
    print("Generating watermarked text...")
    watermarked_text, seed = w_generator.generate(prompt, max_length=32)
    print("Watermarked Text:", watermarked_text)

    # Detect watermark
    print("Detecting watermark...")
    x_prime_bits = []
    for token in tokenizer.encode(watermarked_text):
        x_prime_bits.append(bit_mask.bit_mask[token])
    x_prime = np.array(x_prime_bits[:len(seed)])
    is_watermarked = ldpc.decode_watermark(x_prime)
    print("Watermarked?", "Yes" if is_watermarked else "No")

    # Generate unwatermarked text (baseline)
    print("\nGenerating unwatermarked text...")
    unwatermarked_ids = model.generate(
        tokenizer.encode(prompt, return_tensors="pt").to(device),
        max_length=32
    )
    unwatermarked_text = tokenizer.decode(unwatermarked_ids[0])
    print("Unwatermarked Text:", unwatermarked_text)

Using device: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Generating watermarked text...


The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Watermarked Text: Once upon a time, most Universities and colleges within a five-minute walk from the Capitol would be closed regardless of which big environmental or trash-dumpster pushing the panic button.
Detecting watermark...
Watermarked? No

Generating unwatermarked text...
Unwatermarked Text: Once upon a time, the world was a place of great beauty and great danger. The world was a place of great danger, and the world was a place


In [4]:
import numpy as np
import torch
import torch.nn.functional as F
import galois
from transformers import GPT2LMHeadModel, GPT2Tokenizer

# --------------------------
# LDPC Code Utilities
# --------------------------
class LDPCCode:
    def __init__(self, n=32, g=16, r=16, t=3, eta=0.05):
        self.n, self.g, self.r, self.t, self.eta = n, g, r, t, eta
        self.P, self.G = self._generate_ldpc()

    def _generate_ldpc(self):
        """Generate sparse parity-check matrix P and random generator matrix G from its null space."""
        # Step 1: Generate sparse P
        P = np.zeros((self.r, self.n), dtype=int)
        for i in range(self.r):
            ones = np.random.choice(self.n, self.t, replace=False)
            P[i][ones] = 1

        # Step 2: Compute null space basis (columns of B span ker(P))
        GF2 = galois.GF(2)
        P_gf = GF2(P)
        B = P_gf.null_space().T  # B has shape (n, k)

        # Step 3: Randomly sample g vectors from ker(P) to form G
        k = B.shape[1]  # Dimension of null space
        if k < self.g:
            raise ValueError(f"Null space dimension {k} < required g={self.g}; increase n or reduce g.")

        # Generate a random binary matrix R ∈ F2^(k × g) for linear combinations
        R = GF2.Random((k, self.g))

        # Compute G = B * R over GF(2): each column of G is a random codeword in ker(P)
        G = B @ R  # Matrix multiplication over GF(2)

        return P, G.view(np.ndarray)  # Convert back to NumPy array

    def encode_seed(self):
        """Generate a watermarked seed vector x = G*s ⊕ e."""
        s = np.random.randint(0, 2, self.g)
        e = np.random.binomial(1, self.eta, self.n)
        x = (self.G @ s) % 2  # Matrix multiplication over F2
        x = (x + e) % 2  # Add noise
        return x

    def decode_watermark(self, x_prime):
        """Detect watermark via parity-check weight."""
        GF2 = galois.GF(2)
        P_gf = GF2(self.P)
        x_prime_gf = GF2(x_prime)
        syndrome = (P_gf @ x_prime_gf).tolist()
        weight = np.sum(syndrome)
        threshold = self.r / 2 - self.r**0.5  # Empirical threshold
        return 1 if weight < threshold else 0


# --------------------------
# Token-to-Bit Mapping
# --------------------------
class BitMask:
    def __init__(self, vocab_size, device="cpu"):
        self.vocab_size = vocab_size
        self.bit_mask = self._create_bit_mask()
        self.bit_mask_tensor = torch.tensor(self.bit_mask, dtype=torch.bool, device=device)

    def _create_bit_mask(self):
        """Split vocabulary into two groups for bit assignment."""
        bit_mask = np.zeros(self.vocab_size, dtype=int)
        for token_idx in range(self.vocab_size):
            bit_mask[token_idx] = token_idx % 2  # Even = 0, Odd = 1
        return bit_mask


# --------------------------
# Watermarked Generation
# --------------------------
class WatermarkedGenerator:
    def __init__(self, model, tokenizer, ldpc, bit_mask):
        self.model = model
        self.tokenizer = tokenizer
        self.ldpc = ldpc
        self.bit_mask = bit_mask  # This must be a BitMask instance

    def _watermarked_sample(self, logits, xi):
        """Sample token with watermark bias based on seed bit xi."""
        probs = F.softmax(logits, dim=-1)

        # Remove batch dimension
        probs_squeezed = probs.squeeze()

        # Use bit_mask_tensor for efficient masking
        p_1 = probs_squeezed[self.bit_mask.bit_mask_tensor].sum().item()

        # Bias token selection based on xi and p_1
        if p_1 <= 0.5:
            b = np.random.binomial(1, 2 * xi * p_1)
        else:
            adjusted_p = 1 - 2 * (1 - xi) * (1 - p_1)
            b = np.random.binomial(1, adjusted_p)

        # Sample token from bit group
        b_mask = self.bit_mask.bit_mask_tensor == b
        probs_masked = probs_squeezed[b_mask]
        probs_masked /= probs_masked.sum()

        # Get token indices that match the bit condition
        token_ids = torch.arange(self.bit_mask.vocab_size, device=logits.device)[b_mask]
        token_id = token_ids[torch.multinomial(probs_masked, num_samples=1)]

        return token_id  # Shape: (1,)

    def generate(self, prompt, max_length=50):
        """Generate watermarked text."""
        input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.model.device)
        generated_text = prompt
        x = self.ldpc.encode_seed()  # Generate seed

        for i in range(min(max_length, len(x))):
            with torch.no_grad():
                outputs = self.model(input_ids)
                logits = outputs.logits[:, -1, :]
                xi = x[i]
                token_id = self._watermarked_sample(logits, xi)

                # Convert token_id from (1,) → (1, 1) for concatenation
                input_ids = torch.cat([input_ids, token_id.unsqueeze(0)], dim=1)
                generated_text += self.tokenizer.decode(token_id)

        return generated_text, x


# --------------------------
# Example Usage
# --------------------------
if __name__ == "__main__":
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Using device:", device)

    # Load model and tokenizer
    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token  # Fix attention mask warning
    model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)
    model.config.pad_token_id = tokenizer.pad_token_id

    vocab_size = tokenizer.vocab_size
    bit_mask = BitMask(vocab_size, device=device)
    ldpc = LDPCCode(n=32, g=16, r=16, t=3, eta=0.05)  # Lower noise for better detection

    w_generator = WatermarkedGenerator(model, tokenizer, ldpc, bit_mask)

    # Generate watermarked text
    prompt = "Once upon a time"
    print("Generating watermarked text...")
    watermarked_text, seed = w_generator.generate(prompt, max_length=32)
    print("Watermarked Text:", watermarked_text)

    # Detect watermark
    print("Detecting watermark...")
    x_prime_bits = []
    tokens = tokenizer.encode(watermarked_text)
    for token in tokens:
        if token < vocab_size:
            x_prime_bits.append(bit_mask.bit_mask[token])
    x_prime = np.array(x_prime_bits[:len(seed)])
    if len(x_prime) < len(seed):
        x_prime = np.pad(x_prime, (0, len(seed) - len(x_prime)), constant_values=0)
    is_watermarked = ldpc.decode_watermark(x_prime)
    print("Watermarked?", "Yes" if is_watermarked else "No")

    # Generate unwatermarked text (baseline)
    print("\nGenerating unwatermarked text...")
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
    unwatermarked_ids = model.generate(
        input_ids,
        max_length=32,
        pad_token_id=tokenizer.pad_token_id
    )
    unwatermarked_text = tokenizer.decode(unwatermarked_ids[0])
    print("Unwatermarked Text:", unwatermarked_text)

Using device: cuda
Generating watermarked text...
Watermarked Text: Once upon a time people were more or less naked when a horse travelled west at a speed of forty miles per hour and dragged their mount. The riding of horses is considered important in
Detecting watermark...
Watermarked? No

Generating unwatermarked text...
Unwatermarked Text: Once upon a time, the world was a place of great beauty and great danger. The world was a place of great danger, and the world was a place


In [1]:
!pip install galois

Collecting galois
  Downloading galois-0.4.6-py3-none-any.whl.metadata (14 kB)
Downloading galois-0.4.6-py3-none-any.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m20.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: galois
Successfully installed galois-0.4.6


In [2]:
import numpy as np
import torch
import torch.nn.functional as F
import galois
from transformers import GPT2LMHeadModel, GPT2Tokenizer

# --------------------------
# LDPC Code Utilities
# --------------------------
class LDPCCode:
    def __init__(self, n=32, g=16, r=16, t=3, eta=0.05):
        self.n, self.g, self.r, self.t, self.eta = n, g, r, t, eta
        self.P, self.G = self._generate_ldpc()
        self.threshold = None  # Will be set via calibration

    def _generate_ldpc(self):
        """Generate sparse parity-check matrix P and random generator matrix G from its null space."""
        # Step 1: Generate sparse P
        P = np.zeros((self.r, self.n), dtype=int)
        for i in range(self.r):
            ones = np.random.choice(self.n, self.t, replace=False)
            P[i][ones] = 1

        # Step 2: Compute null space basis (columns of B span ker(P))
        GF2 = galois.GF(2)
        P_gf = GF2(P)
        B = P_gf.null_space().T  # B has shape (n, k)

        # Step 3: Randomly sample g vectors from ker(P) to form G
        k = B.shape[1]  # Dimension of null space
        if k < self.g:
            raise ValueError(f"Null space dimension {k} < required g={self.g}; increase n or reduce g.")

        # Generate a random binary matrix R ∈ F2^(k × g) for linear combinations
        R = GF2.Random((k, self.g))

        # Compute G = B * R over GF(2): each column of G is a random codeword in ker(P)
        G = B @ R  # Matrix multiplication over GF(2)

        return P, G.view(np.ndarray)  # Convert back to NumPy array

    def encode_seed(self):
        """Generate a watermarked seed vector x = G*s ⊕ e."""
        s = np.random.randint(0, 2, self.g)
        e = np.random.binomial(1, self.eta, self.n)
        x = (self.G @ s) % 2  # Matrix multiplication over F2
        x = (x + e) % 2  # Add noise
        return x

    def decode_watermark(self, x_prime, return_weight=False):
        """Detect watermark via parity-check weight."""
        GF2 = galois.GF(2)
        P_gf = GF2(self.P)
        x_prime_gf = GF2(x_prime)
        syndrome = (P_gf @ x_prime_gf).tolist()
        weight = np.sum(syndrome)
        if return_weight:
            return weight
        return 1 if weight < self.threshold else 0


# --------------------------
# Token-to-Bit Mapping
# --------------------------
class BitMask:
    def __init__(self, vocab_size, device="cpu"):
        self.vocab_size = vocab_size
        self.bit_mask = self._create_bit_mask()
        self.bit_mask_tensor = torch.tensor(self.bit_mask, dtype=torch.bool, device=device)

    def _create_bit_mask(self):
        """Split vocabulary into two groups for bit assignment."""
        bit_mask = np.zeros(self.vocab_size, dtype=int)
        for token_idx in range(self.vocab_size):
            bit_mask[token_idx] = token_idx % 2  # Even = 0, Odd = 1
        return bit_mask


# --------------------------
# Watermarked Generation
# --------------------------
class WatermarkedGenerator:
    def __init__(self, model, tokenizer, ldpc, bit_mask):
        self.model = model
        self.tokenizer = tokenizer
        self.ldpc = ldpc
        self.bit_mask = bit_mask  # This must be a BitMask instance

    def _watermarked_sample(self, logits, xi):
        """Sample token with watermark bias based on seed bit xi."""
        probs = F.softmax(logits, dim=-1)

        # Remove batch dimension
        probs_squeezed = probs.squeeze()

        # Use bit_mask_tensor for efficient masking
        p_1 = probs_squeezed[self.bit_mask.bit_mask_tensor].sum().item()

        # Stronger bias: use inverse probability weighting
        if p_1 <= 0.5:
            b = np.random.binomial(1, xi * p_1 * 2)
        else:
            adjusted_p = xi * (1 - 2 * (1 - p_1)) + (1 - xi) * (1 - 2 * p_1)
            adjusted_p = max(0.01, min(0.99, adjusted_p))
            b = np.random.binomial(1, adjusted_p)

        # Sample token from bit group
        b_mask = self.bit_mask.bit_mask_tensor == b
        probs_masked = probs_squeezed[b_mask]
        probs_masked /= probs_masked.sum()

        # Get token indices that match the bit condition
        token_ids = torch.arange(self.bit_mask.vocab_size, device=logits.device)[b_mask]
        token_id = token_ids[torch.multinomial(probs_masked, num_samples=1)]

        return token_id  # Shape: (1,)

    def generate(self, prompt, max_length=50):
        """Generate watermarked text."""
        input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.model.device)
        generated_text = prompt
        x = self.ldpc.encode_seed()  # Generate seed

        for i in range(min(max_length, len(x))):
            with torch.no_grad():
                outputs = self.model(input_ids)
                logits = outputs.logits[:, -1, :]
                xi = x[i]
                token_id = self._watermarked_sample(logits, xi)

                # Convert token_id from (1,) → (1, 1) for concatenation
                input_ids = torch.cat([input_ids, token_id.unsqueeze(0)], dim=1)
                generated_text += self.tokenizer.decode(token_id)

        return generated_text, x


# --------------------------
# Helper Functions
# --------------------------
def extract_bits(text, bit_mask, max_len):
    tokens = bit_mask.tokenizer.encode(text)
    bits = []
    for token in tokens:
        if token < bit_mask.vocab_size:
            bits.append(bit_mask.bit_mask[token])
        if len(bits) >= max_len:
            break
    return bits

def calibrate_threshold(ldpc, bit_mask, w_generator, prompt="Once upon a time", n_samples=20):
    watermarked_weights = []
    unwatermarked_weights = []

    for _ in range(n_samples):
        # Watermarked
        text, seed = w_generator.generate(prompt, max_length=ldpc.n)
        bits = extract_bits(text, bit_mask, ldpc.n)
        x = np.array(bits[:ldpc.n])
        weight = ldpc.decode_watermark(x, return_weight=True)
        watermarked_weights.append(weight)

        # Unwatermarked
        input_ids = w_generator.tokenizer.encode(prompt, return_tensors="pt").to(w_generator.model.device)
        ids = w_generator.model.generate(input_ids, max_length=ldpc.n, pad_token_id=w_generator.tokenizer.pad_token_id)
        text = w_generator.tokenizer.decode(ids[0])
        bits = extract_bits(text, bit_mask, ldpc.n)
        x = np.array(bits[:ldpc.n])
        weight = ldpc.decode_watermark(x, return_weight=True)
        unwatermarked_weights.append(weight)

    threshold = (np.mean(watermarked_weights) + np.mean(unwatermarked_weights)) / 2
    print("Calibrated Threshold:", threshold)
    return threshold


# --------------------------
# Example Usage
# --------------------------
if __name__ == "__main__":
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Using device:", device)

    # Load model and tokenizer
    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token  # Fix attention mask warning
    model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)
    model.config.pad_token_id = tokenizer.pad_token_id

    vocab_size = tokenizer.vocab_size
    bit_mask = BitMask(vocab_size, device=device)
    bit_mask.tokenizer = tokenizer  # Store for extract_bits
    ldpc = LDPCCode(n=32, g=16, r=16, t=3, eta=0.05)  # Lower noise for better detection

    w_generator = WatermarkedGenerator(model, tokenizer, ldpc, bit_mask)

    # Calibrate threshold using empirical sampling
    print("Calibrating threshold...")
    threshold = calibrate_threshold(ldpc, bit_mask, w_generator)
    ldpc.threshold = threshold

    # Generate watermarked text
    prompt = "Once upon a time"
    print("Generating watermarked text...")
    watermarked_text, seed = w_generator.generate(prompt, max_length=32)
    print("Watermarked Text:", watermarked_text)

    # Detect watermark
    print("Detecting watermark...")
    x_prime_bits = extract_bits(watermarked_text, bit_mask, ldpc.n)
    x_prime = np.array(x_prime_bits[:len(seed)])
    is_watermarked = ldpc.decode_watermark(x_prime)
    print("Watermarked?", "Yes" if is_watermarked else "No")

    # Generate unwatermarked text (baseline)
    print("\nGenerating unwatermarked text...")
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
    unwatermarked_ids = model.generate(
        input_ids,
        max_length=32,
        pad_token_id=tokenizer.pad_token_id
    )
    unwatermarked_text = tokenizer.decode(unwatermarked_ids[0])
    print("Unwatermarked Text:", unwatermarked_text)

Using device: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

Calibrating threshold...


The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Calibrated Threshold: 8.275
Generating watermarked text...
Watermarked Text: Once upon a time they had the wild grain wool, and they had fenced them up such that they would always find them upon their farm bring, every time with what they received
Detecting watermark...
Watermarked? Yes

Generating unwatermarked text...
Unwatermarked Text: Once upon a time, the world was a place of great beauty and great danger. The world was a place of great danger, and the world was a place


In [4]:
import random


def extract_bits(text, bit_mask, max_len):
    tokens = bit_mask.tokenizer.encode(text)
    bits = []
    for token in tokens:
        if token < bit_mask.vocab_size:
            bits.append(bit_mask.bit_mask[token])
        if len(bits) >= max_len:
            break
    # Pad with zeros if too short
    if len(bits) < max_len:
        bits += [0] * (max_len - len(bits))
    return bits[:max_len]  # Truncate if too long

# --------------------------
# Attack Functions
# --------------------------

def delete_tokens(text, num_deletions=5):
    tokens = text.split()
    if len(tokens) <= num_deletions:
        return ""  # Avoid empty text
    indices = sorted(random.sample(range(len(tokens)), num_deletions))
    for i in reversed(indices):
        del tokens[i]
    return " ".join(tokens)

def insert_tokens(text, filler_words=["the", "and", "of", "to", "in"], num_insertions=5):
    tokens = text.split()
    for _ in range(num_insertions):
        idx = random.randint(0, len(tokens))
        word = random.choice(filler_words)
        tokens.insert(idx, word)
    return " ".join(tokens)

# --------------------------
# Test Watermark Robustness
# --------------------------

def test_watermark_robustness(w_generator, ldpc, bit_mask, prompt="Once upon a time"):
    print("\n--- Original Watermarked Text ---")
    watermarked_text, seed = w_generator.generate(prompt, max_length=32)
    print("Original Text:", watermarked_text)

    # Detect original
    x_prime = extract_bits(watermarked_text, bit_mask, ldpc.n)
    is_watermarked = ldpc.decode_watermark(np.array(x_prime))
    print("Detected:", "Yes" if is_watermarked else "No")

    print("\n--- Deletion Attack (5 tokens removed) ---")
    deleted_text = delete_tokens(watermarked_text, num_deletions=5)
    print("Deleted Text:", deleted_text)
    x_prime_del = extract_bits(deleted_text, bit_mask, ldpc.n)
    is_watermarked_del = ldpc.decode_watermark(np.array(x_prime_del))
    print("Detected:", "Yes" if is_watermarked_del else "No")

    print("\n--- Insertion Attack (5 filler tokens added) ---")
    inserted_text = insert_tokens(watermarked_text, num_insertions=5)
    print("Inserted Text:", inserted_text)
    x_prime_ins = extract_bits(inserted_text, bit_mask, ldpc.n)
    is_watermarked_ins = ldpc.decode_watermark(np.array(x_prime_ins))
    print("Detected:", "Yes" if is_watermarked_ins else "No")

    print("\n--- Unwatermarked Text (Baseline) ---")
    input_ids = w_generator.tokenizer.encode(prompt, return_tensors="pt").to(w_generator.model.device)
    unwatermarked_ids = w_generator.model.generate(
        input_ids, max_length=32, pad_token_id=w_generator.tokenizer.pad_token_id
    )
    unwatermarked_text = w_generator.tokenizer.decode(unwatermarked_ids[0])
    print("Unwatermarked Text:", unwatermarked_text)
    x_unwatermarked = extract_bits(unwatermarked_text, bit_mask, ldpc.n)
    is_unwatermarked = ldpc.decode_watermark(np.array(x_unwatermarked))
    print("Detected:", "Yes" if is_unwatermarked else "No")

# --------------------------
# Run Test
# --------------------------

if __name__ == "__main__":
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("Using device:", device)

    # Load model and tokenizer
    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token
    model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)
    model.config.pad_token_id = tokenizer.pad_token_id

    vocab_size = tokenizer.vocab_size
    bit_mask = BitMask(vocab_size, device=device)
    bit_mask.tokenizer = tokenizer
    ldpc = LDPCCode(n=32, g=16, r=16, t=3, eta=0.05)
    w_generator = WatermarkedGenerator(model, tokenizer, ldpc, bit_mask)

    # Calibrate threshold
    print("Calibrating threshold...")
    threshold = calibrate_threshold(ldpc, bit_mask, w_generator)
    ldpc.threshold = threshold

    # Run robustness test
    test_watermark_robustness(w_generator, ldpc, bit_mask)

Using device: cuda
Calibrating threshold...
Calibrated Threshold: 8.15

--- Original Watermarked Text ---
Original Text: Once upon a time where the greatest clubs have dominated local music there can happen. In general the greatest national scene can emerge from long distance clubs driven by one party – Manchester Flyer
Detected: Yes

--- Deletion Attack (5 tokens removed) ---
Deleted Text: Once upon time where the greatest clubs have dominated local music there happen. In the greatest national scene can emerge from long distance clubs one party – Manchester Flyer
Detected: Yes

--- Insertion Attack (5 filler tokens added) ---
Inserted Text: Once upon a time where the greatest clubs have dominated local music there can of happen. In general of the in greatest national scene in can emerge of from long distance clubs driven by one party – Manchester Flyer
Detected: Yes

--- Unwatermarked Text (Baseline) ---
Unwatermarked Text: Once upon a time, the world was a place of great beauty and g