In [1]:
import sparknlp
import logging
sparknlp.start()
import numpy as np

from sparknlp import *
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.regression import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from sklearn.ensemble import VotingClassifier
from pyspark.sql.types import DoubleType



In [2]:
spark = SparkSession.builder.appName('Spark-Sentiment').getOrCreate()
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org.apache.spark.scheduler").setLevel(logger.Level.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)
spark.sparkContext.setLogLevel("ERROR")

In [3]:
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "path/to/your/credentials.json")

In [4]:
# Read the CSV file from GCS
df = spark.read.format("csv").option("header", "true").\
                option("delimiter", "\t").load("gs://msca-bdp-student-gcs/Group4_Project_Data/amazon_reviews_us_Music_v1_00.tsv")
df = df.dropna()

##place to use k-means
df = df.withColumn("star_rating",df.star_rating.cast('int'))
df = df.withColumn('sentiment', when(col('star_rating') <= 3, 'negative').otherwise('positive'))

                                                                                

In [6]:
df.select("review_body").show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+
|         review_body|
+--------------------+
|Love this CD alon...|
|This is the album...|
|  Excellent / thanks|
|Nice variety of c...|
|Purchased as a gi...|
|Really enjoyed th...|
|            Great CD|
|       Excellent CD!|
|                nice|
|Outstanding music...|
|      quite relaxing|
|I love this CD.  ...|
|Rhiannon Giddens ...|
|Wrecking Ball is ...|
|I was pleased wit...|
|The music that in...|
|   Excellent CD&#62;|
|Love Or Money by ...|
|      Just splendid!|
|One of my favorit...|
+--------------------+
only showing top 20 rows



                                                                                

In [7]:
#pipeline
tokenizer=Tokenizer(inputCol="review_body", outputCol="review_body_words")
remover = StopWordsRemover(inputCol="review_body_words", outputCol="review_body_words_filtered")
hashingTF = HashingTF(inputCol="review_body_words_filtered", outputCol="hashingTF_features")
idf = IDF(inputCol="hashingTF_features", outputCol="idf_features")
labelIndexer = StringIndexer(inputCol="sentiment", outputCol="sentiment_label")

pipeline = Pipeline(stages=[tokenizer,remover,hashingTF,idf,labelIndexer])

In [6]:
preprocessed_df=pipeline.fit(df).transform(df)

                                                                                

In [7]:
countVectorizer = CountVectorizer(inputCol="review_body_words_filtered", outputCol="raw_features")

In [8]:
cv_model=countVectorizer.fit(preprocessed_df)

                                                                                

In [9]:
data=cv_model.transform(preprocessed_df)

In [10]:
data=data.withColumnRenamed("raw_features","features")

In [None]:
from pyspark.ml.clustering import LDA

num_topics = 10
max_iterations = 20

lda = LDA(k=num_topics, maxIter=max_iterations,featuresCol='features')
lda_model = lda.fit(data)



                                                                                

In [None]:
topics=lda_model.describeTopics(maxTermsPerTopic=10)

In [13]:
from pyspark.ml.util import MLWritable

bucket_name = "msca-bdp-student-gcs"
model_folder_path = "Group4_Project_Data/models/LDA_model"

model_gcs_path = f"gs://{bucket_name}/{model_folder_path}"
lda_model.write().overwrite().save(model_gcs_path)

                                                                                

In [13]:
#topics.show()

In [14]:
for topic in topics.collect():
    print("Topic {}: {}".format(topic[0], ", ".join([cv_model.vocabulary[i] for i in topic[1]])))

Topic 0: /><br, -, album, one, like, song, songs, great, &, track
Topic 1: , one, album, like, music, songs, great, cd, sound, first
Topic 2: , album, great, like, music, one, best, songs, rock, hip
Topic 3: , ok, album, like, songs, music, good, sound, new, one
Topic 4: , album, one, like, music, cd, songs, song, -, love
Topic 5: <br, album, like, , />, song, one, songs, good, really
Topic 6: de, la, , que, y, en, el, es, los, un
Topic 7: , /><br, music, one, -, recording, first, sound, also, like
Topic 8: cd, , great, love, music, songs, one, cd., good, it.
Topic 9: album, great, , like, cd, songs, one, best, /><br, music


In [16]:
# Read the CSV file from GCS
df2 = spark.read.format("csv").option("header", "true").\
                option("delimiter", "\t").load("gs://msca-bdp-student-gcs/Group4_Project_Data/amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv")
df2 = df2.dropna()

##place to use k-means
df2 = df2.withColumn("star_rating",df2.star_rating.cast('int'))
df2 = df2.withColumn('sentiment', when(col('star_rating') <= 3, 'negative').otherwise('positive'))

In [17]:
preprocessed_df_2=pipeline.fit(df2).transform(df2)

                                                                                

In [18]:
cv_model_2=countVectorizer.fit(preprocessed_df_2)

                                                                                

In [19]:
data2=cv_model_2.transform(preprocessed_df_2)
data2=data2.withColumnRenamed("raw_features","features")

In [20]:
predictions=lda_model.transform(data2)

In [42]:
predictions.show(1)

[Stage 89:>                                                         (0 + 1) / 1]

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+---------------+--------------------+-----------+---------+--------------------+--------------------------+--------------------+--------------------+---------------+--------------------+--------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|         review_body|review_date|sentiment|   review_body_words|review_body_words_filtered|  hashingTF_features|        idf_features|sentiment_label|            features|   topicDistribution|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+---------------+--------------------+-----------+---------+------------------

                                                                                

In [21]:
# show the predicted topics for the new data
predictions.select("review_body", "topicDistribution").show(1)

[Stage 82:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|         review_body|   topicDistribution|
+--------------------+--------------------+
|Great  rendition....|[0.01320158602967...|
+--------------------+--------------------+
only showing top 1 row



                                                                                

In [22]:
topics=lda_model.describeTopics(maxTermsPerTopic=3)

In [23]:
# extract the top term indices for each topic
top_terms = topics.select("termIndices").collect()

In [24]:


# show the top terms for each predicted topic
for i, row in enumerate(predictions.select("review_body", "topicDistribution").collect()):
    if i>=10:
        break
    print("Review {}: {}".format(i, row.review_body))
    for j in range(len(top_terms)):
        print("Top terms for Topic {}: {}".format(j, [cv_model_2.vocabulary[int(row.topicDistribution.argmax())]]))



ERROR:root:Exception while sending command.                                     
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("An

Review 0: Great  rendition. Great  song


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:40321)
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3442, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_10627/3028070542.py", line 7, in <module>
    print("Top terms for Topic {}: {}"

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:40321)