In [3]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import nltk
import unicodedata
import re
from nltk.corpus import stopwords
from bs4 import BeautifulSoup
from tensorflow.python.keras.preprocessing.text import Tokenizer
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
from tensorflow.python.keras import models
from tensorflow.python.keras.layers import Dense, Dropout, Embedding, SeparableConv1D, MaxPooling1D, GlobalAveragePooling1D, LSTM, Bidirectional
from gensim.models import KeyedVectors, word2vec
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.multiclass import OneVsRestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

In [4]:
from data_loader import load_imdb_sentiment_analysis_dataset

file_path = '/Users/sakshamjain/Projects/AI/'

(train_texts, train_labels), (test_texts, test_labels) = load_imdb_sentiment_analysis_dataset(file_path)

In [5]:
class Preprocessor():
    def __init__(self, texts):
        self.texts = texts
        self.corpus = ' '.join(self.texts)
        self.stop_words = set(stopwords.words('english'))
    
    def transform_to_lowercase(self, text=None):
        return text.lower()
    
    def strip_html_tags(self, text):
        soup = BeautifulSoup(text, "html.parser")
        stripped_text = soup.get_text()
        return stripped_text
        
    def remove_accented_chars(self, text):
        text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8', 'ignore')
        return text
    
    def remove_special_characters(self, text):
        text = re.sub('[^a-zA-z0-9\s]', '', text)
        return text
        
#     def contaction(self, text):
#         return ' '.join([contraction_mapping[t] if t in contraction_mapping else t for t in text(" ")])

    def clean(self, text=None, lower=True, strip_html=True, contract=True, remove_accented_chars=True,
              special_char_removal=True, remove_stop_words=True):
        if not text:
            text = self.corpus
        if lower:
            text = self.transform_to_lowercase(text)
        if strip_html:
            text = self.strip_html_tags(text)
        if remove_accented_chars:
            text = self.remove_accented_chars(text)
        if special_char_removal:
            text = self.remove_special_characters(text)
        if remove_stop_words:
            tokens = nltk.word_tokenize(text)
            cleaned_tokens = [word for word in tokens if word not in self.stop_words]
            text = ' '.join(cleaned_tokens)
        return text

In [6]:
class DataExplorer():
    def __init__(self, texts):
        self.texts = texts
        self.corpus = ' '.join(self.texts)
        
    def get_num_words_per_sample(self):
        numWords = []
        for text in self.texts:
            counter = len(text.split())
            numWords.append(counter)  
            
        return numWords
        
    def get_median_num_words(self):
        """Returns the median number of words per sample given corpus.

        # Arguments
            sample_texts: list, sample texts.

        # Returns
            int, median number of words per sample.
        """
        num_words = [len(s.split()) for s in self.texts]
        return np.median(num_words)
    
    def plot_sample_length_distribution(self):
        """Plots the sample length distribution.

        # Arguments
            samples_texts: list, sample texts.
        """
        plt.hist([len(s) for s in self.texts], 50)
        plt.xlabel('Length of a sample')
        plt.ylabel('Number of samples')
        plt.title('Sample length distribution')
        plt.show()
        
    def plot_frequency_distribution_of_ngram(self):
        return None
    
    def plot_most_frequent_words(self):
        # Visualization of the most frequent words
        words = nltk.word_tokenize(self.corpus)
        fdist = nltk.FreqDist(words)
        print('Number of tokens:', len(words))
        print("List of 100 most frequent words/counts")
        print(fdist.most_common(100))
        fdist.plot(40)
        
    def plot_most_frequent_words_preprocessed(self):
        P = Preprocessor(self.texts)
        prep_corpus = P.clean()
        words = nltk.word_tokenize(prep_corpus)
        fdist = nltk.FreqDist(words)
        print('Number of tokens:', len(words))
        print("List of 100 most frequent words/counts")
        print(fdist.most_common(100))
        fdist.plot(40)
        
    def get_corpus_statistics(self):
        # Retrieve some info on the text data
        num_texts = len(self.texts)
        total_words = len(nltk.word_tokenize(self.corpus))
        avg_words_text = self.get_median_num_words()
        
        print('Number of texts:', num_texts)
        print('The total number of words in all texts', total_words)
        print('The average number of words in each text is', avg_words_text)
              
        return num_texts, total_words, avg_words_text

In [24]:
class Vectorizer():
    def __init__(self, train_texts, test_texts, max_features=20000, max_sequence_length=500):
        self.train_texts = train_texts
        self.test_texts = test_texts
        self.preprocessed_train_corpus = Preprocessor(self.train_texts).clean()
        self.preprocessed_test_corpus = Preprocessor(self.test_texts).clean()
        self.max_features = max_features
        self.max_sequence_length = max_sequence_length
        self.embed_dim = 300
        
    def get_custom_params(self, ngram_range=None, stop_words=None, min_df=None, max_dif=None, tokenizer=None,
                          analyzer=None, preprocessor=None, lowercase=None, max_features=None, dtype=None, strip_accents=None):
        params = {}
        if ngram_range:
            params['ngram_range'] = ngram_range
        if stop_words:
            params['stop_words'] = stop_words
        if min_df:
            params['min_df'] = min_df
        if max_df:
            params['max_df'] = max_df
        if tokenizer:
            params['tokenizer'] = tokenizer
        if analyzer:
            params['analyzer'] = analyzer
        if preprocessor:
            params['preprocessor'] = preprocessor
        if lowercase:
            params['lowercase'] = lowercase
        if max_features:
            params['lowercase'] = max_features
        if dtype:
            params['dtype'] = dtype
        if strip_accents:
            params['strip_accents'] = strip_accents
            
        return params
    
    def get_vector_info(vector):
        matrix = vector.toarray()
        shape = vector.shape
        return matrix, shape
    
    def count_vectorize(self, kwargs):
        vectorizer = CountVectorizer(**kwargs)
        train_vector = vectorizer.fit_transform(self.train_texts)
        test_vector = vectorizer.transform(self.test_texts)
        # List of features (Words)
        features = vectorizer.get_feature_names()
        # Index assigned for every token
        vocabulary = vectorizer.vocabulary_
        return train_vector, test_vector, vocabulary
        
    def tfidf_vectorize(self, kwargs):
        vectorizer = TfidfVectorizer(**kwargs)
        train_vector = vectorizer.fit_transform(self.train_texts)
        test_vector = vectorizer.transform(self.test_texts)
        # List of features (Words)
        features = vectorizer.get_feature_names()
        # Index assigned for every token
        vocabulary = vectorizer.vocabulary_
        return train_vector, test_vector, vocabulary
    
    def sequence_vectorize(self):
        # Create vocabulary with training texts.
        tokenizer = Tokenizer(num_words=self.max_features)
        tokenizer.fit_on_texts(pd.Series(self.preprocessed_train_corpus))
        
        # Vectorize texts
        train_vector = tokenizer.texts_to_sequences(pd.Series(self.preprocessed_train_corpus))
        test_vector = tokenizer.texts_to_sequences(pd.Series(self.preprocessed_test_corpus))
        
        # Get max sequence length.
        max_length = len(max(train_vector, key=len))
        if max_length > self.max_sequence_length:
            max_length = self.max_sequence_length
        
        # Fix sequence length to max value. Sequences shorter than the length are
        # padded in the beginning and sequences longer are truncated at the beginning.
        train_vector = pad_sequences(train_vector, maxlen=max_length)
        test_vector = pad_sequences(test_vector, maxlen=max_length)
        # Index assigned for every token
        vocabulary = tokenizer.word_index
        
        return train_vector, test_vector, vocabulary
    
    def word_embedding_vectorize(self, vocab):
        word2vec = KeyedVectors.load_word2vec_format('Data/GoogleNews-vectors.bin.gz',binary=True)
        
        # Construct the embedding weights matrix
        # Where rows is length of vocab + 1
        # And column is value of embed_dim
        embedding_weights = np.zeros((len(vocab) + 1, self.embed_dim))
        # Creating a dictionary item of vocab
        for word, index in vocab.items():
            embedding_weights[index, :] = word2vec[word] if word in word2vec else np.random.rand(self.embed_dim)
        
        # Constructing word-vector dictionary
        word_vector_dict = dict(zip(pd.Series(list(vocab.keys())),
                                    pd.Series(list(vocab.keys())).apply(
                                        lambda x: features_embedding_weights[vocab[x]]
                                    )))
        
        return embedding_weights, word_vector_dict
    
    def tfidf_embedding_vectorize(self):
        return

In [18]:
class Model():
    def __init__(self, train_texts, train_labels, test_texts, test_labels, word_embedding=False):
        self.train_texts = train_texts
        self.train_labels = train_labels
        self.test_texts = test_texts
        self.test_labels = test_labels
        self.vocab = None
        self.vectorizer = None
        self.train_vector = None
        self.test_vector = None
        self.prediction = None
        self.max_features = 20000
        self.ngram_range = (1,2)
        self.word_embedding = word_embedding
        
    def vectorize(self):
        V = Vectorizer(train_texts=self.train_texts, test_texts=self.test_texts, max_features=self.max_features)
        if not self.word_embedding:
            self.train_vector, self.test_vector, self.vocab = V.tfidf_vectorize(
                {'strip_accents': 'unicode',
                 'analyzer': 'word',
                 'ngram_range': self.ngram_range,
                 'min_df': 2,
                 'max_features': self.max_features
                })
        else:
            # code here
            return
        
    def run(self, classifier):
        self.vectorize()
        model = classifier().fit(self.train_vector, self.train_labels)
        self.prediction = model.predict(self.test_vector)
        print(confusion_matrix(self.test_labels, self.prediction))  
        print(classification_report(self.test_labels, self.prediction))  
        print(accuracy_score(self.test_labels, self.prediction))
        

class NNModel():
    def __init__(self, num_classes, train_texts, train_labels, test_texts, test_labels):
        self.num_classes = num_classes
        self.train_texts = train_texts
        self.train_labels = train_labels
        self.train_vector = None
        self.test_texts = test_texts
        self.test_labels = test_labels
        self.test_vector = None
        self.vocab = None
        #
        self.max_features = 20000
        self.max_sequence_length = 500
        #
        DE = DataExplorer(self.train_texts)
        self.num_texts, self.total_words, self.avg_words_text = DE.get_corpus_statistics()
        self.S_by_W = self.num_texts / self.avg_words_text
        # Layer's Params
        self.input_shape = None
        self.filters = None
        self.kernal_size = 3
        self.pool_size = None
        self.dropout_rate = 0.2
        self.units = 64
        self.last_layer_units = None
        self.last_layer_activation = None
        # Embedding
        self.embed_dim = 300
        self.embedding_weights = None
        self.word_vect_dic = None
        #
        self.optimizer = 'adam'
        self.metric = 'accuracy'
        self.loss = None
        self.learning_rate = 1e-3
        self.epochs = 500
        self.batch_size = 128
        
    
    def set_params(self, input_shape=None, filters=None, units=None,
                   kernal_size=None, pool_size=None, dropout_rate=None,
                   learning_rate=None, epochs=None, batch_size=None, embed_dim=None):        
        if input_shape:
            self.input_shape = input_shape
        if filters:
            self.filters = filters
        if units:
            self.units = units
        if kernal_size:
            self.kernal_size = kernal_size
        if pool_size:
            self.pool_size = pool_size
        if dropout_rate:
            self.dropout_rate = dropout_rate
        if learning_rate:
            self.learning_rate = learning_rate
        if epochs:
            self.epochs = epochs
        if batch_size:
            self.batch_size = batch_size
        if embed_dim:
            self.embed_dim = embed_dim
            
        if self.num_classes == 2:
            self.last_layer_activation = 'sigmoid'
            self.last_layer_units = 1
            self.loss = 'binary_crossentropy'
        elif self.num_classes > 2:
            self.last_layer_activation = 'softmax'
            self.last_layer_units = self.num_classes
            self.loss = 'sparse_categorical_crossentropy'
        else:
            print('ERROR')
            
    def vectorize(self):
        V = Vectorizer(train_texts=self.train_texts, test_texts=self.test_texts, max_features=self.max_features)
        if self.S_by_W < 1500:
            self.train_vector, self.test_vector, self.vocab = V.tfidf_vectorize(
                {'strip_accents': 'unicode',
                 'analyzer': 'word',
                 'ngram_range': (1, 2),
                 'min_df': 2,
                 'max_features': self.max_features
                })
        else:
            self.train_vector, self.test_vector, self.vocab = V.sequence_vectorize()
            self.embedding_matrix, self.word_vect_dic = V.word_embedding_vectorize(self.vocab)
        
        
    def get_Embedding(self, use_pretrained_embedding=False, is_embedding_trainable=False):
        if use_pretrained_embedding:
            layer = Embedding(
                input_dim=len(self.vocab) + 1,
                output_dim=self.embed_dim,
                input_length=self.max_sequence_length,
                weights=[self.embedding_matrix],
                trainable=is_embedding_trainable
               )
        else:
            layer = Embedding(
                input_dim=num_features,
                output_dim=self.embed_dim,
                input_length=self.input_shape[0]
            )
         
        return layer
    
    def get_SeparableConv1D(n):
        layer = SeparableConv1D(
            filters=self.filters * n,
            kernel_size=self.kernel_size,
            activation='relu',
            bias_initializer='random_uniform',
            depthwise_initializer='random_uniform',
            padding='same'
        )
        return layer
    
    def get_Conv1D(n):
        layer = Conv1D(
            filters=self.filters * n,
            kernel_size=self.kernel_size,
            activation='relu',
        )
        return layer
    
    def build_cnn_model(self, layers, n):
        """
        Convolutional Neural Network
        """
        model = models.Sequential()
        model.add(self.get_Embedding())
        
        for _ in range(layers - 1):
            model.add(self.get_Conv1D(n))
            model.add(MaxPooling1D(pool_size=self.pool_size))
            
        model.add(Dense(units=self.units, ativation='relu'))
        model.add(Dense(units=self.last_layer_units, activation=self.last_layer_activation))
        
        return model
    
    def build_bidirectional_lstm(self):
        """
        """
        model = models.Sequential()
        model.add(self.get_Embedding())
        model.add(Bideractional(LSTM(self.units)))
        model.add(Dropout(rate=self.dropout_rate))
        model.add(Dense(units=self.last_layer_units, activation=self.last_layer_activation))
        
        return model
    
    def build_mlp_model(self, layers):
        """
        Multi Layer Perceptrons (MLPs)
        """
        model = models.Sequential()
        model.add(Dropout(rate=self.dropout_rate, input_shape=self.input_shape))
        
        for _ in range(layers-1):
            model.add(Dense(units=self.units, activation='relu'))
            model.add(Dropout(rate=self.dropout_rate))
        
        model.add(Dense(units=self.last_layer_units, activation=self.last_layer_activation))
        return model
    
    def build_sepcnn_model(self, blocks):
        """
        Separable Convolutional Network
        """
        model = models.Sequential()
        model.add(self.get_Embedding())
        
        for _ in range(blocks - 1):
            model.add(Dropout(rate=self.dropout_rate))
            model.add(self.get_SeparableConv1D(1))
            model.add(self.get_SeparableConv1D(1))
            model.add(MaxPooling1D(pool_size=pool_size))
            
        model.add(self.get_SeparableConv1D(2))
        model.add(self.get_SeparableConv1D(2))
        model.add(GlobalAveragePooling1D())
        model.add(Dropout(rate=dropout_rate))
        model.add(Dense(units=self.last_layer_units, activation=self.last_layer_activation))
            
        return model
    
    def run(self):
        self.set_params()
        self.vectorize()
        
        if self.S_by_W < 1500:
            """
            N-gram Model
            """
            self.input_shape = self.train_vector.shape[1:]
            model = self.build_mlp_model(layers=2)
            model.compile(optimizer=self.optimizer, loss=self.loss, metrics=[self.metric])
            print(model)
        
            history = model.fit(
            self.train_vector,
            self.train_labels,
            epochs=self.epochs,
            validation_data=(self.test_vector, self.test_labels),
            verbose=2,  # Logs once per epoch.
            batch_size=self.batch_size).history
        
            print('Validation accuracy: {acc}, loss: {loss}'.format(
                acc=history['val_acc'][-1], loss=history['val_loss'][-1]))
        else:
            """
            Sequence Model
            """
            self.build_sepcnn_model()

In [None]:
# EXPLORING DATA
de_TRAIN = DataExplorer(train_texts)

de_TRAIN.get_corpus_statistics()

de_TRAIN.plot_most_frequent_words()
de_TRAIN.plot_most_frequent_words_preprocessed()

In [25]:
V = Vectorizer(train_texts=train_texts, test_texts=test_texts)

# TFIDF
# train_vector_tfidf, test_vector_tfidf, vocab = V.tfidf_vectorize({})
# print(train_vector_tfidf)
# print(train_vector_tfidf.shape)

# SEQUENCE
train_vector, test_vector, vocab = V.sequence_vectorize()
word2vec = KeyedVectors.load_word2vec_format('Data/GoogleNews-vectors.bin.gz',binary=True)

