## Imdb Dataset reviews classification using Keras model over Spark

## https://keras.io/datasets/#imdb-movie-reviews-sentiment-classification

IMDB Movie reviews sentiment classification

Dataset of 25,000 movies reviews from IMDB, labeled by sentiment (positive/negative). Reviews have been preprocessed, and each review is encoded as a sequence of word indexes (integers). For convenience, words are indexed by overall frequency in the dataset, so that for instance the integer "3" encodes the 3rd most frequent word in the data. This allows for quick filtering operations such as: "only consider the top 10,000 most common words, but eliminate the top 20 most common words".

As a convention, "0" does not stand for a specific word, but instead is used to encode any unknown word.

In [None]:
!pip install keras tensorflow elephas

In [1]:
from keras.datasets import imdb

(x_train, y_train), (x_test, y_test) = imdb.load_data(path="imdb.npz",
                                                      num_words=None,
                                                      skip_top=0,
                                                      maxlen=None,
                                                      seed=113,
                                                      start_char=1,
                                                      oov_char=2,
                                                      index_from=3)

Using TensorFlow backend.


## Train this kind of Tensorflow model
### https://github.com/tensorflow/models/tree/master/research/sentiment_analysis

## Explore imdb data

In [2]:
print(x_train.shape, y_train.shape, x_test.shape, y_test.shape)

(25000,) (25000,) (25000,) (25000,)


In [3]:
len(x_train[0]), len(x_train[1])

(218, 189)

In [4]:
x_train[0][0:10] # first ten frequency-integer-encoded terms

[1, 14, 22, 16, 43, 530, 973, 1622, 1385, 65]

In [5]:
y_train[0:3]

array([1, 0, 0])

## Utilities for decoding pre-encoded reviews

In [6]:
# A dictionary mapping words to an integer index
word_index = imdb.get_word_index()

# The first indices are reserved
word_index = {k:(v+3) for k,v in word_index.items()} 
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2  # unknown
word_index["<UNUSED>"] = 3

reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])

def decode_review(text):
    return ' '.join([reverse_word_index.get(i, '?') for i in text])

In [7]:
decode_review(x_train[0]), y_train[0]

("<START> this film was just brilliant casting location scenery story direction everyone's really suited the part they played and you could just imagine being there robert redford's is an amazing actor and now the same being director norman's father came from the same scottish island as myself so i loved the fact there was a real connection with this film the witty remarks throughout the film were great it was just brilliant so much that i bought the film as soon as it was released for retail and would recommend it to everyone to watch and the fly fishing was amazing really cried at the end it was so sad and you know what they say if you cry at a film it must have been good and this definitely was also congratulations to the two little boy's that played the part's of norman and paul they were just brilliant children are often left out of the praising list i think because the stars that play them all grown up are such a big profile for the whole film but these children are amazing and s

## Prepare the data

### https://www.tensorflow.org/tutorials/keras/basic_text_classification

We can pad the arrays so they all have the same length, then create an integer tensor of shape max_length * num_reviews. We can use an embedding layer capable of handling this shape as the first layer in our network.

In [8]:
import tensorflow as tf
import keras

In [9]:
seq_len = 256

# Since the movie reviews must be the same length, we will use the pad_sequences function to standardize the lengths:
x_train = keras.preprocessing.sequence.pad_sequences(x_train,
                                                        value=word_index["<PAD>"],
                                                        padding='post',
                                                        maxlen=seq_len)

x_test = keras.preprocessing.sequence.pad_sequences(x_test,
                                                        value=word_index["<PAD>"],
                                                        padding='post',
                                                        maxlen=seq_len)

In [10]:
len(x_train[0]), len(x_train[1])

(256, 256)

In [11]:
print(x_train[0]) # max value is 10000, ie: vocabulary size

[    1    14    22    16    43   530   973  1622  1385    65   458  4468
    66  3941     4   173    36   256     5    25   100    43   838   112
    50   670 22665     9    35   480   284     5   150     4   172   112
   167 21631   336   385    39     4   172  4536  1111    17   546    38
    13   447     4   192    50    16     6   147  2025    19    14    22
     4  1920  4613   469     4    22    71    87    12    16    43   530
    38    76    15    13  1247     4    22    17   515    17    12    16
   626    18 19193     5    62   386    12     8   316     8   106     5
     4  2223  5244    16   480    66  3785    33     4   130    12    16
    38   619     5    25   124    51    36   135    48    25  1415    33
     6    22    12   215    28    77    52     5    14   407    16    82
 10311     8     4   107   117  5952    15   256     4 31050     7  3766
     5   723    36    71    43   530   476    26   400   317    46     7
     4 12118  1029    13   104    88     4   381   

## Build a keras sequential model

In [12]:
max(list(reverse_word_index.keys()))

88587

In [13]:
# input shape is the vocabulary count used for the movie reviews (10,000 words)
vocab_size = max(list(reverse_word_index.keys())) # 10000 # FIXME ??

model = keras.Sequential()
model.add(keras.layers.Embedding(vocab_size, 16))
model.add(keras.layers.GlobalAveragePooling1D())
model.add(keras.layers.Dense(16, activation=tf.nn.relu))
model.add(keras.layers.Dense(1, activation=tf.nn.sigmoid))

model.summary()

Instructions for updating:
Colocations handled automatically by placer.
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_1 (Embedding)      (None, None, 16)          1417392   
_________________________________________________________________
global_average_pooling1d_1 ( (None, 16)                0         
_________________________________________________________________
dense_1 (Dense)              (None, 16)                272       
_________________________________________________________________
dense_2 (Dense)              (None, 1)                 17        
Total params: 1,417,681
Trainable params: 1,417,681
Non-trainable params: 0
_________________________________________________________________


The layers are stacked sequentially to build the classifier:

- The first layer is an Embedding layer. This layer takes the integer-encoded vocabulary and looks up the embedding vector for each word-index. These vectors are learned as the model trains. The vectors add a dimension to the output array. The resulting dimensions are: (batch, sequence, embedding).

- Next, a GlobalAveragePooling1D layer returns a fixed-length output vector for each example by averaging over the sequence dimension. This allows the model to handle input of variable length, in the simplest way possible.
This fixed-length output vector is piped through a fully-connected (Dense) layer with 16 hidden units.

- The last layer is densely connected with a single output node. Using the sigmoid activation function, this value is a float between 0 and 1, representing a probability, or confidence level.

## Loss function and Optimizer

A model needs a loss function and an optimizer for training. Since this is a binary classification problem and the model outputs a probability (a single-unit layer with a sigmoid activation), we'll use the binary_crossentropy loss function.

This isn't the only choice for a loss function, you could, for instance, choose mean_squared_error. But, generally, binary_crossentropy is better for dealing with probabilities—it measures the "distance" between probability distributions, or in our case, between the ground-truth distribution and the predictions.

In [14]:
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['acc'])

## Distribute the keras model over Spark
### https://towardsdatascience.com/how-to-train-your-neural-networks-in-parallel-with-keras-and-apache-spark-ea8a3f48cae6

In [15]:
import pyspark

spark = pyspark.sql.SparkSession.builder\
    .appName('imdb-sentiment')\
    .getOrCreate()

In [16]:
from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(spark._sc, x_train, y_train)



### Distributed Spark/Keras fit
No cross-validation in this example.

In [28]:
%%time
from elephas.spark_model import SparkModel

spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=100, batch_size=512, verbose=1, validation_split=0.1) # verbose: 0

>>> Fit model
 * Serving Flask app "elephas.parameter.server" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off


 * Running on http://172.19.0.2:4000/ (Press CTRL+C to quit)


>>> Initialize workers
>>> Distribute load


172.19.0.2 - - [07/Jun/2019 11:43:26] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:26] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:26] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:26] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:30] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:31] "[37mPOST /up

172.19.0.2 - - [07/Jun/2019 11:43:57] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:58] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:58] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:58] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:58] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:59] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:59] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:43:59] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:00] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:00] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:00] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:00] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:00] "[37mGET /parameter

172.19.0.2 - - [07/Jun/2019 11:44:27] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:27] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:28] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:28] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:28] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:28] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:28] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:28] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:29] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:29] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:30] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:30] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:30] "[37mPOST /update H

172.19.0.2 - - [07/Jun/2019 11:44:56] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:56] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:56] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:57] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:57] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:57] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:57] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:58] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:58] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:58] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:58] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:59] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:44:59] "[37mGET /parameter

172.19.0.2 - - [07/Jun/2019 11:45:25] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:25] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:26] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:26] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:26] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:26] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:27] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:27] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:28] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:28] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:28] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:28] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:28] "[37mPOST /update H

172.19.0.2 - - [07/Jun/2019 11:45:55] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:55] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:55] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:55] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:55] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:56] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:56] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:57] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:57] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:57] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:57] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:57] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:45:57] "[37mGET /parameter

172.19.0.2 - - [07/Jun/2019 11:46:24] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:24] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:24] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:25] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:25] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:25] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:26] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:26] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:27] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:27] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:27] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:27] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:27] "[37mPOST /update H

172.19.0.2 - - [07/Jun/2019 11:46:53] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:53] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:53] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:54] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:54] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:54] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:55] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:55] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:55] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:55] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:55] "[37mGET /parameters HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:56] "[37mPOST /update HTTP/1.1[0m" 200 -
172.19.0.2 - - [07/Jun/2019 11:46:56] "[37mGET /parameter

>>> Async training complete.
CPU times: user 1.74 s, sys: 562 ms, total: 2.31 s
Wall time: 3min 54s


In [29]:
# Your script can now be run using spark-submit:
#   spark-submit --driver-memory 1G ./your_script.py

### Distributed Spark/Keras predict

In [30]:
%%time
y_pred = spark_model.predict(x_test)
# you can now save or use y_pred.

CPU times: user 1.01 s, sys: 123 ms, total: 1.13 s
Wall time: 788 ms


In [32]:
results = model.evaluate(x_test, y_test)
print('Loss, Accuracy')
print(results)

Loss, Accuracy
[0.3535123248338699, 0.88124]


In [33]:
results = model.evaluate(x_train, y_train)
print('Loss, Accuracy')
print(results)

Loss, Accuracy
[0.08502692074835301, 0.97156]
