In [1]:
import numpy as np
import dill as pickle
import tiktoken
from nltk.util import ngrams
from nltk.corpus import brown
from nltk.tokenize import word_tokenize
import tqdm
import nltk
from nltk.corpus import brown
from sklearn.linear_model import LogisticRegression
from collections import Counter, defaultdict
import torch
import torch.nn.functional as F

In [2]:
def normalize(data, mu=None, sigma=None, ret_mu_sigma=False):
    """
    Normalizes data, where data is a matrix where the first dimension is the number of examples
    """
    if mu is None:
        mu = np.mean(data, axis=0)
    if sigma is None:
        raw_std = np.std(data, axis=0)
        sigma = np.ones_like(raw_std)
        sigma[raw_std != 0] = raw_std[raw_std != 0]

    if ret_mu_sigma:
        return (data - mu) / sigma, mu, sigma
    else:
        return (data - mu) / sigma

In [3]:
def get_token_len(tokens):
    """
    Returns a vector of word lengths, in tokens
    """
    tokens_len = []
    curr = 0

    for token in tokens:
        if token[0] == "Ġ":
            tokens_len.append(curr)
            curr = 1
        else:
            curr += 1

    return np.array(tokens_len)

In [4]:
def score_ngram(doc, model, tokenizer, n=3, strip_first=False):
    """
    Returns vector of ngram probabilities given document, model and tokenizer
    """
    scores = []
    if strip_first:
        doc = " ".join(doc.split()[:1000])
    for i in ngrams((n - 1) * [50256] + tokenizer(doc.strip()), n):
        scores.append(model.n_gram_probability(i))

    return np.array(scores)

In [5]:
class NGramModel:
    """
    An n-gram model, where alpha is the laplace smoothing parameter.
    """

    def __init__(self, train_text, n=2, alpha=3e-3, vocab_size=None):
        self.n = n
        if vocab_size is None:
            # Assume GPT tokenizer
            self.vocab_size = 50257

        self.smoothing = alpha
        self.smoothing_f = alpha * self.vocab_size

        self.c = defaultdict(lambda: [0, Counter()])
        for i in tqdm.tqdm(range(len(train_text)-n)):
            n_gram = tuple(train_text[i:i+n])
            self.c[n_gram[:-1]][1][n_gram[-1]] += 1
            self.c[n_gram[:-1]][0] += 1
        self.n_size = len(self.c)

    def n_gram_probability(self, n_gram):
        assert len(n_gram) == self.n
        it = self.c[tuple(n_gram[:-1])]
        prob = (it[1][n_gram[-1]] + self.smoothing)/(it[0] + self.smoothing_f)
        return prob

In [6]:
class DiscountBackoffModel(NGramModel):
    """
    An n-gram model with discounting and backoff. Delta is the discounting parameter.
    """

    def __init__(self, train_text, lower_order_model, n=2, delta=0.9):
        super().__init__(train_text, n=n)
        self.lower_order_model = lower_order_model
        self.discount = delta

    def n_gram_probability(self, n_gram):
        assert len(n_gram) == self.n
        it = self.c[tuple(n_gram[:-1])]

        if it[0] == 0:
            return self.lower_order_model.n_gram_probability(n_gram[1:])

        prob = self.discount * \
            (len(it[1])/it[0]) * \
            self.lower_order_model.n_gram_probability(n_gram[1:])
        if it[1][n_gram[-1]] != 0:
            prob += max(it[1][n_gram[-1]] - self.discount, 0) / it[0]

        return prob

In [7]:
class KneserNeyBaseModel(NGramModel):
    """
    A Kneser-Ney base model, where n=1.
    """

    def __init__(self, train_text, vocab_size=None):
        super().__init__(train_text, n=1, vocab_size=vocab_size)

        base_cnt = defaultdict(set)
        for i in range(1, len(train_text)):
            base_cnt[train_text[i]].add(train_text[i-1])

        cnt = 0
        for word in base_cnt:
            cnt += len(base_cnt[word])

        self.prob = defaultdict(float)
        for word in base_cnt:
            self.prob[word] = len(base_cnt[word]) / cnt

    def n_gram_probability(self, n_gram):
        assert len(n_gram) == 1
        ret_prob = self.prob[n_gram[0]]

        if ret_prob == 0:
            return 1 / self.vocab_size
        else:
            return ret_prob

In [8]:
class TrigramBackoff:
    """
    A trigram model with discounting and backoff. Uses a Kneser-Ney base model.
    """

    def __init__(self, train_text, delta=0.9):
        self.base = KneserNeyBaseModel(train_text)
        self.bigram = DiscountBackoffModel(
            train_text, self.base, n=2, delta=delta)
        self.trigram = DiscountBackoffModel(
            train_text, self.bigram, n=3, delta=delta)

    def n_gram_probability(self, n_gram):
        assert len(n_gram) == 3
        return self.trigram.n_gram_probability(n_gram)

In [9]:
def train_trigram(verbose=True, return_tokenizer=False):
    """
    Trains and returns a trigram model on the brown corpus
    """

    enc = tiktoken.encoding_for_model("davinci")
    tokenizer = enc.encode
    vocab_size = enc.n_vocab

    # We use the brown corpus to train the n-gram model
    sentences = brown.sents()

    if verbose:
        print("Tokenizing corpus...")
    tokenized_corpus = []
    for sentence in tqdm.tqdm(sentences):
        tokens = tokenizer(" ".join(sentence))
        tokenized_corpus += tokens

    if verbose:
        print("\nTraining n-gram model...")

    if return_tokenizer:
        return TrigramBackoff(tokenized_corpus), tokenizer
    else:
        return TrigramBackoff(tokenized_corpus)

In [10]:
def get_words(exp):
    """
    Splits up expression into words, to be individually processed
    """
    return exp.split(" ")

In [11]:
from dataclasses import dataclass
import os

DIR_IGNORE = {"logprobs", "prompts", "headlines"}

@dataclass
class Dataset:
    type: str
    path: str


def get_generate_dataset_normal(path: str, verbose=False):
    files = []
    to_iter = tqdm.tqdm(os.listdir(path)) if verbose else os.listdir(path)

    for file in to_iter:
        if file in DIR_IGNORE:
            continue
        files.append(f"{path}/{file}")

    return files


def get_generate_dataset_author(path: str, author: str, verbose=False):
    files = []

    if author is None:
        authors = sorted(os.listdir(path))
    else:
        authors = [author]

    to_iter = tqdm.tqdm(authors) if verbose else authors

    for author in to_iter:
        for file in sorted(os.listdir(f"{path}/{author}")):
            if file in DIR_IGNORE:
                continue
            files.append(f"{path}/{author}/{file}")

    return files


def get_generate_dataset(*datasets: Dataset):
    def generate_dataset(featurize, split=None, verbose=False, author=None):
        files = []
        for dataset in datasets:
            if dataset.type == "normal":
                files += get_generate_dataset_normal(dataset.path)
            elif dataset.type == "author":
                files += get_generate_dataset_author(dataset.path, author=author)

        if split is not None:
            files = np.array(files)[split]

        data = []
        files = tqdm.tqdm(files) if verbose else files

        for file in files:
            data.append(featurize(file))
        return np.array(data)

    return generate_dataset

In [51]:
wp_dataset = [
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\wp\human"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\wp\gpt"),
]

reuter_dataset = [
    Dataset("author", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\reuter\human"),
    Dataset("author", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\reuter\gpt"),
]

essay_dataset = [
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\human"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\gpt"),
]

eval_dataset = [
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\wp\claude"),
    Dataset("author", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\reuter\claude"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\claude"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\wp\gpt_prompt1"),
    Dataset("author", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\reuter\gpt_prompt1"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\gpt_prompt1"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\wp\gpt_prompt2"),
    Dataset("author", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\reuter\gpt_prompt2"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\gpt_prompt2"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\wp\gpt_writing"),
    Dataset("author", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\reuter\gpt_writing"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\gpt_writing"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\wp\gpt_semantic"),
    Dataset("author", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\reuter\gpt_semantic"),
    Dataset("normal", r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\gpt_semantic"),
]

In [57]:
datasets = [
    *wp_dataset,
    *reuter_dataset,
    *essay_dataset,
]
author = None
files = []
for dataset in datasets:
    if dataset.type == "normal":
        files += get_generate_dataset_normal(dataset.path)
    elif dataset.type == "author":
        files += get_generate_dataset_author(dataset.path, author=author)

    data = []
    files = tqdm.tqdm(files) if False else files
    for file in files:
        data.append(featurize(file))

In [82]:
# len(files)
files[1000]

'D:\\Stuff\\VScode\\Workspace\\Notebooks\\ATML\\Project\\data\\wp\\gpt/1.txt'

In [13]:
def get_all_logprobs(
    generate_dataset,
    preprocess=lambda x: x.strip(),
    verbose=True,
    trigram=None,
    tokenizer=None,
    num_tokens=2047,
):
    if trigram is None:
        trigram, tokenizer = train_trigram(verbose=verbose, return_tokenizer=True)

    # davinci_logprobs, ada_logprobs = {}, {}
    trigram_logprobs, unigram_logprobs = {}, {}

    if verbose:
        print("Loading logprobs into memory")

    file_names = generate_dataset(lambda file: file, verbose=False)
    to_iter = tqdm.tqdm(file_names) if verbose else file_names

    for file in to_iter:
        with open(file, "r") as f:
            doc = preprocess(f.read())
        # davinci_logprobs[file] = get_logprobs(
        #     convert_file_to_logprob_file(file, "davinci")
        # )[:num_tokens]
        # ada_logprobs[file] = get_logprobs(convert_file_to_logprob_file(file, "ada"))[
            # :num_tokens
        # ]
        trigram_logprobs[file] = score_ngram(doc, trigram, tokenizer, n=3)[:num_tokens]
        unigram_logprobs[file] = score_ngram(doc, trigram.base, tokenizer, n=1)[
            :num_tokens
        ]

    # return davinci_logprobs, ada_logprobs, trigram_logprobs, unigram_logprobs
    return trigram_logprobs, unigram_logprobs

In [14]:
vec_functions = {
    "v-add": lambda a, b: a + b,
    "v-sub": lambda a, b: a - b,
    "v-mul": lambda a, b: a * b,
    "v-div": lambda a, b: np.divide(
        a, b, out=np.zeros_like(a), where=(b != 0), casting="unsafe"
    ),
    "v->": lambda a, b: a > b,
    "v-<": lambda a, b: a < b,
}

scalar_functions = {
    "s-max": max,
    "s-min": min,
    "s-avg": lambda x: sum(x) / len(x),
    "s-avg-top-25": lambda x: sum(sorted(x, reverse=True)[:25])
    / len(sorted(x, reverse=True)[:25]),
    "s-len": len,
    "s-var": np.var,
    "s-l2": np.linalg.norm,
}

In [15]:
def get_exp_featurize(best_features, vector_map):
    def calc_features(file, exp):
        exp_tokens = get_words(exp)
        curr = vector_map[exp_tokens[0]](file)

        for i in range(1, len(exp_tokens)):
            if exp_tokens[i] in vec_functions:
                next_vec = vector_map[exp_tokens[i + 1]](file)
                curr = vec_functions[exp_tokens[i]](curr, next_vec)
            elif exp_tokens[i] in scalar_functions:
                return scalar_functions[exp_tokens[i]](curr)

In [None]:
def get_featurized_data(generate_dataset_fn, best_features):
    # t_data = generate_dataset_fn(t_featurize)

    trigram, unigram = get_all_logprobs(
        generate_dataset_fn, trigram=trigram_model, tokenizer=tokenizer
    )

    vector_map = {
        # "davinci-logprobs": lambda file: davinci[file],
        # "ada-logprobs": lambda file: ada[file],
        "trigram-logprobs": lambda file: trigram[file],
        "unigram-logprobs": lambda file: unigram[file],
    }
    exp_featurize = get_exp_featurize(best_features, vector_map)
    exp_data = generate_dataset_fn(exp_featurize)

    # return np.concatenate([t_data, exp_data], axis=1) This was removed
    return np.concatenate([exp_data], axis=1)

In [28]:
file = r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\data\essay\human\4.txt"
MAX_TOKENS = 2048
best_features = open(r"D:\Stuff\VScode\Workspace\Notebooks\ATML\Project\model\features.txt").read().strip().split("\n")

In [None]:
import math
np.random.seed(1)

result_table = [["F1", "Accuracy", "AUC"]]

datasets = [
    *wp_dataset,
    *reuter_dataset,
    *essay_dataset,
]
generate_dataset_fn = get_generate_dataset(*datasets)

labels = generate_dataset_fn(
    lambda file: 1 if any([m in file for m in ["gpt", "claude"]]) else 0 # 0 => Human else 1 => Not Human
)
indices = np.arange(len(labels))
np.random.shuffle(indices)
train, test = (
    indices[: math.floor(0.8 * len(indices))],
    indices[math.floor(0.8 * len(indices)) :],
)
print("Train/Test Split", train, test)
print("Train Size:", len(train), "Valid Size:", len(test))
print(f"Positive Labels: {sum(labels[indices])}, Total Labels: {len(indices)}")

In [19]:
# Load davinci tokenizer
enc = tiktoken.encoding_for_model("davinci")

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [22]:
# Load data and featurize
with open(file) as f:
    doc = f.read().strip()
    # Strip data to first MAX_TOKENS tokens
    tokens = enc.encode(doc)[:MAX_TOKENS]
    doc = enc.decode(tokens).strip()

    print(f"Input: {doc}")

Input: Autumn is not generally viewed as an appropriate season for having a garage sale, not to mention that one on a scale of a local community. However, what makes my neighbourhood particularly unique and different from others is its consistent inability to meet expectations (Pyrkosz 147). Either due to some event that occurred too long ago for any of the neighbours to remember or care, or out of sheer need to have something extravagant in the midst of one of the least unpredictable seasons ever, my community has a massive garage sale every autumn. Although the weather and the lingering threat of health issues complicated the task, the community garage sale carried out last fall was one of the highlights of its members’ public life due to the opportunity to communicate and share memories, objects, and ideas.
The range and diversity of the items that our numerous neighbours and community members offered as sales items could easily make one feel surprised. Though being admittedly cheap

In [None]:
nltk.download('brown')
# Train trigram
print("Loading Trigram...")

trigram_model = train_trigram()

The cell below is for making a loop with features and their labels will be the indeces of labels variable

In [149]:
pip install datasets

Note: you may need to restart the kernel to use updated packages.


In [150]:
from transformers import GPT2TokenizerFast, GPT2LMHeadModel
from datasets import load_dataset
import torch
import textwrap

In [151]:
##################################################
## helper function (nicer printing)
##################################################

def pretty_print(s):
    print("Output:\n" + 80 * '-')
    print(textwrap.fill(tokenizer.decode(s, skip_special_tokens=True),80))

In [153]:
##################################################
## instantiating LLM & its tokenizer
##################################################

# model_to_use = "gpt2"
model_to_use = "gpt2-large"

print("Using model: ", model_to_use)

# get the tokenizer for the pre-trained LM you would like to use
tokenizer = GPT2TokenizerFast.from_pretrained(model_to_use)

# instantiate a model (causal LM)
model = GPT2LMHeadModel.from_pretrained(model_to_use,
                                        output_scores=True,
                                        pad_token_id=tokenizer.eos_token_id)

# inspecting the (default) model configuration
# (it is possible to created models with different configurations)
print(model.config)

Using model:  gpt2-large


Downloading pytorch_model.bin:   0%|          | 0.00/3.25G [00:00<?, ?B/s]

Downloading generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

GPT2Config {
  "_name_or_path": "gpt2-large",
  "activation_function": "gelu_new",
  "architectures": [
    "GPT2LMHeadModel"
  ],
  "attn_pdrop": 0.1,
  "bos_token_id": 50256,
  "embd_pdrop": 0.1,
  "eos_token_id": 50256,
  "initializer_range": 0.02,
  "layer_norm_epsilon": 1e-05,
  "model_type": "gpt2",
  "n_ctx": 1024,
  "n_embd": 1280,
  "n_head": 20,
  "n_inner": null,
  "n_layer": 36,
  "n_positions": 1024,
  "output_scores": true,
  "pad_token_id": 50256,
  "reorder_and_upcast_attn": false,
  "resid_pdrop": 0.1,
  "scale_attn_by_inverse_layer_idx": false,
  "scale_attn_weights": true,
  "summary_activation": null,
  "summary_first_dropout": 0.1,
  "summary_proj_to_labels": true,
  "summary_type": "cls_index",
  "summary_use_proj": true,
  "task_specific_params": {
    "text-generation": {
      "do_sample": true,
      "max_length": 50
    }
  },
  "transformers_version": "4.32.1",
  "use_cache": true,
  "vocab_size": 50257
}



In [None]:
essay = open("4.txt").read() # Machine generated
input_tokens = tokenizer(essay, return_tensors="pt").input_ids

##################################################
## retrieving next-word surprisals from GPT-2
##################################################

# NB: we can supply tensors of labels (token ids for next-words, no need to right-shift)
# using -100 in the labels means: "don't compute this one"
labels        = torch.clone(input_tokens)
labels[0,0]   = -100
output_word2  = model(input_tokens[:,0:2], labels= labels[:,0:2])
output_prompt = model(input_tokens, labels=input_tokens)

# negative log-likelihood of provided labels
nll_word2  = output_word2.loss
nll_output = output_prompt.loss * input_tokens.size(1)
print("NLL of second word: ", nll_word2.item())
print("NLL of whole output:", nll_output.item())

In [None]:
final_train_exp_features = []
final_train_labels = []
for i in train:
    # Load data and featurize
    with open(files[i]) as f:
        doc = f.read().strip()
        # Strip data to first MAX_TOKENS tokens
        tokens = enc.encode(doc)[:MAX_TOKENS]
        doc = enc.decode(tokens).strip()
    trigram_model = train_trigram()

    trigram = np.array(score_ngram(doc, trigram_model, enc.encode, n=3, strip_first=False))
    unigram = np.array(score_ngram(doc, trigram_model.base, enc.encode, n=1, strip_first=False))
    
    vector_map = {
    # "gpt-logprobs": a,
    "trigram-logprobs": trigram,
    "unigram-logprobs": unigram
    }
    
    exp_features = []
    for exp in best_features:

        exp_tokens = get_words(exp)
        curr = vector_map[exp_tokens[0]]

        for i in range(1, len(exp_tokens)):
            if exp_tokens[i] in vec_functions:
                next_vec = vector_map[exp_tokens[i+1]]
                curr = vec_functions[exp_tokens[i]](curr, next_vec)
            elif exp_tokens[i] in scalar_functions:
                exp_features.append(scalar_functions[exp_tokens[i]](curr))
                break
    #Adding another feature from GPT-2
    essay = open(files[i]).read() # Machine generated
    input_tokens = tokenizer(essay, return_tensors="pt").input_ids
    
    labels        = torch.clone(input_tokens)
    labels[0,0]   = -100
    output_prompt = model(input_tokens, labels=input_tokens)
    nll_output = output_prompt.loss * input_tokens.size(1)
    exp_features.append(nll_output)
    final_train_exp_features.append(exp_features)
    final_train_labels.append(labels[i])

    # print(f"Input: {doc}")
# files[train[0]]

In [None]:
final_test_exp_features = []
final_test_labels = []
for i in test[:350]:
    # Load data and featurize
    with open(files[i]) as f:
        doc = f.read().strip()
        # Strip data to first MAX_TOKENS tokens
        tokens = enc.encode(doc)[:MAX_TOKENS]
        doc = enc.decode(tokens).strip()
    # trigram_model = train_trigram()

    trigram = np.array(score_ngram(doc, trigram_model, enc.encode, n=3, strip_first=False))
    unigram = np.array(score_ngram(doc, trigram_model.base, enc.encode, n=1, strip_first=False))
    
    vector_map = {
    # "gpt-logprobs": a,
    "trigram-logprobs": trigram,
    "unigram-logprobs": unigram
    }
    
    exp_features = []
    for exp in best_features:

        exp_tokens = get_words(exp)
        curr = vector_map[exp_tokens[0]]

        for i in range(1, len(exp_tokens)):
            if exp_tokens[i] in vec_functions:
                next_vec = vector_map[exp_tokens[i+1]]
                curr = vec_functions[exp_tokens[i]](curr, next_vec)
            elif exp_tokens[i] in scalar_functions:
                exp_features.append(scalar_functions[exp_tokens[i]](curr))
                break
            
    #Adding another feature from GPT-2 the average likelihood
    essay = open(files[i]).read() # Machine generated
    input_tokens = tokenizer(essay, return_tensors="pt").input_ids
    
    labels        = torch.clone(input_tokens)
    labels[0,0]   = -100
    output_prompt = model(input_tokens, labels=input_tokens)
    nll_output = output_prompt.loss * input_tokens.size(1)
    exp_features.append(nll_output)
            
    final_test_exp_features.append(exp_features)
    final_test_labels.append(labels[i])

In [103]:
from sklearn.calibration import CalibratedClassifierCV

base = LogisticRegression()
model = CalibratedClassifierCV(base, cv=5)

In [110]:
for i in range(len(final_train_exp_features)):
    final_train_labels[i] = labels[train[i]]

In [112]:
for i in range(len(final_test_exp_features)):
    final_test_labels[i] = labels[test[i]]

In [None]:
model.fit(final_train_exp_features, final_train_labels)
predictions = model.predict(final_test_exp_features[:])
Accuracy = sum(torch.tensor(predictions.reshape((350))) == torch.tensor(final_test_labels))/len(predictions)

In [147]:
Accuracy = (Accuracy).item()
print("Accuracy of the model is", Accuracy)

Accuracy of the model is 0.7028571367263794
