<a href="https://colab.research.google.com/github/George-E-B/ShakespeareClassifier/blob/main/BERTShakespeare.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q -U "tensorflow-text==2.8.*"

[K     |████████████████████████████████| 4.9 MB 11.9 MB/s 
[K     |████████████████████████████████| 498.0 MB 11 kB/s 
[K     |████████████████████████████████| 5.8 MB 66.3 MB/s 
[K     |████████████████████████████████| 462 kB 65.7 MB/s 
[K     |████████████████████████████████| 1.4 MB 66.1 MB/s 
[?25h

In [2]:
!python -m pip install -q tf-models-official==2.7.0

[K     |████████████████████████████████| 1.8 MB 14.6 MB/s 
[K     |████████████████████████████████| 238 kB 58.7 MB/s 
[K     |████████████████████████████████| 43 kB 2.1 MB/s 
[K     |████████████████████████████████| 1.3 MB 74.7 MB/s 
[K     |████████████████████████████████| 352 kB 62.9 MB/s 
[K     |████████████████████████████████| 118 kB 82.1 MB/s 
[K     |████████████████████████████████| 1.1 MB 54.5 MB/s 
[?25h  Building wheel for seqeval (setup.py) ... [?25l[?25hdone


In [3]:
import os
import shutil

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
from official.nlp import optimization

import matplotlib.pyplot as plt

tf.get_logger().setLevel('ERROR')

In [4]:
dataset_dir = os.path.join(os.path.dirname('.'), './data')
print(dataset_dir)

train_dir = os.path.join(dataset_dir, 'train')
test_dir = os.path.join(dataset_dir, 'test')
print("Train directory:", train_dir,"\nTest directory:", test_dir)

./data
Train directory: ./data/train 
Test directory: ./data/test


In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
!pwd
!mkdir data
!ls
!mkdir data/train
!mkdir data/test
!mkdir data/train/pos
!mkdir data/train/neg
!mkdir data/test/pos
!mkdir data/test/neg
!mv formatParagraphs.txt data
!mv adversFormatParagraphs.txt data
!ls

In [None]:
!ls data
!ls data/test
!ls data/test/neg

In [None]:
#read in adversarial and regular data and write them one sentence at a time into separate files in folders
import ast
SENTENCES_PER_FILE = 20

def formatData(file, typ, trest):
    global SENTENCES_PER_FILE
    fString = file.read()
    lst = ast.literal_eval(fString)
    fNum = 0
    if trest == "train":
        sNum = 0
        lNum = len(lst)//2
    elif trest == "test":
        sNum = len(lst)//2 + 1
        lNum = len(lst)
    for i in range(sNum, lNum):
        if typ == "pos":
            mult = 18
        else:
            mult = 1
        if i % int(SENTENCES_PER_FILE * mult) == 0:
            fNum += 1
            delFile = open(f"./data/{trest}/{typ}/{fNum}.txt", "w") ##clear file data from potential previous run
            delFile.close()
        with open(f"./data/{trest}/{typ}/{fNum}.txt", "a") as f:
            f.write(lst[i] + "\n")
    return fNum

with open("./data/adversFormatParagraphs.txt", "r") as f:
    print("neg, train:", formatData(f, "neg", "train"))

with open("./data/adversFormatParagraphs.txt", "r") as f:
    print("neg, test:", formatData(f, "neg", "test"))

with open("./data/formatParagraphs.txt", "r") as f:
    print("pos, train:", formatData(f, "pos", "train"))
    
with open("./data/formatParagraphs.txt", "r") as f:
    print("pos, test:", formatData(f, "pos", "test"))

In [None]:
AUTOTUNE = tf.data.AUTOTUNE
batch_size = 32
seed = 42

!pwd
raw_train_ds = tf.keras.utils.text_dataset_from_directory(
    train_dir,
    batch_size=batch_size,
    validation_split=0.5, ##50:50 split on training:validation data
    subset='training',
    seed=seed)

class_names = raw_train_ds.class_names
train_ds = raw_train_ds.cache().prefetch(buffer_size=AUTOTUNE)

val_ds = tf.keras.utils.text_dataset_from_directory(
    train_dir,
    batch_size=batch_size,
    validation_split=0.5,
    subset='validation',
    seed=seed)

val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

test_ds = tf.keras.utils.text_dataset_from_directory(
    test_dir,
    batch_size=batch_size)

test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
for text_batch, label_batch in train_ds.take(1):
  for i in range(3):
    print(f'Text  : {text_batch.numpy()[i]}')
    label = label_batch.numpy()[i]
    print(f'Label : {label} ({class_names[label]})')

# Choosing BERT model:

In [2]:
bertModelName = 'small_bert/bert_en_uncased_L-4_H-512_A-8'

tfhubHandleEncoder = "https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-4_H-512_A-8/1"
tfhubHandlePreprocess = "https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3"

print(f'BERT model       : {tfhubHandleEncoder}')
print(f'Preprocess model : {tfhubHandlePreprocess}')

BERT model       : https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-4_H-512_A-8/1
Preprocess model : https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3


# Defining the fine-tuning model

In [None]:
def build_classifier_model():
  text_input = tf.keras.layers.Input(shape=(), dtype=tf.string, name='text')
  preprocessing_layer = hub.KerasLayer(tfhubHandlePreprocess, name='preprocessing')
  encoder_inputs = preprocessing_layer(text_input)
  encoder = hub.KerasLayer(tfhubHandleEncoder, trainable=True, name='BERT_encoder')
  outputs = encoder(encoder_inputs)
  net = outputs['pooled_output']
  net = tf.keras.layers.Dropout(0.9)(net)
  net = tf.keras.layers.Dense(1, activation=None, name='classifier', kernel_regularizer=tf.keras.regularizers.L1(0.01), activity_regularizer=tf.keras.regularizers.L2(0.01))(net)
  return tf.keras.Model(text_input, net)

In [None]:
classifierModel = build_classifier_model()
bert_raw_result = classifierModel(tf.constant(text_test))
print(tf.sigmoid(bert_raw_result))

Model runs! But it has not been trained yet so the output is meaningless.

In [None]:
tf.keras.utils.plot_model(classifierModel)

In [None]:
classifierModel.summary()

# Model Training

In [None]:
loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)
metrics = tf.metrics.BinaryAccuracy()

In [None]:
epochs = 10
stepsPerEpoch = tf.data.experimental.cardinality(train_ds).numpy()
numTrainSteps = stepsPerEpoch * epochs
numWarmupSteps = int(0.1*numTrainSteps)

initLr = 3e-6
optimizer = optimization.create_optimizer(init_lr=initLr,
                                          num_train_steps=numTrainSteps,
                                          num_warmup_steps=numWarmupSteps,
                                          optimizer_type='adamw')

In [None]:
deviceName = tf.test.gpu_device_name()
if len(deviceName) > 0:
    print("Found GPU at: {}".format(deviceName))
else:
    deviceName = "/device:CPU:0"
    print("No GPU, using {}.".format(deviceName))

# Loading the BERT Model and Training

In [None]:
with (tf.device(deviceName)):
  classifierModel.compile(optimizer=optimizer,
                          loss=loss,
                          metrics=metrics) ##load model

In [None]:
print(f'Training model with {tfhubHandleEncoder}')
history = classifierModel.fit(x=train_ds,
                              validation_data=val_ds,
                              epochs=epochs)

# Evaluate the Model

In [None]:
loss, accuracy = classifierModel.evaluate(test_ds)

print(f'Loss: {loss}')
print(f'Accuracy: {accuracy}')

# Plot Accuracy and Loss Over Time

In [None]:
historyDict = history.history
print(historyDict.keys())

acc = historyDict['binary_accuracy']
val_acc = historyDict['val_binary_accuracy']
loss = historyDict['loss']
val_loss = historyDict['val_loss']

epochs = range(1, len(acc) + 1)
fig = plt.figure(figsize=(10, 6))
fig.tight_layout()

plt.subplot(2, 1, 1)
# r is for "solid red line"
plt.plot(epochs, loss, 'r', label='Training loss')
# b is for "solid blue line"
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
# plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(2, 1, 2)
plt.plot(epochs, acc, 'r', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')

# Export and Test on Arbitrary Sentence

In [None]:
dsName = 'shakespeare'
savedModelPath = './{}_bert'.format(dsName.replace('/', '_'))

classifierModel.save(savedModelPath, include_optimizer=False)

In [None]:
reloadedModel = tf.saved_model.load(savedModelPath)

In [None]:
def printExamples(inputs, results):
  resultForPrinting = \
    [f'input: {inputs[i]:<65} : score: {results[i][0]:.6f}'
                         for i in range(len(inputs))]
  print(*resultForPrinting, sep='\n')
  print()


examples = [
    'where is this sentence going idk',
    'Oh Romeo, Romeo, wherefore art thou Romeo?',
    'asdsa  ahsdbb13614b',
    'Is that the meaning of \'accost\'?',
    'this is a completely normal sentence which is not shakespeare'
]

original_results = tf.sigmoid(classifierModel(tf.constant(examples)))

print('Results from the model in memory:')
printExamples(examples, original_results)