### Read data

In [None]:
# !pip install scikit-learn==0.24.2
# !pip install sklearn_crfsuite
# try:
#     from sklearn_crfsuite.metrics import flat_classification_report
# except ImportError:
#     !pip install git+https://github.com/MeMartijn/updated-sklearn-crfsuite.git#egg=sklearn_crfsuite
#     from sklearn_crfsuite.metrics import flat_classification_report


In [None]:
from tqdm import tqdm
import re

def read_file(f):
    data = open(f,'r').readlines()[1:]
    row_id = [i.split('\t')[0].strip() for i in data]
    data = [i.split('\t')[1].strip().split(' ') for i in data]
    return row_id,data

def read_conll_file(_file):
    all_sentences = []
    all_sentence_ids = [] 
    all_labels = []
    sentence = []
    labels = []
    for line in tqdm(open(_file), desc=f"reading {_file}"):
        if line.startswith("#"):
            all_sentence_ids.append(re.split("\\s+", line.strip())[1])
            continue
        if not line.strip():
            all_sentences.append(sentence)
            all_labels.append(labels)
            sentence = []
            labels = []
        else:
            line = line.strip()
            sentence.append(re.split("\\s+", line)[0])
            labels.append(re.split("\\s+", line)[1])
    if sentence and labels:
        all_sentences.append(sentence)
        all_labels.append(labels)
    return all_sentence_ids, all_sentences, all_labels

In [None]:
import os
if not os.path.exists("review_data"):
    !wget https://www.dropbox.com/s/yqgff7de73iwosr/review_data.zip?dl=1 -O review_data.zip
    !unzip review_data.zip
    !ls review_data 

train_sen_ids, train_text, train_labels = read_conll_file("review_data/review_train.conll")
valid_sen_ids, valid_text, valid_labels = read_conll_file("review_data/review_valid.conll")
test_sen_ids, test_text, test_labels = read_conll_file("review_data/review_test.conll")

### Inputs

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

unique_words = set([j for i in train_text + valid_text + test_text for j in i])
word2idx = {j:i+1 for i,j in enumerate(unique_words)}
word2idx["PAD"] = 0
print(f"{len(word2idx)} tokens in vocab")

unique_labels = set([j for i in train_labels for j in i])
unique_labels_valid = set([j for i in train_labels for j in i])
unique_labels_test = set([j for i in train_labels for j in i])

# make sure there are no labels in valid/test that are not in train.
assert not unique_labels_valid - unique_labels, unique_labels_valid - unique_labels
assert not unique_labels_test - unique_labels, unique_labels_test - unique_labels

label2idx = {'PAD': 0}
for i,j in enumerate(unique_labels):
    label2idx[j] = i+1 
idx2label = {j:i for i,j in label2idx.items()}
print(idx2label)

MAXLEN = 50

def get_padded_x_y(text, labels, _maxlen, _word2idx, _label2idx):
    X = [[word2idx[j] for j in i] for i in text]
    X = pad_sequences(maxlen = _maxlen, sequences = X, padding = "post", value = _word2idx["PAD"])
    Y = [[label2idx[j] for j in i] for i in labels]
    Y = pad_sequences(maxlen = _maxlen, sequences = Y, padding = "post", value = _label2idx["PAD"])
    Y = [to_categorical(i, num_classes = len(label2idx)) for i in Y]
    assert len(X) == len(Y), "X and Y should be of the same shape"
    return X, Y

X_train, Y_train = get_padded_x_y(train_text, train_labels, _maxlen=MAXLEN, _word2idx=word2idx, _label2idx=label2idx)
X_valid, Y_valid = get_padded_x_y(valid_text, valid_labels, _maxlen=MAXLEN, _word2idx=word2idx, _label2idx=label2idx)
X_test, Y_test = get_padded_x_y(test_text, test_labels, _maxlen=MAXLEN, _word2idx=word2idx, _label2idx=label2idx)

### LSTM model 

In [None]:
# from tensorflow.keras.layers import LSTM, Dense, Embedding, Bidirectional
# from tensorflow.keras.models import Sequential

# model = Sequential()
# EMBED_DIM = 300
# RNN_HIDDEN_DIM = 100
# model.add(Embedding(input_dim=len(word2idx.keys()),output_dim=EMBED_DIM,input_length=MAXLEN))
# model.add(Bidirectional(LSTM(units=RNN_HIDDEN_DIM,return_sequences=True,dropout=0.2), merge_mode = 'concat'))
# model.add(Dense(len(label2idx.keys()), activation="relu"))
# model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
# # model.summary()

### Training

In [None]:
import numpy as np
# # history = model.fit(X_train,np.array(Y_train),batch_size=16,epochs=3,validation_data=(X_valid, np.array(Y_valid)))
# history = model.fit(X_train,np.array(Y_train),batch_size=16,epochs=10,validation_split=0.2)

### Prediction

In [None]:
# Y_valid_pred = model.predict(X_valid)
# Y_valid_pred = np.argmax(Y_valid_pred, axis=-1)
# Y_valid_true = np.argmax(Y_valid, -1)
# Y_valid_pred_labels = [[idx2label[i] for i in row] for row in Y_valid_pred]
# Y_valid_true_labels = [[idx2label[i] for i in row] for row in Y_valid_true]

### Result

In [None]:

# report = flat_classification_report(y_pred=Y_valid_pred_labels, y_true=Y_valid_true_labels)
# print(report)

In [None]:
# try:
#     from baseline.utils import to_chunks
# except ImportError:
#     !pip install mead-baseline
#     from baseline.utils import to_chunks

# from tqdm import tqdm
# import numpy as np
# batchsz = 16

# def shorten_sentence_label(sentence_tokens, true_labels, pred_labels, maxlen):
#     if maxlen == -1: # we need to shorten the labels to the sentence length
#         shorten_to = len(sentence_tokens)
#     else: # we have to shorten either the sentence to the max len or the sequence to the sentence length
#         if len(sentence_tokens) > maxlen:
#             shorten_to = maxlen
#         else:
#             shorten_to = len(sentence_tokens)
#     return sentence_tokens[:shorten_to], true_labels[:shorten_to], pred_labels[:shorten_to]



# def generate_conll(sentence_ids,all_sentence_tokens, all_sentence_true_labels, all_sentence_pred_labels, output_base, 
#                    maxlen=-1):
#     assert len(sentence_ids) == len(all_sentence_tokens) == len(all_sentence_true_labels) == len(all_sentence_pred_labels)
#     with open(f"{output_base}.conll", "w") as wf:
#         for sentence_tokens, sentence_true_labels, sentence_pred_labels in zip(all_sentence_tokens, all_sentence_true_labels, all_sentence_pred_labels):
#             sentence_tokens, sentence_true_labels, sentence_pred_labels = shorten_sentence_label(
#                 sentence_tokens, sentence_true_labels, sentence_pred_labels, maxlen)
#             assert len(sentence_tokens) == len(sentence_true_labels) == len(sentence_pred_labels), \
#             f"{len(sentence_tokens)}, {len(sentence_true_labels)}, {len(sentence_pred_labels)}"
#             for token, true_label, pred_label in zip(sentence_tokens, sentence_true_labels, sentence_pred_labels):
#                 wf.write(f"{token} {true_label} {pred_label}\n")
#                 wf.write("\n")
#     print(f"generated conll file {output_base}.conll")

# def generate_labelseq(sentence_ids, all_sentence_tokens, all_sentence_pred_labels, output_base, maxlen=-1):
#     assert len(sentence_ids) == len(all_sentence_tokens) == len(all_sentence_pred_labels)
#     with open(f"{output_base}.labelseq", "w") as wf:
#         wf.write("ID\tSENTENCE\tTAGSEQ\n")
#         for sentence_id, sentence_tokens, sentence_labels in zip(sentence_ids, all_sentence_tokens, all_sentence_pred_labels):
#             sentence_tokens, _, sentence_labels = shorten_sentence_label(sentence_tokens, sentence_labels, sentence_labels, maxlen)
#             assert len(sentence_tokens) == len(sentence_labels)
#             wf.write(f'"{sentence_id}"\t"{" ".join(sentence_tokens)}"\t"{" ".join(sentence_labels)}"\n')
#         print(f"generated labelseq file {output_base}.labelseq")

# def generate_human_readable(sentence_ids, all_sentence_tokens, all_sentence_pred_labels, output_base, maxlen=-1):
#     def create_chunk(tokens, chunk_def):
#             chunk_type, chunk_indices = chunk_def.split("@")[0], [int(x) for x in chunk_def.split("@")[1:]]
#             chunk_indices = chunk_indices + [chunk_indices[-1]+1]
#             return f"{chunk_type}: {' '.join(tokens[chunk_indices[0]: chunk_indices[-1]])}"

#     assert len(sentence_ids) == len(all_sentence_tokens) == len(all_sentence_pred_labels)
#     with open(f"{output_base}.human", "w") as wf:
#         for sentence_id, sentence_tokens, sentence_labels in zip(sentence_ids, all_sentence_tokens, all_sentence_pred_labels):
#             wf.write(f"[id]: {sentence_id}\n")
#             sentence_tokens, _, sentence_labels = shorten_sentence_label(sentence_tokens, sentence_labels, sentence_labels, maxlen)
#             assert len(sentence_tokens) == len(sentence_labels)
#             wf.write(f"[sentence]: {' '.join(sentence_tokens)}\n")
#             chunks = to_chunks(sentence_labels, span_type="iob") 
#             for chunk in chunks:
#                 wf.write(create_chunk(sentence_tokens, chunk)+"\n")
#             wf.write("\n")
#         print(f"generated labelseq file {output_base}.human")


# def predict_tags_for_file(_file, model, _word2idx, _label2idx, output_base, output_formats=["human_readable", "labelseq"]):
#     sentence_ids, sen_texts, sen_labels = read_conll_file(_file)
#     X, Y = get_padded_x_y(sen_texts, sen_labels, _maxlen=MAXLEN, _word2idx=_word2idx, _label2idx=_label2idx)
#     Y_pred = np.argmax(model.predict(X), axis=-1)
#     Y_true = np.argmax(Y, -1)
#     Y_pred_labels = [[idx2label[i] for i in row] for row in Y_pred]
#     Y_true_labels = [[idx2label[i] for i in row] for row in Y_true]
#     if "conll" in output_formats:
#         generate_conll(
#             sentence_ids=sentence_ids,
#             all_sentence_tokens=sen_texts, 
#             all_sentence_true_labels=Y_true_labels, 
#             all_sentence_pred_labels=Y_pred_labels, 
#             output_base=output_base,
#             maxlen=MAXLEN
#         )
#     if "labelseq" in output_formats:
#         generate_labelseq(
#             sentence_ids=sentence_ids,
#             all_sentence_tokens=sen_texts, 
#             all_sentence_pred_labels=Y_pred_labels, 
#             output_base=output_base,
#             maxlen=MAXLEN
#         )
#     if "human_readable" in output_formats:
#          generate_human_readable(
#             sentence_ids=sentence_ids,
#             all_sentence_tokens=sen_texts, 
#             all_sentence_pred_labels=Y_pred_labels, 
#             output_base=output_base,
#             maxlen=MAXLEN
#         )
# # let's first run this method on the validation data because we have the true labels available for it.
# test_file="review_data/review_valid.conll"
# predict_tags_for_file(test_file, model=model, _label2idx=label2idx, _word2idx=word2idx, output_base="valid_output", 
#                       output_formats=["human_readable", "labelseq", "conll"])
# # and evaluate with conll eval
# if not os.path.exists("conlleval.pl"):
#     !wget https://www.clips.uantwerpen.be/conll2000/chunking/conlleval.txt -O conlleval.pl
# !perl ./conlleval.pl < valid_output.conll

# # finally, run this method on the test data and look at the generated labelseq file.
# test_file="review_data/review_test.conll"
# predict_tags_for_file(test_file, model=model, _label2idx=label2idx, _word2idx=word2idx, output_base="test_output", 
#                       output_formats=["human_readable", "labelseq"])


In [None]:
# from tensorflow.keras.layers import LSTM, Dense, Embedding, Bidirectional
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.optimizers import Adam

# # Define hyperparameters
# EMBED_DIM = 300
# RNN_HIDDEN_DIM = 100
# DROPOUT_RATE = 0.2
# LEARNING_RATE = 0.001
# BATCH_SIZE = 32
# EPOCHS = 10

# # Create the LSTM model
# model = Sequential()
# model.add(Embedding(input_dim=len(word2idx.keys()), output_dim=EMBED_DIM, input_length=MAXLEN))
# model.add(Bidirectional(LSTM(units=RNN_HIDDEN_DIM, return_sequences=True, dropout=DROPOUT_RATE), merge_mode='concat'))
# model.add(Dense(len(label2idx.keys()), activation="relu"))

# # Compile the model with chosen optimizer and learning rate
# optimizer = Adam(lr=LEARNING_RATE)
# model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['acc'])

In [None]:
# import numpy as np
# # history = model.fit(X_train,np.array(Y_train),batch_size=16,epochs=3,validation_data=(X_valid, np.array(Y_valid)))
# history = model.fit(X_train,np.array(Y_train),batch_size=16,epochs=10,validation_split=0.2)
# Y_valid_pred = model.predict(X_valid)
# Y_valid_pred = np.argmax(Y_valid_pred, axis=-1)
# Y_valid_true = np.argmax(Y_valid, -1)
# Y_valid_pred_labels = [[idx2label[i] for i in row] for row in Y_valid_pred]
# Y_valid_true_labels = [[idx2label[i] for i in row] for row in Y_valid_true]
# try:
#     from sklearn_crfsuite.metrics import flat_classification_report
# except ImportError:
#     !pip install git+https://github.com/MeMartijn/updated-sklearn-crfsuite.git#egg=sklearn_crfsuite
#     from sklearn_crfsuite.metrics import flat_classification_report

# report = flat_classification_report(y_pred=Y_valid_pred_labels, y_true=Y_valid_true_labels)
# print(report)

In [None]:
# from tensorflow.keras.layers import LSTM, Dense, Embedding, Bidirectional, Dropout
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.optimizers import Adam

# # Define hyperparameters
# EMBED_DIM = 300
# RNN_HIDDEN_DIM = 100
# DROPOUT_RATE = 0.5  # increased dropout rate
# LEARNING_RATE = 0.001
# BATCH_SIZE = 32
# EPOCHS = 20  # increased number of epochs

# # Create the LSTM model
# model = Sequential()
# model.add(Embedding(input_dim=len(word2idx.keys()), output_dim=EMBED_DIM, input_length=MAXLEN))
# model.add(Bidirectional(LSTM(units=RNN_HIDDEN_DIM, return_sequences=True, dropout=DROPOUT_RATE), merge_mode='concat'))
# model.add(Dense(len(label2idx.keys()), activation="relu"))
# model.add(Dropout(DROPOUT_RATE))  # added dropout layer after Dense layer

# # Compile the model with chosen optimizer and learning rate
# optimizer = Adam(lr=LEARNING_RATE)
# model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['acc'])

# # Train the model with updated hyperparameters
# history = model.fit(X_train, np.array(Y_train), batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.2)

# # Evaluate the model
# Y_valid_pred = model.predict(X_valid)
# Y_valid_pred = np.argmax(Y_valid_pred, axis=-1)
# Y_valid_true = np.argmax(Y_valid, -1)
# Y_valid_pred_labels = [[idx2label[i] for i in row] for row in Y_valid_pred]
# Y_valid_true_labels = [[idx2label[i] for i in row] for row in Y_valid_true]

# # Calculate classification report
# report = flat_classification_report(y_pred=Y_valid_pred_labels, y_true=Y_valid_true_labels)
# print(report)


In [None]:
# from tensorflow.keras.layers import LSTM, Dense, Embedding, Bidirectional
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.optimizers import Adagrad

# # Define hyperparameters
# EMBED_DIM = 300
# RNN_HIDDEN_DIM = 100
# DROPOUT_RATE = 0.2
# LEARNING_RATE = 0.01 # Adagrad specific learning rate
# BATCH_SIZE = 16
# EPOCHS = 10

# # Create the LSTM model
# model = Sequential()
# model.add(Embedding(input_dim=len(word2idx.keys()), output_dim=EMBED_DIM, input_length=MAXLEN))
# model.add(Bidirectional(LSTM(units=RNN_HIDDEN_DIM, return_sequences=True, dropout=DROPOUT_RATE), merge_mode='concat'))
# model.add(Dense(len(label2idx.keys()), activation="relu"))

# # Compile the model with Adagrad optimizer and learning rate
# optimizer = Adagrad(lr=LEARNING_RATE)
# model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['acc'])

# # Train the model
# history = model.fit(X_train, np.array(Y_train), batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.2)

# # Make predictions and evaluate the model
# Y_valid_pred = model.predict(X_valid)
# Y_valid_pred = np.argmax(Y_valid_pred, axis=-1)
# Y_valid_true = np.argmax(Y_valid, -1)
# Y_valid_pred_labels = [[idx2label[i] for i in row] for row in Y_valid_pred]
# Y_valid_true_labels = [[idx2label[i] for i in row] for row in Y_valid_true]

# # Print classification report
# try:
#     from sklearn_crfsuite.metrics import flat_classification_report
# except ImportError:
#     !pip install git+https://github.com/MeMartijn/updated-sklearn-crfsuite.git#egg=sklearn_crfsuite
#     from sklearn_crfsuite.metrics import flat_classification_report

# report = flat_classification_report(y_pred=Y_valid_pred_labels, y_true=Y_valid_true_labels)
# print(report)


In [None]:
#FINAL LSTM MODEL
from tensorflow.keras.layers import LSTM, Dense, Embedding, Bidirectional
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD, Adam
import numpy as np


# Define hyperparameters
EMBED_DIM = 30
RNN_HIDDEN_DIM = 50 #reduce
DROPOUT_RATE = 0.2
LEARNING_RATE = 0.01  # Set the learning rate for SGD optimizer
BATCH_SIZE = 32
EPOCHS = 15

# Create the LSTM model
model = Sequential()
model.add(Embedding(input_dim=len(word2idx.keys()), output_dim=EMBED_DIM, input_length=MAXLEN))
model.add(Bidirectional(LSTM(units=RNN_HIDDEN_DIM, return_sequences=True, dropout=DROPOUT_RATE, ), merge_mode='concat'))
model.add(Dense(len(label2idx.keys()), activation="relu"))

# Compile the model with SGD optimizer and learning rate
#optimizer = SGD(lr=LEARNING_RATE)  # Use SGD optimizer with specified learning rate
#adam = k.optimizers.Adam(lr=0.0005, beta_1=0.9, beta_2=0.999)
# configure the optimizer
optimizer = Adam()
#optimizer = Adagrad(learning_rate=LEARNING_RATE, epsilon=1e-10)
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['acc'])

# Train the model
history = model.fit(X_train, np.array(Y_train), batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.1)

# Evaluate the model
Y_valid_pred = model.predict(X_valid)
Y_valid_pred = np.argmax(Y_valid_pred, axis=-1)
Y_valid_true = np.argmax(Y_valid, -1)
Y_valid_pred_labels = [[idx2label[i] for i in row] for row in Y_valid_pred]
Y_valid_true_labels = [[idx2label[i] for i in row] for row in Y_valid_true]

# Generate classification report
from sklearn_crfsuite.metrics import flat_classification_report
report = flat_classification_report(y_pred=Y_valid_pred_labels, y_true=Y_valid_true_labels)
print(report)

In [None]:
# from sklearn.metrics import confusion_matrix
# print(confusion_matrix(y_true=Y_valid_true_labels, y_pred=Y_valid_pred_labels))

In [None]:
#TEST DATA

# Evaluate the model
Y_test_pred = model.predict(X_test)
#Y_test_pred = np.argmax(Y_test_pred, axis=-1)
Y_test_true = np.argmax(Y_test, -1)
#Y_test_pred_labels = [[idx2label[i] for i in row] for row in Y_test_pred]
Y_test_true_labels = [[idx2label[i] for i in row] for row in Y_test_true]


In [None]:
import pandas as pd
#file1 = pd.read_csv("TEST_REVIEW_TEXT.txt", sep='\t')
test_sen_ids

In [None]:
# In Y_test_true_labels, all labels are "O" on the test data probably due to overfitting of the model. 
#Therefore, the model chosen was CRF with 89% accuracy. The LSTM model has 96% accuracy.