In [1]:

import itertools
import re
from optparse import OptionParser

from bigdl.dataset import news20
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.util.common import Sample


In [2]:

action = "train"
batch_size = 128
embedding_dim = 50
max_epoch = 15
model_type = "cnn"
p = 0.0
sequence_len = 50
max_words = 1000
training_split = 0.8
    
redire_spark_logs()
show_bigdl_info_logs()
init_engine()


In [3]:

def text_to_words(review_text):
    letters_only = re.sub("[^a-zA-Z]", " ", review_text)
    words = letters_only.lower().split()
    return words


def analyze_texts(data_rdd):
    def index(w_c_i):
        ((w, c), i) = w_c_i
        return (w, (i + 1, c))
    return data_rdd.flatMap(lambda text_label: text_to_words(text_label[0])) \
        .map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) \
        .sortBy(lambda w_c: - w_c[1]).zipWithIndex() \
        .map(lambda w_c_i: index(w_c_i)).collect()


# pad([1, 2, 3, 4, 5], 0, 6)
def pad(l, fill_value, width):
    if len(l) >= width:
        return l[0: width]
    else:
        l.extend([fill_value] * (width - len(l)))
        return l


def to_vec(token, b_w2v, embedding_dim):
    if token in b_w2v:
        return b_w2v[token]
    else:
        return pad([], 0, embedding_dim)


def to_sample(vectors, label, embedding_dim):
    # flatten nested list
    flatten_features = list(itertools.chain(*vectors))
    features = np.array(flatten_features, dtype='float').reshape(
        [sequence_len, embedding_dim])

    if model_type.lower() == "cnn":
        features = features.transpose(1, 0)
    return Sample.from_ndarray(features, np.array(label))


def build_model(class_num):
    model = Sequential()

    if model_type.lower() == "cnn":
        model.add(Reshape([embedding_dim, 1, sequence_len]))
        model.add(SpatialConvolution(embedding_dim, 128, 5, 1))
        model.add(ReLU())
        model.add(SpatialMaxPooling(5, 1, 5, 1))
        model.add(SpatialConvolution(128, 128, 5, 1))
        model.add(ReLU())
        model.add(SpatialMaxPooling(5, 1, 5, 1))
        model.add(Reshape([128]))
    elif model_type.lower() == "lstm":
        model.add(Recurrent()
                  .add(LSTM(embedding_dim, 128, p)))
        model.add(Select(2, -1))
    elif model_type.lower() == "gru":
        model.add(Recurrent()
                  .add(GRU(embedding_dim, 128, p)))
        model.add(Select(2, -1))
    else:
        raise ValueError('model can only be cnn, lstm, or gru')

    model.add(Linear(128, 100))
    model.add(Linear(100, class_num))
    model.add(LogSoftMax())
    return model




In [4]:
print('Processing text dataset')
texts = news20.get_news20()
data_rdd = sc.parallelize(texts, 2)

word_to_ic = analyze_texts(data_rdd)

# Only take the top wc between [10, sequence_len]
word_to_ic = dict(word_to_ic[10: max_words])
bword_to_ic = sc.broadcast(word_to_ic)

w2v = news20.get_glove_w2v(dim=embedding_dim)
filtered_w2v = dict((w, v) for w, v in w2v.items() if w in word_to_ic)
bfiltered_w2v = sc.broadcast(filtered_w2v)

tokens_rdd = data_rdd.map(lambda text_label:
                            ([w for w in text_to_words(text_label[0]) if
                            w in bword_to_ic.value], text_label[1]))
padded_tokens_rdd = tokens_rdd.map(
    lambda tokens_label: (pad(tokens_label[0], "##", sequence_len), tokens_label[1]))
vector_rdd = padded_tokens_rdd.map(lambda tokens_label:
                                    ([to_vec(w, bfiltered_w2v.value,
                                            embedding_dim) for w in
                                     tokens_label[0]], tokens_label[1]))
sample_rdd = vector_rdd.map(
    lambda vectors_label: to_sample(vectors_label[0], vectors_label[1], embedding_dim))

train_rdd, val_rdd = sample_rdd.randomSplit(
    [training_split, 1-training_split])

optimizer = Optimizer(
    model=build_model(news20.CLASS_NUM),
    training_rdd=train_rdd,
    criterion=ClassNLLCriterion(),
    end_trigger=MaxEpoch(max_epoch),
    batch_size=batch_size,
    optim_method=Adagrad(learningrate=0.01, learningrate_decay=0.0002))

optimizer.set_validation(
    batch_size=batch_size,
    val_rdd=val_rdd,
    trigger=EveryEpoch(),
    val_method=[Top1Accuracy()]
)
train_model = optimizer.optimize()


Processing text dataset
Found 19997 texts.
('Downloading data from', 'http://nlp.stanford.edu/data/glove.6B.zip')
Extracting /tmp/news20/glove.6B.zip to /tmp/news20/glove.6B
creating: createSequential
creating: createReshape
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialMaxPooling
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialMaxPooling
creating: createReshape
creating: createLinear
creating: createLinear
creating: createLogSoftMax
creating: createClassNLLCriterion
creating: createMaxEpoch
creating: createAdagrad
creating: createOptimizer
creating: createEveryEpoch
creating: createTop1Accuracy


Traceback (most recent call last):
  File "/opt/conda/envs/py27/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/envs/py27/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/envs/py27/lib/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/envs/py27/lib/python2.7/SocketServer.py", line 652, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pyspark/serializers.py", line 577, in read_int
    raise EOFError
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.sen

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 51338)
----------------------------------------


Py4JError: An error occurred while calling o232.optimize