## Package and environement setting

In [1]:
# Packages loading
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, StringIndexer, Word2Vec, HashingTF, IDF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.sql.functions import col, split
from pyspark.sql.functions import udf, explode
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.ml import Pipeline
import numpy as np
import time

In [2]:
# Spark env setting
sc = SparkContext()
# spark = (SparkSession.builder.getOrCreate())

spark = (SparkSession.builder
    .appName("PLP Yelp")
    .enableHiveSupport()
    .config("spark.executor.memory","8G")
    .config("spark.driver.memory","18G")
    .config("spark.executor.cores","6")
    .getOrCreate())

## Data Loading

In [3]:
# Full data loading

start_time = time.time()
comments = spark.read.json("yelp_academic_dataset_review.json")
print("--- %s seconds ---" % (time.time() - start_time))

--- 22.28649401664734 seconds ---


In [None]:
# number of total comments
comments.count()

## 1st Data Pre-Processing

In [4]:
# reducing the dataset
start_time = time.time()
comments_string = comments.select('stars','text')
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.054257869720458984 seconds ---


In [5]:
# Sub-setting the initial dataset - 10% sampling

start_time = time.time()
comments_string = comments_string.sample(False,0.02,0)
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.013917922973632812 seconds ---


In [6]:
# Splitting text into words using regex matching

regexTokenizer = RegexTokenizer(inputCol="text", outputCol="text_cleaned", pattern="\\W")
start_time = time.time()
comments_words = regexTokenizer.transform(comments_string)
comments_words = comments_words.select('stars', 'text_cleaned')
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.13809585571289062 seconds ---


In [None]:
# another type of splitting

#start_time = time.time()
#comments_words = comments_string_clened.withColumn(
#        "text_cleaned",
#        split(col("text_cleaned"), " ").cast("array<string>").alias("ev")
# )
#print("--- %s seconds ---" % (time.time() - start_time))

In [7]:
# remove stop-words
start_time = time.time()
remover = StopWordsRemover(inputCol="text_cleaned", outputCol="words")
comments_words = remover.transform(comments_words)
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.2830619812011719 seconds ---


In [8]:
start_time = time.time()
comments_words = comments_words.select("stars","words")
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.010272026062011719 seconds ---


In [9]:
# Stemming : less words, more meaning

# Import stemmer library
from nltk.stem.porter import *

# Instantiate stemmer object
stemmer = PorterStemmer()

# Create stemmer python function
def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return out_vec

# Create user defined function for stemming with return type Array<String>
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

# Create new df with vectors containing the stemmed tokens 
start_time = time.time()
df_final = (
    comments_words
        .withColumn("words_stemmed", stemmer_udf("words"))
        .select("stars","words_stemmed")
  )
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.062242746353149414 seconds ---


In [None]:
# data pre-visualization
df_final.show(5)

## Method 1 : BoW + Bayesian Classifier

In [None]:
train0, test0 = df_final.randomSplit([0.8, 0.2])

# Apply Bag-of-words transformation
vectorizer = CountVectorizer(inputCol="words_stemmed", outputCol="bag_of_words")

# Convert string labels to floats
label_indexer = StringIndexer(inputCol="stars", outputCol="label_index")

# Creating Pipeline
pipeline0 = Pipeline(stages=[vectorizer, label_indexer])

# Applying Pipeline to intitial dataset
start_time = time.time()
pipelineFit0 = pipeline0.fit(train0)
train_bag_of_words = pipelineFit0.transform(train0)
test_bag_of_words = pipelineFit0.transform(test0)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
train_bag_of_words.show(5)

In [None]:
# Learn multiclass classifier on training data
start_time = time.time()
classifier = NaiveBayes(
    labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted"
)
classifier_transformer = classifier.fit(train_bag_of_words)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Predict labels on test data
start_time = time.time()
test_predicted = classifier_transformer.transform(test_bag_of_words)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Classifier evaluation
start_time = time.time()
evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index", predictionCol="label_index_predicted", metricName="accuracy"
)
accuracy = evaluator.evaluate(test_predicted)
print("--- %s seconds ---" % (time.time() - start_time))
print("Accuracy = {:.2f}".format(accuracy))

## Method 2 : TF-IDF + Logistic Regression

In [None]:
# Train and test split
train1, test1 = df_final.randomSplit([0.8, 0.2])

# Add HashingTF and IDF to transformation
hashingTF = HashingTF(inputCol="words_stemmed", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=3) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "stars", outputCol = "label")

# Creating Pipeline
pipeline1 = Pipeline(stages=[hashingTF, idf, label_stringIdx])

# Applying Pipeline to intitial dataset
start_time = time.time()
pipelineFit1 = pipeline1.fit(train1)
train_idf = pipelineFit1.transform(train1)
test_idf = pipelineFit1.transform(test1)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
dataset1.show(5)

In [None]:
# Build the model
lr1 = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
start_time = time.time()
lrModel1 = lr1.fit(train_idf)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Prediction on test
start_time = time.time()
predictions1 = lrModel1.transform(test_idf)
print("--- %s seconds ---" % (time.time() - start_time))

# Evaluation on test
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
start_time = time.time()
accuracy1 = evaluator.evaluate(predictions1)
print("--- %s seconds ---" % (time.time() - start_time))
print("Accuracy = {:.2f}".format(accuracy1))

## Method 3 : Word2Vec + Logistic Regression

In [None]:
# Train and test split
train2, test2 = df_final.randomSplit([0.8, 0.2])

# Word2Vec computation
word2Vec = Word2Vec(vectorSize=300, minCount=5, inputCol="words_stemmed", outputCol="result")
start_time = time.time()
model = word2Vec.fit(train2)
train_w2v = model.transform(train2)
test_w2v = model.transform(test2)
# dataset formatting
trainingData = train_w2v.select('stars','result')
testingData = test_w2v.select('stars','result')
trainingData = trainingData.withColumnRenamed("stars", "label").withColumnRenamed("result", "features")
testingData = testingData.withColumnRenamed("stars", "label").withColumnRenamed("result", "features")
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Build the model
lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
start_time = time.time()
lrModel = lr.fit(trainingData)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Prediction on Test
start_time = time.time()
predictions = lrModel.transform(testingData)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Evaluation on Test
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
start_time = time.time()
accuracy2 = evaluator.evaluate(predictions)
print("--- %s seconds ---" % (time.time() - start_time))
print("Accuracy = {:.2f}".format(accuracy2))

## Going Deep : TF-IDF + LSTM with Elephas

In [None]:
# Train and test split
train3, test3 = df_final.randomSplit([0.8, 0.2])

# Add HashingTF and IDF to transformation
hashingTF = HashingTF(inputCol="words_stemmed", outputCol="rawFeatures", numFeatures=300)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=3) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "stars", outputCol = "label")

# Creating Pipeline
pipeline3 = Pipeline(stages=[hashingTF, idf, label_stringIdx])

# Applying Pipeline to intitial dataset
start_time = time.time()
pipelineFit3 = pipeline3.fit(train3)
train_idf = pipelineFit3.transform(train3)
test_idf = pipelineFit3.transform(test3)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
train_idf = train_idf.select('features','label')

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

slen = udf(lambda s: len(set(s)), IntegerType())

df2 = train_idf.withColumn("features", slen(train_idf.features))
df2.show(5)

In [None]:
from keras.models import Sequential
from keras.layers import Embedding, LSTM, Dense, Activation

embed_dim  = 300  # word embedding dimension
nhid       = 32  # number of hidden units in the LSTM
vocab_size = 1000  # size of the vocabulary
nb_classes = 5

model = Sequential()
model.add(Embedding(vocab_size, embed_dim))
model.add(LSTM(nhid, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(nb_classes, activation='sigmoid'))
loss_classif     =  'categorical_crossentropy' 
optimizer        =  'adam' 
metrics_classif  =  ['accuracy']
model.compile(loss=loss_classif,
              optimizer=optimizer,
              metrics=metrics_classif)

In [None]:
from elephas.utils.rdd_utils import to_simple_rdd

rdd = train_idf.rdd.map(lambda x:x.stringFieldName.split(","))

In [None]:
from elephas.spark_model import SparkModel

spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)

## Going Deep :  without Spark... 

In [10]:
train, test = df_final.randomSplit([0.8, 0.2])
pandas_df_train = train.toPandas()
pandas_df_test = test.toPandas()

In [11]:
strings = pandas_df_train['words_stemmed'].tolist()

In [39]:
strings_test = pandas_df_test['words_stemmed'].tolist()

In [27]:
unique_data = set(x for l in strings for x in l)

In [40]:
data_train = [' '.join(sentence) for sentence in strings]
data_test = [' '.join(sentence) for sentence in strings_test]

In [45]:
import keras
from keras.preprocessing.text import text_to_word_sequence
from keras.utils.np_utils import to_categorical
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.text import one_hot
from keras.preprocessing import sequence

#config = tf.ConfigProto(device_count={"CPU": 6})
#keras.backend.tensorflow_backend.set_session(tf.Session(config=config))

In [42]:
vocab_size_train = len(unique_data)

tokenizer = Tokenizer(num_words=vocab_size_train)
tokenizer.fit_on_texts(data)

train_data = tokenizer.texts_to_sequences(data_train)
dev_data = tokenizer.texts_to_sequences(data_test)

In [61]:
maxlen = max([len(elem) for elem in train_data])

x_train = keras.preprocessing.sequence.pad_sequences(train_data, maxlen=maxlen, dtype='int32', 
                                                               padding='pre', truncating='pre', value=0.0)
x_val = keras.preprocessing.sequence.pad_sequences(dev_data, maxlen=maxlen, dtype='int32', 
                                                               padding='pre', truncating='pre', value=0.0)

trainy = pandas_df_train['stars'].tolist()
valy = pandas_df_test['stars'].tolist()

In [49]:
from keras.models import Sequential
from keras.layers import Embedding, LSTM, Dense, Activation

embed_dim  = x_train.shape[1]  # word embedding dimension
nhid       = 32  # number of hidden units in the LSTM
vocab_size = vocab_size_train  # size of the vocabulary
n_classes  = 5

model = Sequential()
model.add(Embedding(vocab_size, embed_dim))
model.add(LSTM(nhid, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(n_classes, activation='sigmoid'))


loss_classif     =  'categorical_crossentropy' # find the right loss for multi-class classification
optimizer        =  'adam' # find the right optimizer
metrics_classif  =  ['accuracy']

# Observe how easy (but blackboxed) this is in Keras
model.compile(loss=loss_classif,
              optimizer=optimizer,
              metrics=metrics_classif)
print(model.summary())

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_1 (Embedding)      (None, None, 669)         37624560  
_________________________________________________________________
lstm_1 (LSTM)                (None, 32)                89856     
_________________________________________________________________
dense_1 (Dense)              (None, 5)                 165       
Total params: 37,714,581.0
Trainable params: 37,714,581
Non-trainable params: 0.0
_________________________________________________________________
None


In [None]:
bs = 64
n_epochs = 4
y_train = to_categorical(y_train)
y_val = to_categorical(y_val)

history = model.fit(x_train, y_train, batch_size=bs, epochs=n_epochs, validation_data=(x_val, y_val))