In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, min, max, length, from_unixtime, date_format


In [2]:
spark = SparkSession.builder.appName("tensoriot-Assignment1").getOrCreate()


23/10/25 21:46:36 WARN Utils: Your hostname, Rapidos-MacBook-Pro-85.local resolves to a loopback address: 127.0.0.1; using 172.20.10.12 instead (on interface en0)
23/10/25 21:46:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/25 21:46:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#Assignment1

In [4]:
#parse and load only required cols from json data
schema = StructType([
    StructField('asin', StringType(), True),
    StructField('overall', DoubleType(), True), 
    StructField('reviewText', StringType(), True),
    StructField('reviewerID', StringType(), True),
    StructField('unixReviewTime', LongType(), True)
])

In [5]:
#input path can be input params to job 
#reviews.json is just chunk of data 
file_path = "/Users/mac/tensoriot/reviews.json"
reviews_df = spark.read.schema(schema).json(file_path)

#dropping rows which having null values ie data cleansing
reviews_no_null_df = reviews_df.na.drop()


In [6]:
reviews_df.count()

                                                                                

4469063

In [7]:
reviews_df.show()

+----------+-------+--------------------+--------------+--------------+
|      asin|overall|          reviewText|    reviewerID|unixReviewTime|
+----------+-------+--------------------+--------------+--------------+
|B017O9P72A|    2.0|Can only control ...|A3KUPJ396OQF78|    1449273600|
|B017O9P72A|    5.0|         Great skill|A3TXR8GLKS19RE|    1515974400|
|B017O9P72A|    1.0|Not happy. Can no...|A1FOHYK23FJ6CN|    1515024000|
|B017O9P72A|    1.0|Can not connect a...|A1RRDX9AOST1AN|    1514592000|
|B017O9P72A|    1.0|The service works...| AA4DHYT5YSSIT|    1514505600|
|B017O9P72A|    5.0|I haven't had any...|A2LNJJWW2TLL00|    1511481600|
|B017O9P72A|    1.0|This app forces y...|A1M5UIUZ2P5FFO|    1511395200|
|B017O9P72A|    4.0|This skill works ...|A18JLUE8V4G7TX|    1508371200|
|B017O9P72A|    1.0|Who would think y...|A1E35W9YOVAH1G|    1505260800|
|B017O9P72A|    5.0|Once I got used t...| ACZKH8ZBMNTN0|    1490659200|
|B017O9P72A|    3.0|So far I have no ...|A3IZ10SN6PLWA1|    1489

In [8]:
total_reviews_df = reviews_no_null_df.groupBy("asin").agg(count("reviewerID").alias("total_reviews"))
min_reviews = total_reviews_df.select(min("total_reviews")).first()[0]
max_reviews = total_reviews_df.select(max("total_reviews")).first()[0]

                                                                                

In [9]:
items_with_least_reviews_df = total_reviews_df.filter(total_reviews_df["total_reviews"] ==min_reviews )
items_with_most_reviews_df = total_reviews_df.filter(total_reviews_df["total_reviews"] ==max_reviews )
items_with_most_reviews_df.show()



+----------+-------------+
|      asin|total_reviews|
+----------+-------------+
|B00CYQP3AK|        18623|
+----------+-------------+



                                                                                

In [10]:
review_length_df = reviews_no_null_df.withColumn("review_length", length(reviews_no_null_df["reviewText"]))
max_review_length = review_length_df.select(max("review_length")).first()[0]
items_with_max_review_length_df = review_length_df.filter(review_length_df["review_length"] == max_review_length )
items_with_max_review_length_df.show()



+----------+-------+--------------------+-------------+--------------+-------------+
|      asin|overall|          reviewText|   reviewerID|unixReviewTime|review_length|
+----------+-------+--------------------+-------------+--------------+-------------+
|B00BHJS3C0|    5.0|I was an early ad...|AQIJ3ZEEVCIKU|    1383782400|        33608|
+----------+-------+--------------------+-------------+--------------+-------------+



                                                                                

In [11]:
formatted_date_df = reviews_no_null_df.withColumn("formatted_date", date_format(from_unixtime("unixReviewTime"), "MM-dd-yyyy"))
formatted_date_df.show()

+----------+-------+--------------------+--------------+--------------+--------------+
|      asin|overall|          reviewText|    reviewerID|unixReviewTime|formatted_date|
+----------+-------+--------------------+--------------+--------------+--------------+
|B017O9P72A|    2.0|Can only control ...|A3KUPJ396OQF78|    1449273600|    12-05-2015|
|B017O9P72A|    5.0|         Great skill|A3TXR8GLKS19RE|    1515974400|    01-15-2018|
|B017O9P72A|    1.0|Not happy. Can no...|A1FOHYK23FJ6CN|    1515024000|    01-04-2018|
|B017O9P72A|    1.0|Can not connect a...|A1RRDX9AOST1AN|    1514592000|    12-30-2017|
|B017O9P72A|    1.0|The service works...| AA4DHYT5YSSIT|    1514505600|    12-29-2017|
|B017O9P72A|    5.0|I haven't had any...|A2LNJJWW2TLL00|    1511481600|    11-24-2017|
|B017O9P72A|    1.0|This app forces y...|A1M5UIUZ2P5FFO|    1511395200|    11-23-2017|
|B017O9P72A|    4.0|This skill works ...|A18JLUE8V4G7TX|    1508371200|    10-19-2017|
|B017O9P72A|    1.0|Who would think y...|A1

In [12]:
#output path can be input params to job
parquet_output_path = "/Users/mac/tensoriot/output/output.parquet"

# Save the DataFrame in Parquet format
formatted_date_df.write.mode("overwrite").parquet(parquet_output_path)

23/10/25 21:47:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/25 21:47:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/10/25 21:47:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/10/25 21:47:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/10/25 21:47:32 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/10/25 21:47:32 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/10/25 21:47:32 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,01