# 1. Install MarkLLM

In [None]:
# !pip install -r requirements.txt

download_models = False

opt_path = "/workspace/panleyi/models/facebook/opt-1.3b"
llama_path = "models/llama-7b"
nllb_path = "models/nllb-200-distilled-600M"
bert_large_uncased_path = "models/compositional-bert-large-uncased/"
t5_path = "models/t5-v1_1-xxl"
starcoder_path = "models/starcoder"

if download_models:
    !pip install -U huggingface_hub
    !export HF_ENDPOINT=https://hf-mirror.com
    HUGGINGFACE_TOKEN = ""
    !huggingface-cli login --token $HUGGINGFACE_TOKEN
    !huggingface-cli download --resume-download facebook/opt-1.3b --local-dir {opt_path}
    !huggingface-cli download --resume-download princeton-nlp/Sheared-LLaMA-1.3B --local-dir {llama_path}
    !huggingface-cli download --resume-download facebook/nllb-200-distilled-600M --local-dir {nllb_path}
    !huggingface-cli download --resume-download google-bert/bert-large-uncased --local-dir {bert_large_uncased_path}
    !huggingface-cli download --resume-download google/t5-v1_1-xxl --local-dir {t5_path}
    !huggingface-cli download --resume-download bigcode/starcoder --local-dir {starcoder_path}

# 2. Watermaring Algorithm Invocation

In [None]:
import torch
import json
from watermark.auto_watermark import AutoWatermark
from utils.transformers_config import TransformersConfig
from transformers import AutoModelForCausalLM, AutoTokenizer

# Load data
with open('dataset/c4/processed_c4.json', 'r') as f:
    lines = f.readlines()
item = json.loads(lines[0])
prompt = item['prompt']
natural_text = item['natural_text']


def test_algorithm(algorithm_name):
    # Check algorithm name
    assert algorithm_name in ['KGW', 'Unigram', 'SWEET', 'EWD', 'SIR', 'XSIR', 'UPV', 'EXP', 'EXPEdit', 'SynthID', 'Adaptive']

    # Device
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Transformers config
    transformers_config = TransformersConfig(model=AutoModelForCausalLM.from_pretrained(opt_path).to(device),
                                            tokenizer=AutoTokenizer.from_pretrained(opt_path),
                                            vocab_size=50272,
                                            device=device,
                                            max_new_tokens=200,
                                            min_length=230,
                                            do_sample=True,
                                            no_repeat_ngram_size=4)

    # Load watermark algorithm
    myWatermark = AutoWatermark.load(f'{algorithm_name}', algorithm_config=f'config/{algorithm_name}.json', transformers_config=transformers_config)

    # Generate text
    watermarked_text = myWatermark.generate_watermarked_text(prompt)
    unwatermarked_text = myWatermark.generate_unwatermarked_text(prompt)

    # Detect
    detect_result1 = myWatermark.detect_watermark(watermarked_text)
    detect_result2 = myWatermark.detect_watermark(unwatermarked_text)
    detect_result3 = myWatermark.detect_watermark(natural_text)

    print("LLM-generated watermarked text:")
    print(watermarked_text)
    print('\n')
    print(detect_result1)
    print('\n')

    print("LLM-generated unwatermarked text:")
    print(unwatermarked_text)
    print('\n')
    print(detect_result2)
    print('\n')

    print("Natural text:")
    print(natural_text)
    print('\n')
    print(detect_result3)

In [None]:
test_algorithm('KGW')

In [None]:
test_algorithm('Unigram')

In [None]:
test_algorithm('SWEET')

In [None]:
test_algorithm('UPV')

In [None]:
test_algorithm('EWD')

In [None]:
test_algorithm('SIR')

In [None]:
test_algorithm('XSIR')

In [None]:
test_algorithm('SynthID')

In [None]:
test_algorithm('EXP')

In [None]:
test_algorithm('Adaptive')

# 3. Mechanism Visualization

In [None]:
import torch
import gc
import json
from visualize.font_settings import FontSettings
from watermark.auto_watermark import AutoWatermark
from utils.transformers_config import TransformersConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
from visualize.page_layout_settings import PageLayoutSettings
from visualize.data_for_visualization import DataForVisualization
from visualize.visualizer import DiscreteVisualizer, ContinuousVisualizer
from visualize.legend_settings import DiscreteLegendSettings, ContinuousLegendSettings
from visualize.color_scheme import ColorSchemeForDiscreteVisualization, ColorSchemeForContinuousVisualization
from IPython.display import display
from PIL import Image, ImageOps, ImageFont, ImageDraw

def test_discreet_visualization():
    tokens = ["PREFIX", "PREFIX", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test"]
    flags = [-1, -1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0]
    weights = [0, 0, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4]

    discreet_visualizer = DiscreteVisualizer(color_scheme=ColorSchemeForDiscreteVisualization(),
                                            font_settings=FontSettings(),
                                            page_layout_settings=PageLayoutSettings(),
                                            legend_settings=DiscreteLegendSettings())
    img = discreet_visualizer.visualize(data=DataForVisualization(tokens, flags, weights),
                                     show_text=True, visualize_weight=True, display_legend=True)

    img.save("test1.png")
    display(img)


def test_continuous_visualization():
    tokens = ["PREFIX", "PREFIX", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test", "Hello", "world", "this", "is", "a", "test"]
    values = [None, None, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4]
    weights = [0, 0, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4, 0.1, 0.5, 0.3, 0.8, 0.2, 0.4]

    continuous_visualizer = ContinuousVisualizer(color_scheme=ColorSchemeForContinuousVisualization(),
                                                    font_settings=FontSettings(),
                                                    page_layout_settings=PageLayoutSettings(),
                                                    legend_settings=ContinuousLegendSettings())
    img = continuous_visualizer.visualize(data=DataForVisualization(tokens, values, weights),
                                        show_text=False, visualize_weight=False, display_legend=True)

    img.save("test2.png")
    display(img)


def get_data(algorithm_name):
    # Load data
    with open('dataset/c4/processed_c4.json', 'r') as f:
        lines = f.readlines()
    item = json.loads(lines[0])
    prompt = item['prompt']

    # Device
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Transformers config
    transformers_config = TransformersConfig(model=AutoModelForCausalLM.from_pretrained(opt_path).to(device),
                                            tokenizer=AutoTokenizer.from_pretrained(opt_path),
                                            vocab_size=50272,
                                            device=device,
                                            max_new_tokens=200,
                                            min_length=230,
                                            no_repeat_ngram_size=4)

    myWatermark = AutoWatermark.load(f'{algorithm_name}',
                                     algorithm_config=f'config/{algorithm_name}.json',
                                     transformers_config=transformers_config)

    watermarked_text = myWatermark.generate_watermarked_text(prompt)
    unwatermarked_text = myWatermark.generate_unwatermarked_text(prompt)
    watermarked_data = myWatermark.get_data_for_visualization(watermarked_text)
    unwatermarked_data = myWatermark.get_data_for_visualization(unwatermarked_text)

    return watermarked_data, unwatermarked_data


def test_visualization_without_weight(algorithm_name, visualize_type='discrete'):
    # Validate input
    assert visualize_type in ['discrete', 'continuous']
    if visualize_type == 'discrete':
        assert algorithm_name in ['KGW', 'Unigram', 'SWEET', 'UPV', 'SIR', 'XSIR', 'EWD']
    else:
        assert algorithm_name in ['EXP', 'EXPEdit']

    # Get data for visualization
    watermarked_data, unwatermarked_data = get_data(algorithm_name)

    # Init visualizer
    if visualize_type == 'discrete':
        visualizer = DiscreteVisualizer(color_scheme=ColorSchemeForDiscreteVisualization(),
                                        font_settings=FontSettings(),
                                        page_layout_settings=PageLayoutSettings(),
                                        legend_settings=DiscreteLegendSettings())
    else:
        visualizer = ContinuousVisualizer(color_scheme=ColorSchemeForContinuousVisualization(),
                                        font_settings=FontSettings(),
                                        page_layout_settings=PageLayoutSettings(),
                                        legend_settings=ContinuousLegendSettings())

    # Visualize
    watermarked_img = visualizer.visualize(data=watermarked_data,
                                           show_text=True,
                                           visualize_weight=True,
                                           display_legend=True)

    unwatermarked_img = visualizer.visualize(data=unwatermarked_data,
                                             show_text=True,
                                             visualize_weight=True,
                                             display_legend=True)

    watermarked_img.save(f"{algorithm_name}_watermarked.png")
    unwatermarked_img.save(f"{algorithm_name}_unwatermarked.png")

    watermarked_width, watermarked_height = watermarked_img.size
    unwatermarked_width, unwatermarked_height = unwatermarked_img.size

    font_size = 22
    font = ImageFont.truetype("./font/times.ttf", font_size)
    title_height = 80

    new_watermarked_img = Image.new('RGB', (watermarked_width, watermarked_height + title_height), (255, 255, 255))
    new_unwatermarked_img = Image.new('RGB', (unwatermarked_width, watermarked_height + title_height), (255, 255, 255))

    draw1 = ImageDraw.Draw(new_watermarked_img)
    text_bbox1 = draw1.textbbox((0, 0), f"{algorithm_name} watermarked", font=font)
    draw1.text((int((watermarked_width - text_bbox1[2] - text_bbox1[0]) * 0.3), int(title_height * 0.35)), f"{algorithm_name} watermarked", fill=(0, 0, 0), font=font)

    draw2 = ImageDraw.Draw(new_unwatermarked_img)
    text_bbox2 = draw2.textbbox((0, 0), f"{algorithm_name} unwatermarked", font=font)
    draw2.text((int((unwatermarked_width - text_bbox2[2] - text_bbox2[0]) * 0.3), int(title_height * 0.35)), f"{algorithm_name} unwatermarked", fill=(0, 0, 0), font=font)

    new_watermarked_img.paste(watermarked_img, (0, title_height))
    new_unwatermarked_img.paste(unwatermarked_img, (0, title_height))

    total_width = watermarked_width + unwatermarked_width
    max_height = watermarked_height + title_height

    new_img = Image.new('RGB', (total_width, max_height))

    new_img.paste(new_watermarked_img, (0, 0))
    new_img.paste(new_unwatermarked_img, (watermarked_width, 0))
    display(new_img)

def test_visualization_with_weight(algorithm_name):
    # Validate input
    assert algorithm_name in ['SWEET', 'EWD']

    # Get data for visualization
    watermarked_data, unwatermarked_data = get_data(algorithm_name)

    # Init visualizer
    visualizer = DiscreteVisualizer(color_scheme=ColorSchemeForDiscreteVisualization(),
                                    font_settings=FontSettings(),
                                    page_layout_settings=PageLayoutSettings(),
                                    legend_settings=DiscreteLegendSettings())

    # Visualize
    watermarked_img = visualizer.visualize(data=watermarked_data,
                                           show_text=True,
                                           visualize_weight=True,
                                           display_legend=True)

    unwatermarked_img = visualizer.visualize(data=unwatermarked_data,
                                             show_text=True,
                                             visualize_weight=True,
                                             display_legend=True)

    watermarked_img.save(f"{algorithm_name}_watermarked.png")
    unwatermarked_img.save(f"{algorithm_name}_unwatermarked.png")

    watermarked_width, watermarked_height = watermarked_img.size
    unwatermarked_width, unwatermarked_height = unwatermarked_img.size

    font_size = 22
    font = ImageFont.truetype("./font/times.ttf", font_size)
    title_height = 80

    new_watermarked_img = Image.new('RGB', (watermarked_width, watermarked_height + title_height), (255, 255, 255))
    new_unwatermarked_img = Image.new('RGB', (unwatermarked_width, watermarked_height + title_height), (255, 255, 255))

    draw1 = ImageDraw.Draw(new_watermarked_img)
    text_bbox1 = draw1.textbbox((0, 0), f"{algorithm_name} watermarked", font=font)
    draw1.text((int((watermarked_width - text_bbox1[2] - text_bbox1[0]) * 0.3), int(title_height * 0.35)), f"{algorithm_name} watermarked", fill=(0, 0, 0), font=font)

    draw2 = ImageDraw.Draw(new_unwatermarked_img)
    text_bbox2 = draw2.textbbox((0, 0), f"{algorithm_name} unwatermarked", font=font)
    draw2.text((int((unwatermarked_width - text_bbox2[2] - text_bbox2[0]) * 0.3), int(title_height * 0.35)), f"{algorithm_name} unwatermarked", fill=(0, 0, 0), font=font)

    new_watermarked_img.paste(watermarked_img, (0, title_height))
    new_unwatermarked_img.paste(unwatermarked_img, (0, title_height))

    total_width = watermarked_width + unwatermarked_width
    max_height = watermarked_height + title_height

    new_img = Image.new('RGB', (total_width, max_height))

    new_img.paste(new_watermarked_img, (0, 0))
    new_img.paste(new_unwatermarked_img, (watermarked_width, 0))
    display(new_img)

## 3.1 Warm Up: Test Visualizer Using Provided Text

In [None]:
test_discreet_visualization()

In [None]:
test_continuous_visualization()

## 3.2 Mechansim Visualization of Watermarking Algorithms

### 3.2.1 KGW Family

In [None]:
test_visualization_without_weight('KGW', 'discrete')

### 3.2.2 Christ Family

In [None]:
test_visualization_without_weight('EXP', 'continuous')

### 3.2.3 Handling Weighted Token Difference in Visualization

In [None]:
test_visualization_with_weight('SWEET')

In [None]:
test_visualization_with_weight('EWD')

# 4. Automated Comprehensive Evaluation

## 4.1 Watermark Detection Pipeline

In [None]:
import torch
import json
from evaluation.dataset import C4Dataset
from watermark.auto_watermark import AutoWatermark
from utils.transformers_config import TransformersConfig
from evaluation.tools.success_rate_calculator import DynamicThresholdSuccessRateCalculator
from transformers import AutoModelForCausalLM, AutoTokenizer, T5Tokenizer, T5ForConditionalGeneration, BertTokenizer, BertForMaskedLM
from evaluation.pipelines.detection import WatermarkedTextDetectionPipeline, UnWatermarkedTextDetectionPipeline, DetectionPipelineReturnType
from evaluation.tools.text_editor import TruncatePromptTextEditor, TruncateTaskTextEditor, WordDeletion, SynonymSubstitution, ContextAwareSynonymSubstitution, GPTParaphraser, DipperParaphraser

device = 'cuda' if torch.cuda.is_available() else 'cpu'

if torch.cuda.is_available():
  gc.collect()
  torch.cuda.empty_cache()
  with torch.no_grad():
      torch.cuda.empty_cache()

def test_detection_pipeline(algorithm_name, attack_name):
    my_dataset = C4Dataset('dataset/c4/processed_c4.json')
    transformers_config = TransformersConfig(model=AutoModelForCausalLM.from_pretrained(opt_path).to(device),
                                             tokenizer=AutoTokenizer.from_pretrained(opt_path),
                                             vocab_size=50272,
                                             device=device,
                                             max_new_tokens=200,
                                             min_length=230,
                                             do_sample=True,
                                             no_repeat_ngram_size=4)

    my_watermark = AutoWatermark.load(f'{algorithm_name}', algorithm_config=f'config/{algorithm_name}.json', transformers_config=transformers_config)

    if attack_name == 'Word-D':
      attack = WordDeletion(ratio=0.3)
    elif attack_name == 'Word-S':
      attack = SynonymSubstitution(ratio=0.5)
    elif attack_name == 'Word-S(Context)':
      attack = ContextAwareSynonymSubstitution(ratio=0.5,
                                               tokenizer=BertTokenizer.from_pretrained('/data2/shared_model/bert-large-uncased'),
                                               model=BertForMaskedLM.from_pretrained('/data2/shared_model/bert-large-uncased').to(device))
    elif attack_name == 'Doc-P(GPT-3.5)':
        attack = GPTParaphraser(openai_model='gpt-3.5-turbo',
                                prompt='Please rewrite the following text: ')
    elif attack_name == 'Doc-P(Dipper)':
        attack = DipperParaphraser(tokenizer=T5Tokenizer.from_pretrained('/data2/shared_model/google/t5-v1_1-xxl/'),
                                   model=T5ForConditionalGeneration.from_pretrained('/data2/shared_model/kalpeshk2011/dipper-paraphraser-xxl/', device_map='auto'),
                                   lex_diversity=60, order_diversity=0, sent_interval=1,
                                   max_new_tokens=100, do_sample=True, top_p=0.75, top_k=None)


    pipline1 = WatermarkedTextDetectionPipeline(dataset=my_dataset, text_editor_list=[TruncatePromptTextEditor(), attack],
                                                show_progress=True, return_type=DetectionPipelineReturnType.SCORES)

    pipline2 = UnWatermarkedTextDetectionPipeline(dataset=my_dataset, text_editor_list=[TruncatePromptTextEditor()],
                                            show_progress=True, return_type=DetectionPipelineReturnType.SCORES)

    calculator = DynamicThresholdSuccessRateCalculator(labels=['TPR', 'F1'], rule='best')
    print(calculator.calculate(pipline1.evaluate(my_watermark), pipline2.evaluate(my_watermark)))



In [None]:
test_detection_pipeline('KGW', 'Word-D')

## 4.2 Text Quality Analysis Pipeline

In [None]:
import torch
import json
from watermark.auto_watermark import AutoWatermark
from utils.transformers_config import TransformersConfig
from evaluation.dataset import C4Dataset, WMT16DE_ENDataset, HumanEvalDataset
from evaluation.tools.success_rate_calculator import DynamicThresholdSuccessRateCalculator
from transformers import AutoModelForCausalLM, AutoModelForSeq2SeqLM, AutoTokenizer, LlamaTokenizer
from evaluation.tools.text_editor import TruncatePromptTextEditor, TruncateTaskTextEditor ,CodeGenerationTextEditor
from evaluation.tools.text_quality_analyzer import PPLCalculator, LogDiversityAnalyzer, BLEUCalculator, PassOrNotJudger, GPTTextDiscriminator
from evaluation.pipelines.detection import WatermarkedTextDetectionPipeline, UnWatermarkedTextDetectionPipeline, DetectionPipelineReturnType
from evaluation.pipelines.quality_analysis import (DirectTextQualityAnalysisPipeline, ReferencedTextQualityAnalysisPipeline, ExternalDiscriminatorTextQualityAnalysisPipeline,
                                                   QualityPipelineReturnType)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

if torch.cuda.is_available():
  gc.collect()
  torch.cuda.empty_cache()
  with torch.no_grad():
      torch.cuda.empty_cache()

def test_direct_quality_analysis_pipeline(algorithm_name, quality_analyzer_name):
    my_dataset = C4Dataset('dataset/c4/processed_c4.json')
    transformers_config = TransformersConfig(model=AutoModelForCausalLM.from_pretrained(opt_path).to(device),
                          tokenizer=AutoTokenizer.from_pretrained(opt_path),
                          vocab_size=50272,
                          device=device,
                          max_new_tokens=200,
                          min_length=230,
                          do_sample=True,
                          no_repeat_ngram_size=4)
    my_watermark = AutoWatermark.load(f'{algorithm_name}', algorithm_config=f'config/{algorithm_name}.json', transformers_config=transformers_config)

    if quality_analyzer_name == 'PPL':
      analyzer = PPLCalculator(model=AutoModelForCausalLM.from_pretrained(llama_path, device_map='auto'),
                                tokenizer=LlamaTokenizer.from_pretrained(llama_path),
                                device=device)
    elif quality_analyzer_name == 'Log Diversity':
      analyzer = LogDiversityAnalyzer()

    analyzers = []
    analyzers.append(analyzer)

    quality_pipeline = DirectTextQualityAnalysisPipeline(dataset=my_dataset,
                                watermarked_text_editor_list=[TruncatePromptTextEditor()],
                                unwatermarked_text_editor_list=[],
                                analyzers=analyzers,
                                unwatermarked_text_source='natural', show_progress=True,
                                return_type=QualityPipelineReturnType.MEAN_SCORES)

    print(f"{quality_analyzer_name}:")
    print(quality_pipeline.evaluate(my_watermark))


def test_referenced_quality_analysis_pipeline_1(algorithm_name):
    """Evaluate the impact on text quality in the machine translation task."""
    my_dataset = WMT16DE_ENDataset('dataset/wmt16_de_en/validation.jsonl')
    tokenizer= AutoTokenizer.from_pretrained(nllb_path, src_lang="deu_Latn")
    transformers_config = TransformersConfig(model=AutoModelForSeq2SeqLM.from_pretrained(nllb_path).to(device),
                                                tokenizer=tokenizer,
                                                device=device,
                                                forced_bos_token_id=tokenizer.lang_code_to_id["eng_Latn"])

    my_watermark = AutoWatermark.load(f'{algorithm_name}', algorithm_config=f'config/{algorithm_name}.json', transformers_config=transformers_config)

    quality_pipeline = ReferencedTextQualityAnalysisPipeline(dataset=my_dataset,
                                                              watermarked_text_editor_list=[],
                                                              unwatermarked_text_editor_list=[],
                                                              analyzer=BLEUCalculator(),
                                                              unwatermarked_text_source='generated', show_progress=True,
                                                              return_type=QualityPipelineReturnType.MEAN_SCORES)


    print("BLEU:")
    print(quality_pipeline.evaluate(my_watermark))


def test_referenced_quality_analysis_pipeline_2(algorithm_name):
    """Evaluate the impact on text quality in the code generation task."""
    my_dataset = HumanEvalDataset('dataset/human_eval/test.jsonl')
    tokenizer= AutoTokenizer.from_pretrained(tiny_starcoder_path)
    transformers_config = TransformersConfig(model=AutoModelForCausalLM.from_pretrained(tiny_starcoder_path, device_map='auto'),
                                             tokenizer=tokenizer,
                                             device=device,
                                             min_length=200,
                                             max_length=400)

    my_watermark = AutoWatermark.load(f'{algorithm_name}', algorithm_config=f'config/{algorithm_name}.json', transformers_config=transformers_config)

    quality_pipeline = ReferencedTextQualityAnalysisPipeline(dataset=my_dataset,
                                  watermarked_text_editor_list=[TruncateTaskTextEditor(),CodeGenerationTextEditor()],
                                  unwatermarked_text_editor_list=[TruncateTaskTextEditor(), CodeGenerationTextEditor()],
                                  analyzer=PassOrNotJudger(),
                                  unwatermarked_text_source='generated', show_progress=True,
                                  return_type=QualityPipelineReturnType.MEAN_SCORES)

    print("pass@1:")
    print(quality_pipeline.evaluate(my_watermark))


def test_discriminator_quality_analysis_pipeline(algorithm_name):
    my_dataset = WMT16DE_ENDataset('dataset/wmt16_de_en/validation.jsonl')
    tokenizer= AutoTokenizer.from_pretrained(nllb_path, src_lang="deu_Latn")
    transformers_config = TransformersConfig(model=AutoModelForSeq2SeqLM.from_pretrained(nllb_path).to(device),
                                                tokenizer=tokenizer,
                                                device=device,
                                                forced_bos_token_id=tokenizer.lang_code_to_id["eng_Latn"])

    my_watermark = AutoWatermark.load(f'{algorithm_name}', algorithm_config=f'config/{algorithm_name}.json', transformers_config=transformers_config)

    quality_pipeline = ExternalDiscriminatorTextQualityAnalysisPipeline(dataset=my_dataset,
                                      watermarked_text_editor_list=[],
                                      unwatermarked_text_editor_list=[],
                                      analyzer=GPTTextDiscriminator(openai_model='gpt-4',
                                      task_description='Translate the following German text to English'),
                                      unwatermarked_text_source='generated', show_progress=True,
                                      return_type=QualityPipelineReturnType.MEAN_SCORES
                                      )

    print("Win Rate:")
    print(quality_pipeline.evaluate(my_watermark))

In [None]:
test_direct_quality_analysis_pipeline('KGW', 'PPL')

In [None]:
test_direct_quality_analysis_pipeline('Unigram', 'Log Diversity')

In [None]:
test_referenced_quality_analysis_pipeline_1('SIR')

In [None]:
test_referenced_quality_analysis_pipeline_2('SWEET')

In [None]:
import openai

openai.api_key = ""

test_discriminator_quality_analysis_pipeline('EWD')

# 5. Steal Attack For Watermark

In [None]:
import torch
import json
from watermark.auto_watermark import AutoWatermark
from utils.transformers_config import TransformersConfig
from transformers import AutoModelForCausalLM, AutoTokenizer

# Load data
with open("dataset/c4/processed_c4.json", "r") as f:
    lines = f.readlines()
item = json.loads(lines[0])
prompt = item["prompt"]
natural_text = item["natural_text"]


def test_steal_algorithm(steal_algorithm_name, watermark_algorithm_name):
    # Check algorithm name
    assert steal_algorithm_name in ["STEAL"]
    assert watermark_algorithm_name in [
        "KGW",
        "Unigram",
        "SWEET",
        "EWD",
        "SIR",
        "XSIR",
        "UPV",
        "EXP",
        "EXPEdit",
        "SynthID",
        "Adaptive",
    ]

    # Device
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Transformers config
    transformers_config = TransformersConfig(
        model=AutoModelForCausalLM.from_pretrained(opt_path).to(device),
        tokenizer=AutoTokenizer.from_pretrained(opt_path),
        vocab_size=50272,
        device=device,
        max_new_tokens=200,
        min_length=230,
        do_sample=True,
        no_repeat_ngram_size=4,
    )

    # Load steal algorithm and watermark algorithm
    mySteal = AutoWatermark.load(
        f"{steal_algorithm_name}",
        algorithm_config=f"config/{steal_algorithm_name}.json",
        transformers_config=transformers_config,
    )
    myWatermark = AutoWatermark.load(
        f"{watermark_algorithm_name}",
        algorithm_config=f"config/{watermark_algorithm_name}.json",
        transformers_config=transformers_config,
    )

    # Generate text
    steal_text = mySteal.generate_watermarked_text(prompt)
    watermarked_text = myWatermark.generate_watermarked_text(prompt)

    # Detect
    detect_result1 = myWatermark.detect_watermark(steal_text)
    detect_result2 = myWatermark.detect_watermark(watermarked_text)
    detect_result3 = myWatermark.detect_watermark(natural_text)

    print("Stolen Watermarked text:")
    print(steal_text)
    print("\n")
    print(detect_result1)
    print("\n")

    print("LLM-generated watermarked text:")
    print(watermarked_text)
    print("\n")
    print(detect_result2)
    print("\n")

    print("Natural text:")
    print(natural_text)
    print("\n")
    print(detect_result3)

In [None]:
test_steal_algorithm("STEAL", "KGW")