In [20]:
!pip install pyspark



In [21]:
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder \
    .appName("AmazonReviewsBigData") \
    .getOrCreate()


In [22]:
df = spark.read.csv("/content/Reviews.csv", header=True, inferSchema=True)
df.printSchema()
print(f"Total Rows: {df.count()}")


root
 |-- Id: integer (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)

Total Rows: 568454


In [23]:
# Show a few rows
df.show(5)

# Count distinct users & products
df.select("UserId").distinct().count()
df.select("ProductId").distinct().count()


+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|            

74258

In [27]:
from pyspark.sql.types import IntegerType, DoubleType

# Convert Score to integer
df = df.withColumn("Score", df["Score"].cast(IntegerType()))

# Average score overall
df.groupBy().avg("Score").show()

# Top 10 products by average rating
top_products = df.groupBy("ProductId") \
                 .avg("Score") \
                 .orderBy("avg(Score)", ascending=False) \
                 .limit(10)
top_products.show()


+-----------------+
|       avg(Score)|
+-----------------+
|4.176305349530591|
+-----------------+

+----------+------------------+
| ProductId|        avg(Score)|
+----------+------------------+
|B000X90P5I|              25.0|
|B004HN3ONQ|18.666666666666668|
|B001EPPGXG|              16.0|
|B003GUTELM|              14.5|
|B0001WYNDM| 8.333333333333334|
|B001E530FM|               8.3|
|B001E530FW|               8.3|
|B001SAQCNA|              8.25|
|B000EF3FR6| 7.428571428571429|
|B001R7VT7S| 7.055555555555555|
+----------+------------------+



In [28]:
df.write.mode("overwrite").parquet("amazon_reviews_parquet")