<h1 align="center">Twitter sentiment analysis with MLDP</h1>



In this tutorial we're going to cover a simple scenario on how to use the **machine learning data pipeline** (MLDP) module for **tweet polarity classification** (a.k.a. sentence-level polarity sentiment analysis). 

<h2> Sentiment analysis introduction </h2>


<img src="img/sentiment.jpg" alt="sentiment anaylsis" width="50%" style="float:right;margin:10px;margin-top:0px;"/>

Sentiment analysis is a very broad field, and to a great extent focuses on opinions extraction from text. For example, given product reviews, we might be interested in knowing how positive or negative they are. Alternatively, we might be interested in specifics of what users like or dislike.

Polarity sentiment analysis tackles classification of textual input (tweets, reviews, documents, etc) into polarity classes, which are in most cases negative, neutral, or positive. While can appear as a relatively simple task, polarity classification models have to overcome high complexity and variation of human language. For example, sorcasm and word ambiguity are still research open problems.  

Twitter as an online social platform has a vast amount of textual information useful for many business cases, e.g. trends detection, clients segmentation. On the other hand, short tweets with informal language introduce an extra complexity for machine learning (ML) models. Historically, researchers applied different rule based approaches to model text in such cases, for example by manually introducing feature functions and than using ML models to learn their significance. Nowadays, a more common approach is to learn both features and their significance automatically, for example by using a neural network. In this tutorial, we're going to be using the latter approach an use a recurrent neural model. 


<h2>Tweet polarity classification</h2>

In this tutorial, we will be building an end-to-end system as shown below. 

<img src="img/system.png" alt="complete system" width="70%"/>

To put it simple, the system inputs a tweet and outputs a distribution over positive, negative, and neutral polarities. 
Concretely, it will consist of two main components: 

* **Machine learning model** - a model that classifies tweets.
* **Data pipeline** - a pipeline cleans and prepares our data to be consumed by the model. Specifically, it reads tweets from a local storage and processes them in the online fashion to produce batches.  

The ML model implemented in Keras will be provided, namely we will be using an LSTM based model for text classification. And the main focus of the tutorial will be on the second component of the system, namely the **data pipeline**, which is implemented using the **MLDP** module. 

<h2>Tutorial's purpose</h2>

The purpose of this tutorial is to illustrate key features of the MLDP module, and show its real-case application. By the end of the tutorial we will:

* Have an end-to-end system for tweet polarity classification.
* Learn basic features of the module, such as preprocessing and processing phases. 
* Learn how to create vocabularies using the module, which are very common in NLP.
* Familiarize yourself with some basic steps available for out-of-the-box usage.
* Learn how to create automatically scrapped documentation of your pipelines for re-producibility of experiments. 


<h2>Requirements</h2>

The tutorial assumes that you've installed the ML data pipeline module, use Python 2.7, and the following modules:

* Keras==2.2.3
* nltk==3.3
* Theano==1.0.3

It can work for other versions, but without guarantees. 

<h2>Data</h2>

We're going to be using data from [SemEval-2017 Task 4](http://alt.qcri.org/semeval2017/task4/), namely the development English twitter dataset. The dataset is in the csv format, and has 3 implicit columns: ids, polarities, and tweets. An example of two tweets is shown below.

In [7]:
[['619950566786113536','neutral',"Picturehouse's, Pink Floyd's, 'Roger Waters: The Walll - opening 29 Sept is now making waves. Watch the trailer on Rolling Stone - look..."],	
['619969366986235905','neutral','Order Go Set a Watchman in store or through our website before Tuesday and get it half price! #GSAW @GSAWatchmanBook']]		

[['619950566786113536',
  'neutral',
  "Picturehouse's, Pink Floyd's, 'Roger Waters: The Walll - opening 29 Sept is now making waves. Watch the trailer on Rolling Stone - look..."],
 ['619969366986235905',
  'neutral',
  'Order Go Set a Watchman in store or through our website before Tuesday and get it half price! #GSAW @GSAWatchmanBook']]

The dataset contains **20632 tweets**, **38k unique words**, and **3 polarity classes**. The data is located at **data/tweets.csv**.

<h2>Model</h2>

Our model will be a recurrent neural tweet classifier as shown in the figure below. 

<img src="img/model.png" width="100%">

The model will input a processed tweet by the pipeline (a batch of tweets be be more precise) embed it using word embeddings, and run the LSTM to encode the tweet and produce a representation vector that captures semantics and syntactics. Finally, it will perform an affine transformation using a dense layer, and class scores normalization using softmax. The final output of the system is a distribution over positive, negative, and neutral classes.  

The model is already implemented using Keras, and can be directly used for tweet classification. Also, we will use a simple wrapper **interface** (*model/i_senti_lstm.py*) over the **model** (*model/senti_lstm.py*) in order to provide an additional logic for training and computation of accuracy. The train and test methods of the interface are shown below:

In [31]:
def train(self, **data_source_kwargs):
    """Trains the model for a single epoch."""
    itr = self.data_pipeline.iter(**data_source_kwargs)
    for counter, (tweets_batch, labels_batch) in enumerate(itr, 1):
        loss = self.model.train(tweets_batch, labels_batch)
        if counter % 100 == 0:
            print ("chunk's # %d loss: %f" % (counter, loss))

In [32]:
 def test(self, **data_source_kwargs):
        """Iterates over data batches, computes and prints accuracy [0, 1]."""
        correct = 0
        total = 0
        itr = self.data_pipeline.iter(**data_source_kwargs)
        for tweets_batch, labels_batch in itr:
            predictions = self.model.predict(tweets_batch)
            correct += np.sum(predictions == np.argmax(labels_batch, axis=1))
            total += len(tweets_batch)
        print "accuracy: %f" % (float(correct)/total)

Essentially, we need to provide the interface that feeds the model with an iterable **data_pipeline** over processed **tweet_batches** and **label_batches**:

* **tweets_batch** - an integer numpy array of size *batch_size* x *sequence_length*.
* **labels_batch** - an integer numpy array of size *batch_size* x *3*.

We're going to discuss specifics of raw data processing to obtain the desired batches shortly. 

<h2>Data Pipeline Introduction</h2>

A data pipeline can be understood as a sequence of steps that are applied to raw data that resides on a storage (remote or local) in order to produce formatted data batches a model requires for training and evaluation. Steps perform particular operations on data and feed subsequent steps with their output until the end of the pipeline is reached. 

The MLDP module allows to implement your steps in order to efficiently scale data processing on multi-core architecures, log its configuration for re-producibility, and permit re-usability of code across different machine learning projects. In our case, a more detailed setup is shown in the figure below.

<img src="img/architecture.png" width="100%">

First, we shall scatch required operations on the raw data stored on the local storage. Afterwards, we shall construct data pipelines based on those operations for the task at hand. 

<h3> Preprocessing </h3>

Preprocessing usually refers to the first phase operations, which are static by their nature, and can be performed only once before processing (the second phase) takes place. Some examples of preprocessing are listed below.

* Get a fresh dump of data from a remote location and save it on a local storage.
* Clean files with expensive regular expressions.
* Perform text enrichment (POS tagging, dependency parsing, etc).

In our case, the raw data file (*data/tweets.csv*) has **two undesired properties**:
1. No column names (i.e. no header).
2. Some tweets are wrapped in double quotes, which ideally should be removed not to confuse the model.

And those two issues can be solved once for a file and later a clean version of the file can be reused for model's training and evaluation. In other words, there is no need to re-execute the preprocessing step for every model's run.
It's especially useful when many experiments are conducted using the same model. 


For this task to be accomplished, we will be using a **preprocessing step**, which is implemented in **components/twitter_files_preprocessor.py**. The step has a simple caching, that allows to reuse a previous execution output. We're not going to dive into the details on how its implemented, but an interested reader is encouraged to check the implementation. 

<h3>Processing</h3> 

Once we have performed a preliminary preprcessing, we can perform processing, which is a sequence of steps that alter read data-chunks in order to produce a desired output.

As we discussed previously, for the task at hand, the final output should be tuples:

* **tweets_batch** - an integer numpy array of size *batch_size* x *sequence_length*.
* **labels_batch** - an integer numpy array of size *batch_size* x *3*.

Specifically, our tweets should be cleaned, tokenized, tokens converted to token ids, and be padded to ensure the same *sequence_length* (a requirement to run LSTM). Labels should be also converted to ids, and then to **one-hot vectors** (a Keras requirement). One-hot vectors, are vectors of all zeros except the component corresponding to the actual class's label (it's set to 1). 

A way to obtain the desired batches would be using the following processing pipeline's architecture:

<img src="img/dpp.png" width="100%">

**CSV Files Reader** - reads file(s) based on passed paths and produces raw data-chunks.

**Fields Selector** - selects only *tweet* and *label* fields, as other fields are not necessary for the task.

**Token Processor** - splits tweet strings into tokens, lower-cases tokens, and cleans them using regular expressions.

**Vocabulary Mapper** - maps both tweet tokens and labels to integer ids.

**Sequence Padder** - pads each tweets sequence with a special token id to assume the same sequence length over batch. 

**Formatter** - splits data-chunks into a tuple of tweet and label batches. Also, converts label ids to one-hot vectors. 

<h2>Data Pipelines Implementation</h2>

In [4]:
from mltoolkit.mldp.pipeline import Pipeline
from mltoolkit.mldp.steps.readers import CsvReader
from mltoolkit.mldp.steps.transformers import TokenProcessor,\
VocabMapper, FieldsSelector, Padder
from mltoolkit.mldp.utils.util_funcs.nlp.token_cleaning import twitter_text_cleaner
from mltoolkit.mldp.utils.util_classes import Vocabulary
from mltoolkit.mldp.utils.util_classes.vocabulary import PAD_TOKEN
from steps import TwitterFilesPreprocessor, FeaturesLabelsFormatter

In [5]:
from nltk.tokenize import TweetTokenizer

In [6]:
data_path = "data/tweets.csv"

<h3>Vocabularies Pipeline</h3>

First of all, we need to create vocabularies that will contain our mappings from token strings to ids and vice-versa. While one could create vocabulary classes that have an internal logic for files reading, but it becomes tricky to generalize to different file formats. In addition, the logic will have cover a possibility to process read data before creation of mappings (e.g. tokenization or tokens cleaning).

Instead, we have a **decaupled** vocabulary class from data parsing relying on a data-chunk provider, which in this case will be a data pipeline.

In [7]:
# paths where vocabs will be saved and later loaded from
words_vocab_file_path = "data/vocabs/words.txt"
labels_vocab_file_path = 'data/vocabs/labels.txt'

In [8]:
# creating step objects
twitter_tokenizer = TweetTokenizer()
preprocessor = TwitterFilesPreprocessor(input_cols_number=3, tweets_indx=2,
                                        add_header=['ids', 'labels', 'tweets'])
csv_reader = CsvReader(sep='\t', chunk_size=30)
fields_selector = FieldsSelector(field_names=["tweets", "labels"])
token_processor = TokenProcessor(field_names="tweets",
                                 tokenization_func=twitter_tokenizer.tokenize,
                                 token_cleaning_func=twitter_text_cleaner,
                                 lower_case=True)

In [9]:
# data pipeline for vocabularies creation
vocab_data_pipeline = Pipeline(reader=csv_reader,
                               preprocessor=preprocessor,
                               worker_processes_num=0, name_prefix="vocabs")
vocab_data_pipeline.add_step(fields_selector)
vocab_data_pipeline.add_step(token_processor)

In [14]:
# creating or loading vocabs
words_vocab = Vocabulary(vocab_data_pipeline, name_prefix="words")
words_vocab.load_or_create(words_vocab_file_path,
                           data_source={"data_path": data_path},
                           data_field_names="tweets")

labels_vocab = Vocabulary(vocab_data_pipeline, add_default_special_symbols=False,
                         name_prefix="labels")
labels_vocab.load_or_create(labels_vocab_file_path,
                            data_source={"data_path": data_path},
                            data_field_names="labels")

At this stage, we obtain vocabulary files for words and labels which are saved to **words_vocab_file_path** and **labels_vocab_file_path** respectively. Each file is a space separated token-count pair, and the file can be loaded next time the vocabulary is needed without re-computation.

Also, **words_vocab** and **labels_vocab** are objects containg symbols corresponding to str tokens, counts and ids. In other words, each unique token has a separate symbol, which can easily accessed using those objects. We will later use them to perform mapping from tokens to ids.

We also might be interested in the configurations/setups of our vocabularies in order log that information for experiments re-producibility.

In [15]:
print(words_vocab)


#########   Words Vocabulary   #########

  min_count: 1
  max_size: None
  sep: ' '
  encoding: 'utf-8'
  add_default_special_symbols: True
  special_symbols: (keys){<PAD>, <UNK>}
  vocab_size: 38388



In [16]:
print(labels_vocab)


########   Labels Vocabulary   #########

  min_count: 1
  max_size: None
  sep: ' '
  encoding: 'utf-8'
  add_default_special_symbols: False
  special_symbols: {}
  vocab_size: 3



We could also save/log information about the pipeline that was used for our vocabularies.

In [17]:
print(vocab_data_pipeline)


#################################################################
#                    VOCABS PIPELINE'S SETUP                    #
#################################################################

  worker_processes_number: 0

#################   TwitterFilesPreprocessor   ##################

  input_cols_number: 3
  output_folder: 'data/clean_tweets/'
  input_sep: '\t'
  output_sep: '\t'
  add_header: ['ids', 'labels', 'tweets']
  tweets_indx: 2
  encoding: 'utf-8'

######################   FieldsSelector   #######################

  field_names: ['tweets', 'labels']

######################   TokenProcessor   #######################

  field_names: ['tweets']
  tokenization_func: nltk.tokenize.casual.TweetTokenizer.tokenize
  token_cleaning_func: twitter_text_cleaner
  token_matching_func: None
  lower_case: True

#################################################################
#                                                               #
######################################

The print-out to a large extend is **scrapped automatically** from objects. However, each step could be enriched to produce a better signature by overriding a specific method (**get_signature_attrs**).

<h3>Training and Evaluation Pipeline</h3>

Once we have vocabulary objects, we can proceed creating the pipeline necessary for model's training and evaluation. Also, we will execute data parsing on a separate process by setting **worker_processes_num=1** in the pipeline. This configuration is especially useful if a model is trained on a GPU, so a CPU can be used in parallel for data-preparation by filling a buffer.

In [18]:
# extra steps for training and evaluation
mapper = VocabMapper(field_names_to_vocabs={"tweets": words_vocab,
                                            "labels": labels_vocab})
padder = Padder(field_names="tweets", pad_symbol=words_vocab[PAD_TOKEN].id)
formatter = FeaturesLabelsFormatter(features_field_name="tweets",
                                    labels_field_name="labels",
                                    classes_number=len(labels_vocab))

In [19]:
# building the actual pipeline
dev_data_pipeline = Pipeline(reader=csv_reader, preprocessor=preprocessor,
                             worker_processes_num=1, name_prefix="dev")
dev_data_pipeline.add_step(fields_selector)
dev_data_pipeline.add_step(token_processor)
dev_data_pipeline.add_step(mapper)
dev_data_pipeline.add_step(padder)
dev_data_pipeline.add_step(formatter)

In [20]:
print(dev_data_pipeline)


##############################################################
#                    DEV PIPELINE'S SETUP                    #
##############################################################

  worker_processes_number: 1
  output_buffer_size: 5

################   TwitterFilesPreprocessor   ################

  input_cols_number: 3
  output_folder: 'data/clean_tweets/'
  input_sep: '\t'
  output_sep: '\t'
  add_header: ['ids', 'labels', 'tweets']
  tweets_indx: 2
  encoding: 'utf-8'

#####################   FieldsSelector   #####################

  field_names: ['tweets', 'labels']

#####################   TokenProcessor   #####################

  field_names: ['tweets']
  tokenization_func: nltk.tokenize.casual.TweetTokenizer.tokenize
  token_cleaning_func: twitter_text_cleaner
  token_matching_func: None
  lower_case: True

######################   VocabMapper   #######################

  symbols_attr: 'id'
  field_names_to_vocabs: {tweets: Vocabulary({min_count: 1, max_size: None, sep

<h2> Combining all together </h2>

Now when we have all components, we're going to combine them to the end-to-end system that is trained and evaluated.

In [21]:
from model import ISentiLSTM

Using Theano backend.


In [14]:
epochs = 2

i_model = ISentiLSTM(dev_data_pipeline)
i_model.init_model(words_vocab_size=len(words_vocab), input_dim=50,
                   lstm_hidden_dim=120,
                   number_of_classes=len(labels_vocab),
                   mask_symbol=words_vocab[PAD_TOKEN].id)
print("testing before training")
i_model.test(data_path=data_path)
print("training the model")
for epoch in range(1, epochs+1):
    print "epoch %d" % epoch
    i_model.train(data_path=data_path)
    i_model.test(data_path=data_path)

testing before training
accuracy: 0.496897
training the model
epoch 1
chunk's # 100 loss: 0.985782
chunk's # 200 loss: 0.956783
chunk's # 300 loss: 0.878575
chunk's # 400 loss: 1.036046
chunk's # 500 loss: 0.834549
chunk's # 600 loss: 0.778580
accuracy: 0.732858
epoch 2
chunk's # 100 loss: 0.972646
chunk's # 200 loss: 0.655704
chunk's # 300 loss: 0.595417
chunk's # 400 loss: 0.747819
chunk's # 500 loss: 0.640200
chunk's # 600 loss: 0.473408
accuracy: 0.817137


The model should train for two epochs, and we should observe an improvement in terms of accuracy and loss. That indicates that all parts were combined correctly. Congrats :) 

<h2>Conclusion</h2>

In this tutorial we have covered how to create a data pipeline for a LSTM based sentiment analysis model. We covered some of its basic features, and processing steps that can be useful for data parsing. Hope you have enjoyed the tutorial, and will find the module useful for your own ML projects. 