In [None]:
import numpy as np
import csv
import random
import re
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Conv1D, MaxPooling1D, GlobalMaxPooling1D, Dropout, Flatten, Dense, Activation
from tensorflow.keras.layers import Embedding
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
from matplotlib import pyplot

In [None]:
#loading the dataset
X = []
Y = []
with open('food_review_dataset.csv', encoding='utf8', errors='ignore') as csvDataFile:
    csvReader = csv.reader(csvDataFile)
    for row in csvReader:
        row[0] = re.sub('[^0-9a-zA-Z]+', ' ', row[0])
        X.append(row[0])
        Y.append(int(float(row[1])))

X = np.asarray(X)
Y = np.asarray(Y, dtype=int)

In [None]:
#randomly splitting into train, validation and test set
seed = random.randint(0,2**32 - 1)
rng = np.random.RandomState(seed)
rng.shuffle(X)
rng = np.random.RandomState(seed)
rng.shuffle(Y)
X_train = X[:int(len(X) * 0.7)]
X_validation = X[int(len(X) * 0.7):int(len(X) * 0.9)]
X_test = X[int(len(X) * 0.9):]
Y_train = Y[:int(len(Y) * 0.7)]
Y_validation = Y[int(len(Y) * 0.7):int(len(Y) * 0.9)]
Y_test = Y[int(len(Y) * 0.9):]

In [None]:
#loading GloVe embeddings
with open('glove.6B.50d.txt', encoding='utf8', errors='ignore') as f:
    words = set()
    word_to_index = {}
    index_to_word = {}
    word_to_vec_map = {}
    i = 0
    for line in f:
        line = line.strip().split()
        curr_word = line[0]
        curr_word = re.sub('[^0-9a-zA-Z]+', '', curr_word)
        curr_word = curr_word.strip()
        if curr_word != '' and curr_word not in words:
            words.add(curr_word)
            word_to_index[curr_word] = i
            index_to_word[i] = curr_word
            word_to_vec_map[curr_word] = np.array(line[1:], dtype=np.float64)
            i += 1

In [None]:
class_names = ["Negative Review", "Positive Review"]
max_length = 128

In [None]:
def pretrained_embedding_layer(word_to_vec_map, word_to_index):
    vocab_len = len(word_to_index)
    emb_dim = word_to_vec_map["the"].shape[0]
    emb_matrix = np.zeros((vocab_len, emb_dim))    
    for word, idx in word_to_index.items():
        emb_matrix[idx, :] = word_to_vec_map[word]

    embedding_layer = Embedding(vocab_len, emb_dim, trainable = False)
    embedding_layer.build((None,))
    embedding_layer.set_weights([emb_matrix])
    return embedding_layer

In [None]:
embedding_layer = pretrained_embedding_layer(word_to_vec_map, word_to_index)

In [None]:
#tokenizing the input sentences
def sentences_to_indices(X, word_to_index, max_len):
    m = X.shape[0]
    X_indices = np.zeros((X.shape[0], max_len))
    for i in range(m):
        sentence_words = X[i].lower().strip().split()
        j = 0
        for w in sentence_words:
            cw = word_to_index.get(w)
            if cw is not None:
                X_indices[i, j] = word_to_index[w]
                j = j + 1
                if j >= max_len:
                    break
    return X_indices

In [None]:
X_train_indices = sentences_to_indices(X_train, word_to_index, max_length)
X_validation_indices = sentences_to_indices(X_validation, word_to_index, max_length)
X_test_indices = sentences_to_indices(X_test, word_to_index, max_length)

In [None]:
sentence_indices = Input(max_length, dtype = 'int32')
embeddings = embedding_layer(sentence_indices)

In [None]:
#building an LSTM model
lstm_X = LSTM(units = 64, dropout=0.7, return_sequences = True)(embeddings)
lstm_X = LSTM(units = 128, dropout=0.7, return_sequences = True)(lstm_X)
lstm_X = LSTM(units = 64, dropout=0.7, return_sequences = True)(lstm_X)
lstm_X = LSTM(units = 64, dropout=0.7, return_sequences = False)(lstm_X)
lstm_X = Dense(units = 32)(lstm_X)
lstm_X = Dense(units = 16)(lstm_X)
lstm_X = Dense(units = 8)(lstm_X)
lstm_X = Dense(units = 1)(lstm_X)
lstm_X = Activation('sigmoid')(lstm_X)
lstm_model = Model(inputs = sentence_indices, outputs = lstm_X)
lstm_model.summary()

In [None]:
lstm_es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=50)
lstm_cp = ModelCheckpoint('lstm.h5', monitor='val_loss', mode='min', verbose=1, save_best_only=True)

In [None]:
lstm_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
lstm_history = lstm_model.fit(X_train_indices, Y_train, epochs = 200, batch_size = 200, validation_data=(X_validation_indices, Y_validation), callbacks=[lstm_es, lstm_cp])

In [None]:
#plotting the learning curve
pyplot.plot(lstm_history.history['loss'], label='lstm_train')
pyplot.plot(lstm_history.history['val_loss'], label='lstm_validation')
pyplot.legend()
pyplot.show()

In [None]:
best_lstm_model = load_model('lstm.h5')

In [None]:
#measuring the test accuracy
loss, acc = best_lstm_model.evaluate(X_test_indices, Y_test)
print("Test accuracy = ", acc)

In [None]:
#building a CNN model
cnn_X = Conv1D(filters=128, kernel_size=7, padding='causal', activation='relu')(embeddings)
cnn_X = MaxPooling1D(pool_size=2)(cnn_X)
cnn_X = Dropout(0.4)(cnn_X)
cnn_X = Conv1D(filters=64, kernel_size=7, padding='causal', activation='relu')(cnn_X)
cnn_X = MaxPooling1D(pool_size=2)(cnn_X)
cnn_X = Dropout(0.4)(cnn_X)
cnn_X = Conv1D(filters=32, kernel_size=7, padding='causal', activation='relu')(cnn_X)
cnn_X = GlobalMaxPooling1D()(cnn_X)
cnn_X = Dropout(0.4)(cnn_X)
cnn_X = Dense(units = 32)(cnn_X)
cnn_X = Dense(units = 16)(cnn_X)
cnn_X = Dense(units = 8)(cnn_X)
cnn_X = Dense(units = 1)(cnn_X)
cnn_X = Activation('sigmoid')(cnn_X)
cnn_model = Model(inputs = sentence_indices, outputs = cnn_X)
cnn_model.summary()

In [None]:
cnn_es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=50)
cnn_cp = ModelCheckpoint('cnn.h5', monitor='val_loss', mode='min', verbose=1, save_best_only=True)

In [None]:
cnn_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
cnn_history = cnn_model.fit(X_train_indices, Y_train, epochs = 200, batch_size = 200, validation_data=(X_validation_indices, Y_validation), callbacks=[cnn_es, cnn_cp])

In [None]:
#plotting the learning curve
pyplot.plot(cnn_history.history['loss'], label='cnn_train')
pyplot.plot(cnn_history.history['val_loss'], label='cnn_validation')
pyplot.legend()
pyplot.show()

In [None]:
best_cnn_model = load_model('cnn.h5')

In [None]:
#measuring the test accuracy
loss, acc = best_cnn_model.evaluate(X_test_indices, Y_test)
print("Test accuracy = ", acc)

In [None]:
#building a hybrid(CNN + LSTM) model
cnn_X = Conv1D(filters=128, kernel_size=7, padding='causal', activation='relu')(embeddings)
cnn_X = MaxPooling1D(pool_size=2)(cnn_X)
cnn_X = Dropout(0.4)(cnn_X)
hybrid_X = LSTM(units = 64, dropout=0.7, return_sequences = True)(hybrid_X)
hybrid_X = LSTM(units = 64, dropout=0.7, return_sequences = False)(hybrid_X)
hybrid_X = Dense(units = 32)(hybrid_X)
hybrid_X = Dense(units = 16)(hybrid_X)
hybrid_X = Dense(units = 8)(hybrid_X)
hybrid_X = Dense(units = 1)(hybrid_X)
hybrid_X = Activation('sigmoid')(hybrid_X)
hybrid_model = Model(inputs = sentence_indices, outputs = hybrid_X)
hybrid_model.summary()

In [None]:
hybrid_es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=50)
hybrid_cp = ModelCheckpoint('hybrid.h5', monitor='val_loss', mode='min', verbose=1, save_best_only=True)

In [None]:
hybrid_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
hybrid_history = hybrid_model.fit(X_train_indices, Y_train, epochs = 200, batch_size = 200, validation_data=(X_validation_indices, Y_validation), callbacks=[hybrid_es, hybrid_cp])

In [None]:
#plotting the learning curve
pyplot.plot(hybrid_history.history['loss'], label='hybrid_train')
pyplot.plot(hybrid_history.history['val_loss'], label='hybrid_validation')
pyplot.legend()
pyplot.show()

In [None]:
best_hybrid_model = load_model('hybrid.h5')

In [None]:
#measuring the test accuracy
loss, acc = best_hybrid_model.evaluate(X_test_indices, Y_test)
print("Test accuracy = ", acc)

# Conclusion

While CNN is much faster than LSTM and appears to find the minimum in fewer epochs, LSTM can get upto 94% test accuracy compared to CNNs 91%.

I found it optimal to use a hybrid which is moderately fast and gets close to 93% test accuracy.

Still, more research must be done in order to differetiate these two approaches as hardware was a crucial bottleneck while conducting this project!