In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, avg
from pyspark.sql.functions import to_date, date_format
import os

In [11]:
print(os.path.abspath("date/ReviewsCleaned.csv"))

d:\Master\Sem II\BigData\ProiectCod\date\ReviewsCleaned.csv


In [12]:
spark = SparkSession.builder \
    .appName("Review analysis") \
    .getOrCreate()

schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("ProductId", StringType(), True),
    StructField("UserId", StringType(), True),
    StructField("ProfileName", StringType(), True),
    StructField("HelpfulnessNumerator", IntegerType(), True),
    StructField("HelpfulnessDenominator", IntegerType(), True),
    StructField("Score", IntegerType(), True),  # this is key
    StructField("Summary", StringType(), True),
    StructField("Text", StringType(), True),
    StructField("ReadableTime", StringType(), True),
])

df = spark.read.csv("date/ReviewsCleaned.csv", header=True, schema=schema)


df.printSchema()
df.show(5)

root
 |-- Id: integer (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: integer (nullable = true)
 |-- HelpfulnessDenominator: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- ReadableTime: string (nullable = true)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+--------------------+--------------------+------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|             Summary|                Text|ReadableTime|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+--------------------+--------------------+------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|  

In [13]:
df.groupBy("UserId").count().orderBy("count", ascending=False).show(10)


+--------------+-----+
|        UserId|count|
+--------------+-----+
|A3OXHLG6DIBRW8|  448|
|A1YUL9PCJR3JTY|  421|
| AY12DBB0U420B|  389|
|A281NPSIMI1C2R|  365|
|A1Z54EM24Y40LL|  256|
|A1TMAVN4CEM8U8|  204|
|A2MUGFV2TDQ47K|  201|
|A3TVZM3ZIXG8YW|  199|
|A3PJZ8TU8FDQ1K|  178|
| AQQLWCMRNDFGI|  176|
+--------------+-----+
only showing top 10 rows



In [14]:
df.createOrReplaceTempView("reviews")

In [15]:

spark.sql("""
    SELECT ProfileName, COUNT(*) as num_reviews
    FROM reviews
    GROUP BY ProfileName
    ORDER BY num_reviews DESC
    LIMIT 5
""").show()

+--------------------+-----------+
|         ProfileName|num_reviews|
+--------------------+-----------+
|"C. F. Hill ""CFH"""|        451|
|"O. Brown ""Ms. O...|        421|
|       Gary Peterson|        389|
|"Rebecca of Amazo...|        365|
|               Chris|        363|
+--------------------+-----------+



In [16]:
df = df.withColumn("ReadableDate", to_date(col("ReadableTime")))

# Filtrare după 01-01-2011
df_filtered = df.filter(col("ReadableDate") > "2011-01-01")

# Afișare rezultate
df_filtered.show(truncate=False)

+---+----------+--------------+---------------------------------+--------------------+----------------------+-----+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------+
|Id |ProductId |UserId        |ProfileName                      |HelpfulnessNumerator|HelpfulnessDenominator|Score|Summary                   |Text                                                                                                                                                        

In [None]:
df.createOrReplaceTempView("reviews")

# Rulare interogarea
spark.sql("""
    SELECT *
    FROM reviews
    WHERE to_date(ReadableTime) > '2011-01-01'
""").show(truncate=False)

+---+----------+--------------+---------------------------------+--------------------+----------------------+-----+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------+
|Id |ProductId |UserId        |ProfileName                      |HelpfulnessNumerator|HelpfulnessDenominator|Score|Summary                   |Text                                                                                                                                                        