# COLA

In [None]:
import os
import csv
import json
import time
import openai
import pickle
import numpy as np
import pandas as pd
from sklearn import metrics

In [None]:
DATA_SIZE = 200
random_seed = 7
openai.api_type = "azure"
openai.api_version = "2023-05-15"

In [None]:
def language_check(df):
    print('# en rows:', f"{df[df.lang=='en'].shape[0]:,}", ' % of English text:', f"{df[df.lang=='en'].shape[0] / df.shape[0]}")
    df = df[df.lang=='en']
    df.drop('lang', axis=1, inplace=True)
    return df


def eval_fn(df):
    print('Input shape:', df.shape)
    df['same'] = df.author_id1 == df.author_id2
    y_test, y_pred = df.same, df.binary
    acc = metrics.accuracy_score(y_test, y_pred)*100
    precision = metrics.precision_score(y_test, y_pred)*100
    recall = metrics.recall_score(y_test, y_pred)*100
    f1 = metrics.f1_score(y_test, y_pred)*100
    print('# same authors:', df['same'].sum())
    print("Accuracy: %.2f" % (acc), '%', end=' | ')   
    print("Precision: %.2f" % (precision), '%', end=' | ')
    print("Recall: %.2f" % (recall), '%', end=' | ')
    print("F1: %.2f" % (f1), '%\n')
    return acc, precision, recall, f1

## Chunked data

In [None]:
file_path = '/data//dataset/'
df = pd.read_csv(file_path+"blog_test_en.csv")
df.shape

In [None]:
df

In [None]:
df.rename(columns={'decoded_text': 'text'}, inplace=True)
df.text = df.text.str.strip()
df.isnull().any()

In [None]:
# Finding and removing duplicate rows
df[df[['text']].duplicated(keep=False)].sort_values('text')

In [None]:
print('# duplicates:', df.text.duplicated().sum(), 'sanity check:', df.shape[0] - len(set(df.text)))
print('Before removing duplicates, df.shape:', df.shape)
df = df.drop_duplicates(subset=['text'], keep='first').reset_index(drop=True)
print('New df.shape:', df.shape)

In [None]:
%%time
import py3langid
df['lang'] = df['text'].apply(lambda x: py3langid.classify(x)[0])
print(f"{df.shape[0]:,}")

In [None]:
df = language_check(df)
print(f"{df.shape[0]:,}") 

In [None]:
v = df.id.value_counts()
df = df[df.id.isin(v[v > 10].index)]
print('# unique authors', len(df.id.unique()))
df.shape

In [None]:
df.id.value_counts()

In [None]:
# check # of tokens
import tiktoken

def num_tokens_from_string(string: str, encoding_name: str) -> int:
    encoding = tiktoken.encoding_for_model(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens

for i in range(10):
    text1, text2 = df.sample(2).text.values
    print(num_tokens_from_string(text1 + text2, "gpt-3.5-turbo"))

Consider if we use all different authors

In [None]:
def sampler_fn(df, size=DATA_SIZE*5):
    """Sample a subset of df in a balanced way"""
    dict_to_df = []
    text_set = set()
    author_set = set()

    for i in range(size):
        # print('Reptition No.', i)
        if i % 2 == 0:  # sample documents from different authors
            df_tmp = df.sample(2)  # should not use random_seed because we want different samples in each iteration
            aut_id1, aut_id2 = df_tmp.id.tolist()
            # while aut_id1 in author_set:
            #     df_tmp = df.sample(2) 
            #     aut_id1, aut_id2 = df_tmp.id.tolist()
            text1, text2 = df_tmp.text.tolist()
            author_set.add(aut_id1)
            author_set.add(aut_id2)
        else:  # sample documents from same authors to make it balance
            same_auth_id = df.sample(1).id.values[0]
            while same_auth_id in author_set:
                same_auth_id = df.sample(1).id.values[0]
            aut_id1, aut_id2 = same_auth_id, same_auth_id
            text1, text2 = df[df.id==same_auth_id].sample(2).text.tolist()
            while text1 in text_set or text2 in text_set:
                text1, text2 = df[df.id==same_auth_id].sample(2).text.tolist()
        # print(text1, text2)
        dict_row = {}
        dict_row["text1"], dict_row["text2"] = text1, text2
        dict_row["aut_id1"], dict_row["aut_id2"] = aut_id1, aut_id2
        text_set.add(text1)
        text_set.add(text2)
        dict_to_df.append(dict_row)

    df_sub = pd.DataFrame(dict_to_df)
    df_sub['same'] = df_sub.aut_id1 == df_sub.aut_id2
    print('# same authors:', df_sub['same'].sum(), '# different authors:', len(np.unique(df_sub.aut_id1)))
    return df_sub
    

df_sub = sampler_fn(df)
df_sub.head()

In [None]:
df_sub[df_sub[['text1']].duplicated(keep=False)].sort_values('text1')

## Zero-shot Prompting

In [None]:
prompt1 = r"""
Your task is to verify if 2 input texts are written by a same author. Create a valid JSON object following this format: \
{
    "binary": "a boolean (True/False) indicating whether the input texts are written by a same author.",
    "analysis": "an analysis to support the binary output above",
    "confidence": "an integer score on a scale of 1-10 indicating how confident are you about the binary output.",
    "similarity": "an integer score on a scale of 1-10 indicating the similarity between two texts.",
}
"""

prompt2 = r"""
You are a linguist. Your task is to verify whether two input texts are written by the same author based on writing styles. Do not consider topic differences. 

Create a valid JSON object following this format: \
{
    "binary": "a boolean (True/False) indicating whether the input texts are written by a same author, make decision based on the linguistic analysis above.",
    "analysis": "an analysis of linguistic similarities and differences indicating whether the input texts are written by a same author. Be specific about which linguistic features you analyze.",
    "confidence": "an integer score on a scale of 1-10 indicating how confident are you about the binary output.",
    "similarity": "an integer score on a scale of 1-10 indicating the stylistic similarity between the two texts; a high score suggests that the texts were written by the same author.",
}
"""

prompt3 = r"""
You are a linguist. Your task is to verify whether two input texts are written by the same author based on writing styles. Do not consider topic differences.

Create a valid JSON object for authorship verification following this format:
{
    "binary": " a boolean (True/False) indicating whether the input texts are written by the same author. Decisions should be made based on analysis above and not on content differences.",
    "analysis": "an analysis of the grammer style of the input texts that support the binary output. Be specific and in detail.",
    "confidence": "an integer score on a scale of 1-10, representing how confident you are about the binary output.",
    "similarity": "an integer score on a scale of 1-10 indicating the style similarity between two texts; a high score suggests that the texts were written by the same author."
}
"""





In [None]:
def run_verfication(df_sub, model_name, prompt_prefix, data_size, prompt_postfix="", deployment_id="test", temperature=0):
    ls = []
    err_ls = []
    if model_name == 'gpt-4':
        openai.api_key = "" 
        openai.api_base = ""
    elif model_name == 'gpt-35-turbo-16k':
        openai.api_key = ""  # 
        openai.api_base = ""

    for i0, i in enumerate(df_sub.index[:data_size]):
        print('zero-indexed:', i0, ' index:', i)
        aut_id1, aut_id2 = df_sub.loc[i, 'aut_id1'], df_sub.loc[i, 'aut_id2']
        text1, text2 = df_sub.loc[i, 'text1'], df_sub.loc[i, 'text2']
        # print(text1, text2)

        prompt = prompt_prefix + f"""The input texts (text1 and text2) are delimited with triple backticks. Input texts: ```text1: {text1}, text2: {text2}```\
        Do not generate other word""" + prompt_postfix
        raw_response = openai.ChatCompletion.create(deployment_id=deployment_id, messages=[{"role": "user", "content": prompt}], temperature=temperature)
        response_str = raw_response.choices[0].message["content"]
        print('Raw response content:', response_str, '\n')
        
        # To avoid JSONDecodeError: Expecting ',' delimiter
        sub_str1, sub_str2 = '"analysis": "', '",\n    "confidence"'
        if sub_str2 not in response_str:
            sub_str2 = '", "confidence"'
        try:
            idx1, idx2 = response_str.index(sub_str1), response_str.index(sub_str2)
            # print(response_str[idx1+len(sub_str1):idx2])
            evidence_mod = response_str[idx1+len(sub_str1):idx2].replace('"', '\\"')
            str_mod = response_str[:idx1+len(sub_str1)] + evidence_mod + response_str[idx2:]
            response = json.loads(str_mod, strict=False)  
        except json.JSONDecodeError:
            print(f"===== JSONDecodeError =====\n")
            err_ls.append(i)
            continue
        except ValueError:
            print(f"ValueError.\n")
            err_ls.append(i)
            continue
        # print(str_mod, '\n')
        # response = json.loads(str_mod)
        # print(response, '\n')
        
        response["text1"], response["text2"] = text1, text2
        response["author_id1"], response["author_id2"] = aut_id1, aut_id2
        response["model"] = raw_response["model"]
        response["tokens"] = raw_response["usage"]["total_tokens"]
        ls.append(response)
        response = None
    print('# errors', len(err_ls), '\n')
    df_res = pd.DataFrame(ls)
    eval_fn(df_res)
    return df_res

### No guidance at all

In [None]:
%%time
df1_gpt35 = run_verfication(df_sub, model_name='gpt-35-turbo-16k', prompt_prefix=prompt1, data_size=DATA_SIZE)

In [None]:
%%time
df1_gpt4 = run_verfication(df_sub, model_name='gpt-4', prompt_prefix=prompt1, data_size=DATA_SIZE)

### No guidance + CoT

In [None]:
%%time
df1_gpt35_cot = run_verfication(df_sub[:], model_name='gpt-35-turbo-16k', prompt_prefix=prompt1, data_size=DATA_SIZE, prompt_postfix=" Let's think step by step.")

In [None]:
%%time
df1_gpt4_cot = run_verfication(df_sub, model_name='gpt-4', prompt_prefix=prompt1, data_size=DATA_SIZE, prompt_postfix=" Let's think step by step.")

### V2: some high-level guidance

In [None]:
%%time
df2_gpt35 = run_verfication(df_sub, model_name='gpt-35-turbo-16k', prompt_prefix=prompt2, data_size=DATA_SIZE)

In [None]:
%%time
df2_gpt4 = run_verfication(df_sub, model_name='gpt-4', prompt_prefix=prompt2, data_size=DATA_SIZE)

### Prompt V3: grammar as guidance

In [None]:
%%time
df3_gpt35 = run_verfication(df_sub, model_name='gpt-35-turbo-16k', prompt_prefix=prompt3, data_size=DATA_SIZE)

In [None]:
%%time
df3_gpt4 = run_verfication(df_sub, model_name='gpt-4', prompt_prefix=prompt3, data_size=DATA_SIZE)

### COLA Prompt V4: with more explicit linguistic guidance

In [None]:
%%time
df4_gpt35 = run_verfication(df_sub[:], model_name='gpt-35-turbo-16k', prompt_prefix=prompt4, data_size=DATA_SIZE)

In [None]:
%%time
df4_gpt4 = run_verfication(df_sub, model_name='gpt-4', prompt_prefix=prompt4, data_size=DATA_SIZE)

### Prompt V5: linguistic feature list + CoT

In [None]:
%%time
df4_gpt35_cot = run_verfication(df_sub, model_name='gpt-35-turbo-16k', prompt_prefix=prompt4, data_size=DATA_SIZE, prompt_postfix=" Let's think step by step. Only output the JSON string")

In [None]:
%%time
df4_gpt4_cot = run_verfication(df_sub, model_name='gpt-4', prompt_prefix=prompt4, data_size=DATA_SIZE, prompt_postfix=" Let's think step by step. Only output the JSON string")

In [None]:
prompt4 = r"""
You are a linguist and an authorship attribution expert. Your task is to verify whether two input texts are written by the same author based on their writing styles.
Analyze the writing style based on the following linguistic feature list: Phrasal verbs (e.g., blow up), Modal verbs (e.g., might, may), Punctuation, Rare words, Affixes \
(e.g., -ation, -ification), Quantities (e.g., a lot, many), Humor, Sarcasm, Typographical errors, and Misspellings. Decisions should be based on these linguistic \
features and not on content or discourse differences, as the same author can write about different topics and employ different discourse types.
Create a valid JSON object for authorship verification using this format:

{
    "analysis": "An analysis of the features from the linguistic feature list to support your claim. Be specific about which part of the text indicates which linguistic feature, suggesting different authorship. Consider each linguistic feature individually.",
    "binary": "A boolean (True/False) indicating whether the input texts are written by the same author. Decisions should be made based on the analysis above and not on content or discourse differences.",
    "confidence": "An integer score on a scale of 1-10, representing how confident you are about the binary output.",
    "similarity": "An integer score on a scale of 1-10 indicating the stylistic similarity between the two texts; a high score suggests that the texts were written by the same author.",
}
"""

In [None]:
def run_few_shot(df_sub, model_name, prompt_prefix, data_size, k, deployment_id="test", temperature=0):
    """k: number of extra example from the same author for k-shot prompting"""
    ls = []
    err_ls = []
    if model_name == 'gpt-4':
        openai.api_key = "d3c67b9fdf1e4f438d2ac07193c88708" 
        openai.api_base = "https://test-gpt-4-ks.openai.azure.com"
    elif model_name == 'gpt-35-turbo-16k':
        openai.api_key = "08e99b6c65e84ead8676c505ee4d6f1e"  # a2e5095fc4a6420e873cdfddcaf46915
        openai.api_base = "https://iarpa.openai.azure.com"

    for i0, i in enumerate(df_sub.index[:data_size]):
        print('zero-indexed:', i0, ' index:', i)
        aut_id1, aut_id2 = df_sub.loc[i, 'aut_id1'], df_sub.loc[i, 'aut_id2']
        text1, text2 = df_sub.loc[i, 'text1'], df_sub.loc[i, 'text2']
        # print(text1, text2)
        k_shot = df[df.id == aut_id1][:k]
        k_texts = k_shot.text.tolist()

        prompt = prompt_prefix + f"""The input texts are delimited with triple backticks: ```Example text: {text1+". Example text from same author ".join(k_texts)}, query text: {text2}```\
        Do not generate other word"""
        # prompt = prompt_prefix + f"""The input texts are delimited with triple backticks: ```{k+1} example texts written by the same author: {text1+k_texts}, query text: {text2}```\
        # Do not generate other word"""
        raw_response = openai.ChatCompletion.create(deployment_id=deployment_id, messages=[{"role": "user", "content": prompt}], temperature=temperature)
        response_str = raw_response.choices[0].message["content"]
        print(response_str, '\n')

        # To avoid JSONDecodeError: Expecting ',' delimiter
        sub_str1, sub_str2 = '"analysis": "', '", "binary"'
        if sub_str2 not in response_str:
            sub_str2 = '",\n  "binary"'
        if sub_str2 not in response_str:
            sub_str2 = '",\n"binary"'
        if sub_str2 not in response_str:
            sub_str2 = '",\n    "binary"' 

        try:
            idx1, idx2 = response_str.index(sub_str1), response_str.index(sub_str2)
            # print(response_str[idx1+len(sub_str1):idx2])
            evidence_mod = response_str[idx1+len(sub_str1):idx2].replace('"', '\\"')
            str_mod = response_str[:idx1+len(sub_str1)] + evidence_mod + response_str[idx2:]
        
            response = json.loads(str_mod)  
        # except json.JSONDecodeError:
        #     print(f"JSONDecodeError.\n")
        #     err_ls.append(i)
        #     continue
        except ValueError:
            print(f"ValueError.\n")
            err_ls.append(i)
            continue
        # print(response, '\n')
        response["text1"], response["text2"] = text1, text2
        response["author_id_text1"], response["author_id_text2"] = aut_id1, aut_id2
        response["model"] = raw_response["model"]
        response["tokens"] = raw_response["usage"]["total_tokens"]
        ls.append(response)
        response = None
    return pd.DataFrame(ls)

## Baselines
https://github.com/pan-webis-de/pan-code/blob/master/clef23/authorship-verification/pan23-verif-baseline-compressor.py

In [None]:
dict_to_df = []

for i in range(100):
    df_tmp = df.sample(2)  # should not use random_seed because we want different samples in each iteration
    aut_id1, aut_id2 = df_tmp.id.tolist()
    text1, text2 = df_tmp.text.tolist()
    dict_row = {}
    dict_row["text1"], dict_row["text2"] = text1, text2
    dict_row["aut_id1"], dict_row["aut_id2"] = aut_id1, aut_id2
    dict_to_df.append(dict_row)

df_imbalanced = pd.DataFrame(dict_to_df)
df_imbalanced['same'] = df_imbalanced.aut_id1 == df_imbalanced.aut_id2
print('# same authors:', df_imbalanced['same'].sum(), '# different authors:', len(np.unique(df_imbalanced.aut_id1)))
print('Null Accuracy:', 1 - df_imbalanced['same'].sum() / df_imbalanced.shape[0])

In [None]:
df_sub.same.sum() / df_sub.shape[0]

In [None]:
import pickle
from pathlib import Path
from itertools import combinations
from pan_verify_evaluator import evaluate_all
from sklearn.feature_extraction.text import TfidfVectorizer

In [None]:
import torch
import random
import numpy as np
import pandas as pd
import lightning as L
import torch.nn.functional as F

from torch import nn
from tqdm import tqdm
from datetime import datetime
from torch.utils.data import Dataset, DataLoader
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor
from transformers import AutoModel, AutoTokenizer, AdamW, get_linear_schedule_with_warmup

In [None]:
def binarize(y, threshold=0.5, triple_valued=False):
    y = np.array(y)
    y = np.ma.fix_invalid(y, fill_value=threshold)
    if triple_valued:
        y[y > threshold] = 1
    else:
        y[y >= threshold] = 1
    y[y < threshold] = 0
    return y

In [None]:
def cosine_sim(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))


def rescale(value, orig_min, orig_max, new_min, new_max):
    """
    Rescales a `value` in the old range defined by
    `orig_min` and `orig_max`, to the new range
    `new_min` and `new_max`. Assumes that
    `orig_min` <= value <= `orig_max`.
    Parameters
    ----------
    value: float, default=None
        The value to be rescaled.
    orig_min: float, default=None
        The minimum of the original range.
    orig_max: float, default=None
        The minimum of the original range.
    new_min: float, default=None
        The minimum of the new range.
    new_max: float, default=None
        The minimum of the new range.
    Returns
    ----------
    new_value: float
        The rescaled value.
    """

    orig_span = orig_max - orig_min
    new_span = new_max - new_min

    try:
        scaled_value = float(value - orig_min) / float(orig_span)
    except ZeroDivisionError:
        orig_span += 1e-6
        scaled_value = float(value - orig_min) / float(orig_span)

    return new_min + (scaled_value * new_span)


def correct_scores(scores, p1, p2):
    for sc in scores:
        if sc <= p1:
            yield rescale(sc, 0, p1, 0, 0.49)
        elif p1 < sc < p2:
            yield 0.5
        else:
            yield rescale(sc, p2, 1, 0.51, 1)  # np.array(list

### TF-IDF

In [None]:
def eval_baseline(df, rescale_flag=True, vocab_size=3000, ngram_size=4, threshold_mv=True):
    """" 
    the cosine similarity of two documents will range from 0 to 1, since the term frequencies
    (using tfâ€“idf weights) cannot be negative. 
    """
    print('-> load the model')
    vectorizer = TfidfVectorizer(max_features=vocab_size, analyzer='char', ngram_range=(ngram_size, ngram_size))
    print('-> calculating pairwise similarities')
    similarities, labels = [], []
    
    for i in df.index:
        x1, x2 = vectorizer.fit_transform(df.loc[i, ['text1', 'text2']]).toarray()
        similarities.append(cosine_sim(x1, x2))
        labels.append(int(df.loc[i, 'same']))
    
    similarities = np.array(similarities, dtype=np.float64)
    labels = np.array(labels, dtype=np.float64)
    print('cos similarity min and max:', np.min(similarities), np.max(similarities))
    if rescale_flag:
        # similarities = np.vectorize(rescale)(similarities, -1, 1, 0, 1)
        rescale_similarities = (similarities + 1) / 2
        print('rescaled cos similarity min and max:', np.min(rescale_similarities), np.max(rescale_similarities))
    print('score:', evaluate_all(pred_y=similarities, true_y=labels))
    adjusted = (similarities >= 0.5) * 1
    print('sklearn F1:', metrics.f1_score(labels, adjusted))
    print('\nscore after resclae:', evaluate_all(pred_y=rescale_similarities, true_y=labels))
    
    if threshold_mv:
        print('\n-> determining optimal threshold')
        scores = []
        for th in np.linspace(0.05, 0.95, 100):  # 
            adjusted = (similarities >= th)
            scores.append((th, metrics.f1_score(labels, adjusted),
                           metrics.precision_score(labels, adjusted),
                           metrics.recall_score(labels, adjusted)))
        thresholds, f1s, precisions, recalls = zip(*scores)

        max_idx = np.array(f1s).argmax()
        max_f1 = f1s[max_idx]
        max_th = thresholds[max_idx]
        print(f'Dev results -> F1={max_f1} at th={max_th}')
        
        adjusted_new = (similarities >= max_th)
    else:
        adjusted_new = (similarities >= 0.5)
    y_test, y_pred = labels, adjusted_new
    acc = metrics.accuracy_score(y_test, y_pred)*100
    precision = metrics.precision_score(y_test, y_pred)*100
    recall = metrics.recall_score(y_test, y_pred)*100
    f1 = metrics.f1_score(y_test, y_pred)*100
    return acc, precision, recall, f1

In [None]:
# %%time
eval_baseline(df_sub[:DATA_SIZE], rescale_flag=True, threshold_mv=False)
eval_baseline(df_sub[:DATA_SIZE], rescale_flag=True, threshold_mv=True)