## <font size='5' color='red'>Aspect Based Sentiment Analysis</font>

In [None]:
!pip install git+https://github.com/LIAAD/yake

In [None]:
#### scraper.py


# fetch tweets from twitter
import tweepy
import pandas as pd
import re


def get_twitter_data():
    """Fetch tweets from twitter for ABSA."""
    # get the data
    consumerKey = "lulAuGwEdpdHK9N2k9rlBffVg"
    consumerSecret = "qUmfkBnxlbaWZ4HKZRaPpS9fTAicU0fgc8EBszC43yboI3hLZp"
    accessToken = "1257406807103778818-ZTnOlTd52FzbO9wSyKwJ9Dhv4CBv9b"
    accessTokenSecret = "llrRS84dOsLY6wQeNYq9TaUvfGgyTi9Jfx9qukLe8J0Mr"

    # authenticate
    authenticate = tweepy.OAuthHandler(consumerKey, consumerSecret)

    # set access token
    authenticate.set_access_token(accessToken, accessTokenSecret)

    # create the api object
    api = tweepy.API(authenticate, wait_on_rate_limit=True)

    myCount = 10
    search_word = input("Enter topic name: ")
    public_tweets = api.search(search_word, count=myCount, lang="en")
    unwanted_words = ['@', 'RT', ':', 'https', 'http']
    symbols = ['@', '#']
    single_chars = re.compile(r'\s+[a-zA-Z]\s+') # remove single chars
    data = []
    for tweet in public_tweets:
        text = tweet.text
        textWords = text.split()
        # print(textWords)
        cleaning_tweet = ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|(RT)", " ", text).split())
        cleaning_tweet = single_chars.sub('', cleaning_tweet)
        data.append(cleaning_tweet)
    print("===============Tweet Scrapping complete=============")
    data = pd.DataFrame(data)
    return data, str(search_word).split()


In [None]:
#### main.py

import os
import re
import numpy as np 
import pandas as pd
import xgboost as xgb
import pickle
import warnings
import nltk

from nltk.corpus import stopwords
from matplotlib import pyplot as plt
from wordcloud import WordCloud
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, log_loss, mean_squared_error
from sklearn.feature_extraction.text import CountVectorizer

warnings.filterwarnings('ignore')

nltk.download('stopwords')

# Load the train data
train_data = pd.read_csv('./train.csv')
train_data = train_data.groupby('aspect_term').filter(lambda x: len(x) > 10)
print(f'data_shape: {train_data.shape}')
# shuffle the dataframe
train_data = train_data.sample(frac=1, random_state=77).reset_index(drop=True)

# Text Cleaning
REPLACE_BY_SPACE_RE = re.compile('[/(){}\[\]\|@,;:$!]')
BAD_SYMBOLS_RE = re.compile('[^0-9a-z #+_?&]')
STOPWORDS = set(stopwords.words('english'))
single_chars = re.compile(r'\s+[a-zA-Z]\s+')

def clean_text(text: str)-> str:
    """
    Preprocesses text and returns a cleaned
    piece of text with unwanted characters removed
    
    Args:
       text: a string
    Returns: 
        Preprocessed text
    """
    text = str(text).lower() # lowercase text
    text = REPLACE_BY_SPACE_RE.sub(' ', text) # replace REPLACE_BY_SPACE_RE symbols by space in text
    text = BAD_SYMBOLS_RE.sub('', text) # delete symbols which are in BAD_SYMBOLS_RE from text
    text = ' '.join(word for word in text.split() if word not in STOPWORDS) # delete stopwords from text
    text = single_chars.sub('', text) #remove single-characters
    return text


def remove_URL(text: str)-> str:
    """
    Removes URL patterns from text.
    Args:
        `text`: A string, word/sentence
    Returns:
        Text without url patterns.
    """
    url = re.compile('https?://\S+|www\.\S+')
    text = url.sub('',text)
    return text


def extract_entity(text):
    """Extract entity name from aspect category."""
    attribute = re.compile('#[a-zA-Z0-9]+')
    underscore = re.compile('_[a-zA-Z0-9]+')
    entity = attribute.sub('', str(text).lower())
    entity = underscore.sub('', entity)

    return entity


def count_vectorize(data: str) -> [int]:
    """
    Create word vectors/tokens for input data
    Args:
        `data`: text to be vectorized
    """
    vectorizer = CountVectorizer(min_df=3, analyzer='word',
                                 ngram_range=(1, 3))
    vectors = vectorizer.fit_transform(data)

    return vectors, vectorizer


def rmse(y_true, y_pred):
    """Computing RMSE metric."""
    return np.sqrt(mean_squared_error(y_true, y_pred))


def make_dir(dir_name: str):
    """Creates a new directory in the current working directory."""
    save_dir = os.path.join('./', dir_name)
    if not os.path.exists(save_dir):
            os.mkdir(save_dir)
    else:
        print(f'{save_dir}: Already exists!') 
    return save_dir


# Splitting data into train and test sets
train_df = train_data.iloc[:2100]
test_df = train_data.iloc[len(train_df):]

train_df['text'] = train_df['text'].apply(clean_text)
train_df['text'] = train_df['text'].apply(remove_URL)

test_df['text'] = test_df['text'].apply(clean_text)
test_df['text'] = test_df['text'].apply(remove_URL)

train_df['aspect_term'] = train_df['aspect_term'].apply(clean_text)
test_df['aspect_term'] = test_df['aspect_term'].apply(clean_text)

print(f"\nsentiment tweet distribution: {train_df['polarity'].value_counts()}\n")

#Test-labels
polarity_labels_test = pd.get_dummies(test_df['polarity'].astype(str), dtype='int32')
aspect_labels_test = pd.get_dummies(test_df['aspect_term'], dtype='int32')

# Categorizing polarity and aspect_term labels (multi-label formulation)
# Train
train_df['polarity'] = train_df['polarity'].astype(str)

#remove nan class
train_df = train_df[train_df['aspect_term'] != 'nan']
test_df = test_df[test_df['aspect_term'] != 'nan']

# Class label Encoding
aspect_encoder = LabelEncoder()
train_df['aspect_term'] = aspect_encoder.fit_transform(train_df['aspect_term'])

polarity_encoder = LabelEncoder()
train_df['polarity'] = polarity_encoder.fit_transform(train_df['polarity'])


aspect_labels_train = pd.get_dummies(train_df['aspect_term'], dtype='int32')


print(f"\nNumber of unique aspect_terms: {train_df['aspect_term'].nunique()}\n")

print('\n========================Text Preprocessing=============================\n')

# Create train vectors
train_vectors, count_vectorizer = count_vectorize(train_df['text'])

## Map the tokens in the train vectors to the test set.
# i.e.the train and test vectors use the same set of tokens
test_vectors = count_vectorizer.transform(test_df['text'])

# split data into features and labels for training
y_aspect = train_df['aspect_term'] #aspect_term target

y_polarity = train_df['polarity'] #polarity target

# Building the models

def build_models():
    """Define ABSA ensemble predicition models.
    Returns:
        Absa_model: Aspect prediction model
        Polarity_model: Sentiment prediction model
    """
    absa_model = xgb.XGBRegressor(max_depth=5,
                              n_estimators=150,
                              classes=list(aspect_encoder.classes_),
                              colsample_bytree=0.9,
                              num_class=79,
                              objective='multi:softprob',
                              metric='auc',
                              nthread=2,
                              learning_rate=0.1,
                              random_state=77
                              )

    polarity_model = xgb.XGBRegressor(max_depth=5,
                              n_estimators=350,
                              classes=list(polarity_encoder.classes_),
                              colsample_bytree=0.9,
                              num_class=3,
                              objective='multi:softprob',
                              metric='auc',
                              nthread=2,
                              learning_rate=0.1,
                              random_state=77
                              )
    return absa_model, polarity_model

# training ABSA models
# Using stratifiedKfold to try balancing classes during training.
# This ensures that all classes are almost equally represented in the 
## training data thus eliminating bias towards one.

def train_absa_model(train_data, target, model, task='polarity'):
    """Train absa model.
    Args:
        word_vectors: train vectors.
        target: Variable to be predicted (label).
        model: predictive model.
    """
    print('===========================Training ABSA model=======================\n')
    stratified_kf = StratifiedKFold(8, shuffle=True, random_state=77)
    labels = target
    p_scores = []

    for i, (tr, val) in enumerate(stratified_kf.split(train_data, labels), 1):
        X_train, y_train = train_data[tr], np.take(labels, tr, axis=0)
        X_val, y_val = train_data[val], np.take(labels, val, axis=0)
        
        model.fit(X_train, y_train)
        val_preds = model.predict(X_val)
        score = log_loss(y_val, val_preds)
        p_scores.append(score)
        print(f'Fold-{i} log_loss: {score}')
    print(f'Mean_log_loss: {np.mean(p_scores)}')
    print('\n===============Saving trained model=============================\n')
    
    save_dir = make_dir('saved_models')
    # Save model
    if task=='absa':
        pickle.dump(model, open(save_dir+'/absa_model.pkl', 'wb'))
    else:
        pickle.dump(model, open(save_dir+'/polarity_model.pkl', 'wb'))
    
    return model


# Making predictions
def predict_on_test(test_data, model, task='absa'):
    """Make predictions on test data set with
    the model.
    Args:
        test_data: test vectors
        model: trained model
        type: polarity or absa predictions
    """
    predictions = model.predict(test_data)
    print(f'Predictions_shape: {predictions.shape}')
    
    #Test-data performance scores
    test_scores = []
    if task == 'polarity':
        for i in range(len(polarity_encoder.classes_)):
            loss =     log_loss(polarity_labels_test[polarity_encoder.classes_[i]], predictions[:,i]) 
            test_scores.append(loss)
        print(f'Mean Test_loss_{task}: {np.mean(test_scores)}')
    else:
        #print(f'Aspect classes: {aspect_encoder.classes_}')
        for i in range(len(aspect_encoder.classes_)):
            loss =     log_loss(aspect_labels_test[aspect_encoder.classes_[i]], predictions[:,i]) 
            test_scores.append(loss)
        print(f'Mean Test_loss_{task}: {np.mean(test_scores)}')

def inverse_predict(data: [str], tokenizer, model2):
    """Reverse numerical predictions for each word in a list from numeric
    value to text.
    """
    import yake
    extractor = yake.KeywordExtractor(lan='en',
                                      n=1,
                                      top=5
                                      )
    data = np.array(data)
    text = data.flatten().tolist()
    for i, c in enumerate(data):
        word_to_vec = tokenizer.transform(c)
        aspects = extractor.extract_keywords(text[i])
        # bar plots
        plt.bar(dict(list(aspects)).keys(), dict(list(aspects)).values())
        # Create the wordcloud object
        wordcloud = WordCloud(width=480, height=480, margin=0).generate(' '.join(list(dict(list(aspects)).keys())))
        # Display the generated image:
        plt.imshow(wordcloud, interpolation='bilinear')
        plt.axis("off")
        plt.margins(x=0, y=0)

        plt.show()
        plt.savefig(f'./wcloud_{i}.png')
        top_aspect_term = list(aspects)[-1][0]
        #predictions = aspect_encoder.inverse_transform(np.argmax(model1.predict(word_to_vec), 1))
        polarity_pred = polarity_encoder.inverse_transform(np.argmax(model2.predict(word_to_vec),1))

        print(f"The Review: {str(c[0])}_\n is expressing a {str(polarity_pred[0])} sentiment about {top_aspect_term}\n")

def get_data():
    """Run absa models on unseen data"""
    #import scraper
    test_data, _ = get_twitter_data()
    return test_data


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
data_shape: (2506, 3)

sentiment tweet distribution: positive    1184
negative     548
neutral      368
Name: polarity, dtype: int64


Number of unique aspect_terms: 79





In [None]:
test_data = get_data()

aspect_model, sentiment_model = build_models()
#absa_model = train_absa_model(train_vectors, y_aspect, aspect_model)

polarity_model = train_absa_model(train_vectors, y_polarity, sentiment_model, task='polarity')
print('==============Running sentiment predictions=================')
predict_on_test(test_vectors, polarity_model, task='polarity')

# Run model against scraped twitter data
inverse_predict(test_data, count_vectorizer, polarity_model)

In [None]:
### lstm.py
import os
import re
import numpy as np 
import pandas as pd
import tensorflow as tf
import warnings
import nltk

from nltk.corpus import stopwords
from matplotlib import pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, log_loss, mean_squared_error
from tensorflow.keras import layers
from tensorflow.keras import regularizers
from tensorflow.keras import backend as K

warnings.filterwarnings('ignore')
nltk.download('stopwords')

# Load the train data
train_data = pd.read_csv('./train.csv')
# shuffle the dataframe
train_data = train_data.sample(frac=1, random_state=77).reset_index(drop=True)

# Text Cleaning
REPLACE_BY_SPACE_RE = re.compile('[/(){}\[\]\|@,;:$!]')
BAD_SYMBOLS_RE = re.compile('[^0-9a-z #+_?&]')
STOPWORDS = set(stopwords.words('english'))
single_chars = re.compile(r'\s+[a-zA-Z]\s+')

def clean_text(text: str)-> str:
    """
    Preprocesses text and returns a cleaned
    piece of text with unwanted characters removed
    
    Args:
       text: a string
    Returns: 
        Preprocessed text
    """
    text = str(text).lower() # lowercase text
    text = REPLACE_BY_SPACE_RE.sub(' ', text) # replace REPLACE_BY_SPACE_RE symbols by space in text
    text = BAD_SYMBOLS_RE.sub('', text) # delete symbols which are in BAD_SYMBOLS_RE from text
    text = ' '.join(word for word in text.split() if word not in STOPWORDS) # delete stopwords from text
    text = single_chars.sub('', text) #remove single-characters
    return text

def remove_URL(text: str)-> str:
    """
    Removes URL patterns from text.
    Args:
        `text`: A string, word/sentence
    Returns:
        Text without url patterns.
    """
    url = re.compile('https?://\S+|www\.\S+')
    text = url.sub('',text)
    return text


# Splitting data into train and test sets
train_df = train_data.iloc[:2900]
test_df = train_data.iloc[len(train_df):]

train_df['text'] = train_df['text'].apply(clean_text)
train_df['text'] = train_df['text'].apply(remove_URL)

test_df['text'] = test_df['text'].apply(clean_text)
test_df['text'] = test_df['text'].apply(remove_URL)



# Categorizing polarity and aspect_term labels (multi-label formulation)
# Train
train_df['polarity'] = train_df['polarity'].astype(str)

#remove nan class
train_df = train_df[train_df['aspect_term'] != 'nan']
test_df = test_df[test_df['aspect_term'] != 'nan']


polarity_encoder = LabelEncoder()
train_df['polarity'] = polarity_encoder.fit_transform(train_df['polarity'])

#Test-labels
polarity_labels_test = pd.get_dummies(test_df['polarity'], dtype='int32')

print('\n========================Text Preprocessing=============================\n')
# text lengths
token_length = [len(x.split(" ")) for x in train_df['text']]
print(f'Max_token length: {max(token_length)}\n')


# split data into features and labels for training
X = train_df['text']
y_polarity = train_df['polarity'] #polarity target

# Text prepocessing
train_corpus = X.tolist()
test_corpus = test_df['text'].tolist()

# Tokenization
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(train_corpus)
vocab_size = len(tokenizer.word_counts)
print(f'\nTrain Vocabulary size: {vocab_size}\n')

# Sequence lengths (vocabulary size in a given sequence)
# Computing the vocabulary size per percentile
print('===========Analyzing the vocabulary size per percentile==============')
seq_lengths = np.array([len(s.split()) for s in train_corpus])
print(f'{[(p, np.percentile(seq_lengths, p)) for p in [75, 80, 90, 95, 99, 100]]}')

max_seqlen = 64

# Train encodings (words/sentences >> int) with padding
# Padding ensures that sequences are of the same length
train_encodings = tokenizer.texts_to_sequences(train_corpus)
train_encodings = tf.keras.preprocessing.sequence.pad_sequences(
    train_encodings, maxlen = max_seqlen)
polarity_labels = np.array(y_polarity)

# Creating a train dataset
aspect_dataset = tf.data.Dataset.from_tensor_slices(
    (train_encodings, polarity_labels))

# Test encodings with padding
test_encodings = tokenizer.texts_to_sequences(test_corpus)
test_encodings = tf.keras.preprocessing.sequence.pad_sequences(
    test_encodings, maxlen= max_seqlen)
test_labels = np.zeros_like(polarity_labels_test) # Predictions placeholder

# Test dataset
test_dataset = tf.data.Dataset.from_tensor_slices(
 (test_encodings, test_labels))

def encode(text):
    text_encodings = tokenizer.texts_to_sequences(text)
    text_encodings = tf.keras.preprocessing.sequence.pad_sequences(
    text_encodings, maxlen= max_seqlen)
    dataset = (tf.data.Dataset.from_tensor_slices((text_encodings)))
    return dataset

# Creating train and test batches

# Train-validation split and batch creation
aspect_dataset = aspect_dataset.shuffle(2000)

val_size = (len(train_corpus)) // 7
val_dataset_absa = aspect_dataset.take(val_size)

aspect_dataset = aspect_dataset.skip(val_size)

batch_size = 8
aspect_dataset = aspect_dataset.batch(batch_size).repeat()

val_dataset_absa = val_dataset_absa.batch(batch_size)
print(f'Validation_size: {val_size}')
print(f'Train_size: {len(train_corpus)}')

# test batch creation
test_batched = test_dataset.batch(batch_size)


# Building the model
# 
# The model consist of:
#      * An Embedding layer (to generate word embeddings)
#      A Bidirectional LSTM layer
#      1 hidden Dense layers with the `relu` activation function
#      An output Dense layer


def rmse(y_true, y_pred):
    """Computing the RMSE metric."""
    return K.sqrt(K.mean((K.square(y_pred - y_true))))



embedding_dim=64

# ABSA model
absa_model = tf.keras.Sequential([
    layers.Embedding(vocab_size+1, embedding_dim),
    layers.Bidirectional(
        layers.LSTM(max_seqlen, return_sequences=True)),
    layers.GlobalAveragePooling1D(),
    layers.Dropout(0.5),
    layers.Dense(32, activation='relu'),
    layers.Dropout(0.3),
    layers.Dense(3, activation='softmax')
])

absa_model.build(input_shape=(batch_size, max_seqlen))

absa_model.compile(optimizer=tf.keras.optimizers.RMSprop(learning_rate=2e-3),
              loss='mae' ,
              metrics=[rmse])


K.clear_session()
history = absa_model.fit(aspect_dataset,
                         epochs=8,
                         steps_per_epoch=300,
                         validation_data=val_dataset_absa,
                         verbose=1,
                         )

mean_score = list(history.history['val_rmse'])
print(mean_score)
loss = np.round(np.mean(mean_score), 2)
print(f'Train_RMSE: {loss}\n')

# Saving the model
def make_dir(dir_name: str):
    """Creates a new directory in the current working directory."""
    save_dir = os.path.join('./', dir_name)
    if not os.path.exists(save_dir):
            os.mkdir(save_dir)
    else:
        print(f'{save_dir}: Already exists!') 
    return save_dir

make_dir('./saved_models')
absa_model.save('./saved_models/sentiment_model.h5')

# Making predictions
predictions = absa_model.predict(test_batched)

print(f'Absa_predictions_shape: {predictions.shape}')

test_data_absa = test_df[['text']].copy()

# Test RMSE:
def _rmse(y_true, y_pred):
    """Computing RMSE metric (without keras backend)."""
    return np.sqrt(mean_squared_error(y_true, y_pred))

polarity_test_rmse = _rmse(predictions, polarity_labels_test.values)
print(f'\nTest_RMSE: {polarity_test_rmse}\n')

test_scores = []
for i in range(len(polarity_encoder.classes_)):
    loss = log_loss(polarity_labels_test[polarity_encoder.classes_[i]], predictions[:,i]) 
    test_scores.append(loss)
print(f'Mean Test_loss_polarity: {np.mean(test_scores)}')
