# Text classification model based on BERT and LSTM

In [None]:
# Install BERT for tf2 module
!pip install bert-for-tf2
# Install sentencepiece library for text cleaning
!pip install sentencepiece

In [None]:
# Import all necessary libraries
try:
    %tensorflow_version 2.x
except Exception:
    pass

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras import optimizers
from tensorflow.keras import utils
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import shuffle
import bert
import os
import numpy as np
import re
import pandas as pd
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

  import pandas.util.testing as tm


In [None]:
# Import the training and test .csv files
colnames=['image_path', 'text', 'food']
train = pd.read_csv('train_titles.csv', names=colnames, header=None, sep = ',', index_col=['image_path'])
test = pd.read_csv('test_titles.csv', names=colnames, header=None, sep = ',', index_col=['image_path'])

In [None]:
# Sort values by 'image_path'
test = test.sort_values('image_path')
train = train.sort_values('image_path')

In [None]:
train.head()

Unnamed: 0,image_path,text,food
64859,tacos_232.jpg,TILAPIA FISH TACOS (TACOS DE TILAPIA) | My Col...,tacos
8408,cannoli_442.jpg,Cannoli Cream Cake,cannoli
8403,cannoli_348.jpg,The Well-Fed Newlyweds: Strawberries with Cann...,cannoli
48432,pancakes_472.jpg,Cheesy mushroom,pancakes
49235,panna_cotta_49.jpg,White Chocolate Panna Cotta Recipe | MyRecipes...,panna_cotta


In [None]:
# Check the shapes
print("train samples:",train.shape[0])
print("test samples:",test.shape[0])

train samples: 67972
test samples: 22716


In [None]:
# Cleaning text function

def preprocess_text(sen):
    # Removing html tags
    sentence = remove_tags(sen)

    # Remove punctuations and numbers
    sentence = re.sub('[^a-zA-Z]', ' ', sentence)

    # Single character removal
    sentence = re.sub(r"\s+[a-zA-Z]\s+", ' ', sentence)

    # Removing multiple spaces
    sentence = re.sub(r'\s+', ' ', sentence)

    sentence = sentence.lower()

    return sentence

def remove_tags(text):
    return TAG_RE.sub('', text)

TAG_RE = re.compile(r'<[^>]+>')
vec_preprocess_text = np.vectorize(preprocess_text)

In [None]:
# Check number of classes
nClasses = train.food.nunique()

In [None]:
encoder = LabelEncoder()
processed_train = vec_preprocess_text(train.text.values)
processed_test = vec_preprocess_text(test.text.values)


encoded_labels_train = encoder.fit_transform(train.food.values)
labels_train = utils.to_categorical(encoded_labels_train, nClasses)

encoded_labels_test = encoder.fit_transform(test.food.values)
labels_test = utils.to_categorical(encoded_labels_test, nClasses)

print("Processed text sample:", processed_train[0])
print("Shape of train labels:", labels_train.shape)

Processed text sample: tilapia fish tacos tacos de tilapia my colombian recipes
Shape of train labels: (67972, 101)


In [None]:
# Import the BERT BASE model from Tensorflow HUB (layer, vocab_file and tokenizer)
BertTokenizer = bert.bert_tokenization.FullTokenizer
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1",
                            trainable=False)
vocabulary_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()
to_lower_case = bert_layer.resolved_object.do_lower_case.numpy()
tokenizer = BertTokenizer(vocabulary_file, to_lower_case)

In [None]:
# Preprocessing of texts according to BERT

def get_masks(text, max_length):
    """Mask for padding"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    length = len(tokens)
    if length > max_length:
        tokens = tokens[:max_length]

    return np.asarray([1]*len(tokens) + [0] * (max_length - len(tokens)))
vec_get_masks = np.vectorize(get_masks, signature = '(),()->(n)')

def get_segments(text, max_length):
    """Segments: 0 for the first sequence, 1 for the second"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    length = len(tokens)
    if length > max_length:
        tokens = tokens[:max_length]
    
    segments = []
    current_segment_id = 0
    with_tags = ["[CLS]"] + tokens + ["[SEP]"]
    token_ids = tokenizer.convert_tokens_to_ids(tokens)
    
    for token in tokens:
        segments.append(current_segment_id)
        if token == "[SEP]":
            current_segment_id = 1
    return np.asarray(segments + [0] * (max_length - len(tokens)))
vec_get_segments = np.vectorize(get_segments, signature = '(),()->(n)')

def get_ids(text, tokenizer, max_length):
    """Token ids from Tokenizer vocab"""
    tokens = tokenizer.tokenize(text)
    tokens = ["[CLS]"] + tokens + ["[SEP]"]
    length = len(tokens)
    if length > max_length:
        tokens = tokens[:max_length]

    token_ids = tokenizer.convert_tokens_to_ids(tokens)
    input_ids = np.asarray(token_ids + [0] * (max_length-length))
    return input_ids
vec_get_ids = np.vectorize(get_ids, signature = '(),(),()->(n)')


def prepare(text_array, tokenizer, max_length = 128):
    
    ids = vec_get_ids(text_array, 
                      tokenizer, 
                      max_length).squeeze()
    masks = vec_get_masks(text_array,
                      max_length).squeeze()
    segments = vec_get_segments(text_array,
                      max_length).squeeze()

    return ids, segments, masks

In [None]:
max_length = 40 # that must be set according to your dataset
ids_train, segments_train, masks_train = prepare(processed_train,
                                                 tokenizer,
                                                 max_length)
ids_test, segments_test, masks_test = prepare(processed_test, 
                                               tokenizer,
                                               max_length)

In [None]:
input_word_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                       name="input_word_ids")
input_mask = layers.Input(shape=(max_length,), dtype=tf.int32,
                                   name="input_masks")
segment_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                    name="segment_ids")
den_out, seq_out = bert_layer([input_word_ids, input_mask, segment_ids])

In [None]:
# Classification Model
input_word_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                       name="input_word_ids")
input_mask = layers.Input(shape=(max_length,), dtype=tf.int32,
                                   name="input_mask")
segment_ids = layers.Input(shape=(max_length,), dtype=tf.int32,
                                    name="segment_ids")
den_out, seq_out = bert_layer([input_word_ids, input_mask, segment_ids])

X = layers.LSTM(128)(seq_out)
X = layers.Dropout(0.5)(X)
X = layers.Dense(256, activation="relu")(X)
X = layers.Dropout(0.5)(X)
output = layers.Dense(nClasses, activation = 'softmax')(X)

model = tf.keras.models.Model(inputs=[input_word_ids, input_mask, segment_ids], outputs=[output])

In [None]:
# Adam optimizer
opt = optimizers.Adam(lr=.001)

# Compile model
model.compile(loss = 'categorical_crossentropy',
              optimizer = opt,
              metrics = ['accuracy'])

In [None]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_word_ids (InputLayer)     [(None, 40)]         0                                            
__________________________________________________________________________________________________
input_mask (InputLayer)         [(None, 40)]         0                                            
__________________________________________________________________________________________________
segment_ids (InputLayer)        [(None, 40)]         0                                            
__________________________________________________________________________________________________
keras_layer (KerasLayer)        [(None, 768), (None, 109482241   input_word_ids[0][0]             
                                                                 input_mask[0][0]             

In [None]:
es = callbacks.EarlyStopping(monitor='val_accuracy', patience=2, restore_best_weights=True)

In [None]:
# Setup callbacks, logs and early stopping condition
checkpoint_path = "BERT_LSTM/weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5"
cp = callbacks.ModelCheckpoint(checkpoint_path, monitor='val_accuracy',save_best_only=True,verbose=1, mode='max')
csv_logger = callbacks.CSVLogger('BERT_LSTM/BERT_LSTM.log')
es = callbacks.EarlyStopping(patience = 3, restore_best_weights=True)

In [None]:
# Reduce learning rate if no improvement is observed
reduce_lr = callbacks.ReduceLROnPlateau(
    monitor='val_accuracy', factor=0.1, patience=1, min_lr=0.00001)

In [None]:
history = model.fit([ids_train, masks_train, segments_train], 
          labels_train,
          epochs = 16,
          batch_size = 512,
          validation_split = 0.3,
          callbacks = [csv_logger, reduce_lr])

Epoch 1/16
Epoch 2/16
Epoch 3/16

KeyboardInterrupt: ignored

In [None]:
# Load the log file
df = pd.read_csv('BERT_LSTM/BERT_LSTM.log')

In [None]:
# Training and Test accuracy
fig = go.Figure()
fig.add_trace(go.Scatter(x=df['epoch'], y=df['accuracy'],
                    mode='lines',
                    name='training'))

fig.add_trace(go.Scatter(x=df['epoch'], y=df['val_accuracy'],
                    mode='lines',
                    name='test'))

fig.update_layout(
    font_size = 20,
    paper_bgcolor='rgba(0,0,0,0)',
    plot_bgcolor='rgba(0,0,0,0)',
)

fig.update_xaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')
fig.update_yaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')

In [None]:
# Training and Test loss
fig = go.Figure()
fig.add_trace(go.Scatter(x=df['epoch'], y=df['loss'],
                    mode='lines',
                    name='training'))

fig.add_trace(go.Scatter(x=df['epoch'], y=df['val_loss'],
                    mode='lines',
                    name='test'))

fig.update_layout(
    font_size = 20,
    paper_bgcolor='rgba(0,0,0,0)',
    plot_bgcolor='rgba(0,0,0,0)',
)

fig.update_xaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')
fig.update_yaxes(showgrid=True, gridwidth=0.5, gridcolor='Gray')

In [None]:
model.evaluate([ids_test, masks_test, segments_test],
               labels_test, 
               batch_size = 512)



[0.9757596850395203, 0.8465398550033569]