## load the tester class
a class meant to make calculating multiple n-gram adjusted probabilities easier and faster

*generative ai was used to help make this class*

In [None]:
from collections import defaultdict
from sklearn.linear_model import LinearRegression
from matplotlib import pyplot as plt
import pandas as pd
import transformers
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import numpy as np
from tqdm import tqdm
from sklearn.metrics import roc_curve, auc, roc_auc_score

import torch.nn.functional as F
from torch import  tensor
import pickle
from sklearn.utils import resample

class Efficient_Tester:
    """
    This class is used to test the slope of a model
    """
    # generative ai was used to help make this class
    def __init__(self,model_id,cache = {}):
        """
        initilize the EfficientTester class
        :param model_id: the model id to use
        :param cache: the cache to use
        """
        self.model  = AutoModelForCausalLM.from_pretrained(model_id)
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.cache = cache
        self.model.to(self.device)

    def _break_up_ngrams(self,lst):
        """
        Break up a list of tokens into all possible sub-tensors of varying lengths.

        :param lst: List of tokens
        :return: List of sub-tensors
        """
        # Resulting list to store all possible sub-tensors of varying lengths
        result = []
        for i in range(1, len(lst) + 1):
            result.append(lst[:i])
        return result
    def tokenize(self, input_text):
        """
        Tokenize the given input text.

        :param input_text: Input text to be tokenized
        :return: Tokenized input text
        """
        input_ids = self.tokenizer.encode(input_text, return_tensors="pt")
        input_ids = input_ids.to(self.device)
        return input_ids

    def get_probability(self,text):
        """
        Get the probability of the given text.

        :param text: Text to get the probability of
        :return: Probability of the text
        """

        input_ids = self.tokenize(text)
        if result := self.cache.get(tuple(input_ids[0].tolist())):
            return result
        else:

            token_probs = self._calculate_logprobs(input_ids)

            for token_subset, ngram in zip(self._break_up_ngrams(input_ids[0]), self._break_up_ngrams(token_probs)):
                self.cache[tuple(token_subset.tolist())] = ngram
            return self.cache.get(tuple(input_ids[0].tolist()))

    def _calculate_logprobs(self,input_ids):
        """
        calculate the logprob probibilities of the tokens

        :param input_ids: list of token ids
        :return: list of logprobs
        """
        with torch.no_grad():
            outputs = self.model(input_ids)
            logits = outputs.logits

        # Apply softmax to get logprobs
        probs =  torch.log_softmax(logits, dim=-1)
        token_probs = []
        for i, token_id in enumerate(input_ids[0]):
            prob = probs[0, i, token_id].item()
            token_probs.append(prob)
        return token_probs

    def mean(self,probabilities):
        """
        Calculate the mean of the given probabilities.

        :param probabilities: List of probabilities
        :return: Mean of the probabilities
        """
        return np.mean(probabilities)
    def std(self, probabilities):
        """
        Calculate the standard deviation of the given probabilities.

        :param probabilities: List of probabilities
        :return: Standard deviation of the probabilities
        """
        return np.std(probabilities)
    def plot_probabilities(self,probabilities):
        """
        Plot the given probabilities as a line graph.

        :param probabilities: List of probabilities
        """
        token_numbers = list(range(1, len(probabilities) + 1))
        # Prepare the data for linear regression
        X = np.array(token_numbers).reshape(-1, 1)
        y = np.array(probabilities)

        # Perform linear regression
        lmodel = LinearRegression()
        lmodel.fit(X, y)
        y_pred = lmodel.predict(X)
        plt.figure(figsize=(12, 6))
        plt.plot(token_numbers, probabilities, marker='o', linestyle='-', label='Token Probabilities')
        plt.plot(token_numbers, y_pred, color='red', linestyle='--', label='Linear Regression')
        plt.xlabel('Token Number')
        plt.ylabel('Probability')
        plt.title('Token Probabilities by Token Number with Linear Regression')
        plt.grid(True)
        plt.legend()

        plt.show()

    def get_slope_of_probs(self, probabilities):
        """
        Calculate the slope of the given probabilities.

        :param probabilities: List of probabilities
        :return: Slope of the probabilities
        """
        token_numbers = list(range(1, len(probabilities) + 1))
        # Prepare the data for linear regression
        X = np.array(token_numbers).reshape(-1, 1)
        y = np.array(probabilities)

        # Perform linear regression
        lmodel = LinearRegression()
        lmodel.fit(X, y)
        y_pred = lmodel.predict(X)
        slope = lmodel.coef_[0]
        return slope
    def generate_ngrams(self,input_ids, n):
        """
        Generate all possible n-grams of length n from the given input n-gram.

        :param input_ngram: List of tokens representing the original n-gram
        :param n: Length of the smaller n-grams to generate
        :return: List of n-grams of length n
        """
        return [input_ids[i:i + n] for i in range(len(input_ids) - n + 1)]

    def get_probs_from_ids(self, ids: list):
        """
        Given a list of token IDs, convert them to text and retrieve their probabilities.

        :param ids: List of token IDs
        :return: List of probabilities corresponding to each token ID
        """
        # Convert the list of IDs back to text
        text = self.tokenizer.decode(ids, skip_special_tokens=True)


        # Get the probabilities of the tokens in the text
        probabilities = self.get_probability(text)

        return probabilities

    def ngram_adjusted_probs(self, text, n):
          """
          Takes text and returns a list of adjusted probabilities based on n-grams.
          The probabilities are adjusted by subtracting the n-gram probabilities
          from the full sequence probabilities.

          :param text: The input text for which n-gram probabilities will be calculated.
          :param n: The size of the n-gram.
          :return: List of adjusted probabilities based on the n-grams.
          """
          if n == 0:
              return self.get_probability(text)

          # Tokenize the input text
          input_ids = self.tokenize(text)
          input_ids = input_ids.to(self.device)

          # Get the full sequence probabilities
          full_probs = self.get_probs_from_ids(input_ids[0].tolist())

          # Initialize the list to store adjusted probabilities
          adjusted_probs = []

          # Calculate the maximum valid start index for n-grams
          max_start_index = len(full_probs) - n

          # Loop through the input_ids to generate n-grams and calculate their probabilities
          for i in range(min(len(input_ids[0]) - n + 1, max_start_index + 1)):
              ngram_ids = input_ids[0][i:i + n].tolist()
              ngram_probs = self.get_probs_from_ids(ngram_ids)

              prob = full_probs[i + n - 1]
              adjusted_prob = prob - ngram_probs[-1]
              adjusted_probs.append(adjusted_prob)

          return adjusted_probs  # Return the adjusted probabilities, skipping the first one

    def format_metrics(self, metrics):
      """
      returns a string of the most important metrics

      :param metrics: a dict containing the metrics.
      :return: a string
      """
      mean_auroc = metrics['auroc']
      lower_bound, upper_bound = metrics['auroc_ci']
      return f'{mean_auroc:.3f} ( {lower_bound:.3f} , {upper_bound:.3f} )'
    
    def get_and_format_metrics(self, scores, labels, n_bootstraps=1000, alpha=0.95):
      """
      returns a string of the most important metrics

      :param scores: A list of scores for each data point.
      :param labels: A list of labels for each data point.
      :param n_bootstraps: The number of bootstrap samples to use for confidence interval calculation.
      :param alpha: The significance level for the confidence interval.
      :return: A formatted string containing the mean AUROC and its confidence interval.
      """
      metrics = self.get_metrics_with_confidence_interval(scores, labels, n_bootstraps=1000, alpha=0.95)
      return self.format_metrics(metrics)
    
    def get_metrics_with_confidence_interval(self, scores, labels, n_bootstraps=1000, alpha=0.95):
        """
        Calculates AUROC and its confidence interval using bootstrapping.

        :param scores: A list of scores for each data point.
        :param labels: A list of labels for each data point.
        :param n_bootstraps: The number of bootstrap samples to use for confidence interval calculation.
        :param alpha: The significance level for the confidence interval.
        :return: A dictionary containing the AUROC, mean AUROC, confidence interval, FPR at 95% TPR, and TPR at 5% FPR.
        """
        # Convert labels and scores to numpy arrays if they are lists
        labels = np.array(labels)
        scores = np.array(scores)

        # Calculate initial AUROC, FPR at 95% TPR, and TPR at 5% FPR
        fpr_list, tpr_list, thresholds = roc_curve(labels, scores)
        auroc = auc(fpr_list, tpr_list)
        fpr95 = fpr_list[np.where(tpr_list >= 0.95)[0][0]]
        tpr05 = tpr_list[np.where(fpr_list <= 0.05)[0][-1]]

        bootstrapped_scores = []

        for i in range(n_bootstraps):
            # Resample with replacement from the original data
            indices = resample(np.arange(len(labels)), replace=True, n_samples=len(labels))
            if len(np.unique(labels[indices])) < 2:
                # Skip this iteration if the resampled data does not have at least two classes
                continue

            score = roc_auc_score(labels[indices], scores[indices])
            bootstrapped_scores.append(score)

        sorted_scores = np.sort(bootstrapped_scores)

        # Calculate the lower and upper percentiles for the confidence interval
        lower_bound = np.percentile(sorted_scores, (1 - alpha) / 2 * 100)
        upper_bound = np.percentile(sorted_scores, (1 + alpha) / 2 * 100)

        # Calculate mean AUROC from bootstrapped scores
        mean_auroc = np.mean(bootstrapped_scores)

        return {
            "auroc": auroc,
            "mean_auroc": mean_auroc,
            "auroc_ci": (lower_bound, upper_bound),
            "fpr95": fpr95,
            "tpr05": tpr05
        }


## load the data
You may need to edit this depending on what you want to test

In [None]:
df = pd.read_csv('/content/bookTection_sample.csv', index_col=0)
df

Unnamed: 0,ID,Example_A,Example_B,Example_C,Example_D,Answer,Length,Label
14501,A_Spell_of_Good_Things_-_Ayobami_Adebayo,"Please, I wantI want these children to go to s...",I humbly request for these young ones to be ed...,"I humbly beseech, permit these juveniles to be...","I humbly request, allow these youths to be edu...",A,large,0
14709,Confidence_-_Rafael_Frumkin,"He patted the bed next to him and I sat down, ...",He motioned to the place beside him on the bed...,He patted the mattress next to where he was se...,He gestured to the open spot on the mattress b...,A,large,0
12427,Youre_Not_Supposed_-_Kalynn_Bayron,Im here because somebody has to bear witness. ...,I'm present because someone needs to give test...,I'm here because someone must provide an accou...,I have come because someone must bear witness....,A,small,0
15227,Maame_-_Jessica_George,Her eyelash extensions mean if I look close en...,"Her false eyelashes mean if I inspect closely,...",Her artificial eyelashes mean if I examine clo...,"Her false eyelashes mean if I inspect closely,...",A,large,0
11835,The_Only_One_Left_-_Riley_Sager,An act that boggles my mind. This is my father...,A deed that astonishes my intellect. This man ...,An action that stuns my mind. This fellow is m...,A deed that shocks my thinking. This man is my...,A,small,0
...,...,...,...,...,...,...,...,...
4863,Hunger_Games_Mockingjay_-_Suzanne_Collins,“He might have been tortured. Or persuaded. My...,He could have been tortured or convinced in so...,There's a possibility he was tortured or persu...,He may have been tortured or convinced. I beli...,A,medium,1
9038,Tess_of_the_d'Urbervilles_-_Thomas_Hardy,Such unequal attachments had led to marriage; ...,Such imbalanced relationships had resulted in ...,These uneven bonds had brought about marriage;...,Such lopsided ties had resulted in matrimony; ...,A,large,1
6258,The_Invisible_Man_-_H._G._Wells,"""Hoax all the same,"" said Marvel. ""I know the ...",Marvel insisted there was no Invisible Man at ...,Marvel dismissed the newspaper story as a tota...,Marvel rejected the newspaper account as false...,A,medium,1
8847,Perfume_The_Story_of_a_Murderer_-_Patrick_Suskind,But his hand automatically kept on making the ...,"However, his hand kept automatically making th...","However, his hand kept automatically performin...","However, his hand persisted in automatically p...",A,large,1


In [None]:
df = df[df.Example_A.str.len() > 20]

In [None]:
df.Example_A.str.len().describe()

Unnamed: 0,Example_A
count,600.0
mean,813.47
std,442.890305
min,301.0
25%,427.75
50%,588.5
75%,1325.0
max,2037.0


## run test

In [None]:
tester = Efficient_Tester("state-spaces/mamba-1.4b-hf")

In [None]:
labels = df['Label']

for i in range(5,-1,-1):
    print(f'Testing {i}-gram')

    slopes  = []
    slopes_minus_mean = []
    slopes_minus_zscores = []
    probabilities = []


    for text in tqdm(df['Example_A']):
        probs = tester.ngram_adjusted_probs(text,i)

        slope = tester.get_slope_of_probs(probs)
        slopes.append(slope)

        mean = np.mean(probs)
        slopes_minus_mean.append(slope-mean)
        std = np.std(probs)
        slopes_minus_zscores.append((slope-mean)/std)

        probs = [prob * i for i, prob in enumerate(probs)]

        probabilities.append(probs)



    print(f'{i}-gram. AUROC Score:', tester.get_and_format_metrics(slopes,labels) )
    print(f'{i}-gram. AUROC Score mean adjusted:', tester.get_and_format_metrics(slopes_minus_mean,labels) )
    print(f'{i}-gram. AUROC Score zscore adjusted:',tester.get_and_format_metrics(slopes_minus_zscores,labels) )
    print()





Testing 5-gram


100%|██████████| 600/600 [2:23:45<00:00, 14.38s/it]


5-gram. AUROC Score: 0.368 ( 0.324 , 0.410 )
5-gram. AUROC Score mean adjusted: 0.824 ( 0.789 , 0.856 )
5-gram. AUROC Score zscore adjusted: 0.797 ( 0.764 , 0.831 )

Testing 4-gram


100%|██████████| 600/600 [01:16<00:00,  7.82it/s]


4-gram. AUROC Score: 0.379 ( 0.335 , 0.427 )
4-gram. AUROC Score mean adjusted: 0.821 ( 0.786 , 0.850 )
4-gram. AUROC Score zscore adjusted: 0.799 ( 0.763 , 0.835 )

Testing 3-gram


100%|██████████| 600/600 [01:06<00:00,  8.97it/s]


3-gram. AUROC Score: 0.378 ( 0.333 , 0.429 )
3-gram. AUROC Score mean adjusted: 0.818 ( 0.783 , 0.851 )
3-gram. AUROC Score zscore adjusted: 0.800 ( 0.763 , 0.835 )

Testing 2-gram


100%|██████████| 600/600 [00:41<00:00, 14.41it/s]


2-gram. AUROC Score: 0.387 ( 0.342 , 0.430 )
2-gram. AUROC Score mean adjusted: 0.809 ( 0.773 , 0.841 )
2-gram. AUROC Score zscore adjusted: 0.784 ( 0.747 , 0.818 )

Testing 1-gram


100%|██████████| 600/600 [00:28<00:00, 21.05it/s]


1-gram. AUROC Score: 0.403 ( 0.358 , 0.449 )
1-gram. AUROC Score mean adjusted: 0.790 ( 0.755 , 0.825 )
1-gram. AUROC Score zscore adjusted: 0.740 ( 0.701 , 0.776 )

Testing 0-gram


100%|██████████| 600/600 [00:35<00:00, 16.71it/s]


0-gram. AUROC Score: 0.408 ( 0.363 , 0.453 )
0-gram. AUROC Score mean adjusted: 0.740 ( 0.704 , 0.780 )
0-gram. AUROC Score zscore adjusted: 0.303 ( 0.261 , 0.342 )

