In [32]:
# installing Pyspark and its libraries
!apt-get update -y
!apt-get install openjdk-11 -y
!pip install pyspark==3.3.2 findspark


0% [Working]            Hit:1 https://cli.github.com/packages stable InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 3,632 B in 1s (3,087 B/s)
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
E: Unable to locate pac

In [33]:
# importing sparksession to establish spark environment
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Google Play Store Reviews_Task1") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print("Spark version:", spark.version)





Spark version: 3.3.2


In [34]:
# Load dataset into PySpark DataFrame

csv_path = "/content/googleplaystore_user_reviews.csv"

df = spark.read.csv(
    csv_path,
    header=True,
    inferSchema=True
)

df.printSchema()
df.show(5, truncate=100)


root
 |-- App: string (nullable = true)
 |-- Translated_Review: string (nullable = true)
 |-- Sentiment: string (nullable = true)
 |-- Sentiment_Polarity: string (nullable = true)
 |-- Sentiment_Subjectivity: string (nullable = true)

+---------------------+--------------------------------------------------------------------------------------------+-----------------------------------+------------------+----------------------+
|                  App|                                                                           Translated_Review|                          Sentiment|Sentiment_Polarity|Sentiment_Subjectivity|
+---------------------+--------------------------------------------------------------------------------------------+-----------------------------------+------------------+----------------------+
|10 Best Foods for You|"I like eat delicious food. That's I'm cooking food myself, case ""10 Best Foods"" helps lot| also ""Best Before (Shelf Life)"""|          Positive|         

In [35]:
# changing data types to the data columns
from pyspark.sql.functions import col

df = df.withColumn(
    "Sentiment_Polarity",
    col("Sentiment_Polarity").cast("double")
).withColumn(
    "Sentiment_Subjectivity",
    col("Sentiment_Subjectivity").cast("double")
)

df.printSchema()


root
 |-- App: string (nullable = true)
 |-- Translated_Review: string (nullable = true)
 |-- Sentiment: string (nullable = true)
 |-- Sentiment_Polarity: double (nullable = true)
 |-- Sentiment_Subjectivity: double (nullable = true)



In [36]:
# Check missing (NULL) values in each column
from pyspark.sql.functions import count, when

df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
]).show()


+---+-----------------+---------+------------------+----------------------+
|App|Translated_Review|Sentiment|Sentiment_Polarity|Sentiment_Subjectivity|
+---+-----------------+---------+------------------+----------------------+
|  0|                5|        0|               648|                   326|
+---+-----------------+---------+------------------+----------------------+



In [37]:
# Remove rows with missing review or sentiment
df = df.filter(
    col("Translated_Review").isNotNull() &
    col("Sentiment").isNotNull()
)


In [38]:
# Total number of reviews (basic big-data operation)
df.count()


64290

In [39]:
# Sentiment distribution (groupBy + count)
df.groupBy("Sentiment").count().show()


+--------------------+-----+
|           Sentiment|count|
+--------------------+-----+
| especially consi...|    3|
| I pleased I year...|    1|
| WHY DOES IT KEEP...|    1|
| despiste game go...|    2|
| instead I must s...|    1|
| would believable...|    1|
|       teach I learn|    2|
| fast forward (so...|    2|
| & cellphone prov...|    2|
| I lost PAID FOR ...|    1|
| WAY TOO MANY ads...|    3|
|                guns|    1|
| said happen time...|    1|
| I figure worth l...|    2|
| different passwo...|    1|
| there's function...|    1|
|              log in|    1|
| feed Facebook se...|    1|
|           direction|    1|
| I reset password...|    1|
+--------------------+-----+
only showing top 20 rows



In [40]:
#App-wise review count
df.groupBy("App") \
  .count() \
  .orderBy(col("count").desc()) \
  .show(30)


+--------------------+-----+
|                 App|count|
+--------------------+-----+
| Angry Birds Classic|  320|
|CBS Sports App - ...|  320|
|          Bowmasters|  320|
|          Helix Jump|  300|
|         8 Ball Pool|  300|
|      Bubble Shooter|  260|
|Calorie Counter -...|  259|
|DEAD TARGET: FPS ...|  240|
|    Candy Crush Saga|  240|
|Duolingo: Learn L...|  240|
|    Garena Free Fire|  240|
|                ESPN|  240|
|              Granny|  220|
|   Hill Climb Racing|  220|
|            Hangouts|  220|
|        Block Puzzle|  220|
|       Google Photos|  220|
|    Farm Heroes Saga|  220|
|           Flow Free|  220|
|    Alto's Adventure|  200|
|Calorie Counter b...|  200|
|Calorie Counter -...|  200|
|10 Best Foods for...|  200|
|Calorie Counter -...|  200|
|Bleacher Report: ...|  200|
|             Agar.io|  200|
|        Clash Royale|  180|
|     Amazon Shopping|  180|
| DRAGON BALL LEGENDS|  180|
|Episode - Choose ...|  180|
+--------------------+-----+
only showing t

In [41]:
# checking count of sentiment polarity values
from pyspark.sql.functions import isnan, col, count, when

df.select(
    count(when(col("Sentiment_Polarity").isNull(), True)).alias("Null_Count"),
    count(when(isnan(col("Sentiment_Polarity")), True)).alias("NaN_Count")
).show()


+----------+---------+
|Null_Count|NaN_Count|
+----------+---------+
|       648|    26863|
+----------+---------+



In [42]:
# Filter valid sentiment polarity values (fix NaN issue)
df_valid = df.filter(
    col("Sentiment_Polarity").isNotNull() &
    (~isnan(col("Sentiment_Polarity")))
)
#App-wise average sentiment polarity
df_valid.groupBy("App") \
    .avg("Sentiment_Polarity") \
    .orderBy(col("avg(Sentiment_Polarity)").desc()) \
    .show(10)


+--------------------+-----------------------+
|                 App|avg(Sentiment_Polarity)|
+--------------------+-----------------------+
|            HomeWork|                    1.0|
|       Google Slides|     0.9333333333333332|
|Daily Workouts - ...|                    0.8|
|Bed Time Fan - Wh...|                0.78125|
|Cameringo Lite. F...|     0.7702690972222221|
|       Google Primer|                   0.75|
|        GPS Map Free|                    0.7|
|GPS Speedometer a...|                 0.6875|
|Best Ovulation Tr...|              0.5953125|
|3D Live Neon Weed...|     0.5681818181818181|
+--------------------+-----------------------+
only showing top 10 rows



In [43]:
# Repartition dataset to increase parallelism
df = df.repartition(8)

# Print number of partitions
print("Number of partitions:", df.rdd.getNumPartitions())


Number of partitions: 8
