In this notebook we are testing the zero-shot approach for classification called binoculars with falcon-7b and falcon-7b-instruct. The dataset contains various texts from different domains such as Arxiv, Wikipedia, Wikihow and Reddit. Furthermore, the texts are written by human and also rewritten by the LLMs 'bloomz', 'chatgpt', 'cohere', 'davinci'.

In [None]:
%pip install accelerate bitsandbytes transformers

In [36]:
import pandas as pd
import numpy as np
import torch
import json
import os
import torch.nn.functional as F
import transformers
# from google.colab import files

from tqdm import tqdm
from torch import nn
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from torch.utils.data import Dataset, DataLoader, TensorDataset, SequentialSampler
from timeit import default_timer as timer
from os import walk
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.metrics import recall_score, precision_score, roc_curve, auc, brier_score_loss
from typing import Union
from google.colab import files

In [30]:
ROOT_DATA = '../../'
M4_DATA_FOLDER_PATH = f'{ROOT_DATA}/data/raw/m4-unified'

In [4]:
from google.colab import drive
drive.mount('/content/gdrive')
!unzip /content/gdrive/MyDrive/uni/iioz/llm-detect/m4-unified.zip

Mounted at /content/gdrive
Archive:  /content/gdrive/MyDrive/uni/iioz/llm-detect/m4-unified.zip
   creating: m4-unified/arxiv/
  inflating: m4-unified/arxiv/arxiv_bloomz.jsonl  
  inflating: m4-unified/arxiv/arxiv_chatGPT.jsonl  
  inflating: m4-unified/arxiv/arxiv_cohere.jsonl  
  inflating: m4-unified/arxiv/arxiv_davinci.jsonl  
  inflating: m4-unified/arxiv/arxiv_flant5.jsonl  
  inflating: m4-unified/arxiv/arxiv_human.jsonl  
   creating: m4-unified/reddit/
  inflating: m4-unified/reddit/reddit_bloomz.jsonl  
  inflating: m4-unified/reddit/reddit_chatGPT.jsonl  
  inflating: m4-unified/reddit/reddit_cohere.jsonl  
  inflating: m4-unified/reddit/reddit_davinci.jsonl  
  inflating: m4-unified/reddit/reddit_dolly.jsonl  
  inflating: m4-unified/reddit/reddit_flant5.jsonl  
  inflating: m4-unified/reddit/reddit_human.jsonl  
   creating: m4-unified/wikihow/
  inflating: m4-unified/wikihow/wikihow_bloomz.jsonl  
  inflating: m4-unified/wikihow/wikihow_chatGPT.jsonl  
  inflating: m4-uni

In [31]:
TEST_SET_FRACTION = 0.3
BATCH_SIZE = 32
LLMS = ['bloomz', 'chatgpt', 'cohere', 'davinci', 'human']

In [None]:
dir_path, dir_names, file_names = next(walk(M4_DATA_FOLDER_PATH))

for dir in dir_names:
    dataset_folder_path, _, dataset_names = next(walk(os.path.join(dir_path, dir)))
    for dataset_name in dataset_names:
        temp_df = pd.read_json(path_or_buf=f'{dataset_folder_path}/{dataset_name}', lines=True)
        temp_df['domain'] = dir
        temp_df['dataset_name'] = Path(dataset_name).stem
        if Path(dataset_name).stem.split('_')[1].lower() not in LLMS:
          continue
        temp_df['is_llm'] = 0 if 'human' in dataset_name else 1
        print(dataset_name, 0 if 'human' in dataset_name else 1 )
        df = pd.concat([df, temp_df], ignore_index=True)

arxiv_bloomz.jsonl 1
arxiv_chatGPT.jsonl 1
arxiv_cohere.jsonl 1
arxiv_davinci.jsonl 1
arxiv_human.jsonl 0
reddit_bloomz.jsonl 1
reddit_chatGPT.jsonl 1
reddit_cohere.jsonl 1
reddit_davinci.jsonl 1
reddit_human.jsonl 0
wikihow_bloomz.jsonl 1
wikihow_chatGPT.jsonl 1
wikihow_cohere.jsonl 1
wikihow_davinci.jsonl 1
wikihow_human.jsonl 0
wikipedia_bloomz.jsonl 1
wikipedia_chatGPT.jsonl 1
wikipedia_cohere.jsonl 1
wikipedia_davinci.jsonl 1
wikipedia_human.jsonl 0


In [None]:
df[df["is_llm"] == 1]

Unnamed: 0,text,is_llm,domain,dataset_name
0,The present work is devoted to the study of qu...,1,arxiv,arxiv_bloomz
1,We present the results of our analysis of the ...,1,arxiv,arxiv_bloomz
2,We report on spectroscopic observations made w...,1,arxiv,arxiv_bloomz
3,We present new numerical methods for stochasti...,1,arxiv,arxiv_bloomz
4,The Solar Chromosphere is an important compone...,1,arxiv,arxiv_bloomz
...,...,...,...,...
56326,Adolf Fredrik Church (also known as Adolf Fre...,1,wikipedia,wikipedia_davinci
56327,"""I've Been Doin' Some Thinkin'"" is a single b...",1,wikipedia,wikipedia_davinci
56328,"Michele Lean (born November 13, 1991) is a Ca...",1,wikipedia,wikipedia_davinci
56329,The Speaker of the Provincial Assembly of Pun...,1,wikipedia,wikipedia_davinci


In [None]:
df = df.astype({'is_llm': 'int64'})

In [32]:
df = pd.DataFrame(columns=['text', 'is_llm', 'domain', 'dataset_name'])

## Metrics

In [None]:
ce_loss_fn = torch.nn.CrossEntropyLoss(reduction="none")
softmax_fn = torch.nn.Softmax(dim=-1)

def perplexity(encoding: transformers.BatchEncoding,
               logits: torch.Tensor,
               median: bool = False,
               temperature: float = 1.0):


    shifted_logits = logits[..., :-1, :].contiguous() / temperature
    shifted_labels = encoding.input_ids[..., 1:].contiguous()
    shifted_attention_mask = encoding.attention_mask[..., 1:].contiguous()

    print('shifted_attention_mask')
    print(shifted_attention_mask)
    print('shifted_attention_mask.sum(1)')
    print(shifted_attention_mask.sum(1))


    if median:
        #shifter_logits.transpose(1, 2).shape: [batch_size, vocab_size, max_text_length - 1] (-1 is because we remove the logits for the last token)
        ce_nan = (ce_loss_fn(shifted_logits.transpose(1, 2), shifted_labels).
                  masked_fill(~shifted_attention_mask.bool(), float("nan")))
        ppl = np.nanmedian(ce_nan.cpu().float().numpy(), 1)

    else:
        loss = ce_loss_fn(shifted_logits.transpose(1, 2), shifted_labels)
        print('loss')
        print(loss)
        ppl = (loss *
               shifted_attention_mask).sum(1) / shifted_attention_mask.sum(1)
        ppl = ppl.to("cpu").float().numpy()

    return ppl


def entropy(p_logits: torch.Tensor,
            q_logits: torch.Tensor,
            encoding: transformers.BatchEncoding,
            pad_token_id: int,
            median: bool = False,
            sample_p: bool = False,
            temperature: float = 1.0):
    vocab_size = p_logits.shape[-1]
    total_tokens_available = q_logits.shape[-2]
    p_scores, q_scores = p_logits / temperature, q_logits / temperature

    p_proba = softmax_fn(p_scores).view(-1, vocab_size)

    if sample_p:
        p_proba = torch.multinomial(p_proba.view(-1, vocab_size), replacement=True, num_samples=1).view(-1)

    q_scores = q_scores.view(-1, vocab_size)

    ce = ce_loss_fn(input=q_scores, target=p_proba).view(-1, total_tokens_available)
    padding_mask = (encoding.input_ids != pad_token_id).type(torch.uint8)

    if median:
        ce_nan = ce.masked_fill(~padding_mask.bool(), float("nan"))
        agg_ce = np.nanmedian(ce_nan.cpu().float().numpy(), 1)
    else:
        agg_ce = (((ce * padding_mask).sum(1) / padding_mask.sum(1)).to("cpu").float().numpy())

    return agg_ce

In [None]:
def assert_tokenizer_consistency(model_id_1, model_id_2):
    identical_tokenizers = (
            AutoTokenizer.from_pretrained(model_id_1).vocab
            == AutoTokenizer.from_pretrained(model_id_2).vocab
    )
    if not identical_tokenizers:
        raise ValueError(f"Tokenizers are not identical for {model_id_1} and {model_id_2}.")

## Quantization config

## Models

In [None]:
from huggingface_hub import login
login(token="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")

In [37]:
OBSERVER = "mistralai/Mistral-7B-v0.1"
PERFORMER = "mistralai/Mistral-7B-Instruct-v0.1"

In [39]:
torch.set_grad_enabled(False)

huggingface_config = {
    # Only required for private models from Huggingface (e.g. LLaMA models)
    "TOKEN": os.environ.get("HF_TOKEN", None)
}

# selected using Falcon-7B and Falcon-7B-Instruct at bfloat16
BINOCULARS_ACCURACY_THRESHOLD = 0.9015310749276843  # optimized for f1-score
BINOCULARS_FPR_THRESHOLD = 0.8536432310785527  # optimized for low-fpr [chosen at 0.01%]

DEVICE_1 = "cuda:0" if torch.cuda.is_available() else "cpu"
DEVICE_2 = "cuda:1" if torch.cuda.device_count() > 1 else DEVICE_1


class Binoculars(object):
    def __init__(self,
                 observer_name_or_path: str = OBSERVER,
                 performer_name_or_path: str = PERFORMER,
                 use_bfloat16: bool = True,
                 max_token_observed: int = 512,
                 mode: str = "low-fpr",
                 ) -> None:
        assert_tokenizer_consistency(observer_name_or_path, performer_name_or_path)

        self.change_mode(mode)
        self.observer_model = AutoModelForCausalLM.from_pretrained(observer_name_or_path,
                                                                   device_map={"": DEVICE_1},
                                                                   trust_remote_code=True,
                                                                   token=huggingface_config["TOKEN"],
                                                                   load_in_4bit=True
                                                                   )
        self.performer_model = AutoModelForCausalLM.from_pretrained(performer_name_or_path,
                                                                    device_map={"": DEVICE_2},
                                                                    trust_remote_code=True,
                                                                    load_in_4bit=True,
                                                                    token=huggingface_config["TOKEN"]
                                                                    )
        self.observer_model.eval()
        self.performer_model.eval()

        self.tokenizer = AutoTokenizer.from_pretrained(observer_name_or_path)
        if not self.tokenizer.pad_token:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        self.max_token_observed = max_token_observed

    def change_mode(self, mode: str) -> None:
        if mode == "low-fpr":
            self.threshold = BINOCULARS_FPR_THRESHOLD
        elif mode == "accuracy":
            self.threshold = BINOCULARS_ACCURACY_THRESHOLD
        else:
            raise ValueError(f"Invalid mode: {mode}")

    def _tokenize(self, batch: list[str]) -> transformers.BatchEncoding:
        batch_size = len(batch)
        encodings = self.tokenizer(
            batch,
            return_tensors="pt",
            padding="longest" if batch_size > 1 else False,
            truncation=True,
            max_length=self.max_token_observed,
            return_token_type_ids=False).to(self.observer_model.device)
        return encodings

    @torch.inference_mode()
    def _get_logits(self, encodings: transformers.BatchEncoding) -> torch.Tensor:
        observer_logits = self.observer_model(**encodings.to(DEVICE_1)).logits
        performer_logits = self.performer_model(**encodings.to(DEVICE_2)).logits
        if DEVICE_1 != "cpu":
            torch.cuda.synchronize()
        return observer_logits, performer_logits

    def compute_score(self, input_text: Union[list[str], str]) -> Union[float, list[float]]:
        batch = [input_text] if isinstance(input_text, str) else input_text
        encodings = self._tokenize(batch)
        observer_logits, performer_logits = self._get_logits(encodings)
        ppl = perplexity(encodings, performer_logits)
        x_ppl = entropy(observer_logits.to(DEVICE_1), performer_logits.to(DEVICE_1),
                        encodings.to(DEVICE_1), self.tokenizer.pad_token_id)
        binoculars_scores = ppl / x_ppl
        binoculars_scores = binoculars_scores.tolist()
        return binoculars_scores[0] if isinstance(input_text, str) else binoculars_scores

    def predict(self, input_text: Union[list[str], str]) -> Union[list[str], str]:
        binoculars_scores = np.array(self.compute_score(input_text))
        pred = np.where(binoculars_scores < self.threshold,
                        "Most likely AI-generated",
                        "Most likely human-generated"
                        ).tolist()
        return pred


## Test

In [None]:
model = Binoculars()
tokenizer = model.tokenizer

In [None]:
MINIMUM_TOKENS = 1

def count_tokens(text, tokenizer):
    return len(tokenizer(text).input_ids)

def run_detector(model, input_str, tokenizer):
    if count_tokens(input_str, tokenizer) < MINIMUM_TOKENS:
        return f"Too short length. Need minimum {MINIMUM_TOKENS} tokens to run Binoculars."
    return f"{model.predict(input_str)}"

def get_binoculars_score(text):
    return run_detector(model, text, tokenizer)

In [None]:
df['binoculars_score'] = df['text'].apply(get_binoculars_score)

In [18]:
# Create a JSON Encoder class
class json_serialize(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)


In [None]:
for file_name in file_names:
  files.download(file_name)