In [0]:
###################
#Sentiment Analysis of Online Amazon Reviews
###################


#Sentiment Analysis of Online Amazon Reviews




In [1]:
#import dependencies
!pip install pyspark
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd





Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/37/98/244399c0daa7894cdf387e7007d5e8b3710a79b67f3fd991c0b0b644822d/pyspark-2.4.3.tar.gz (215.6MB)
[K     |████████████████████████████████| 215.6MB 119kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 35.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/8d/20/f0/b30e2024226dc112e256930dd2cd4f06d00ab053c86278dcf3
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.3


**PySpark is compatible with Java 8, not Java 11. Below, we search and install an open-source version of Java 8. We then set Java 8 as the default java program for this notebook.**





In [5]:
# ! sudo apt search openjdk
# ! apt-get install openjdk-8-jdk-headless
# ! echo 2 | sudo update-alternatives --config javac
# ! sudo update-java-alternatives --set /usr/lib/jvm/java-1.8.0-openjdk-amd64

! sudo update-alternatives --config javac 


There is only one alternative in link group javac (providing /usr/bin/javac): /usr/lib/jvm/java-8-openjdk-amd64/bin/javac
Nothing to configure.


In [0]:
# Read in the training data from AWS, using PySpark

url = "https://s3-us-west-1.amazonaws.com/emansbucket/SentimentAnalysis/train.csv"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("train.csv"), inferSchema=True, sep=',')
df.show(10)

In [0]:
# Tokenize the words
tokenizer = Tokenizer(inputCol="comments", outputCol="token_words")
# tokenizer
# tokenizer id _fb45f637d262

In [0]:
# Transform and display DF with tokenizer output col. 
# Tokenizing means separating each word by commas within a list

# Transform and show DataFrame
tokenized = tokenizer.transform(df)
tokenized.show(truncate=True)



In [0]:
# Perform stop-words filtering

# Instantiate Remover
remover = StopWordsRemover(inputCol="token_words", outputCol="filtered")
# Transform and show data
remover.transform(tokenized).show(truncate=True)


tk_stpwrds_df = remover.transform(tokenized)

In [0]:
# Perform NLP hashing

hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,8))

# Transform into a spark DF
hashed_df = hashing.transform(tk_stpwrds_df)
#display new DF
hashed_df.show()

In [0]:
# Term freq- inverse document frequency.
# TF--count of each word in a document.
# IDF-- the more prevalent the word, the lower the IDF score

idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

# Display the DataFrame w/ original 'comments' and 'features' col
feat_df = rescaledData.select(["comments","token_words", "filtered", "hashedValues", "features"])

# Note, .show() will not return new DF . Only select the col

feat_df.dtypes

In [0]:
# Function on tokens using square brackets slicing method.
# Separate the comment label ('1' or '2') from the comment string itself

def array_chop(arry):
    fstring = arry[0]
    digit = fstring.replace(chr(34),'')
    # Note, in binary chr(34) is "
    
    return int(digit)

# Create a user defined function 
chop = udf(array_chop, IntegerType())

In [0]:
# Select the necessary columns, and do not truncate results
# One MUST label the output column 'label' to use Naive Bayes Model

final_df = feat_df.select("token_words", "features").withColumn("label", chop(col("token_words")))
final_df.show()

# Create a Naive Bayes model and fit the training data


In [0]:
# Instantiate the Naive Bayes classifier
# Note, the final spark DataFrame MUST have a 'label' and 'features' column 
nb = NaiveBayes()
# Fit the model onto our dataset
predictor = nb.fit(final_df)

# Now that we have trained our model, we can transform the model with our testing data


# Read in and pre-process the testing data. Then transform the model with it.


In [0]:
url1 = "https://s3-us-west-1.amazonaws.com/emansbucket/SentimentAnalysis/test.csv"
#sc = SparkContext('local')
spark = SparkSession(sc)
spark.sparkContext.addFile(url1)

df1 = spark.read.option("header", "true").csv(SparkFiles.get("test.csv"), inferSchema=True, sep=',')
df1.show(10)

In [0]:
# Tokenizer the words
tokenizer1 = Tokenizer(inputCol="comments", outputCol="token_words")
tokenizer1

In [0]:
# Transform and display DF with tokenizer output column
tokenized1 = tokenizer1.transform(df1)
tokenized1.show(truncate=True)

In [0]:
# Perform stop words filtering

# Instantiate Remover
remover1 = StopWordsRemover(inputCol="token_words", outputCol="filtered")
# Transform and show data
remover1.transform(tokenized1).show(truncate=True)
tk_stpwrds_df1 = remover1.transform(tokenized1)

In [0]:
# Perform NLP hashing

hashing1 = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,8))

# Transform into a DF
hashed_df1 = hashing1.transform(tk_stpwrds_df1)
#display new DF
hashed_df1.show()

In [0]:
# Term freq- inverse document frequency.
idf1 = IDF(inputCol="hashedValues", outputCol="features")
idfModel1 = idf1.fit(hashed_df1)
rescaledData1 = idfModel1.transform(hashed_df1)

# Display the DataFrame w/ original 'comments' col and 'features' col
feat_df1 = rescaledData1.select(["comments","token_words", "filtered", "hashedValues", "features"])

In [0]:
# Separate the comment label ('1' or '2') from the comment string itself using chop function
final_df1 = feat_df1.select("token_words", "features").withColumn("label", chop(col("token_words")))
final_df1.show()

# We have now pre-processed all of our data. 
# Below, we can transform the model with our testing data.

In [0]:
# Tranform the model with the testing data
test_results = predictor.transform(final_df1)
test_results.show(20)

In [0]:
# Evaluating our model's accuracy 
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

# Transform user input into the format necessary for the model.


In [0]:
input_string = "....................................."
raw_data = {'comments': [input_string]}

input_df = pd.DataFrame(raw_data, columns=['comments'])
input_df

In [0]:
# From google.colab import files

input_df.to_csv("user_input1.csv")
#files.download("user_input.csv")

In [0]:
path = "user_input1.csv"
#path = "/Users/emmanuelfabre/Desktop/sentiment_analysis/user_input.csv"
spark = SparkSession(sc)
spark.sparkContext.addFile(path)
spdf = spark.read.option('header', 'true').csv(SparkFiles.get("user_input1.csv"), inferSchema=True, sep=',')
spdf.show(10)

In [6]:
# Tokenize user input
tokenizer2 = Tokenizer(inputCol="comments", outputCol="token_words")
tokenizer2

# Transform and show DataFrame
tokenized2 = tokenizer2.transform(spdf)
tokenized2.show(truncate=True)

AttributeError: ignored

In [0]:
# Perform Stop Words Removal 

# Instantiate Remover
remover2 = StopWordsRemover(inputCol="token_words", outputCol="filtered")
# Transform and show data
remover2.transform(tokenized2).show(truncate=True)

tk_stpwrds_df2 = remover2.transform(tokenized2)


In [0]:
# Perform NLP hashing
hashing2 = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,8))

# Transform into a DF
hashed_df2 = hashing2.transform(tk_stpwrds_df2)
# Display new DF
hashed_df2.show()

In [0]:
# Term freq- inverse document frequency.

idf2 = IDF(inputCol="hashedValues", outputCol="features")
idfModel2 = idf2.fit(hashed_df2)
rescaledData2 = idfModel2.transform(hashed_df2)

feat_df2 = rescaledData2.select(["comments","token_words", "filtered", "hashedValues", "features"])



In [0]:
# Separate the comment label from the comment using chop function

final_df2 = feat_df2.select("token_words", "features").withColumn("label", chop(col("token_words")))
#final_df2 = feat_df2.select("token_words", "features").withColumn("label", "1")
final_df2.show()

In [0]:
# Transform the model onto the user input DF
input_results = predictor.transform(final_df2)
input_results.show(20)