# Set Up
Set up the Google Colab environment and import dependent libraries.

In [1]:
#Loading data from Google drive
import os

try:
  from google.colab import drive
  drive.mount('/content/drive')
  os.chdir("/content/drive/My Drive/PAN14_Code")
  RUNNING_COLAB = True
except ImportError:
  print("I have a sneaking suspicion that I'm not running on Google Colab")
  RUNNING_COLAB = False


I have a sneaking suspicion that I'm not running on Google Colab


In [2]:
# Lambda to print module versions
ver = lambda module : print(f"{module.__name__}=={module.__version__}")

In [3]:
# Supress Tensorflow warnings (usually unnecessary,
#	but supresses some annoying warnings that happen when models are overwritten during testing)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [4]:
import json
import math
import csv
import numpy as np
import glob
import pickle
import itertools
from collections import Counter
ver(np)

#pip install nltk
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
#nltk.download(["punkt", "stopwords","wordnet"])
ver(nltk)

#pip install tensorflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import PolynomialDecay
from tensorflow.keras.callbacks import EarlyStopping
ver(tf)

#pip install gensim
import gensim
from gensim.models import Word2Vec
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
from gensim.test.utils import common_texts
ver(gensim)

#pip install scikit-learn
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.cluster import KMeans
from scipy.spatial.distance import cosine
from sklearn.svm import SVC
from sklearn.model_selection import KFold
from collections import defaultdict

#pip install networkx
import networkx as nx
import random
from tqdm import tqdm
from urllib.request import urlretrieve
ver(nx)

#pip install matplotlib
import matplotlib.pyplot as plt
import string

numpy==1.24.3
nltk==3.8.1
tensorflow==2.13.0
gensim==4.3.1
networkx==3.1


In [5]:
def nltk_setup(path = None):
  """Initialize and download the right modules"""
  if type(path) is None:
    # No change to default path (defaults to user home directory)
    nltk.download(["punkt", "stopwords","wordnet"])
  else:
    # Change default path
    nltk.data.path = [ path ]
    nltk.download(["punkt", "stopwords","wordnet"], download_dir=nltk.data.path[0])

if RUNNING_COLAB:
  nltk_setup()
else:
  nltk_setup(f"{os.path.curdir}/nltk_data")

[nltk_data] Downloading package punkt to ./nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to ./nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to ./nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [6]:
def print_colab_stats():
  # GPU info
  gpu_info = !nvidia-smi
  gpu_info = '\n'.join(gpu_info)
  if gpu_info.find('failed') >= 0:
    print('Not connected to a GPU')
  else:
    print(gpu_info)

  # Memory Info
  from psutil import virtual_memory
  ram_gb = virtual_memory().total / 1e9
  print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

  if ram_gb < 20:
    print('Not using a high-RAM runtime')
  else:
    print('You are using a high-RAM runtime!')

if RUNNING_COLAB: print_colab_stats()

# Data Process
Read the data in and save it in the dict.

In [7]:
def get_data_directory_path(subdirectory):
    return os.path.join('data', subdirectory)


def get_json_file_path(data_directory, file_name):
    return os.path.join(data_directory, file_name)


def read_json_file(file_path):
    with open(file_path) as file:
        data = json.load(file)
    return data


def extract_text_from_files(file_paths):
    known_text, unknown_text = [], []

    for file_path in file_paths:
        text_lines = []
        with open(file_path, 'r') as file:
            for line in file:
                cleaned_line = line.strip().lstrip("\ufeff")
                text_lines.append(cleaned_line)
        if 'unknown' in file_path:
            unknown_text.append(text_lines)
        else:
            known_text.append(text_lines)

    return known_text, unknown_text


def build_corpus(data_directory, content_data, label_data):
    corpus = {}

    for index in tqdm(range(len(content_data['problems']))):
        problem_file_paths = glob.glob(os.path.join(data_directory, content_data['problems'][index], '*'))

        if not problem_file_paths:
            continue

        known_text, unknown_text = extract_text_from_files(problem_file_paths)
        label = 1 if label_data['problems'][index]['answer'] == 'Y' else 0

        corpus[index] = {
            'known': known_text,
            'unknown': unknown_text,
            'label': label
        }

    return corpus

In [8]:
# Get data path
train_data_directory = get_data_directory_path('train_data')
validation_data_directory = get_data_directory_path('val_data')
test_data_directory = get_data_directory_path('test_data')

In [9]:
# Train
train_content = read_json_file(get_json_file_path(train_data_directory, 'contents.json'))
train_labels = read_json_file(get_json_file_path(train_data_directory, 'truth.json'))
# # Val
#validation_content = read_json_file(get_json_file_path(validation_data_directory, 'contents.json'))
#validation_labels = read_json_file(get_json_file_path(validation_data_directory, 'truth.json'))
# Test
test_content = read_json_file(get_json_file_path(test_data_directory, 'contents.json'))
test_labels = read_json_file(get_json_file_path(test_data_directory, 'truth.json'))

In [10]:
# Get train corpus
train_corpus = build_corpus(train_data_directory, train_content, train_labels)
# # Get val corpus
#val_corpus = build_corpus(validation_data_directory, validation_content, validation_labels)
# Get test corpus
test_corpus = build_corpus(test_data_directory, test_content, test_labels)

100%|██████████| 200/200 [00:00<00:00, 3560.76it/s]
100%|██████████| 200/200 [00:00<00:00, 3563.43it/s]


In [11]:
# Split the training data into training and validation
def split_data_into_train_and_val(data_dict, test_size=0.2, random_state=42):
    document_ids, labels = zip(*[(doc_id, data['label']) for doc_id, data in data_dict.items()])

    train_ids, val_ids, train_labels, val_labels = train_test_split(document_ids, labels, test_size=test_size, random_state=random_state)

    train_data = {doc_id: data_dict[doc_id] for doc_id in train_ids}
    validation_data = {doc_id: data_dict[doc_id] for doc_id in val_ids}

    return train_data, validation_data

train_corpus, val_corpus = split_data_into_train_and_val(data_dict=train_corpus)

# Train Word2Vec Model

In [None]:
def preprocess_text(text):
    """
    Preprocess a given text by tokenizing, removing punctuation and numbers,
    removing stop words, and lemmatizing.

    Args:
        text (str): The text to preprocess.

    Returns:
        list: The preprocessed text as a list of tokens.
    """
    if not isinstance(text, str):
        text = str(text)

    # Tokenize the text into words
    tokens = word_tokenize(text.lower())

    # Remove punctuation and numbers
    table = str.maketrans('', '', string.punctuation + string.digits)
    tokens = [word.translate(table) for word in tokens]

    # Remove stop words
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if (not word in stop_words) and (word != '')]

    # Lemmatize words
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(word) for word in tokens]

    return tokens

def train_word2vec_model(data, vector_size):
    """
    Train a word2vec model using the given data.

    Args:
        data (dict): The data to use for training the model.
        vector_size (int): The size of the word vectors in the model.

    Returns:
        gensim.models.Word2Vec: The trained word2vec model.
    """
    corpus = []

    # Process all articles in the data
    for articles in tqdm(data.values(), total=len(data)):
        all_articles = []
        all_articles.extend(articles['known'])
        all_articles.extend(articles['unknown'])

        for article in all_articles:
            for line in article:
                text = line.strip()
                tokens = preprocess_text(text)
                corpus.append(tokens)

    # Train the word2vec model
    word2vec_model = gensim.models.Word2Vec(vector_size=vector_size, window=5, min_count=1, workers=4)
    word2vec_model.build_vocab(corpus)
    word2vec_model.train(corpus, total_examples=word2vec_model.corpus_count, epochs=word2vec_model.epochs)

    return word2vec_model

In [None]:
# Size of word vectors in the word2vec model
w2v_vector_size = 300

In [None]:
# Train a word2vec model using the training corpus
word2vec_model = train_word2vec_model(train_corpus, w2v_vector_size)

# Vectorize Text Data

In [None]:
def convert_text_to_vector(texts, model):
    """
    Convert a list of texts into their corresponding word2vec vectors
    """
    vectors = []
    for text in texts:
        words = preprocess_text(text)
        vector = np.sum([model.wv[word] for word in words if word in model.wv], axis=0)
        word_count = np.sum([word in model.wv for word in words])
        if word_count != 0:
            vector /= word_count
        else:
          vector = np.zeros(w2v_vector_size)
        vectors.append(vector)
    return vectors

In [None]:
def count_punctuations(texts):
  """
  Count the frequency of different punctuations in the texts
  """
  # Define punctuations to count
  punctuations = set(['.', ',', ';', ':', '!', '?', '-', '(', ')', '\"', '\'', '`', '/'])

  # Initialize dictionary to count punctuations
  punctuations_count = {p: 0 for p in punctuations}

  # Count punctuations in text_list
  for text in texts:
      for char in text:
          if char in punctuations:
              punctuations_count[char] += 1

  # Return list of punctuation counts
  return list(punctuations_count.values())

In [None]:
def analyze_sentence_lengths(sentences):
  """
  Analyze the lengths of sentences
  """
  sentence_lengths = [len(sentence.split()) for sentence in sentences]
  average_length = np.mean(sentence_lengths)
  count_over_avg = np.sum([length > average_length for length in sentence_lengths])
  count_under_avg = np.sum([length < average_length for length in sentence_lengths])
  count_avg = len(sentence_lengths) - count_over_avg - count_under_avg

  return [count_over_avg, count_under_avg, count_avg, average_length]

In [None]:
def analyze_words(texts):
    """
    Analyze the words used in the texts
    """
    words = []
    stop_words = set(stopwords.words('english'))
    lemmatizer = WordNetLemmatizer()
    for text in texts:
        tokenized = word_tokenize(text.lower())
        processed = [lemmatizer.lemmatize(word) for word in tokenized if word not in stop_words]
        words += processed
    word_freq = nltk.FreqDist(words)
    rare_count = np.sum([freq <= 2 for word, freq in word_freq.items()])
    long_count = np.sum([len(word) > 6 for word in words])
    word_lengths = [len(word) for word in words]
    average_length = np.mean(word_lengths)
    count_over_avg = np.sum([length > average_length for length in word_lengths])
    count_under_avg = np.sum([length < average_length for length in word_lengths])
    count_avg = len(word_lengths) - count_over_avg - count_under_avg
    ttr = len(set(words)) / len(words) if words else 0

    return [rare_count, long_count, count_over_avg, count_under_avg, count_avg, ttr]

In [None]:
def calculate_style_vector(texts):
  """
  Calculate the style vector of the texts
  """
  punctuation_vec = count_punctuations(texts)     # Punctuations stylistic features
  sentence_vec = analyze_sentence_lengths(texts)  # Sentences stylistic features
  word_vec = analyze_words(texts)                 # Words stylistic features
  word_count = np.sum([len(text.split()) for text in texts])

  vector = np.concatenate((punctuation_vec, sentence_vec, word_vec))

  return vector / word_count if word_count else vector

In [None]:
def get_vectors(texts, w2v_model):
  res = []
  for text in texts:
    w2v_vec = np.mean(convert_text_to_vector(text, w2v_model), axis=0)
    style_vec = calculate_style_vector(text)
    res.append(np.concatenate((w2v_vec, style_vec), axis=None))
    # res.append(w2v_vec)

  return res

In [None]:
def vectorize_text_data(data, w2v_model):
  """
  Build author data from the corpus
  """
  res = {}
  for key,val in tqdm(data.items(), total=len(data)):
    if len(val['unknown']) == 0:
      continue
    res[key] = {
        'known': get_vectors(val['known'], w2v_model),
        'unknown': get_vectors(val['unknown'], w2v_model),
        'label': val['label']
    }

  return res

In [None]:
train_data = vectorize_text_data(train_corpus, word2vec_model)
val_data = vectorize_text_data(val_corpus, word2vec_model)
test_data = vectorize_text_data(test_corpus, word2vec_model)

# Build Triplet Samples

In [None]:
# Random triplet mining
def build_random_triplet_sample(data):
  """
  This function creates random triplet samples from the input data
  """

  keys_list = list(data.keys())
  triplet_samples = {}

  # Initialize the lists for storing the anchor, positive, and negative samples
  anchors, positives, negatives = [], [], []

  for key,val in tqdm(data.items(), total=len(data)):
    n = len(val['known'])
    for i in range(n):
      for j in range(i+1, n):
        anchors.append(val['known'][i])
        positives.append(val['known'][j])
        # Get negative sample
        while True:
          random_key = random.choices(keys_list, k=1)
          if random_key != key:
            break
        random_neg_sample = random.choices(data[random_key[0]]['known'], k=1)
        negatives.append(random_neg_sample[0])


  # Build triplet sample
  for i in range(len(anchors)):
    triplet_samples[i] = {
        'anchor': anchors[i],
        'positive': positives[i],
        'negative': negatives[i]
    }

  return triplet_samples

In [None]:
random_triplet_samples = build_random_triplet_sample(train_data)

In [None]:
anchor_data = np.array([data['anchor'] for data in random_triplet_samples.values()])
positive_data = np.array([data['positive'] for data in random_triplet_samples.values()])
negative_data = np.array([data['negative'] for data in random_triplet_samples.values()])
labels_data = np.array([0 for _ in random_triplet_samples.values()])

In [None]:
val_random_triplet_samples = build_random_triplet_sample(val_data)

In [None]:
val_anchor_data = np.array([data['anchor'] for data in val_random_triplet_samples.values()])
val_positive_data = np.array([data['positive'] for data in val_random_triplet_samples.values()])
val_negative_data = np.array([data['negative'] for data in val_random_triplet_samples.values()])
val_labels_data = np.array([0 for _ in val_random_triplet_samples.values()])

# Build SiameseNet Model

## Model Frame

In [None]:
class SiameseNet(tf.keras.Model):
    def __init__(self, base_network, clf_network):
        super().__init__()
        self.base = base_network
        self.clf = clf_network

    def call(self, inputs):
        anchor = inputs[0]
        positive = inputs[1]
        negative = inputs[2]

        output_anchor = self.base(anchor)
        output_positive = self.base(positive)
        output_negative = self.base(negative)

        # Anchor - Positive
        x1 = tf.concat([output_anchor, output_positive], axis=-1)
        x1_out = self.clf(x1)

        # Anchor - Negative
        x2 = tf.concat([output_anchor, output_negative], axis=-1)
        x2_out = self.clf(x2)

        return (x1_out, x2_out)

In [None]:
def create_dense_block(x, units, dropout_rate, l1_reg, l2_reg):
    x = tf.keras.layers.Dense(units, kernel_regularizer=tf.keras.regularizers.l1_l2(l1=l1_reg, l2=l2_reg))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    return tf.keras.layers.Dropout(dropout_rate)(x)

In [None]:
# Define the base network
def create_base_network(embedding_dim, dropout_rate=0.4, l1_reg=0.001, l2_reg=0.001):
    input = tf.keras.layers.Input(shape=embedding_dim)
    x = tf.keras.layers.BatchNormalization()(input)

    x = create_dense_block(x, 256, dropout_rate, l1_reg, l2_reg)
    x = create_dense_block(x, 128, dropout_rate, l1_reg, l2_reg)
    x = create_dense_block(x, 64, dropout_rate, l1_reg, l2_reg)

    x = tf.keras.layers.Dense(64, activation='linear')(x)

    return tf.keras.Model(inputs=input, outputs=x)

In [None]:
def create_clf_network(input_shape, dropout_rate=0.5, l1_reg=0.003, l2_reg=0.003):
    input = tf.keras.layers.Input(shape=(input_shape,))
    x = tf.keras.layers.BatchNormalization()(input)

    x = create_dense_block(x, 128, dropout_rate, l1_reg, l2_reg)
    x = create_dense_block(x, 64, dropout_rate, l1_reg, l2_reg)
    x = create_dense_block(x, 32, dropout_rate, l1_reg, l2_reg)

    x = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    return tf.keras.Model(inputs=input, outputs=x)


In [None]:
def customer_loss(y_true, y_pred):
    AP = y_pred[0]
    AN = y_pred[1]

    loss = 1.0 - AP + AN

    return loss

## Construct the Model

In [None]:
# Define the embedding dimension
embedding_dim = anchor_data[0].shape

# Create base network
base_network = create_base_network(embedding_dim)
clf_network = create_clf_network(base_network.output_shape[1]*2)

siamese_model = SiameseNet(base_network, clf_network)

In [None]:
input_anchor = tf.keras.layers.Input(shape=embedding_dim)
input_positive = tf.keras.layers.Input(shape=embedding_dim)
input_negative = tf.keras.layers.Input(shape=embedding_dim)

In [None]:
# Assemble siameseNet model
siamese_model.compile(optimizer='adam',
                      loss=customer_loss)

In [None]:
checkpoint_path = "model_weights/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# Create a callback that saves the model's weights
cp_save = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                             save_weights_only=True,
                                             verbose=1)

## Load SiameseNet Model Weights

In [None]:
latest = tf.train.latest_checkpoint(checkpoint_dir)
siamese_model.load_weights(latest)

## Train SiameseNet Model

### Train on Random Triplet Samples

In [None]:
# Train siameseNet model
early_stopping = EarlyStopping(monitor='val_loss', patience=100, verbose=1)
siamese_history = siamese_model.fit([anchor_data, positive_data, negative_data], labels_data,
                  epochs=1000,
                  validation_data=([val_anchor_data, val_positive_data, val_negative_data], val_labels_data),
                  callbacks=[early_stopping, cp_save])

In [None]:
loss = siamese_history.history['loss']
val_loss = siamese_history.history['val_loss']

# Draw
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.title('Loss and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

### Train on Semi-Hard Triplet Samples

#### Semi-Hard Samples Construct

In [None]:
# Build semi-hard triplet sample candidates
def build_triplet_sample_candidates(data):
  res = {}

  keys = []
  anchors = []
  positives = []

  for key,val in tqdm(data.items(), total=len(data)):
    n = len(val['known'])
    for i in range(n-1):
      keys.append(key)
      anchors.append(val['known'][i])
      positives.append(val['known'][i+1:])

  for i in range(len(keys)):
    res[i] = {
        'key': keys[i],
        'anchor': anchors[i],
        'positives': positives[i]
    }

  return res

In [None]:
triplet_sample_candidates = build_triplet_sample_candidates(train_data)

In [None]:
def create_negative_vectors_dict(data):
    negative_vectors_dict = {}
    key_list = list(data.keys())

    for key in tqdm(key_list, total=len(key_list)):
        negative_vectors_dict[key] = []
        for k,v in data.items():
            if k != key:
                for vec in v['known']:
                    negative_vectors_dict[key].append(vec)
    return negative_vectors_dict

In [None]:
negative_vectors_dict = create_negative_vectors_dict(train_data)

In [None]:
def select_random_from_list(input_list):
    """
    Selects a random item from a list.
    """
    return input_list[np.random.randint(0, len(input_list))]

# def select_negative_vectors(negatives, key):
#     """
#     Collects all negative vectors except for the one corresponding to the key.
#     """
#     return [vec for k,v in negatives.items() if k != key for vec in v['known']]

def get_random_triplet(sample, negatives):
    """
    This function takes a sample and negatives, and returns a random triplet of anchor, positive, and negative.
    """
    # Select the positive vector
    positive = select_random_from_list(sample['positives'])

    # Select the negative vector
    negative = select_random_from_list(negatives[sample['key']])

    return sample['anchor'], positive, negative

def get_hard_triplet(sample, negatives, base_model, clf_model,):
    """
    This function takes a sample, negatives, and a model, and returns a hard triplet of anchor, positive, and negative.
    The sample with the lowest probability is the hardest positive sample,
    while a high probability indicates that the model is confident in classifying it as positive.
    Therefore, the lowest probability implies that the model has incorrectly classified it.
    """
    anchor_rep = base_model.predict(np.array([sample['anchor']]), verbose=0)

    ### ------ Positive ------ ###
    # Compute distances between anchor and all positive vectors
    positive_reps = base_model.predict(np.array(sample['positives']), verbose=0)
    AP_reps = []
    for rep in positive_reps:
        comb = np.concatenate((anchor_rep[0], rep), axis=None)
        AP_reps.append(comb)

    # Select the hardest positive (the one with the lowest probability)
    positive_distances = clf_model.predict(np.array(AP_reps), verbose=0)
    hardest_positive = sample['positives'][np.argmin(positive_distances)]


    ### ------ Negative ------ ###
    # Collect all negative vectors and compute distances to anchor
    negative_vectors = negatives[sample['key']]
    negative_reps = base_model.predict(np.array(negative_vectors), verbose=0)
    AN_reps = []
    for rep in negative_reps:
        comb = np.concatenate((anchor_rep[0], rep), axis=None)
        AN_reps.append(comb)

    # Select the hardest negative (the one with the highest probability)
    negative_distances = clf_model.predict(np.array(AN_reps), verbose=0)
    hardest_negative = negative_vectors[np.argmax(negative_distances)]

    # # positive_distances = [compute_cosine_distance(pos_rep, anchor_rep[0]) for pos_rep in positive_reps]
    # positive_distances = [np.sum(np.square(pos_rep - anchor_rep[0])) for pos_rep in positive_reps]

    # # Select the hardest positive (the one with the largest distance)
    # hardest_positive = sample['positives'][np.argmax(positive_distances)]

    # # Collect all negative vectors and compute distances to anchor
    # negative_vectors = select_negative_vectors(negatives, sample['key'])
    # negative_reps = model.predict(np.array(negative_vectors), verbose=0)
    # negative_distances = [np.sum(np.square(neg_rep - anchor_rep[0])) for neg_rep in negative_reps]

    # # Select the hardest negative (the one with the smallest distance)
    # hardest_negative = negative_vectors[np.argmin(negative_distances)]

    return sample['anchor'], hardest_positive, hardest_negative

def get_triplet(sample, negatives, base_model, clf_model, hard_triplet_probability):
    """
    This function decides between selecting a hard triplet or a random triplet based on the hard_triplet_probability.
    """
    if np.random.rand() < hard_triplet_probability:
        # With a certain probability, choose the hardest triplet
        return get_hard_triplet(sample, negatives, base_model, clf_model)
    else:
        # Otherwise, choose a random triplet
        return get_random_triplet(sample, negatives)


#### Training on Semi-Hard Samples

In [None]:
num_epochs = 100
patience = 10
previous_loss = float('inf')

hard_triplet_probability_start=0.5
hard_triplet_probability_end=0.8

early_stopping_2 = EarlyStopping(monitor='loss', patience=patience, verbose=0)

In [None]:
# Initial probability of selecting a hard triplet
triplet_select_probability = hard_triplet_probability_start

# Iterate over each epoch
for epoch in tqdm(range(num_epochs)):
  # Initialize empty lists for anchor, positive, negative samples and labels
  anchor_samples = []
  positive_samples = []
  negative_samples = []
  labels = []

  # Iterate over triplet samples
  for _, sample in triplet_sample_candidates.items():
    # Get the anchor, positive, negative samples
    anchor, positive, negative = get_triplet(sample, negative_vectors_dict, base_network, clf_network, triplet_select_probability)
    # Add samples to their respective lists
    anchor_samples.append(anchor)
    positive_samples.append(positive)
    negative_samples.append(negative)
    labels.append(0)

  # Convert lists to numpy arrays
  anchor_samples = np.array(anchor_samples)
  positive_samples = np.array(positive_samples)
  negative_samples = np.array(negative_samples)
  labels = np.array(labels)

  # Train the model on current epoch's data
  siamese_model.fit([anchor_samples, positive_samples, negative_samples], labels,
                    epochs=50,
                    verbose=1,
                    callbacks=[early_stopping_2, cp_save])

  # Gradually increase the probability of choosing a hard triplet
  triplet_select_probability += (hard_triplet_probability_end - hard_triplet_probability_start) / num_epochs

  # Uncomment the following section for Early Stopping
  # Check if current epoch is a 'patience' epoch
  if epoch % patience == 0 and epoch != 0:
    current_loss = siamese_model.history.history['loss'][-1]
    # Check if loss is increasing or constant, if yes, then stop training
    if current_loss >= previous_loss:
      print("Early stopping triggered. Stopping training.")
      break
    else:
      # Update previous loss with current loss
      previous_loss = current_loss

# Inference and Validation

In [None]:
def generate_concatenated_vectors(data, base_network):
  concatenated_vectors = []
  labels = []

  for k, v in tqdm(data.items(), total=len(data)):
    # Process known vectors
    known_feature_vectors = base_network.predict(np.array(v['known']), verbose=0)

    # Process unknown vectors
    unknown_feature_vectors = base_network.predict(np.array(v['unknown']), verbose=0)

    # Compute the average feature vector
    author_representation = np.mean(known_feature_vectors, axis=0)
    unknown_representation = np.mean(unknown_feature_vectors, axis=0)

    concate_vec = np.concatenate((author_representation, unknown_representation), axis=None)

    concatenated_vectors.append(concate_vec)
    labels.append(v['label'])

  return np.array(concatenated_vectors), np.array(labels)

In [None]:
# Build train siamese_embedding dataset
train_siamese_vec, train_siamese_labels = generate_concatenated_vectors(train_data, base_network)

# Build val siamese_embedding dataset
val_siamese_vec, val_siamese_labels = generate_concatenated_vectors(val_data, base_network)

# Build test siamese_embedding dataset
test_siamese_vec, test_siamese_labels = generate_concatenated_vectors(test_data, base_network)

In [None]:
clf_network.compile(optimizer='adam',
                    loss='binary_crossentropy',
                    metrics=['accuracy', tf.keras.metrics.AUC()])

In [None]:
clf_early_stopping = EarlyStopping(monitor='val_loss', patience=100, verbose=1)
clf_history = clf_network.fit(train_siamese_vec, train_siamese_labels,
                              epochs=1000,
                              verbose=1,
                              validation_data = (val_siamese_vec, val_siamese_labels),
                              callbacks=[clf_early_stopping])

In [None]:
res = clf_network.evaluate(test_siamese_vec, test_siamese_labels)

In [None]:
res = clf_network.evaluate(test_siamese_vec, test_siamese_labels)

# Calculate Score

In [None]:
def calculate_score(y_predict, y_true):
    n = len(y_predict)
    n_correct = 0
    n_unknown = 0

    for i in range(n):
        if y_predict[i] > 0.5:
            prediction = 1
        elif y_predict[i] < 0.5:
            prediction = 0
        else:
            n_unknown += 1
            continue

        if prediction == y_true[i]:
            n_correct += 1

    c_1 = (n_correct + (n_unknown * n_correct / n)) / n
    auc = tf.keras.metrics.AUC()(y_true, y_predict)
    score = auc.numpy() * c_1

    return c_1, auc.numpy(), score

In [None]:
nn_pred = clf_network.predict(test_siamese_vec)
c_1, auc, score = calculate_score(nn_pred, test_siamese_labels)

print("C@1:", round(c_1, 3))
print("AUC:", round(auc, 3))
print("Final Score:", round(score, 3))

# Cleanup
Optional: only run if you need to clean up the directory to re-run tests

In [None]:
def deep_clean(path, inc_root = False):
  """Deletes the contents of a dir, and (optionally) the dir itself"""

  if not os.path.isdir(path):
    print(f"Invalid path: {path}")
    return

  if len(os.listdir(path)):
    for root, dirs, files in os.walk(path, topdown=False):
      for file in files:
        os.remove(os.path.join(root, file))
      for dir in dirs:
        os.rmdir(os.path.join(root, dir))
  else:
    print(f"{path} is already empty!")

  if inc_root:
    os.rmdir(path)
    print(f"Removed {path} and contents")
  else:
    print(f"Removed contents of {path}")

In [None]:
print("WARNING: This will remove all SiameseNet weights, Word2Vec saved models and downloaded nltk_data!")
if input("Are you sure? ").lower().strip() == "yes":
  # nltk downloads, if they're in the working directory
  deep_clean("nltk_data", True)

  # checkpoint information
  deep_clean("model_weights", False)

  # saved Word2Vec model, if it exists
  try:
    os.remove("Word2Vec.model")
    print("Removed Word2Vec.model")
  except FileNotFoundError:
    print("Invalid file: Word2Vec.model")

else:
  print("Abort")

# Production Use

### Saving/Loading Models

In [None]:
# Save Gensim Word2Vec model
DEFAULT_W2V_SAVEFILE = "Word2Vec.model"
word2vec_model.save(DEFAULT_W2V_SAVEFILE)

In [None]:
# Load Word2Vec Model
W2V_SAVEFILE = "Word2Vec.model"
loadW2v = lambda path : gensim.models.Word2Vec.load(path)

In [None]:
# Function to load a new SiameseNet model from existing weights
DEFAULT_CHECKPOINTS_PATH = "model_weights/cp.ckpt"
DEFAULT_EMBEDDING_DIM = (323,)   # Based on value from training data, but not stored with the weights

def loadSiameseNet(checkpoint_path, embedding_dim = DEFAULT_EMBEDDING_DIM):
  # Redefine the model
  base_network = create_base_network(embedding_dim)
  clf_network = create_clf_network(base_network.output_shape[1]*2)

  siamese_model = SiameseNet(base_network, clf_network)

  # Assembled the model
  siamese_model.compile(optimizer='adam',
                      loss=customer_loss)

  # Locate the checkpoint file, and ensure it is the right filetype
  if os.path.basename(checkpoint_path) == "cp.ckpt":
    checkpoint_dir = os.path.dirname(checkpoint_path)
  else:
    raise ValueError(f"Model Checkpoint '{checkpoint_path}' does not exist!")

  latest = tf.train.latest_checkpoint(checkpoint_dir)
  siamese_model.load_weights(latest)

  # Compile the clf network
  clf_network.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', tf.keras.metrics.AUC()])

  return siamese_model

In [None]:
def save_profile(name, profile_dir = "models", weights_dir = "model_weights", w2v_save = "Word2Vec.model"):
  # Ensure destination exists
  if not os.path.isdir(profile_dir):
    try:
      os.mkdir(profile_dir)
    except FileExistsError:
      os.remove(profile_dir)
      os.mkdir(profile_dir)
    print(f"Created {profile_dir}")

  save_dir = os.path.join(profile_dir, name)
  try:
    os.mkdir(save_dir)
    print(f"Created {save_dir}")
  except FileExistsError:
    if not os.path.isdir(save_dir):
      os.remove(save_dir)
      os.mkdir(save_dir)
    else:
      deep_clean(save_dir, False)

  # Store weights
  weights_save = os.path.join(save_dir, "weights")
  os.mkdir(weights_save)
  for file in os.scandir(weights_dir):
    if not os.path.isfile(file): continue

    nfile = os.path.join(weights_save, os.path.basename(file))
    os.rename(file, os.path.join(weights_save, os.path.basename(file)))
    print(f"{file.name}->{nfile}")

  # Store Word2Vec
  os.rename(w2v_save, os.path.join(save_dir, os.path.basename(w2v_save)))
  print(f"{w2v_save}->{os.path.join(save_dir, os.path.basename(w2v_save))}")


In [None]:
# Save a profile with a custom name
save_profile(input("profile name? ").lower().strip())

### Proof-of-concept use functions
Functions that run the model on a single profile at a time

In [None]:
def vectorize_set(known,unknown,w2v_model):
    """Converts a single set of texts (instead of the entire corpus) to vectors"""
    vectors = {
        'known': get_vectors(known, w2v_model),
        'unknown': get_vectors(unknown, w2v_model)
        # 'label' not included since it's not relevant outside of training
	  }
    return vectors

def concatenate_vector_set(vectors, base_network):
    known_feature_vectors = base_network(np.array(vectors['known']))
    unknown_feature_vectors = base_network(np.array(vectors['unknown']))

    author_representation = np.mean(known_feature_vectors, axis=0)
    unknown_representation = np.mean(unknown_feature_vectors, axis=0)

    concate_vec = np.concatenate((author_representation, unknown_representation), axis=None)
    return concate_vec

def predict_once(path, w2v_model, base_network, clf_network) -> float:
    files = []
    for f in os.listdir(path):
        files.append(f"{path}/{f}")

    known_data, unknown_data = extract_text_from_files(files)

    vectors = vectorize_set(known_data, unknown_data, w2v_model)
    concats = concatenate_vector_set(vectors, base_network)

    prediction : tf.Tensor = clf_network(np.array([concats]))

    # Convert the Tensor to a numpy array and flatten, as output shape will be (1,1)
    return prediction.numpy()[0][0]

In [None]:
word2vec_model = loadW2v(W2V_SAVEFILE)
siamese_model : SiameseNet = loadSiameseNet(DEFAULT_CHECKPOINTS_PATH)
clf_network = siamese_model.clf
base_network = siamese_model.base

In [None]:
# Single point test of values from freshly generated model vs. loaded one
def load_and_test(path):
  # Using the base_network and clf_network variables from the generation
  print(predict_once(path, word2vec_model, base_network, clf_network))

  # Pulling those same variables from the siamese_model object (still from the freshly generated model)
  print(predict_once(path, word2vec_model, siamese_model.base, siamese_model.clf))

  # Load both models
  w2v_loaded = loadW2v(W2V_SAVEFILE)
  siamese_loaded = loadSiameseNet(DEFAULT_CHECKPOINTS_PATH)
  print(predict_once(path, w2v_loaded, siamese_loaded.base, siamese_model.clf))

load_and_test("data/test_data/EE002")
# All values should be identical

### Self-Contained Classes
Handles both loading and running the models, with all required functions

In [None]:
### Extra utility functions for use with classes ###
def strip_text(data):
	if type(data) is str: data = data.splitlines()

	text = []
	for line in data:
		if type(line) is not str: line = str(line)
		cleaned = line.strip().lstrip("\ufeff")
		text.append(cleaned)

	return text

def setupNltk(path = f"{os.curdir}/nltk_data") -> None:
	"""Set up the NLTK package path and downloads datapacks (if required)"""
	nltk.data.path = [ path ]
	nltk.download(["punkt", "stopwords","wordnet"], download_dir=nltk.data.path[0])

In [None]:
class StyloNet:
	"""Contains the whole stylometry model, functions score, score_multi, predict and predict_multi
		can be called to run the stylometry model on a text.
		
		Input format for predictions is the following:
			texts = {
				'known': list[str] (known text(s))
				'unknown': list[str] (unknown text(s))
			}
		All lists for texts can be multi-dimensional, as long as it only ends in strings
	"""

	def __init__(self, working_dir:str = os.curdir, valid_threshold:float = 0.5,
					model_checkpoints:str = "model_weights", w2v_save:str = "Word2Vec.model", nltk_path:str = "nltk_data"):
		
		# Most args can be left at their default values, as long as working_dir is correct the model should work.
		if not os.path.isdir(working_dir):
			raise OSError(f"Working dir for StyloNet does not exist: {working_dir}")
		
		setupNltk(os.path.join(working_dir, nltk_path))

		# Load Word2Vec model
		self.word2vec = loadW2v(os.path.join(working_dir, w2v_save))
		
		# Redefine and load the SiameseNet model from checkpoints
		self.siamese_model = buildSiameseNet(os.path.join(working_dir, model_checkpoints))
		self.base_network, self.clf_network = self.siamese_model.base, self.siamese_model.clf

		# Save validiy threshold for making predictions
		self.threshold = valid_threshold

	def _vectorize(self,text : dict):
		vectors = {
			'known': get_vectors(text['known'],self.word2vec),
			'unknown': get_vectors(text['unknown'],self.word2vec)
		}
		return vectors

	def _vectorize_multi(self, texts : list[dict]):
		vectors = []
		for text in texts:
			vectors.append(self._vectorize(text))
		return vectors

	def _concatenate(self, vectors : dict):
		known_feature_vectors = self.base_network(np.array(vectors['known']))
		unknown_feature_vectors = self.base_network(np.array(vectors['unknown']))

		author_representation = np.mean(known_feature_vectors, axis=0)
		unknown_representation = np.mean(unknown_feature_vectors, axis=0)

		concat_vec = np.concatenate((author_representation, unknown_representation), axis=None)
		return concat_vec

	def _concatenate_multi(self, vectors : list[dict]):
		concats = []
		for vec in vectors:
			concats.append(self._concatenate(vec))
		return np.array(concats)

	### Interface Functions ###
	def score(self, texts : dict) -> float:
		"""Run the model and return the similarity score as a decimal"""
		if len(texts['unknown']) == 0: return 0  # Incase of empty unknown set

		vectors = self._vectorize(texts['known'], texts['unknown'])
		concats = self._concatenate(vectors)
		
		prediction : tf.Tensor = self.clf_network(np.array([concats]))
		if prediction.shape != (1,1): raise ValueError("Result incorrect shape!")
		
		# Convert to numpy array and flatten, as output shape will be (1,1)
		return prediction.numpy()[0][0]

	def score_batch(self, texts: list|dict) -> list[float]|dict:
		"""Calculate score over a list or dictionary of texts and return a list/dict of the results"""
		texts = texts.values() if type(texts) is dict else texts

		vectors = self._vectorize_multi(texts)
		concats = self._concatenate_multi(vectors)

		predicts = self.clf_network.predict(concats, verbose=0)
		
		if type(texts) is dict:
			results = {}
			for i, key in enumerate(texts.keys()):
				# Unpack np.array and match the predictions up to the keys of the original dictionary
				results[key] = predicts[i][0]
		else:
			results = []
			for val in predicts:
				# Unpack the nested np.array to convert to a basic array of results
				results.append(val[0])
		
		return results

	def predict(self, texts: dict) -> bool:
		"""Calculate score and return a prediction based on the predetermined threshold, returns a boolean result"""
		return self.score(texts) >= self.threshold

	def predict_batch(self, texts: list|dict) -> list[bool]|dict:
		"""Run predict over a list/dict of texts and return the result with a boolean result"""
		scores = self.score_batch(texts)
		pred = lambda s : s >= self.threshold

		if type(texts) is dict:
			results = {}
			for key, val in scores.items():
				results[key] = pred(val)
		else:
			results = []
			for val in scores:
				results.append(pred(val))
		
		return results

In [None]:
class StyloInst:
    """A single instance of the StyloNet class, intended to be used to store profile variables
        but currently unnecessary"""
    def __init__(self, parent : StyloNet):
        self.parent = parent
        self.known = []
        self.unknown = []
        self.lastResult = None
	
    def addKnown(self, data):
        self.known.append(strip_text(data))
    
    def addUnknown(self, data):
        self.unknown.append(strip_text(data))
    
    def predict(self):
        data = { 'known' : self.known, 'unknown' : self.unknown }
        result = self.parent.predict(data)
        self.lastResult = result
        return result

### Testing and Verification
Verifies the output of the single-case functions to ensure they match the output of the original code

In [None]:
### Test the class defined above to ensure it generates the same output on all datasets ###
def verifyResults(loaded, generated):
    for i in range(len(loaded)):
        if loaded[i] != generated[i][0]:
            return False
    return True

# Run predictions over all 3 precompiled data-sets using the generated model
generated_test_results = clf_network.predict(test_siamese_vec)
generated_train_results = clf_network.predict(train_siamese_vec)
generated_val_results = clf_network.predict(val_siamese_vec)

# Run predictions over the same dataset using the loaded model
loaded_model = StyloNet()
loaded_test_results = loaded_model.predict_batch(test_corpus)
loaded_train_results = loaded_model.predict_batch(train_corpus)
loaded_val_results = loaded_model.predict_batch(val_corpus)

# Verify results
print(f"Test Results: {verifyResults(loaded_test_results, generated_test_results)}")
print(f"Train Results: {verifyResults(loaded_train_results, generated_train_results)}")
print(f"Val Results: {verifyResults(loaded_val_results, generated_val_results)}")

In [21]:
### Load StyloNet from library ###
from importlib import reload

try:
    reload(Stylometry)
except NameError:
	import Stylometry

loaded_model = Stylometry.StyloNet()

[nltk_data] Downloading package punkt to ./nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to ./nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to ./nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
