In [20]:
from pyspark.sql import SparkSession
from operator import add
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import corr, length, col, when
import time
import matplotlib.pyplot as plt
from pyspark.sql.functions import avg

#change spark.cores.max to change the total number of cores used. Keep executor cores set to 1 since the small vms have only one core having more here causes a ressource issue

spark_session = SparkSession\
        .builder\
        .master("spark://de-i-19:7077")\
        .appName("Project_Group19")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .config("spark.executor.cores", 1)\
        .config("spark.cores.max", 4)\
        .config("spark.executor.memory","1g")\
        .config("spark.shuffle.service.enabled", "false")\
        .config("spark.dynamicAllocation.enabled", "false")\
        .getOrCreate()




# RDD API
spark_context = spark_session.sparkContext

# spark_context.setLogLevel("INFO")
spark_context.setLogLevel("WARN")

In [21]:
from pyspark.sql import functions as F  

# Load Data

In [22]:
data = spark_session.read.csv("hdfs://192.168.2.246:9000/user/ubuntu/amazon_reviews_us_Baby_v1_00.tsv", sep="\t", header=True, inferSchema=True)
data.printSchema()



root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



                                                                                

In [23]:
data.show(20)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    9970739| R8EWA1OFT84NX|B00GSP5D94|     329991347|Summer Infant Swa...|            Baby|          5|            0|          0|   N|                Y|Great swaddled bl...|Loved these swadd...|2015-08-31 00:00:00|
|         US|   23538442|R2JWY4YRQD4FOP|B00YYDDZGU|     646108902|Pacifier Clip Gir...| 

In [24]:
data.withColumn('word', F.explode(F.split(F.col('review_body'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .show()

[Stage 9:>                                                          (0 + 1) / 1]

+----+-------+
|word|  count|
+----+-------+
| the|4995891|
| and|3342042|
|  to|3098469|
|   I|2968866|
|   a|2556351|
|    |2486371|
|  it|2243224|
|  is|1887722|
| for|1608897|
|  of|1352145|
|  in|1261456|
|this|1230274|
|  my|1096258|
|that|1015808|
|with| 903145|
|  on| 876415|
| was| 850969|
|have| 833948|
| but| 783985|
|  so| 670283|
+----+-------+
only showing top 20 rows



                                                                                

In [25]:
from pyspark.sql.functions import col, explode, split, lower

# Define a list of common words to exclude
common_words = ['the', 'and', 'to', 'i', 'a', 'it', 'is', 'for', 'of', 'in', 'this', 'my', 'that', 'with', 'on', 'was', 'have', 'but', 'so',
                'we', 'not', 'are', 'you', 'as', 'very', 'they', 'when', 'be', 'one', 'just', 'would', 'at', 'great', "it's", 'can', 'like', 'our',
                '', 'had', 'these', 'if', 'up', 'she', 'he', 'or', 'out', 'use', 'has', 'all', 'her', 'get', 'from', 'it', '/><br', 'it.', 'them',
                'because', 'the', 'a', 'an', 'and', 'but', 'or', 'if', 'while', 'as', 'of',
                'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into',
                'through', 'during', 'before', 'after', 'above', 'below', 'to', 
                'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under',
                'again', 'further', 'then', 'once', 'here', 'there', 'when', 
                'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 
                'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 
                'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 
                'can', 'will', 'just', 'don', 'should', 'now', 'your', 'really', 'much', 'do', 'also', 'his'
               
               ]

# Apply lower case to the review_body column to make the comparison case-insensitive
filtered_data = data.withColumn('word', explode(split(lower(col('review_body')), ' '))) \
                    .filter(~col('word').isin(common_words)) \
                    .groupBy('word') \
                    .count() \
                    .sort('count', ascending=False)

# Show the filtered DataFrame
filtered_data.show()


[Stage 12:>                                                         (0 + 1) / 1]

+--------+------+
|    word| count|
+--------+------+
|    baby|473765|
|    love|370658|
|  little|309892|
|    easy|306180|
|    seat|246934|
|  bought|227859|
|    good|217413|
|     old|217401|
|   don't|202966|
|     put|201772|
|     son|192947|
|    well|184945|
|   loves|181315|
|    even|179779|
|   still|172961|
|    were|170022|
|daughter|169954|
|   which|169636|
|    used|169121|
| product|167074|
+--------+------+
only showing top 20 rows



                                                                                

# Sentiment analysis
https://www.johnsnowlabs.com/sentiment-analysis-with-spark-nlp-without-machine-learning/

In [26]:
#spark_session.stop()

In [27]:
import sparknlp
spark = sparknlp.start() # start spark session


In [28]:
import nltk
nltk.download('opinion_lexicon')
nltk.download('punkt')

[nltk_data] Downloading package opinion_lexicon to
[nltk_data]     /home/ubuntu/nltk_data...
[nltk_data]   Package opinion_lexicon is already up-to-date!
[nltk_data] Downloading package punkt to /home/ubuntu/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [29]:
from nltk.corpus import opinion_lexicon

pos_list=set(opinion_lexicon.positive())
neg_list=set(opinion_lexicon.negative())

In [30]:
# Function to classify words
def classify_word(word):
    if word in pos_list:
        return 'positive'
    elif word in neg_list:
        return 'negative'
    else:
        return 'neutral'

# Example review_body
review_body = "The product is good and affordable, but the delivery was late and the packaging was damaged."

# Tokenize the review_body into words
words = nltk.word_tokenize(review_body)

# Classify each word in the review_body
word_sentiments = {word: classify_word(word) for word in words}

# Print the classification results
for word, sentiment in word_sentiments.items():
    print(f"{word}: {sentiment}")

The: neutral
product: neutral
is: neutral
good: positive
and: neutral
affordable: positive
,: neutral
but: neutral
the: neutral
delivery: neutral
was: neutral
late: neutral
packaging: neutral
damaged: negative
.: neutral


In [31]:
from nltk.corpus import opinion_lexicon
import nltk
from collections import Counter

# Download the opinion_lexicon if not already downloaded
nltk.download('opinion_lexicon')

# Get the positive and negative word lists from opinion_lexicon
pos_list = set(opinion_lexicon.positive())
neg_list = set(opinion_lexicon.negative())

# Function to classify the sentiment of a word
def classify_word(word):
    if word in pos_list:
        return 'positive'
    elif word in neg_list:
        return 'negative'
    else:
        return 'neutral'

# Function to classify the sentiment of a sentence
def classify_sentence(sentence):
    # Tokenize the sentence into words
    words = nltk.word_tokenize(sentence.lower())
    # Classify the sentiment of each word
    sentiments = [classify_word(word) for word in words]
    # Count the occurrences of each sentiment
    counts = Counter(sentiments)
    # Get the sentiment with the highest count
    majority_sentiment = max(counts, key=counts.get)
    return majority_sentiment

# Assuming 'df' is your DataFrame containing the 'review_body' column
sentiment_udf = F.udf(classify_sentence)

# Apply the sentiment classification UDF to the 'review_body' column
df_with_sentiment = data.withColumn('sentence_sentiment', sentiment_udf(data['review_body']))

# Show the DataFrame with the added column of sentence sentiment
df_with_sentiment.show(truncate=False)


[nltk_data] Downloading package opinion_lexicon to
[nltk_data]     /home/ubuntu/nltk_data...
[nltk_data]   Package opinion_lexicon is already up-to-date!


Py4JJavaError: An error occurred while calling o385.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:120)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1536)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:102)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues(FileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues$(FileFormat.scala:128)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:248)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:465)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:454)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:543)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute(EvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute$(EvalPythonExec.scala:87)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.doExecute(BatchEvalPythonExec.scala:34)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:340)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:473)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
