In [1]:
import re
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.metrics import classification_report

In [5]:
def clean_str(string):
 """
 Tokenization/string cleaning for all datasets except for SST.
 Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py
 """
 string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
 string = re.sub(r"\'s", " \'s", string)
 string = re.sub(r"\'ve", " \'ve", string)
 string = re.sub(r"n\'t", " n\'t", string)
 string = re.sub(r"\'re", " \'re", string)
 string = re.sub(r"\'d", " \'d", string)
 string = re.sub(r"\'ll", " \'ll", string)
 string = re.sub(r",", " , ", string)
 string = re.sub(r"!", " ! ", string)
 string = re.sub(r"\(", " \( ", string)
 string = re.sub(r"\)", " \) ", string)
 string = re.sub(r"\?", " \? ", string)
 string = re.sub(r"\s{2,}", " ", string)
 return string.strip().lower()
def load_data_and_labels(positive_data_file, negative_data_file):
 """
 Loads MR polarity data from files, splits the data into words and generates labels.
 Returns split sentences and labels.
 """
 # Load data from files
 positive_examples = list(open(positive_data_file, "r", encoding='latin1').readlines())
 positive_examples = [s.strip() for s in positive_examples]
 negative_examples = list(open(negative_data_file, "r", encoding='latin1').readlines())
 negative_examples = [s.strip() for s in negative_examples]
 # Split by words
 x = positive_examples + negative_examples
 x = [clean_str(sent) for sent in x]
 x = np.array(x)
 # Generate labels
 positive_labels = [1] * len(positive_examples)
 negative_labels = [0] * len(negative_examples)
 y = np.concatenate([positive_labels, negative_labels], 0)


 shuffle_indices = np.random.permutation(np.arange(len(y)))
 shuffled_x = x[shuffle_indices]
 shuffled_y = y[shuffle_indices]

 return shuffled_x, shuffled_y

In [7]:
positive_data_file = 'data/rt-polarity.pos'
negative_data_file = 'data/rt-polarity.neg'
x, y = load_data_and_labels(positive_data_file, negative_data_file)

In [9]:
x[:5]

array(["it would n't be my preferred way of spending 100 minutes or 7 00",
       "but the power of these subjects is obscured by the majority of the film that shows a stationary camera on a subject that could be mistaken for giving a public oration , rather than contributing to a film 's narrative",
       "it moves quickly , adroitly , and without fuss it does n't give you time to reflect on the inanity and the cold war datedness of its premise",
       "the film 's needlessly opaque intro takes its doe eyed crudup out of pre 9 11 new york and onto a cross country road trip of the homeric kind",
       'plods along , minus the twisted humor and eye popping visuals that have made miike a cult hero'],
      dtype='<U266')

In [11]:
y[:5]

array([0, 0, 1, 0, 0])

In [15]:
vocab = set()
for doc in x:
 for word in doc.split(' '):
     if word.strip():
         vocab.add(word.strip().lower())
         
# write to vocab.txt file
with open('data/vocab.txt', 'w') as file:
 for word in vocab:
     file.write(word)
     file.write('\n')
     
test_size = 2000
x_train, y_train = x[:-2000], y[:-2000]
x_test, y_test = x[-2000:], y[-2000:]
label_map = {0: 'negative', 1: 'positive'}


class Config():
 embedding_dim = 100 # word embedding dimention
 max_seq_len = 200 # max sequence length
 vocab_file = 'data/vocab.txt' # vocab_file_length
config = Config()

class Preprocessor():
 def __init__(self, config):
     self.config = config
     # initial the map of word and index
     token2idx = {"[PAD]": 0, "[UNK]": 1} # {wordï¼šid}
     with open(config.vocab_file, 'r') as reader:
         for index, line in enumerate(reader):
             token = line.strip()
             token2idx[token] = index+2

     self.token2idx = token2idx

 def transform(self, text_list):
     # tokenization, and transform word to coresponding index
     idx_list = [[self.token2idx.get(word.strip().lower(), self.token2idx['[UNK]']) for word in text.split(' ')] for text in text_list]
     idx_padding = pad_sequences(idx_list, self.config.max_seq_len, padding='post')

     return idx_padding
preprocessor = Preprocessor(config)
preprocessor.transform(['I love working', 'I love eating'])

array([[ 7059, 12332, 17420,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
      

In [27]:
class TextCNN(object):
    def __init__(self, config):
        self.config = config
        self.preprocessor = Preprocessor(config)
        self.class_name = {0: 'negative', 1: 'positive'}

    def build_model(self):
        # build model architecture
        idx_input = tf.keras.layers.Input((self.config.max_seq_len,))
        input_embedding = tf.keras.layers.Embedding(
            len(self.preprocessor.token2idx),
            self.config.embedding_dim,
            input_length=self.config.max_seq_len,
            mask_zero=True
        )(idx_input)
        convs = []
        for kernel_size in [2, 3, 4, 5]:
            c = tf.keras.layers.Conv1D(128, kernel_size, activation='relu')(input_embedding)
            c = tf.keras.layers.GlobalMaxPooling1D()(c)
            convs.append(c)
        fea_cnn = tf.keras.layers.Concatenate()(convs)
        fea_cnn = tf.keras.layers.Dropout(rate=0.5)(fea_cnn)
        fea_dense = tf.keras.layers.Dense(128, activation='relu')(fea_cnn)
        fea_dense = tf.keras.layers.Dropout(rate=0.5)(fea_dense)
        fea_dense = tf.keras.layers.Dense(64, activation='relu')(fea_dense)
        fea_dense = tf.keras.layers.Dropout(rate=0.3)(fea_dense)
        output = tf.keras.layers.Dense(2, activation='softmax')(fea_dense)

        model = tf.keras.Model(inputs=idx_input, outputs=output)
        model.compile(
            loss='sparse_categorical_crossentropy',
            optimizer='adam',
            metrics=['accuracy']
        )

        model.summary()

        self.model = model

    def fit(self, x_train, y_train, x_valid=None, y_valid=None, epochs=5, batch_size=128, **kwargs):
        # train
        self.build_model()

        x_train = self.preprocessor.transform(x_train)
        if x_valid is not None and y_valid is not None:
            x_valid = self.preprocessor.transform(x_valid)
        self.model.fit(
            x=x_train,
            y=y_train,
            validation_data=(x_valid, y_valid) if x_valid is not None and y_valid is not None else None,
            batch_size=batch_size,
            epochs=epochs,
            **kwargs
        )

    def evaluate(self, x_test, y_test):
        # evaluate
        x_test = self.preprocessor.transform(x_test)
        y_pred_probs = self.model.predict(x_test)
        y_pred = np.argmax(y_pred_probs, axis=-1)
        result = classification_report(y_test, y_pred, target_names=['negative', 'positive'])
        print(result)

    def single_predict(self, text):
        # predict
        input_idx = self.preprocessor.transform([text])
        predict_prob = self.model.predict(input_idx)[0]
        predict_label_id = np.argmax(predict_prob)
        predict_label_name = self.class_name[predict_label_id]
        predict_label_prob = predict_prob[predict_label_id]

        return predict_label_name, predict_label_prob

In [29]:
textcnn = TextCNN(config)
textcnn.fit(x_train, y_train, x_test, y_test, epochs=10) # train

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 200)]        0                                            
__________________________________________________________________________________________________
embedding (Embedding)           (None, 200, 100)     1876600     input_1[0][0]                    
__________________________________________________________________________________________________
conv1d (Conv1D)                 (None, 199, 128)     25728       embedding[0][0]                  
__________________________________________________________________________________________________
conv1d_1 (Conv1D)               (None, 198, 128)     38528       embedding[0][0]                  
______________________________________________________________________________________________

In [31]:
textcnn.evaluate(x_test, y_test) # Test Set Evaluation

              precision    recall  f1-score   support

    negative       0.77      0.70      0.73      1019
    positive       0.71      0.78      0.75       981

    accuracy                           0.74      2000
   macro avg       0.74      0.74      0.74      2000
weighted avg       0.74      0.74      0.74      2000



In [33]:
textcnn.single_predict("beautiful actors, great movie.") # single sentence predict

('positive', 0.98573536)

In [35]:
textcnn.single_predict("it's really boring") # single sentence predict

('negative', 0.999998)