# Intro

This notebook demonstrates how to extract, transform, and load the TUABtxt dataset for use with Tensorflow.

First, let's install and import some useful libraries.

In [None]:
# Be sure you're using the stable versions of both tf and tf-text, for binary compatibility.
!pip install -q -U tensorflow==2.7
!pip install -q -U tensorflow-text==2.7
!pip install tensorflow-text
!pip install -U tensorflow-text==2.9.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m489.7/489.7 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m463.1/463.1 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m35.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.6/42.6 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.9/4.9 MB[0m [31m31.8 MB/s[0m eta [36m0:00:00[0m
[?25hLooking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-text==2.9.0
  Downloading tensorflow_text-2.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m 

In [None]:
import collections
import pathlib
import re
import string
import numpy as np
import matplotlib.pyplot as plt


import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import preprocessing
from tensorflow.keras import utils
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization

import tensorflow_datasets as tfds
import tensorflow_text as tf_text

ImportError: ignored

# Download and explore the dataset

First we'll use a handy tool called `gdown` to download the dataset (just the text reports) from where your team have stored them on Google Drive.

In [None]:
!gdown --id 120d8bHo6NxEsUDprDqiuqj1OTfmhgJzR

The dataset is compressed inside the archive file TUABtxt.tar, so let's extract it (like unzipping a zip file).

In [None]:
import tarfile
tar = tarfile.open("TUAB_txt_relabelled.tar")
tar.extractall()
tar.close()

Now we've extracted a folder called TUAB_txt_relabelled. Let's use pathlib library to explore this directory.

In [None]:
dataset_dir = pathlib.Path('TUAB_txt_relabelled/v2.0.0/edf') # First create a Path variable.
list(dataset_dir.iterdir())           # Then print a list of the folders contained in TUABtxt

We see above that the dataset has already been split into 'train' and 'eval' subsets. This is common practice in widely used machine learning research datasets, to ensure that everyone uses the same test (eval) set when comparing performance.

### Load the full dataset

Next, we will load the data off disk and prepare it into a format suitable for training. The [text_dataset_from_directory](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/text_dataset_from_directory) utility makes this easy, and creates a `tf.data.Dataset` object with labels ('normal' and 'abnormal') automatically recognised from the folder structure. ([tf.data](https://www.tensorflow.org/guide/data) is a collection of tools for building input pipelines for machine learning).

In [None]:
train_and_val_ds = preprocessing.text_dataset_from_directory(dataset_dir/'train', batch_size=32)
raw_test_ds = preprocessing.text_dataset_from_directory(dataset_dir/'eval', batch_size=32)

When running a machine learning experiment, it is a best practice to divide your dataset into three splits: [train](https://developers.google.com/machine-learning/glossary#training_set), [validation](https://developers.google.com/machine-learning/glossary#validation_set), and [test](https://developers.google.com/machine-learning/glossary#test-set). There are no strict rules, but usually it's best to put most of your data in the training (so that there's plenty to learn from. Let's split the training-and-validation data into 80% training and 20% validation.

In [None]:
# Set the size of each subset of data:
n = len(list(train_and_val_ds)) # Number of batches in original 'train' dataset
n_train = int(0.8*n)   # Use about 80% as training data ...
n_val = int(0.2*n)     # and 20% as validation data.
print(n)
print(n_val)
print(n_train)

Now we're ready to actually make the split.

In [None]:
# Split the data into training, validation, and test sets:
raw_train_ds = train_and_val_ds.take(n_train)
raw_val_ds = train_and_val_ds.skip(n_train)
print(raw_train_ds)


Let's print out a few examples, to get more of a feel for the data.

In [None]:
total_n_normal = 0
total_n = 0
for batch in raw_train_ds.as_numpy_iterator():
  # Count the normals in the batch and add it to our tally
  n_normal_in_batch = sum(batch[1])
  total_n_normal = total_n_normal + n_normal_in_batch

  # Count the total number of samples in the batch and add it to our tally
  total_n = total_n + len(batch[1])

print(f"We found {total_n_normal} normals out of {total_n} samples.")

In [None]:
total_n_normal2 = 0
total_n2 = 0
for batch in raw_val_ds.as_numpy_iterator():
  # Count the normals in the batch and add it to our tally
  n_normal_in_batch2 = sum(batch[1])
  total_n_normal2 = total_n_normal2 + n_normal_in_batch2

  # Count the total number of samples in the batch and add it to our tally
  total_n2 = total_n2 + len(batch[1])

print(f"We found {total_n_normal2} normals out of {total_n2} samples.")

In [None]:
for text_batch, label_batch in raw_train_ds.take(1):   # Take a single batch from the dataset.
  for i in range(32):                                  # Iterate through the first 10 examples in that batch.
    print("Report: ", text_batch.numpy()[i])
    print("Label:", label_batch.numpy()[i])

The labels are `0` or `1`. To see which of these correspond to which string label, you can check the `class_names` property on the dataset, as below.


In [None]:
for i, label in enumerate(train_and_val_ds.class_names):
  print("Label", i, "corresponds to", label)



### Prepare the dataset for training

Next, you will standardize, tokenize, and vectorize the data using the `preprocessing.TextVectorization` layer.
* Standardization refers to preprocessing the text, typically to remove punctuation or HTML elements to simplify the dataset.

* Tokenization refers to splitting strings into tokens (for example, splitting a sentence into individual words by splitting on whitespace).

* Vectorization refers to converting tokens into numbers so they can be fed into a neural network.

All of these tasks can be accomplished with this layer. You can learn more about each of these in the [API doc](https://www.tensorflow.org/api_docs/python/tf/keras/layers/experimental/preprocessing/TextVectorization).

* The default standardization converts text to lowercase and removes punctuation.

* The default tokenizer splits on whitespace.

* The default vectorization mode is `int`. This outputs integer indices (one per token). This mode can be used to build models that take word order into account. You can also use other modes, like `binary`, to build bag-of-word models.


Here we will use the `binary` mode to build a bag-of-words model (essentially one-hot encoding of whether each word in the vocabulary appears in the report). Then we will use the `int` mode (integer encoding of each word in the report, with order preserved) with a 1D ConvNet.

In [None]:
VOCAB_SIZE = 10000

binary_vectorize_layer = TextVectorization(
    max_tokens=VOCAB_SIZE,
    output_mode='binary')

For `int` mode, in addition to maximum vocabulary size, you need to set an explicit maximum sequence length, which will cause the layer to pad or truncate sequences to exactly sequence_length values.

In [None]:
MAX_SEQUENCE_LENGTH = 250

int_vectorize_layer = TextVectorization(
    max_tokens=VOCAB_SIZE,
    output_mode='int',
    output_sequence_length=MAX_SEQUENCE_LENGTH)

Next, you will call `adapt` to make the VectorizationLayer adjust itself according to the vocabulary in the dataset.

Note: it's important to only use your training data when calling adapt (using the test set would leak information).

In [None]:
# To avoid some errors caused by non-standard characters, we create a function
# that does some additional 'cleaning' of the text.
import re

def clean_text(text, labels):
  cleaned_version_of_text = tf.strings.unicode_transcode(text, "US ASCII", "UTF-8")
  return cleaned_version_of_text

# Now apply our clean_text function to the full dataset.
train_text = raw_train_ds.map(clean_text)

# Finally, let the vectorize layers adjust themselves to fit the vocabulary of the dataset.
binary_vectorize_layer.adapt(train_text)
int_vectorize_layer.adapt(train_text)

See the result of using these layers to preprocess data:

In [None]:
def binary_vectorize_text(text, label):
  text = tf.expand_dims(text, -1)
  return binary_vectorize_layer(text), label

In [None]:
def int_vectorize_text(text, label):
  text = tf.expand_dims(text, -1)
  return int_vectorize_layer(text), label

In [None]:
# Retrieve a batch (of 32 reports and labels) from the dataset
text_batch, label_batch = next(iter(raw_train_ds))
first_report, first_label = text_batch[0], label_batch[0]
print("Report", first_report)
print("Label", first_label)

In [None]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
import string

def normalize_sentence(sentence):
    # Convert all letters to lowercase
    sentence = sentence.lower()

    # Tokenize the sentence into words
    words = word_tokenize(sentence)

    # Remove stop words and punctuation
    stop_words = set(stopwords.words('english') + list(string.punctuation))
    words = [word for word in words if word not in stop_words]

    # Lemmatize the words
    lemmatizer = WordNetLemmatizer()
    words = [lemmatizer.lemmatize(word) for word in words]

    # Join the words back into a sentence
    sentence = ' '.join(words)

    return sentence

In [None]:
print("'binary' vectorized report:",
      binary_vectorize_text(first_report, first_label)[0])

In [None]:
print("'int' vectorized report:",
      int_vectorize_text(first_report, first_label)[0])

convolutional neurol network


```
# This is formatted as code
```



As you can see above, `binary` mode returns an array denoting which tokens exist at least once in the input, while `int` mode replaces each token by an integer, thus preserving their order. You can lookup the token (string) that each integer corresponds to by calling `.get_vocabulary()` on the layer.

In [None]:
print("12 ---> ", int_vectorize_layer.get_vocabulary()[12])
print("18 ---> ", int_vectorize_layer.get_vocabulary()[18])
print("Vocabulary size: {}".format(len(int_vectorize_layer.get_vocabulary())))

You are nearly ready to train your model. As a final preprocessing step, you will apply the `TextVectorization` layers you created earlier to the train, validation, and test dataset.

In [None]:
binary_train_ds = raw_train_ds.map(binary_vectorize_text)
binary_val_ds = raw_val_ds.map(binary_vectorize_text)
binary_test_ds = raw_test_ds.map(binary_vectorize_text)

int_train_ds = raw_train_ds.map(int_vectorize_text)
int_val_ds = raw_val_ds.map(int_vectorize_text)
int_test_ds = raw_test_ds.map(int_vectorize_text)

CONFIGURE DATASET FOR PERFORMANCE

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

def configure_dataset(dataset):
  return dataset.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
binary_train_ds = configure_dataset(binary_train_ds)
binary_val_ds = configure_dataset(binary_val_ds)
binary_test_ds = configure_dataset(binary_test_ds)

int_train_ds = configure_dataset(int_train_ds)
int_val_ds = configure_dataset(int_val_ds)
int_test_ds = configure_dataset(int_test_ds)

TRAIN MODEL

In [None]:
"""binary_model = tf.keras.Sequential([
    layers.Dense(16, activation='relu'),
    layers.Dense(4, activation='relu')
])"""
binary_model = tf.keras.Sequential([
    layers.Dense(16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
    layers.Dense(4, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))
])

binary_model.compile(
    loss=losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer='adam',
    metrics=['accuracy'])

history = binary_model.fit(
    binary_train_ds, validation_data=binary_val_ds, epochs=30)

In [None]:
# save the model to disk FOR LATER USE
filename = 'modelCNN.h5'
binary_model.save(filename)

In [None]:
plt.rcParams["figure.figsize"] = (5,7)
plt.plot(history.history['loss'], label='Train_loss')
plt.plot(history.history['val_loss'], label='Val_loss')
plt.ylim([0, 3])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()
#plt.grid(True)

    #plot_loss(history2)

Next, you will use the `'int'` vectorized layer to build a 1D ConvNet:

In [None]:
def create_model(vocab_size, num_labels):
  model = tf.keras.Sequential([
      layers.Embedding(vocab_size, 64, mask_zero=True),
      layers.Conv1D(64, 4, padding="valid", activation="relu", strides=2),
      layers.Dropout(0.5),
      layers.GlobalMaxPooling1D(),
      layers.Dense(num_labels)
  ])
  return model

In [None]:
# `vocab_size` is `VOCAB_SIZE + 1` since `0` is used additionally for padding.
int_model = create_model(vocab_size=VOCAB_SIZE + 1, num_labels=9)
int_model.compile(
    loss=losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer='adam',
    metrics=['accuracy'])
history = int_model.fit(int_train_ds, validation_data=int_val_ds, epochs=20)

In [None]:
# save the model to disk FOR LATER USE
filename = 'modelCNN.h5'
int_model.save(filename)

In [None]:
plt.rcParams["figure.figsize"] = (5,7)
plt.plot(history.history['loss'], label='Train_loss')
plt.plot(history.history['val_loss'], label='Val_loss')
plt.ylim([0, 3])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()
#plt.grid(True)

    #plot_loss(history2)

Compare the two models:

In [None]:
print("Linear model on binary vectorized data:")
print(binary_model.summary())

In [None]:
print("ConvNet model on int vectorized data:")
print(int_model.summary())

Evaluate both models on the test data:

In [None]:
binary_loss, binary_accuracy = binary_model.evaluate(binary_test_ds)
int_loss, int_accuracy = int_model.evaluate(int_test_ds)

print("Binary model accuracy: {:2.2%}".format(binary_accuracy))
print("Int model accuracy: {:2.2%}".format(int_accuracy))

### Export the model

In the code above, you applied `tf.keras.layers.TextVectorization` to the dataset before feeding text to the model. If you want to make your model capable of processing raw strings (for example, to simplify deploying it), you can include the `TextVectorization` layer inside your model.

To do so, you can create a new model using the weights you have just trained:

In [None]:
export_model = tf.keras.Sequential(
    [binary_vectorize_layer, binary_model,
     layers.Activation('sigmoid')])

export_model.compile(
    loss=losses.SparseCategoricalCrossentropy(from_logits=False),
    optimizer='adam',
    metrics=['accuracy'])

# Test it with `raw_test_ds`, which yields raw strings
loss, accuracy = export_model.evaluate(raw_test_ds)
print("Accuracy: {:2.2%}".format(binary_accuracy))

# Rule-Based (non-ML) Approach

Looking through the reports, it seems as though it's usually stated quite clearly when the EEG is abnormal. Rather than attempting any machine learning, why don't we just look for that key word (or related words/phrases) in the text? This approach is implemented below.

In [None]:
# First initialise some counters
n = 0
n_correct = 0
n_failed_decode = 0

# Iterate over all batches, taking the text and labels batch-by-batch.
# N.B. take(-1) has the effect of pulling out all the batches, instead of a specific number, as explained in the docs here: https://www.tensorflow.org/api_docs/python/tf/data/Dataset#take
for text_batch, label_batch in train_and_val_ds.take(-1):

  # Iterate over the report examples in the batch:
  for ind,text in enumerate(text_batch):

    # Get rid of any pesky non-standard characters using the function we created previously.
    cleaned_text = clean_text(text,0)
    # Then convert it from a tensorflow Tensor to a python string so that we can
    # use some standard python text analysis on it.
    cleaned_and_decoded_text = cleaned_text.numpy().decode("UTF-8")

    # Check if the word 'abnormal' is in the report, and label it accordingly.
    #if 'abnormal ' in cleaned_and_decoded_text.lower():
      #predicted_label = 0
    #else:
     # predicted_label = 1
    if re.search(r'abnormal(?:(?!None).)*$', cleaned_and_decoded_text.lower()):
      predicted_label = 0
    else:
      predicted_label = 1



    # If we predicted correctly, add one to our count of correct predictions.
    if predicted_label==label_batch[ind]:
      n_correct = n_correct+1
    else:
      # Uncomment the lines below if you want to inspect the cases where we were wrong.
      print("\n\n\n\n\n\n--- Wrong example ---")
      # print(text.numpy().decode("UTF-8"))  # Uncomment this line to print the original text.
      print(cleaned_and_decoded_text)
      print()
      print("---------------------")
      print(f"The above example was classified with label {predicted_label} but it's actual label is {label_batch[ind].numpy()}.")
      print("---------------------")
      pass

    # Add one to our count of the total number of examples examined.
    n = n+1

print(f"Accuracy = {100*n_correct/n} percent ({n_correct} correct predictions out of {n}).")

In [None]:
# First initialise some counters
n = 0
n_correct = 0
n_failed_decode = 0

# Iterate over all batches, taking the text and labels batch-by-batch.
# N.B. take(-1) has the effect of pulling out all the batches, instead of a specific number, as explained in the docs here: https://www.tensorflow.org/api_docs/python/tf/data/Dataset#take
for text_batch, label_batch in train_and_val_ds.take(-1):

  # Iterate over the report examples in the batch:
  for ind,text in enumerate(text_batch):

    # Get rid of any pesky non-standard characters using the function we created previously.
    cleaned_text = clean_text(text,0)
    # Then convert it from a tensorflow Tensor to a python string so that we can
    # use some standard python text analysis on it.
    cleaned_and_decoded_text = cleaned_text.numpy().decode("UTF-8")

    # Check if the word 'abnormal' is in the report, and label it accordingly.
    if 'abnormal ' in cleaned_and_decoded_text.lower():
      predicted_label = 0
        if
    else:
      predicted_label = 1


    # If we predicted correctly, add one to our count of correct predictions.
    if predicted_label==label_batch[ind]:
      n_correct = n_correct+1
    else:
      # Uncomment the lines below if you want to inspect the cases where we were wrong.
      print("\n\n\n\n\n\n--- Wrong example ---")
      # print(text.numpy().decode("UTF-8"))  # Uncomment this line to print the original text.
      print(cleaned_and_decoded_text)
      print()
      print("---------------------")
      print(f"The above example was classified with label {predicted_label} but it's actual label is {label_batch[ind].numpy()}.")
      print("---------------------")
      pass

    # Add one to our count of the total number of examples examined.
    n = n+1

print(f"Accuracy = {100*n_correct/n} percent ({n_correct} correct predictions out of {n}).")