In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta --conf spark.cassandra.connection.host=cassandra pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, lower, col
from pyspark.ml.feature import StopWordsRemover
from collections import Counter

In [3]:
# Spark session & context
spark = SparkSession.builder.master("local") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.instances", "4") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "6") \
    .config("spark.cassandra.connection.host", "cassandra-1,cassandra-2,cassandra-3") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.connection.timeout_ms", "30000") \
    .config("spark.cassandra.query.retry.count", "10") \
    .config("spark.cassandra.connection.reconnection_delay_ms.min", "1000") \
    .config("spark.cassandra.connection.reconnection_delay_ms.max", "60000") \
    .getOrCreate()
    
sc = spark.sparkContext

In [4]:
%ls -la ./sparkdata

total 10666752
drwxrwxrwx 1 root   root          512 Apr 16 10:36 [0m[34;42m.[0m/
drwxrwxrwx 1 root   root          512 Apr 21 09:52 [34;42m..[0m/
-rwxrwxrwx 1 root   root  10915138215 Apr 14 06:31 [01;32men.openfoodfacts.org.products.csv[0m*
-rwxrwxrwx 1 root   root      7614057 Apr 21 07:43 [01;32mgoogleplaystore_user_reviews.csv[0m*
drwxr-xr-x 1 jovyan users         512 Apr 16 10:36 [01;34mkmeans_model[0m/


In [7]:
csv_path = "./sparkdata/googleplaystore_user_reviews.csv"

In [5]:
df = spark.read.format("org.apache.spark.sql.cassandra") \
    .options(table="googleplaystore_user_reviews", keyspace="mykeyspace") \
    .load()
    
df.cache().show(5, False)

+-----------+--------------------------------------------+---------+-------------------+----------------------+------------------------------------------------+
|indexcolumn|app                                         |sentiment|sentiment_polarity |sentiment_subjectivity|translated_review                               |
+-----------+--------------------------------------------+---------+-------------------+----------------------+------------------------------------------------+
|5715       |All Maths Formulas                          |Positive |1.0                |0.3                   |Best hardworking chaps                          |
|5874       |All Video Downloader 2018                   |Neutral  |0.0                |0.0                   |Just I want                                     |
|679        |2ndLine - Second Phone Number               |Positive |0.875              |0.6000000000000001    |Good business line! Now make wifi compatible    |
|9264       |Ascape VR: 360° Virtu

In [6]:
print("Number of rows:", df.count())

Number of rows: 9610


In [7]:
# Tokenize the reviews
df_words = df.withColumn("word", explode(split(lower(col("Translated_Review")), "\\W+")))

# Remove stop words
stop_words = set(StopWordsRemover.loadDefaultStopWords("english"))  # Default English stop words
df_filtered = df_words.filter(~df_words.word.isin(stop_words))

# Count word frequency
word_frequency = df_filtered.groupBy("word").count().orderBy(col("count").desc())

# Display the top keywords
word_frequency.show()

# If you want to get the results in Python
keywords_list = word_frequency.rdd.map(lambda row: (row.word, row.count)).collect()


+------+-----+
|  word|count|
+------+-----+
|      | 3831|
|  game| 1461|
|   app|  916|
|  good|  794|
|  like|  785|
| great|  778|
|  love|  745|
|   get|  717|
|  time|  711|
|update|  500|
|really|  497|
|   ads|  486|
|  even|  461|
|  play|  415|
|     m|  386|
|  work|  379|
| phone|  372|
|  easy|  362|
|  also|  360|
|  much|  354|
+------+-----+
only showing top 20 rows

