In [1]:
%pip install -q python-dotenv

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import configparser
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import split, explode, lower, col
from pyspark.ml.feature import StopWordsRemover
from dotenv import load_dotenv

In [4]:
TABLE = "user_reviews"
spark_config_apth = 'conf/spark.ini'

In [5]:
config = configparser.ConfigParser()
config.optionxform=str
config.read(spark_config_apth)
list(config['spark'].items())

[('spark.master', 'local[*]'),
 ('spark.driver.memory', '2g'),
 ('spark.executor.memory', '1g'),
 ('spark.executor.instances', '2'),
 ('spark.executor.cores', '2'),
 ('spark.dynamicAllocation.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.dynamicAllocation.maxExecutors', '5'),
 ('spark.sql.execution.arrow.pyspark.enabled', 'true')]

In [6]:
conf = SparkConf()
conf.setAll(list(config['spark'].items()))

<pyspark.conf.SparkConf at 0x7f0085078290>

In [7]:
spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()
    
sc = spark.sparkContext

In [9]:
path = './sparkdata/googleplaystore_user_reviews.csv'

In [10]:
df = spark.read \
    .options(delimiter=",", header=True) \
    .csv(path)
    
df.cache().show(5, False)

+-----------+---------------------+------------------------------------------------------------------------------------------------------------------------------+---------+------------------+----------------------+
|IndexColumn|App                  |Translated_Review                                                                                                             |Sentiment|Sentiment_Polarity|Sentiment_Subjectivity|
+-----------+---------------------+------------------------------------------------------------------------------------------------------------------------------+---------+------------------+----------------------+
|0          |10 Best Foods for You|"I like eat delicious food. That's I'm cooking food myself case ""10 Best Foods"" helps lot also ""Best Before (Shelf Life)"""|Positive |1.0               |0.5333333333333333    |
|1          |10 Best Foods for You|This help eating healthy exercise regular basis                                                          

In [13]:
df.unpersist()

DataFrame[indexcolumn: string, app: string, sentiment: string, sentiment_polarity: string, sentiment_subjectivity: string, translated_review: string]

In [11]:
df.rdd.getNumPartitions()

2

In [12]:
print("Number of rows:", df.count())

Number of rows: 64295


In [13]:
# Tokenize the reviews
df_words = df.withColumn("word", explode(split(lower(col("Translated_Review")), "\\W+")))

# Remove stop words
stop_words = set(StopWordsRemover.loadDefaultStopWords("english"))  # Default English stop words
df_filtered = df_words.filter(~df_words.word.isin(stop_words))

# Count word frequency
word_frequency = df_filtered.groupBy("word").count().orderBy(col("count").desc())

# Display the top keywords
word_frequency.show()

# If you want to get the results in Python
keywords_list = word_frequency.rdd.map(lambda row: (row.word, row.count)).collect()


+------+-----+
|  word|count|
+------+-----+
|      |23013|
|  game| 9384|
|  like| 5498|
|  good| 5271|
|   app| 4998|
| great| 4810|
|   get| 4726|
|  love| 4681|
|  time| 4538|
|really| 3096|
|  even| 2883|
|   ads| 2661|
|     m| 2564|
|update| 2536|
|  play| 2479|
| phone| 2439|
|please| 2397|
|  work| 2349|
|  also| 2339|
|  much| 2305|
+------+-----+
only showing top 20 rows



In [14]:
spark.stop()