In [1]:
'''
Sentiment analysis, a common Natural Language Processing method, was applied to big data of Amazon reviews. 
It will process the large-scale datasets using Apache Spark's RDD to partition and distribute.
The pre-processing on the big data will be done using PySpark, and the in-depth processing will be done in Python.
Like real world situations, we will take the best of both worlds.
'''

In [2]:
# loading to train and test RDDs, creating union of train and test
train = sc.textFile("/FileStore/tables/train_ft-bfd00.txt", 2) 
test = sc.textFile("/FileStore/tables/test_ft-41af5.txt", 2)
df = train.union(test)

In [3]:
# first 2 train set rows
train.take(2)

In [4]:
# first 2 test set rows
test.take(2)

In [5]:
'''
remove \
('__level__1 or __level__2  review title : review sentences'  ,  'train1' or 'test1'
1 indicates bad, 2 indicates good
CAPS for emphasizing
'''

In [6]:
import time
time_start = time.time()

# cleaning to get label numbers and reviews 
df = df.map(lambda x: x.split('__label__')[1].split(" ", 1))
print("Size of dataset: ", df.count())
# counting the time
print('Split done! Time elapsed: {} seconds'.format(time.time()-time_start))
df.take(2)

In [7]:
# convert PipelineRDD to DataFrame
df1 = df.toDF(["Label","Review"])
df1.describe()
df1.show(2)

In [8]:
df1 = df1.dropna()

In [9]:
type(df1)

In [10]:
# label 2 = good , label 1 = bad
if df1["Label"]=="2":
  df1["Label"].replace("2", "good")
else:
  df1["Label"].replace("1", "bad")

In [11]:
from pyspark.sql.functions import lit
new_df = df1.withColumn('Label', lit("good"))

In [12]:
sentiment="good" if(df1[0]=="2") else "bad"

In [13]:
# compare the 2 labels
import seaborn as sns
sns.countplot(df1['Label'])

In [14]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline

# Tokenize the review 
tokenizer = Tokenizer(inputCol="Review", outputCol="review_words")
wordsDF = tokenizer.transform(df1)
wordsDF.show(2)

# Remove stop words
remover = StopWordsRemover(inputCol="review_words", outputCol="filtered")
wordsDF2 = remover.transform(wordsDF)
wordsDF2.show(2)

# Convert to TF words vector
hashingTF = HashingTF(inputCol="filtered", outputCol="TF")
wordsDF3 = hashingTF.transform(wordsDF2)
wordsDF3.show(2)

# Convert to IDF words vector, ensure to name the features as 'features'
idf = IDF(inputCol="TF", outputCol="features")
idfModel = idf.fit(wordsDF3)
wordsDF4 = idfModel.transform(wordsDF3)
wordsDF4.show(2)

# HashingTF in SparkML can't normalize term frequency with the total number of words in each document
for features_label in wordsDF3.select("TF", "label").take(1):
    print(features_label)

# IDF features
for features_label in wordsDF4.select("features", "label").take(1):
    print("\n",features_label)   

In [15]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Split data into training and testing set 
(training, test) = df1.randomSplit([0.7, 0.3])

# Create a logistic regression instance
lr = LogisticRegression(maxIter=10)

# Use a pipeline to chain all transformers and estimators
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idfModel, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 50]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3) 

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("Review", "Label", "probability", "prediction").take(5)
for row in selected:
    print(row)

# Evaluate result with ROC
evaluator = BinaryClassificationEvaluator(
    labelCol="Label", metricName="areaUnderROC")
ROC = evaluator.evaluate(prediction)
ROC