In [16]:
# Imports
import datetime
import os
import random
import zipfile
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import time
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, precision_recall_fscore_support # Function to evaluate: accuracy, precision, recall, f1-score
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras import layers, Model
from tensorflow.keras.utils import plot_model
from tensorflow.keras import layers, models, callbacks
from tensorflow.keras.models import clone_model

In [17]:
print(f"Notebook last run: {datetime.datetime.now()}")

Notebook last run: 2024-06-24 13:15:50.495922


In [18]:
def load_and_preprocess_pubmed_data(data_dir):    
    train_samples = preprocess_text_with_line_numbers(os.path.join(data_dir, "train.txt"))
    val_samples = preprocess_text_with_line_numbers(os.path.join(data_dir, "dev.txt"))
    test_samples = preprocess_text_with_line_numbers(os.path.join(data_dir, "test.txt"))

    train_df = pd.DataFrame(train_samples)
    val_df = pd.DataFrame(val_samples)
    test_df = pd.DataFrame(test_samples)

    # Convert abstract text lines into lists 
    train_sentences = train_df["text"].tolist()
    val_sentences = val_df["text"].tolist()
    test_sentences = test_df["text"].tolist()

    return train_df, val_df, test_df, train_sentences, val_sentences, test_sentences

In [19]:
#Function to read the lines of a document
def get_lines(filename):
  with open(filename, "r") as f:
    return f.readlines()

In [20]:
def preprocess_text_with_line_numbers(filename):
    input_lines = get_lines(filename) # get all lines from filename
    abstract_lines = "" # create an empty abstract
    abstract_samples = [] # create an empty list of abstracts

    # Loop through each line in target file
    for line in input_lines:
        if line.startswith("###"): # check to see if line is an ID line
            abstract_id = line
            abstract_lines = "" # reset abstract string
        elif line.isspace(): # check to see if line is a new line
            abstract_line_split = abstract_lines.splitlines() # split abstract into separate lines

            # Iterate through each line in abstract and count them at the same time
            for abstract_line_number, abstract_line in enumerate(abstract_line_split):
                line_data = {} # create empty dict to store data from line
                target_text_split = abstract_line.split("\t") # split target label from text
                line_data["target"] = target_text_split[0] # get target label
                line_data["text"] = target_text_split[1].lower() # get target text and lower it
                line_data["line_number"] = abstract_line_number # what number line does the line appear in the abstract?
                line_data["total_lines"] = len(abstract_line_split) - 1 # how many total lines are in the abstract? (start from 0)
                abstract_samples.append(line_data) # add line data to abstract samples list

        else: # if the above conditions aren't fulfilled, the line contains a labelled sentence
            abstract_lines += line

    return abstract_samples

In [21]:
#Creates a TensorBoard callback instand to store log files.
def create_tensorboard_callback(dir_name, experiment_name):

  log_dir = dir_name + "/" + experiment_name + "/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir
  )
  print(f"Saving TensorBoard log files to: {log_dir}")
  return tensorboard_callback

In [22]:
def calculate_results(y_true, y_pred):
  # Calculate model accuracy
  model_accuracy = accuracy_score(y_true, y_pred) * 100
  # Calculate model precision, recall and f1 score using "weighted average
  model_precision, model_recall, model_f1, _ = precision_recall_fscore_support(y_true, y_pred, average="weighted")
  model_results = {"accuracy": model_accuracy,
                  "precision": model_precision,
                  "recall": model_recall,
                  "f1": model_f1}
  return model_results

In [23]:
#One hot encode labels
def perform_one_hot_encoding(one_hot_encoder, train_df, val_df, test_df, target_column="target"):
    train_labels_one_hot = one_hot_encoder.fit_transform(train_df["target"].to_numpy().reshape(-1, 1))
    val_labels_one_hot = one_hot_encoder.transform(val_df["target"].to_numpy().reshape(-1, 1))
    test_labels_one_hot = one_hot_encoder.transform(test_df["target"].to_numpy().reshape(-1, 1))
    return train_labels_one_hot, val_labels_one_hot, test_labels_one_hot

In [24]:
#Extract labels ("target" columns) and encode them into integers 
def perform_label_encoding(label_encoder, train_df, val_df, test_df, target_column="target"):
    train_labels_encoded = label_encoder.fit_transform(train_df["target"].to_numpy())
    val_labels_encoded = label_encoder.transform(val_df["target"].to_numpy())
    test_labels_encoded =label_encoder.transform(test_df["target"].to_numpy())
    return train_labels_encoded, val_labels_encoded, test_labels_encoded

In [25]:
def split_chars_in_sentences(sentences):
    return [split_chars(sentence) for sentence in sentences]

In [26]:
# Make function to split sentences into characters
def split_chars(text):
  return " ".join(list(text))

In [27]:
def create_tf_datasets(sentences, labels_one_hot, batch_size=32):
    dataset = tf.data.Dataset.from_tensor_slices((sentences, labels_one_hot))
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

In [28]:
def create_char_token_datasets(sentences, chars, labels_one_hot, batch_size=32, prefetch_buffer=tf.data.AUTOTUNE):
    data = tf.data.Dataset.from_tensor_slices((sentences, chars))
    labels = tf.data.Dataset.from_tensor_slices(labels_one_hot)
    dataset = tf.data.Dataset.zip((data, labels))
    dataset = dataset.batch(batch_size).prefetch(prefetch_buffer)
  
    return dataset

In [29]:
def create_position_char_token_datasets(line_numbers_one_hot, total_lines_one_hot, sentences, chars, labels_one_hot, batch_size=32, prefetch_buffer=tf.data.AUTOTUNE):
    data = tf.data.Dataset.from_tensor_slices((line_numbers_one_hot, total_lines_one_hot, sentences, chars))
    labels = tf.data.Dataset.from_tensor_slices(labels_one_hot)
    dataset = tf.data.Dataset.zip((data, labels))
    dataset = dataset.batch(batch_size).prefetch(prefetch_buffer)
  
    return dataset

In [30]:
def create_model():
    # 1. Token inputs
    token_inputs = layers.Input(shape=[], dtype="string", name="token_inputs")
    token_embeddings = use_layer(token_inputs)
    token_outputs = layers.Dense(128, activation="relu")(token_embeddings)
    token_model = tf.keras.Model(inputs=token_inputs,
                                 outputs=token_outputs)
    
    # 2. Char inputs
    char_inputs = layers.Input(shape=(1,), dtype="string", name="char_inputs")
    char_vectors = char_vectorizer(char_inputs)
    char_embeddings = char_embed(char_vectors)
    char_bi_lstm = layers.Bidirectional(layers.LSTM(32))(char_embeddings)
    char_model = tf.keras.Model(inputs=char_inputs,
                                outputs=char_bi_lstm)
    
    # 3. Line numbers inputs
    line_number_inputs = layers.Input(shape=(15,), dtype=tf.int32, name="line_number_input")
    x = layers.Dense(32, activation="relu")(line_number_inputs)
    line_number_model = tf.keras.Model(inputs=line_number_inputs,
                                       outputs=x)
    
    # 4. Total lines inputs
    total_lines_inputs = layers.Input(shape=(20,), dtype=tf.int32, name="total_lines_input")
    y = layers.Dense(32, activation="relu")(total_lines_inputs)
    total_line_model = tf.keras.Model(inputs=total_lines_inputs,
                                      outputs=y)
    
    # 5. Combine token and char embeddings into a hybrid embedding
    combined_embeddings = layers.Concatenate(name="token_char_hybrid_embedding")([token_model.output, 
                                                                                  char_model.output])
    z = layers.Dense(256, activation="relu")(combined_embeddings)
    z = layers.Dropout(0.5)(z)
    
    # 6. Combine positional embeddings with combined token and char embeddings into a tribrid embedding
    z = layers.Concatenate(name="token_char_positional_embedding")([line_number_model.output,
                                                                    total_line_model.output,
                                                                    z])
    
    # 7. Create output layer
    output_layer = layers.Dense(5, activation="softmax", name="output_layer")(z)
    
    # 8. Put together model
    model = tf.keras.Model(inputs=[line_number_model.input,
                                     total_line_model.input,
                                     token_model.input, 
                                     char_model.input],
                                     outputs=output_layer)
        
    return model