### Creating spark session

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1588617440254_0009,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7f24ad644b00>

### Importing library

In [2]:
from pyspark.sql import functions as F

In [3]:
# Load Data Set
df = spark.read\
          .option("header", "true")\
          .option("inferSchema", "true")\
          .option("basePath", "hdfs:///hive/amazon-reviews-pds/parquet/")\
          .parquet("hdfs:///hive/amazon-reviews-pds/parquet/*")

In [4]:
# Checking the schema of dataframe:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)

In [4]:
#Considering reviews after 2004 only:
df1 = df.filter(F.col("year")>2004)

In [6]:
df1.count()

71519024

In [6]:
df1.show(1)

+-----------+-----------+--------------+----------+--------------+-------------------+-----------+-------------+-----------+----+-----------------+---------------+--------------------+-----------+----+--------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|      product_title|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|         review_body|review_date|year|    product_category|
+-----------+-----------+--------------+----------+--------------+-------------------+-----------+-------------+-----------+----+-----------------+---------------+--------------------+-----------+----+--------------------+
|         FR|   26387317|R1UP1BZDZMYRQ7|B005R4HUZM|     648585293|Le Rouge et le Noir|          5|            0|          1|   N|                Y|        parfait|a relire pour se ...| 2014-04-09|2014|Digital_Ebook_Pur...|
+-----------+-----------+--------------+----------+--------------+-------------------+-----------+----------

#### Excluding multiple reviews by the same users for the same product. In the case the same user has reviewed particular product more than once, excluding all reviews following the ﬁrst review. First review will remain as part of the analysis

In [5]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
 
df12 = df1.select("*",F.row_number().over(Window.partitionBy("product_category",'customer_id', 'product_id').orderBy(df['review_date'])))

In [8]:
df12.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+----------------+-----------------------------------------------------------------------------------------------------------------------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|year|product_category|row_number() OVER (PARTITION BY product_category, customer_id, product_id ORDER BY review_date ASC NULLS FIRST unspecifiedframe$())|
+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+----------------+--------------------------------------------------------------

In [6]:
df12=df12.withColumnRenamed("row_number() OVER (PARTITION BY product_category, customer_id, product_id ORDER BY review_date ASC NULLS FIRST unspecifiedframe$())", "row_num")

In [7]:
# The count of rows which we will be removing 
df12.where(F.col("row_num")>1).count()

5607955

In [7]:
# Unique rows
df2=df12.where(F.col("row_num")==1)

In [10]:
df2.show(1)

+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+----------------+-------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|year|product_category|row_num|
+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+----------------+-------+
|         US|   10000770|R1MKP6SU9MAHWT|1589976533|     398545867|Wait No More: One...|          5|            0|          0|   N|                N|Heartwarming and ...|Kelly and John's ...| 2011-10-30|2011|           Books|      1|
+-----------+-----------+--------------+----------+--------------+--

1. Explore the dataset and provide analysis by product-category and year: 
1. Number of reviews 
2. Number of users 
3. Average and Median review stars 
4. Percentiles of length of the review. Use the following percentiles: [0.1, 0.25, 0.5, 0.75, 0.9, 0.95] 
5. Percentiles for number of reviews per product. For example, 10% of books got 5 or less reviews. Use the following percentiles: [0.1, 0.25, 0.5, 0.75, 0.9, 0.95] 6. Identify week number (each year has 52 weeks) for each year and product category with most positive reviews (4 and 5 star).

## Exploratory Data Analysis

In [9]:
#Number of reviews
df2.count()

65911069

In [10]:
#1.Number of reviews product_category and year wise
df2.groupby("product_category","year")\
    .agg(F.count("review_id"))\
    .sort("year")\
    .show(80,truncate = False)

+----------------------+----+----------------+
|product_category      |year|count(review_id)|
+----------------------+----+----------------+
|PC                    |2005|18166           |
|Digital_Ebook_Purchase|2005|19              |
|Books                 |2005|521047          |
|Digital_Video_Download|2005|12              |
|Video_DVD             |2005|189265          |
|Wireless              |2005|11835           |
|Video_DVD             |2006|197551          |
|PC                    |2006|26291           |
|Wireless              |2006|19857           |
|Digital_Ebook_Purchase|2006|36              |
|Books                 |2006|568401          |
|Digital_Video_Download|2006|185             |
|PC                    |2007|59890           |
|Digital_Ebook_Purchase|2007|508             |
|Digital_Video_Download|2007|2597            |
|Books                 |2007|761037          |
|Video_DVD             |2007|271324          |
|Wireless              |2007|47738           |
|Wireless    

In [11]:
#2.Number of users product_category and year wise
df2.groupby("product_category","year")\
    .agg(F.countDistinct("customer_id").alias("Count of Users"))\
    .sort("year")\
    .show(80,truncate = False)

+----------------------+----+--------------+
|product_category      |year|Count of Users|
+----------------------+----+--------------+
|Wireless              |2005|10585         |
|PC                    |2005|15781         |
|Digital_Ebook_Purchase|2005|17            |
|Books                 |2005|290588        |
|Digital_Video_Download|2005|6             |
|Video_DVD             |2005|95199         |
|PC                    |2006|23177         |
|Wireless              |2006|17984         |
|Video_DVD             |2006|105659        |
|Digital_Ebook_Purchase|2006|33            |
|Digital_Video_Download|2006|154           |
|Books                 |2006|317358        |
|Digital_Video_Download|2007|2027          |
|Digital_Ebook_Purchase|2007|407           |
|PC                    |2007|51028         |
|Books                 |2007|420721        |
|Video_DVD             |2007|147934        |
|Wireless              |2007|42100         |
|Books                 |2008|459247        |
|Video_DVD

In [16]:
#3.Average and Median review stars product_category wise
df2.groupBy('product_category',"year")\
    .agg(F.round(F.avg("star_rating"),2).alias("avg_rating"),F.expr('percentile_approx(star_rating, 0.5)').alias('med_rating'))\
    .sort("year")\
    .show(80,truncate = False)

+----------------------+----+----------+----------+
|product_category      |year|avg_rating|med_rating|
+----------------------+----+----------+----------+
|PC                    |2005|3.62      |4         |
|Books                 |2005|4.15      |5         |
|Digital_Video_Download|2005|3.75      |4         |
|Digital_Ebook_Purchase|2005|3.58      |4         |
|Wireless              |2005|3.41      |4         |
|Video_DVD             |2005|4.0       |5         |
|PC                    |2006|3.72      |4         |
|Digital_Ebook_Purchase|2006|4.03      |5         |
|Video_DVD             |2006|4.08      |5         |
|Wireless              |2006|3.51      |4         |
|Digital_Video_Download|2006|3.63      |4         |
|Books                 |2006|4.2       |5         |
|Wireless              |2007|3.76      |4         |
|Video_DVD             |2007|4.16      |5         |
|PC                    |2007|3.94      |5         |
|Books                 |2007|4.26      |5         |
|Digital_Ebo

#### 4. Percentiles of length of the review. Use the following percentiles: [0.1, 0.25, 0.5, 0.75, 0.9, 0.95] 


In [9]:
from pyspark.sql.functions import length
df3=df2.select("*",length("review_body"))
df4=df3.withColumnRenamed("length(review_body)", "review_len")

[64.0, 130.0, 220.0, 812.0, 41200.0, 51019.0]

In [10]:
df4.groupby("product_category","year").agg(F.round(F.expr('percentile_approx(Review_Len, 0.1)')).alias('%1'),\
                                           F.round(F.expr('percentile_approx(Review_Len, 0.25)')).alias('%25'),\
                                           F.round(F.expr('percentile_approx(Review_Len, 0.50)')).alias('%50'),\
                                           F.round(F.expr('percentile_approx(Review_Len, 0.75)')).alias('%75'),\
                                           F.round(F.expr('percentile_approx(Review_Len, 0.75)')).alias('%90'),\
                                           F.round(F.expr('percentile_approx(Review_Len, 0.75)')).alias('%95'))\
.sort("product_category","year").show(80,truncate=False)

+----------------------+----+---+---+----+----+----+----+
|product_category      |year|%1 |%25|%50 |%75 |%90 |%95 |
+----------------------+----+---+---+----+----+----+----+
|Books                 |2005|208|367|674 |1286|1286|1286|
|Books                 |2006|189|339|628 |1222|1222|1222|
|Books                 |2007|144|258|499 |1005|1005|1005|
|Books                 |2008|150|267|521 |1054|1054|1054|
|Books                 |2009|138|249|505 |1058|1058|1058|
|Books                 |2010|140|251|519 |1115|1115|1115|
|Books                 |2011|157|259|523 |1133|1133|1133|
|Books                 |2012|137|208|433 |985 |985 |985 |
|Books                 |2013|117|145|244 |558 |558 |558 |
|Books                 |2014|21 |71 |161 |386 |386 |386 |
|Books                 |2015|12 |35 |100 |307 |307 |307 |
|Digital_Ebook_Purchase|2005|139|305|2468|3489|3489|3489|
|Digital_Ebook_Purchase|2006|179|249|900 |2330|2330|2330|
|Digital_Ebook_Purchase|2007|127|244|502 |1181|1181|1181|
|Digital_Ebook

#### Q5 Percentiles for number of reviews per product. For example, 10% of books got 5 or less reviews. Use the following percentiles: [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]

In [11]:
df4.groupBy('product_category')\
    .agg(F.expr('percentile_approx(Review_Len, array(0.1, 0.25, 0.5, 0.75, 0.9, 0.95))').alias('quantile'))\
    .show(80,truncate = False)

+----------------------+-------------------------------+
|product_category      |quantile                       |
+----------------------+-------------------------------+
|PC                    |[28, 95, 171, 372, 764, 1172]  |
|Wireless              |[21, 70, 141, 281, 558, 851]   |
|Digital_Video_Download|[13, 38, 113, 190, 374, 586]   |
|Digital_Ebook_Purchase|[59, 119, 188, 397, 868, 1410] |
|Books                 |[52, 133, 297, 728, 1584, 2492]|
|Mobile_Apps           |[23, 82, 121, 171, 278, 392]   |
|Video_DVD             |[28, 107, 202, 531, 1290, 2112]|
+----------------------+-------------------------------+

In [9]:
import pyspark.sql.functions as F
df_Q5 = df2.groupBy("product_category","year","product_title").agg(F.count("review_id").alias('review_count'))
df_Q5.groupby("product_category","year").agg(F.round(F.expr('percentile_approx(review_count, 0.1)')).alias('%1'),\
                                           F.round(F.expr('percentile_approx(review_count, 0.25)')).alias('%25'),\
                                           F.round(F.expr('percentile_approx(review_count, 0.50)')).alias('%50'),\
                                           F.round(F.expr('percentile_approx(review_count, 0.75)')).alias('%75'),\
                                           F.round(F.expr('percentile_approx(review_count, 0.75)')).alias('%90'),\
                                           F.round(F.expr('percentile_approx(review_count, 0.75)')).alias('%95'))\
.sort("product_category","year").show(80,truncate=False)

+----------------------+----+---+---+---+---+---+---+
|product_category      |year|%1 |%25|%50|%75|%90|%95|
+----------------------+----+---+---+---+---+---+---+
|Books                 |2005|1  |1  |1  |2  |2  |2  |
|Books                 |2006|1  |1  |1  |2  |2  |2  |
|Books                 |2007|1  |1  |1  |2  |2  |2  |
|Books                 |2008|1  |1  |1  |2  |2  |2  |
|Books                 |2009|1  |1  |1  |2  |2  |2  |
|Books                 |2010|1  |1  |1  |2  |2  |2  |
|Books                 |2011|1  |1  |1  |2  |2  |2  |
|Books                 |2012|1  |1  |1  |2  |2  |2  |
|Books                 |2013|1  |1  |1  |3  |3  |3  |
|Books                 |2014|1  |1  |1  |3  |3  |3  |
|Books                 |2015|1  |1  |1  |2  |2  |2  |
|Digital_Ebook_Purchase|2005|1  |1  |1  |1  |1  |1  |
|Digital_Ebook_Purchase|2006|1  |1  |1  |1  |1  |1  |
|Digital_Ebook_Purchase|2007|1  |1  |1  |1  |1  |1  |
|Digital_Ebook_Purchase|2008|1  |1  |1  |1  |1  |1  |
|Digital_Ebook_Purchase|2009

##### 6.Identify week number (each year has 52 weeks) for each year and product category with most positive reviews (4 and 5 star). 

In [8]:
#df4 is a dataframe with coloumns "product_category","year","review_date",'star_rating' and 'week'
df4= df2.select("product_category","year","review_date",'star_rating').withColumn('week',F.weekofyear(df2.review_date))\
        .where(F.column("star_rating").isin([4,5]))

In [None]:
df4.count()

In [10]:
df4.show(5,truncate=False)

+----------------+----+-----------+-----------+----+
|product_category|year|review_date|star_rating|week|
+----------------+----+-----------+-----------+----+
|Books           |2011|2011-10-30 |5          |43  |
|Books           |2014|2014-05-02 |5          |18  |
|Books           |2008|2008-03-24 |5          |13  |
|Books           |2010|2010-05-08 |5          |18  |
|Books           |2013|2013-08-12 |5          |33  |
+----------------+----+-----------+-----------+----+
only showing top 5 rows

In [9]:
df6 = df4.groupby("product_category","year","week")\
   .agg(F.count("star_rating").alias("rating_count"))\
   .sort("product_category","year")

In [16]:
df6.show(5,truncate=False)

+----------------+----+----+------------+
|product_category|year|week|rating_count|
+----------------+----+----+------------+
|Books           |2005|17  |6663        |
|Books           |2005|5   |6632        |
|Books           |2005|21  |6249        |
|Books           |2005|34  |9774        |
|Books           |2005|12  |6298        |
+----------------+----+----+------------+
only showing top 5 rows

In [17]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import rank
 
df7 = df6.select("*",rank().over(Window.partitionBy("product_category",'year').orderBy(F.col('rating_count').desc())))

In [19]:
df8=df7.withColumnRenamed("RANK() OVER (PARTITION BY product_category, year ORDER BY rating_count DESC NULLS LAST unspecifiedframe$())",'rank')
df8.where(F.col("rank")==1).show(20,truncate = False)

+----------------------+----+----+------------+----+
|product_category      |year|week|rating_count|rank|
+----------------------+----+----+------------+----+
|Books                 |2005|31  |11399       |1   |
|Books                 |2006|45  |15769       |1   |
|Books                 |2007|2   |29867       |1   |
|Books                 |2008|1   |16441       |1   |
|Books                 |2009|37  |21677       |1   |
|Books                 |2010|5   |24043       |1   |
|Books                 |2011|52  |28267       |1   |
|Books                 |2012|52  |62085       |1   |
|Books                 |2013|1   |73910       |1   |
|Books                 |2014|1   |86638       |1   |
|Books                 |2015|2   |95507       |1   |
|Digital_Ebook_Purchase|2005|43  |3           |1   |
|Digital_Ebook_Purchase|2006|31  |3           |1   |
|Digital_Ebook_Purchase|2006|8   |3           |1   |
|Digital_Ebook_Purchase|2006|34  |3           |1   |
|Digital_Ebook_Purchase|2007|48  |78          

##### 2. Provide detailed analysis of "Digital eBook Purchase" versus Books. 
 1. Using Spark Pivot functionality, produce DataFrame with following columns: 
    1. Year 
    2. Month 
    3. Total number of reviews for "Digital eBook Purchase" category 
    4. Total number of reviews for "Books" category 
    5. Average stars for reviews for "Digital eBook Purchase" category 
    6. Average stars for reviews for "Books" category 

In [22]:
df10 = df2.withColumn('month',F.month(df2.review_date))\
          .select("product_category","review_id","star_rating","year","month")
df10.show(10, False)

+----------------+--------------+-----------+----+-----+
|product_category|review_id     |star_rating|year|month|
+----------------+--------------+-----------+----+-----+
|Books           |R1MKP6SU9MAHWT|5          |2011|10   |
|Books           |R1YB0J8DC0TKS2|5          |2014|5    |
|Books           |R2IEM1VVAHVQNG|5          |2008|3    |
|Books           |R20L65O2S58MAO|5          |2010|5    |
|Books           |R38P8K3BI379KU|5          |2013|8    |
|Books           |RS7TO93OWV5VA |5          |2013|12   |
|Books           |RJE8D9YSE0TEX |5          |2008|6    |
|Books           |R17HQIAO8MSGNZ|5          |2012|11   |
|Books           |R2M6061U2N92VD|5          |2014|5    |
|Books           |R3RWT30V27AWRZ|2          |2013|7    |
+----------------+--------------+-----------+----+-----+
only showing top 10 rows

In [23]:
category_to_filter=['Digital_Ebook_Purchase','Books']
df10.groupBy("Year","Month","product_category")\
    .agg(F.count("review_id").alias("Num_of_reviews"),F.round(F.mean("star_rating"),3).alias("Avg_rating"))\
    .filter(F.col("product_category").isin(category_to_filter))\
    .sort("Year","Month", ascending=False).show(10,False)

+----+-----+----------------------+--------------+----------+
|Year|Month|product_category      |Num_of_reviews|Avg_rating|
+----+-----+----------------------+--------------+----------+
|2015|8    |Books                 |347646        |4.48      |
|2015|8    |Digital_Ebook_Purchase|578604        |4.338     |
|2015|7    |Digital_Ebook_Purchase|564277        |4.341     |
|2015|7    |Books                 |339311        |4.477     |
|2015|6    |Digital_Ebook_Purchase|534045        |4.35      |
|2015|6    |Books                 |326946        |4.486     |
|2015|5    |Books                 |327538        |4.49      |
|2015|5    |Digital_Ebook_Purchase|581539        |4.343     |
|2015|4    |Books                 |336509        |4.498     |
|2015|4    |Digital_Ebook_Purchase|597905        |4.354     |
+----+-----+----------------------+--------------+----------+
only showing top 10 rows

In [24]:
category_to_filter=["Digital_Ebook_Purchase","Books"]
Q21=df10.groupBy("Year","Month").pivot("product_category",category_to_filter)\
    .agg(F.count("review_id").alias("review_count"),F.round(F.mean("star_rating"),3).alias("Avg_rating"))\
    .sort("Year","Month", ascending=False)
Q21.show(132,truncate = False)
Q21=Q21.na.fill(0)
Q21.show(132,truncate = False)

+----+-----+-----------------------------------+---------------------------------+------------------+----------------+
|Year|Month|Digital_Ebook_Purchase_review_count|Digital_Ebook_Purchase_Avg_rating|Books_review_count|Books_Avg_rating|
+----+-----+-----------------------------------+---------------------------------+------------------+----------------+
|2015|8    |578604                             |4.338                            |347646            |4.48            |
|2015|7    |564277                             |4.341                            |339311            |4.477           |
|2015|6    |534045                             |4.35                             |326946            |4.486           |
|2015|5    |581539                             |4.343                            |327538            |4.49            |
|2015|4    |597905                             |4.354                            |336509            |4.498           |
|2015|3    |695166                             |

##### 2. Provide detailed analysis of "Digital eBook Purchase" versus Books.
2. Produce two graphs to demonstrate aggregations from #1: 
   1. Number of reviews
   2. Average stars

###### 2. Provide detailed analysis of "Digital eBook Purchase" versus Books
3. Identify similar products (books) in both categories. Use "product_title" to match products. To account for potential diﬀerences in naming of products, compare titles after stripping spaces and converting to lower case. 

    1.Is there a diﬀerence in average rating for the similar books in digital and printed form? 
    
    2.To answer #1, you may calculate number of items with high stars in digital form versus printed form, and vise versa. Alternatively, you can make the conclusion by using appropriate pairwise statistic. 

In [8]:
#Q2.3
#df12 will have df2 dataset with condition product_category is "Digital_Ebook_Purchase" or "Books" and star_rating is 4 or 5

category_to_filter=["Digital_Ebook_Purchase","Books"]
Rating=[4,5]

df12=df2.select("product_category","product_title","star_rating")\
    .filter((F.col("product_category").isin(category_to_filter)) & (F.col("star_rating").isin(Rating)))

#Q231a is a dataframe with product_category Digital_Ebook_Purchase
Q231a= df12.groupby("product_category",  F.lower(F.trim(F.col("product_title"))).alias("product_title"))\
          .agg(F.round(F.avg("star_rating"),2).alias("Ebook_Avg_rating"))\
          .filter(F.col("product_category")=="Digital_Ebook_Purchase")

#Q231b is a dataframe with product_category Books
Q231b= df12.groupby("product_category",  F.lower(F.trim(F.col("product_title"))).alias("product_title"))\
          .agg(F.round(F.avg("star_rating"),2).alias("book_Avg_rating"))\
          .filter(F.col("product_category")=="Books")
#Q231c is a dataframe created by join Q231a and Q231b where product title is same
Q231c=Q231a.join(Q231b, (Q231a["product_title"] == Q231b['product_title'])).drop(Q231a["product_title"])

Q231c.show(10)

+--------------------+----------------+----------------+--------------------+---------------+
|    product_category|Ebook_Avg_rating|product_category|       product_title|book_Avg_rating|
+--------------------+----------------+----------------+--------------------+---------------+
|Digital_Ebook_Pur...|             5.0|           Books|"rays of light": ...|            5.0|
|Digital_Ebook_Pur...|            4.99|           Books|"the siege of khe...|           4.94|
|Digital_Ebook_Pur...|             5.0|           Books|          'dem bon'z|            5.0|
|Digital_Ebook_Pur...|            4.67|           Books|   0400 roswell time|            5.0|
|Digital_Ebook_Pur...|            4.83|           Books|10 smart things g...|            5.0|
|Digital_Ebook_Pur...|             5.0|           Books|100 prayers for y...|            5.0|
|Digital_Ebook_Pur...|             5.0|           Books|13 cent killers: ...|           4.81|
|Digital_Ebook_Pur...|             5.0|           Books|25 e

##### Count of product titles which are common in "Digital_Ebook_Purchase" and "Books" product titles.

In [30]:
Q231c.count()

307550

In [31]:
Q231c.printSchema()

root
 |-- product_category: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- Ebook_Avg_rating: double (nullable = true)
 |-- book_Avg_rating: double (nullable = true)

##### Checking correlation between "Ebook_Avg_rating" and "book_Avg_rating"

In [9]:
Q231c.stat.corr("Ebook_Avg_rating", "book_Avg_rating")

0.16606580243319788