<a href="https://colab.research.google.com/github/aguilin1/tda_ai_text_generation/blob/main/tda_ai_text_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
!git clone https://github.com/aguilin1/tda_ai_text_generation.git
!pip install nltk
!pip install Ripser

Cloning into 'tda_ai_text_generation'...
remote: Enumerating objects: 187, done.[K
remote: Counting objects: 100% (64/64), done.[K
remote: Compressing objects: 100% (64/64), done.[K
remote: Total 187 (delta 24), reused 0 (delta 0), pack-reused 123 (from 2)[K
Receiving objects: 100% (187/187), 17.43 MiB | 16.11 MiB/s, done.
Resolving deltas: 100% (80/80), done.


In [20]:
# set up Ripser for use (run this just once)

from ripser import ripser, Rips
from persim import plot_diagrams

In [16]:
# Functions ---
# cosSimDistanceMatrix: creates cosine similarity distance matrix
# vrFiltration: takes a distance matrix and computes Vietoris-Rips filtration

from sklearn.metrics.pairwise import cosine_similarity
import itertools
import warnings
from scipy.sparse import SparseEfficiencyWarning
warnings.simplefilter('ignore',SparseEfficiencyWarning)

def smoothing(embeddings):
  number_sentences = embeddings.shape[0]
  original_embeddings = embeddings.copy()
  for i in range(number_sentences):
    smoothed = original_embeddings[safe_index(i -3, number_sentences)]/8.0 +\
      original_embeddings[safe_index(i -2, number_sentences)]/4.0 + \
      original_embeddings[safe_index(i -1, number_sentences)]/2.0 + \
      original_embeddings[i] + \
      original_embeddings[safe_index(i +1, number_sentences)]/2.0 + \
      original_embeddings[safe_index(i +2, number_sentences)]/4.0 + \
      original_embeddings[safe_index(i +3, number_sentences)]/8.0
    embeddings[i] = smoothed
  return embeddings

def safe_index(i, total_n):
  if i < 0:
    return i * -1
  elif i >= total_n-1:
    return total_n -1 - (i % total_n)
  else:
    return i


def angular_distance(embeddings, index_1, index_2):
  cos_sim = cosine_similarity(embeddings[index_1].reshape(1, -1),\
                              embeddings[index_2].reshape(1, -1))[0][0]

  dist = 2 * np.arccos(cos_sim) / np.pi
  return dist

# input a single array of embeddings
def angular_distance_matrix(embeddings, time_skeleton = False):
  # get # of data points in each sample
  n_dpts = embeddings.shape[0]
  # preallocate distance matrix
  angular_distances = np.zeros((n_dpts, n_dpts))
  # Label each data point in order starting at 0, 1, 2, ...
  # Distance between data point i and data point j is in (i,j) entry of matrix
  # Matrix will be upper triangular
  for pair1_i, pair2_i in itertools.combinations(range(n_dpts), 2):
    if time_skeleton and (pair1_i == pair2_i + 1 or pair1_i == pair2_i - 1):
      dist = 0
    else:
      dist = angular_distance(embeddings, pair1_i, pair2_i)
    angular_distances[pair1_i][pair2_i] = dist
    angular_distances[pair2_i][pair1_i] = dist
  return angular_distances


def mixed_distance_matrix(bow_embeddings, bert_embeddings, alpha, time_skeleton = False):
  # This should align with the tokens (number of sentences), so match between types
  n_dpts = bert_embeddings.shape[0]

  cosSimDistances = np.zeros((n_dpts, n_dpts))

  for pair1_i, pair2_i in itertools.combinations(range(n_dpts), 2):
    if time_skeleton and (pair1_i == pair2_i + 1 or pair1_i == pair2_i - 1):
      dist = 0
    else:
      distance_bow = angular_distance(bow_embeddings, pair1_i, pair2_i)
      distance_bert = angular_distance(bert_embeddings, pair1_i, pair2_i)
      dist = alpha * distance_bow + (1 - alpha) * distance_bert

    cosSimDistances[pair1_i][pair2_i] = dist
    cosSimDistances[pair2_i][pair1_i] = dist
  return cosSimDistances

# input a symmetric distance matrix and this function will return birth and
# death homology data for the V-R complex
def ripserFiltration(distanceMat, maxDim=2):
  # Here's one way to use Ripser
  # examples and instructions are here:
  # https://ripser.scikit-tda.org/en/latest/notebooks/Basic%20Usage.html
  # https://docs.scikit-tda.org/en/latest/notebooks/scikit-tda%20Tutorial.html#1.1.3.-Input-option:-Distance-matrix

  # This version uses the distance matrix input, which is what we want
  birthDeathPairs = ripser(distanceMat, distance_matrix=True, maxdim=maxDim)['dgms']

  # diagrams is a vector of vectors where each element of each
  # vector is [birth epoch, death epoch] and the vectors are in increasing order
  # of homology (H0, H1, H2, etc.).
  return birthDeathPairs

In [17]:
import pickle

def load_cached_embeddings(abstract_type, embedding_type, num_abstracts):
  DATA_FILE_BASE = '/content/tda_ai_text_generation/data/{}_encodings_{}-{}.pkl'

  bow_data = DATA_FILE_BASE.format('bow', abstract_type, 0)

  if embedding_type == 'bert':
    batch_sentences = []
    batch_embeddings = []
    for i in range(0,num_abstracts,100):
      i += 1
      bert_data = DATA_FILE_BASE.format('sentence-bert', abstract_type, i)
      with open(bert_data, "rb") as data_file:
        cache_data = pickle.load(data_file)
        batch_sentences += cache_data['sentences']
        batch_embeddings += cache_data['embeddings']
  elif embedding_type == 'bow':
    with open(bow_data, "rb") as data_file:
        cache_data = pickle.load(data_file)
        batch_sentences = cache_data['sentences']
        batch_embeddings = cache_data['embeddings']
  return batch_sentences, batch_embeddings

# print(len(load_cached_embeddings('ai', 'bert',300)[0]))

In [24]:
import pickle
import numpy as np
import copy
import matplotlib.pyplot as plt
from ripser import Rips
from persim import PersImage
from persim import PersistenceImager
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

def diagram_sizes(dgms):
    return {f"H_{i}": len(d) for i, d in enumerate(dgms)}

def print_sizes(sizes, num_abstracts):
  num_h0 = 0
  num_h1 = 0
  num_h2 = 0
  for size in sizes:
    num_h0 += size['H_0']
    num_h1 += size['H_1']
    num_h2 += size['H_2']

  print("|H0|: {} |H1|: {} |H2|: {}".format(num_h0/num_abstracts, num_h1/num_abstracts, num_h2/num_abstracts))

# replace with should be multiplied by the max, right? 1.1 could be anywhere in the diagram?
def replace_infinity(diagrams, replace_with = 1.1):
  for diagram in diagrams:
    for point in diagram:
      if point[1] == np.inf:
        point[1] = replace_with
  return diagrams

# Eli: it might be better to just remove inf points; I think that Persistence Images were designed that way based on the paper.
def remove_infinity(diagrams):
  for diagram in diagrams:
    for point in diagram:
      if point[1] == np.inf:
        point = []
  return diagrams

def run_evaluation(abstract_type, embedding_type, time_skeleton = False, with_smoothing = False, graph = False, num_abstracts = 100):
  _, batch_embeddings = load_cached_embeddings(abstract_type, embedding_type, num_abstracts)

  # initialize diagram arrays
  diagrams_h0 = []
  diagrams_h1 = []
  diagrams_h2 = []

  rips = Rips()
  persistence_diagrams = []
  sizes = []
  description = "{} text encoded with {}".format(abstract_type, embedding_type.upper())
  if time_skeleton:
    description += " with time skeleton"
  if with_smoothing:
    description += " smoothed"
  if graph:
    num_abstracts = 3
    fig,axs = plt.subplots(nrows=2,ncols=3,figsize=(12,6))
    title = "Persistence diagram for {}".format(description)
    fig.suptitle(title)
    subplot = 0
  else:
    print(description)
  for i, embeddings in enumerate(batch_embeddings[0:num_abstracts]):
    if with_smoothing:
      embeddings = smoothing(embeddings)
    distances = angular_distance_matrix(embeddings, time_skeleton)
    diagram = ripserFiltration(distances)
    persistence_diagrams.append(diagram)
    sizes.append(diagram_sizes(diagram))

    # instantiate PeristanceImager to use in plotting
    pimgr = PersistenceImager(pixel_size=0.05)
    # pimgr.fit(replace_infinity(diagram))

    if graph:
      imgs = pimgr.transform(replace_infinity(diagram))
      pimgr.plot_image(imgs[0], axs[1][subplot])
      rips.plot(diagram, show=False, ax=axs[0][subplot])
      subplot += 1

    # For each new diagram, append its points to the the appropriate array. These must have all infs removed or replaced.
    diagram_infResolved = replace_infinity(diagram)
    # skip any empty ones
    if len(diagram_infResolved[0]) > 0:
      diagrams_h0.append(diagram_infResolved[0])
    if len(diagram_infResolved[1]) > 0:
      diagrams_h1.append(diagram_infResolved[1])
    if len(diagram_infResolved[2]) > 0:
      diagrams_h2.append(diagram_infResolved[2])

  # create labels array for regression analysis later--all zeros for ai, all ones for human
  if abstract_type == 'ai':
    labelsH0 = np.zeros(len(diagrams_h0))
    labelsH1 = np.zeros(len(diagrams_h1))
    labelsH2 = np.zeros(len(diagrams_h2))
  else:
    labelsH0 = np.ones(len(diagrams_h0))
    labelsH1 = np.ones(len(diagrams_h1))
    labelsH2 = np.ones(len(diagrams_h2))

  if graph:
    plt.show()

  print_sizes(sizes, num_abstracts)

  return persistence_diagrams, diagrams_h0, diagrams_h1, diagrams_h2, labelsH0, labelsH1, labelsH2


def run_mixed_encoding_evaluation(abstract_type, alpha, num_abstracts=6, time_skeleton = False, plot=False):
  _, batch_bow_embeddings = load_cached_embeddings(abstract_type, 'bow', num_abstracts)
  _, batch_bert_embeddings = load_cached_embeddings(abstract_type, 'bert', num_abstracts)
  if alpha < 0.0 and alpha > 1.0:
    raise ValueError("alpha must be between 0 and 1")

  rips = Rips()
  persistence_diagrams = []
  sizes = []
  description = "{} mixed encoding with alpha={}".format(abstract_type, alpha)
  if time_skeleton:
    description += " with time skeleton"
  if plot:
    fig,axs = plt.subplots(nrows=2,ncols=3,figsize=(12,6))
    title = "Persistence diagrams for {}".format(description)
    fig.suptitle(title)
    subplot = 0
  else:
    print(description)
  for i in range(num_abstracts):
    pimgr = PersistenceImager(pixel_size=0.05)
    distances = mixed_distance_matrix(batch_bow_embeddings[i], batch_bert_embeddings[i], alpha, time_skeleton)
    diagram = ripserFiltration(distances)
    persistence_diagrams.append(diagram)
    sizes.append(diagram_sizes(diagram))

    if plot:
      imgs = pimgr.transform(replace_infinity(diagram))
      pimgr.plot_image(imgs[0], axs[1][subplot])
      rips.plot(diagram, show=False, ax=axs[0][subplot])
      subplot += 1

  if plot:
    plt.show()

  print_sizes(sizes, num_abstracts)
  return persistence_diagrams

def evaluation(embedding_type, time_skeleton = False, with_smoothing = False, num_abstracts = 100):
  # initialize arrays to hold data for both the ai and human cases
  labelsH0_actual = []
  labelsH1_actual = []
  labelsH2_actual = []
  diagrams_h0_actual = []
  diagrams_h1_actual = []
  diagrams_h2_actual = []

  # run ai evaluations
  _, dH0, dH1, dH2, lblH0, lblH1, lblH2 = run_evaluation('ai', embedding_type, time_skeleton=time_skeleton, with_smoothing=with_smoothing, graph=False, num_abstracts=num_abstracts)

  # add ai run data to holding arrays
  labelsH0_actual.extend(lblH0)
  labelsH1_actual.extend(lblH1)
  labelsH2_actual.extend(lblH2)
  diagrams_h0_actual.extend(dH0)
  diagrams_h1_actual.extend(dH1)
  diagrams_h2_actual.extend(dH2)

  # run human evaluations
  _, dH0, dH1, dH2, lblH0, lblH1, lblH2 = run_evaluation('human', embedding_type, time_skeleton=time_skeleton, with_smoothing=with_smoothing, graph=False, num_abstracts=num_abstracts)

  # add human run data to holding arrays
  labelsH0_actual.extend(lblH0)
  labelsH1_actual.extend(lblH1)
  labelsH2_actual.extend(lblH2)
  diagrams_h0_actual.extend(dH0)
  diagrams_h1_actual.extend(dH1)
  diagrams_h2_actual.extend(dH2)

  # transform peristence diagrams into persistence images and generate flat vector versions of the PIs for user in regression
  # for H0, all the birth values are 0. This is specially handled by duplicating all the pairs and setting the duplicate birth values to 1 so that there is a 2-dimensionality in the resulting diagram
  if len(diagrams_h0_actual) > 0:
    # extend H0 to avoid error when all births are zero
    d = copy.deepcopy(diagrams_h0_actual)
    l = copy.deepcopy(labelsH0_actual)
    for diag in d:
      diag[:,0]=1
    diagrams_h0_actual.extend(d)
    labelsH0_actual.extend(l)
    # instantiate PersistenceImager, transform PD to PI, and flatten everything for regression analysis
    pimgrH0 = PersistenceImager(pixel_size=0.01)
    pimgrH0.fit(diagrams_h0_actual)
    imgsH0 = pimgrH0.transform(diagrams_h0_actual, skew=False)
    imgsH0_array = np.array([imgH0.flatten() for imgH0 in imgsH0])
  # for H1
  if len(diagrams_h1_actual) > 0:
    # instantiate PersistenceImager, transform PD to PI, and flatten everything for regression analysis
    pimgrH1 = PersistenceImager(pixel_size=0.01)
    pimgrH1.fit(diagrams_h1_actual)
    imgsH1 = pimgrH1.transform(diagrams_h1_actual, skew=True)
    imgsH1_array = np.array([imgH1.flatten() for imgH1 in imgsH1])
  # for H2
  if len(diagrams_h2_actual) > 0:
    # instantiate PersistenceImager, transform PD to PI, and flatten everything for regression analysis
    pimgrH2 = PersistenceImager(pixel_size=0.01)
    pimgrH2.fit(diagrams_h2_actual)
    imgsH2 = pimgrH2.transform(diagrams_h2_actual, skew=True)
    imgsH2_array = np.array([imgH2.flatten() for imgH2 in imgsH2])

  # Perform regression analysis on all dimensions
  if len(diagrams_h0_actual) > 0:
    X0_train, X0_test, y0_train, y0_test = train_test_split(imgsH0_array, labelsH0_actual, train_size=0.80, random_state=42)
    lrH0 = LogisticRegression()
    lrH0.fit(X0_train, y0_train)
    scr0 = lrH0.score(X0_test, y0_test)
    pred0 = lrH0.predict(X0_test)
    # print('H0: predicted classification(s): ', pred0)
    # print('H0: actual classification(s): ', y0_test)
    print('H0: classification success rate was', scr0)

  if len(diagrams_h1_actual) > 0:
    X1_train, X1_test, y1_train, y1_test = train_test_split(imgsH1_array, labelsH1_actual, train_size=0.80, random_state=42)
    lrH1 = LogisticRegression()
    lrH1.fit(X1_train, y1_train)
    scr1 = lrH1.score(X1_test, y1_test)
    pred1 = lrH1.predict(X1_test)
    # print('H1: predicted classification: ', pred1)
    # print('H1: actual classification: ', y1_test)
    print('H1: classification success rate was', scr1)

  if len(diagrams_h2_actual) > 0:
    X2_train, X2_test, y2_train, y2_test = train_test_split(imgsH2_array, labelsH2_actual, train_size=0.80, random_state=42)
    lrH2 = LogisticRegression()
    lrH2.fit(X2_train, y2_train)
    scr2 = lrH2.score(X2_test, y2_test)
    pred2 = lrH2.predict(X2_test)
    # print('H2: predicted classification: ', pred2)
    # print('H2: actual classification: ', y2_test)
    print('H2: classification success rate was', scr2)

evaluation('bow', time_skeleton=False, with_smoothing=False, num_abstracts=100)
evaluation('bert', time_skeleton=False, with_smoothing=False, num_abstracts=100)

Rips(maxdim=1, thresh=inf, coeff=2, do_cocycles=False, n_perm = None, verbose=True)
ai text encoded with BOW
|H0|: 8.57 |H1|: 0.95 |H2|: 0.01
Rips(maxdim=1, thresh=inf, coeff=2, do_cocycles=False, n_perm = None, verbose=True)
human text encoded with BOW
|H0|: 8.59 |H1|: 1.08 |H2|: 0.08
H0: classification success rate was 0.45
H1: classification success rate was 0.47619047619047616
H2: classification success rate was 1.0
Rips(maxdim=1, thresh=inf, coeff=2, do_cocycles=False, n_perm = None, verbose=True)
ai text encoded with BERT
|H0|: 8.57 |H1|: 0.87 |H2|: 0.08
Rips(maxdim=1, thresh=inf, coeff=2, do_cocycles=False, n_perm = None, verbose=True)
human text encoded with BERT
|H0|: 8.59 |H1|: 1.24 |H2|: 0.03
H0: classification success rate was 0.45
H1: classification success rate was 0.5238095238095238
H2: classification success rate was 0.5


In [None]:
## To wrap the array outputs for readability
from IPython.display import HTML, display

def set_css():
  display(HTML('''
  <style>
    pre {
        white-space: pre-wrap;
    }
  </style>
  '''))
get_ipython().events.register('pre_run_cell', set_css)

In [None]:
# Gather 3 sets: human cases, AI cases, and test ("unknown") cases

# Determine a representative persistence diagram for human cases and AI cases? HOW?

# Compare each of the test cases against the representative diagrams, classify,
# and determine whether classification was successful or not

# suggest using
# https://persim.scikit-tda.org/en/latest/notebooks/distances.html

# Example
# if A and B are two Ripser persistence diagrams to be compared
#distance = persim.bottleneck(A, B, matching=False)
# and then just see whether the test case distance from human diagram is smaller
# or larger than distance from AI diagram. Classify based on smaller distance