# Downloading Amazon reviews data and installing necessary libraries

In [1]:
!ls

drive  sample_data


In [2]:
# !wget https://jmcauley.ucsd.edu/data/amazon_v2/categoryFilesSmall/Toys_and_Games_5.json.gz --no-check-certificate

In [3]:
# !gzip -d Toys_and_Games_5.json.gz

In [4]:
# !mkdir /content/drive/MyDrive/colab/data/raw
# !mv Toys_and_Games_5.json /content/drive/MyDrive/colab/data/raw/Toys_and_Games_5.json

In [5]:
!ls

drive  sample_data


In [6]:
!pip3 install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=afaa800bc29035f48eb93a0b8a9b3f99273e19a836c11340175254a3a1add30f
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [7]:
!pip3 install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1


In [8]:
# Creating PySpark session

from pyspark.sql import SparkSession
from dotenv import load_dotenv
def create_spark_session():
    """Create a Spark Session"""
    _ = load_dotenv()
    return (
        SparkSession
        .builder
        .appName("SparkApp")
        .master("local[5]")
        .getOrCreate()
    )
spark = create_spark_session()
print('Session Started')
print('Code Executed Successfully')

Session Started
Code Executed Successfully


# Data Input/Output

In [9]:
# Reading data using pandas

import pandas as pd

PATH_BIGDATA = '/content/drive/MyDrive/colab/data/raw/Toys_and_Games_5.json'
raw_pdf = pd.read_json(PATH_BIGDATA, orient='records',lines=True)

raw_pdf.head()

Unnamed: 0,overall,vote,verified,reviewTime,reviewerID,asin,style,reviewerName,reviewText,summary,unixReviewTime,image
0,5,3.0,True,"10 6, 2013",A2LSCFZM2FBZK7,486427706,{'Format:': ' Paperback'},Ginger,The stained glass pages are pretty cool. And i...,Nice book,1381017600,
1,5,9.0,True,"08 9, 2013",A3IXP5VS847GE5,486427706,{'Format:': ' Paperback'},Dragonflies &amp; Autumn Leaves,My 11 y.o. loved this...and so do I (you know ...,Great pictures,1376006400,
2,5,,True,"04 5, 2016",A1274GG1EB2JLJ,486427706,{'Format:': ' Paperback'},barbara ann,"The pictures are great , I've done one and gav...","The pictures are great, I've done one and gave...",1459814400,
3,5,3.0,True,"02 13, 2016",A30X5EGBYAZQQK,486427706,{'Format:': ' Paperback'},Samantha,I absolutely love this book! Its translucent p...,So beautiful!,1455321600,
4,5,,True,"12 10, 2015",A3U6UNXLAUY6ZV,486427706,{'Format:': ' Paperback'},CP in Texas,I love it!,Five Stars,1449705600,


In [10]:
# Reading the data using PySpark

PATH_BIGDATA = '/content/drive/MyDrive/colab/data/raw/Toys_and_Games_5.json'

spark = create_spark_session()
spark.conf.set('spark.sql.caseSensitive','true')

raw_sdf = spark.read.json(PATH_BIGDATA)
raw_sdf.show()

+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|0486427706| NULL|    5.0|The stained glass...| 10 6, 2013|A2LSCFZM2FBZK7|              Ginger|{NULL, NULL, NULL...|           Nice book|    1381017600|    true|   3|
|0486427706| NULL|    5.0|My 11 y.o. loved ...| 08 9, 2013|A3IXP5VS847GE5|Dragonflies &amp;...|{NULL, NULL, NULL...|      Great pictures|    1376006400|    true|   9|
|0486427706| NULL|    5.0|The pictures are ...| 04 5, 2016|A1274GG1EB2JLJ|         barbara ann|{NULL, NULL, NULL...|The pictures are ...|    1459814400|    true|NULL

In [11]:
# renaming the columns in pandas dataframe

COL_NAME_MAP = {
    "overall": "overall",
    "verified": "verified",
    "reviewTime": "review_time",
    "reviewerID": "reviewer_id",
    "asin": "asin",
    "reviewerName": "reviewer_name",
    "reviewText": "review_text",
    "summary": "summary",
    "unixReviewTime": "unix_review_time",
    "style": "style",
    "vote": "vote",
    "image": "image"
}
print('Initial Columns names:')
i=1
for col_name in raw_pdf.columns:
    print(f'{i}: {col_name}')
    i=i+1

## renaming column names
raw_pdf = raw_pdf.rename(columns=COL_NAME_MAP)

print('___________________________')
print('Columns names after rename:')
i=1
for col_name in raw_pdf.columns:
    print(f'{i}: {col_name}')
    i=i+1

print('___________________________')
print('Code Executed Successfully')



Initial Columns names:
1: overall
2: vote
3: verified
4: reviewTime
5: reviewerID
6: asin
7: style
8: reviewerName
9: reviewText
10: summary
11: unixReviewTime
12: image
___________________________
Columns names after rename:
1: overall
2: vote
3: verified
4: review_time
5: reviewer_id
6: asin
7: style
8: reviewer_name
9: review_text
10: summary
11: unix_review_time
12: image
___________________________
Code Executed Successfully


In [12]:
# Renaming the columns in PySpark dataframe
print('Initial Column names:')
i=1
for col_name in raw_sdf.schema.names:
    print(f'{i}: {col_name}')
    i=i+1

# We are basically changing the names in PySpark schema
def rename_columns(df, column_map):
  for old, new in column_map.items():
        df = df.withColumnRenamed(old, new)
  return df


raw_sdf = rename_columns(raw_sdf, COL_NAME_MAP)
print('___________________________')
print('Columns names after rename:')
i=1
for col_name in raw_sdf.schema.names:
    print(f'{i}: {col_name}')
    i=i+1

print('___________________________')

print('Code Executed Successfully')



Initial Column names:
1: asin
2: image
3: overall
4: reviewText
5: reviewTime
6: reviewerID
7: reviewerName
8: style
9: summary
10: unixReviewTime
11: verified
12: vote
___________________________
Columns names after rename:
1: asin
2: image
3: overall
4: review_text
5: review_time
6: reviewer_id
7: reviewer_name
8: style
9: summary
10: unix_review_time
11: verified
12: vote
___________________________
Code Executed Successfully


In [13]:
# selecting columns/attributes in pandas dataframe

SELECTED_COLUMNS = [
    "reviewer_id",
    "asin",
    "review_text",
    "summary",
    "verified",
    "overall",
    "vote",
    "unix_review_time",
    "review_time",
]

raw_pdf = raw_pdf[SELECTED_COLUMNS]
print(raw_pdf.head())
print('Code Executed Successfully')


      reviewer_id        asin  \
0  A2LSCFZM2FBZK7  0486427706   
1  A3IXP5VS847GE5  0486427706   
2  A1274GG1EB2JLJ  0486427706   
3  A30X5EGBYAZQQK  0486427706   
4  A3U6UNXLAUY6ZV  0486427706   

                                         review_text  \
0  The stained glass pages are pretty cool. And i...   
1  My 11 y.o. loved this...and so do I (you know ...   
2  The pictures are great , I've done one and gav...   
3  I absolutely love this book! Its translucent p...   
4                                         I love it!   

                                             summary  verified  overall vote  \
0                                          Nice book      True        5    3   
1                                     Great pictures      True        5    9   
2  The pictures are great, I've done one and gave...      True        5  NaN   
3                                      So beautiful!      True        5    3   
4                                         Five Stars      True  

In [14]:
# selecting attributes/columns in PySpark dataframe

raw_sdf = raw_sdf.select(*SELECTED_COLUMNS)
raw_sdf.show()

+--------------+----------+--------------------+--------------------+--------+-------+----+----------------+-----------+
|   reviewer_id|      asin|         review_text|             summary|verified|overall|vote|unix_review_time|review_time|
+--------------+----------+--------------------+--------------------+--------+-------+----+----------------+-----------+
|A2LSCFZM2FBZK7|0486427706|The stained glass...|           Nice book|    true|    5.0|   3|      1381017600| 10 6, 2013|
|A3IXP5VS847GE5|0486427706|My 11 y.o. loved ...|      Great pictures|    true|    5.0|   9|      1376006400| 08 9, 2013|
|A1274GG1EB2JLJ|0486427706|The pictures are ...|The pictures are ...|    true|    5.0|NULL|      1459814400| 04 5, 2016|
|A30X5EGBYAZQQK|0486427706|I absolutely love...|       So beautiful!|    true|    5.0|   3|      1455321600|02 13, 2016|
|A3U6UNXLAUY6ZV|0486427706|          I love it!|          Five Stars|    true|    5.0|NULL|      1449705600|12 10, 2015|
|A1SAJF5SNM6WJS|0486427706|MY HU

In [15]:
!mkdir -p /content/drive/MyDrive/colab/data/snapshot/pandas

!mkdir -p /content/drive/MyDrive/colab/data/snapshot/pyspark



In [16]:
!ls

drive  sample_data


In [17]:
# creating a snapshot of pandas dataframe
import time

def create_path_snapshot():
    path_fixed = '/content/drive/MyDrive/colab/data/snapshot/pandas/data_{}.json'
    current_unix_time = int(time.time())
    return path_fixed.format(current_unix_time)

PATH_SNAPSHOT = create_path_snapshot()
raw_pdf = raw_pdf.iloc[:10000,:]
raw_pdf.to_json(PATH_SNAPSHOT, orient='records', lines=True)
print('Snapshot Saved')
print('Code Executed Successfully')

Snapshot Saved
Code Executed Successfully


In [18]:
"""
creating a snapshot of pyspark dataframe.

"""
def create_path_snapshot():
    path_fixed = '/content/drive/MyDrive/colab/data/snapshot/pyspark/snapshot_{}'
    current_unix_time = int(time.time())
    return path_fixed.format(current_unix_time)

PATH_SNAPSHOT = create_path_snapshot()

raw_sdf = raw_sdf.limit(10000)

raw_sdf = (
raw_sdf
    .repartition("asin")
    .sortWithinPartitions("unix_review_time")
)
# Write data with partition and sorted
(
raw_sdf
    .write.partitionBy("asin")
    .mode("overwrite")
    .parquet(PATH_SNAPSHOT)
)

print('Code Executed Successfully')

Code Executed Successfully


In [19]:
# reading the data in cache

raw_sdf = spark.read.parquet(PATH_SNAPSHOT).cache()
raw_sdf.count()

10000

# Data Transformation

In [20]:
# pandas dataframe utility functions

import pandas as pd
from tqdm import tqdm
import json
import typing as t


def read_json_to_pdf(path: str, rename_columns: bool, schema: t.Dict) -> pd.DataFrame:
    df = pd.read_json(path, orient='records',lines=True)
    if rename_columns:
      df.rename(columns=schema, inplace=True)
    return df


def select_columns(df: pd.DataFrame, col_list: list) -> pd.DataFrame:
    return df[col_list]

# table schema

# utils/tableschema.py
COL_NAME_MAP = {
    "overall": "overall",
    "verified": "verified",
    "reviewTime": "review_time",
    "reviewerID": "reviewer_id",
    "asin": "asin",
    "reviewerName": "reviewer_name",
    "reviewText": "review_text",
    "summary": "summary",
    "unixReviewTime": "unix_review_time",
    "style": "style",
    "vote": "vote",
    "image": "image"
}

SELECTED_COLUMNS = [
    "reviewer_id",
    "asin",
    "review_text",
    "summary",
    "verified",
    "overall",
    "vote",
    "unix_review_time",
    "review_time",
]

In [21]:
# PySpark utility  functions

from pathlib import Path
import time
import yaml

ROOT = Path('/content/drive/MyDrive/colab')
DATA_LAKE = ROOT.joinpath("data")
SNAPSHOT = DATA_LAKE.joinpath("snapshot")

PATH_TOY_GAME_DATA = "/Toys_and_Games_5.json"

def create_path_snapshot() -> Path:
    """
    Create path to shapshot wit time stamp
    :return: Path like object
    """
    path_fixed = 'snapshot_{}'.format(int(time.time()))
    return SNAPSHOT.joinpath(path_fixed)


# TODO: Store The Snapshot
def create_path_snapshot_spark():
    path_fixed = 'data/snapshot/pyspark/snapshot_{}'
    current_unix_time = int(time.time())
    return path_fixed.format(current_unix_time)


def load_latest_parquet_path() -> Path:
    """
    :return: path to latest parquet file storage
    """
    with open(SNAPSHOT.joinpath('pyspark', 'versions.yaml')) as f:
        content = yaml.safe_load(f)
        print(content)
        latest = content['latest']
        path = content[latest]['path']
        return SNAPSHOT.joinpath(path)


In [22]:
# Loading the saved snapshot for PySpark dataframe

from pyspark.sql import DataFrame as SparkDf

def read_latest_snapshot(ctx: SparkSession) -> SparkDf:
    """
    Read parquet data source from latest metadata
    :param ctx: A Spark session context
    :return: Spark Dataframe
    """
    path = load_latest_parquet_path()
    df = ctx.read.parquet(str(path))
    return df


main_df = read_latest_snapshot(spark)
main_df.show(10)
print('Code Executed Successfully')

{'snapshot_1709829573': {'path': 'pyspark/snapshot_1709829573', 'created_at': 1709829573, 'description': 'Lots of Files on Game'}, 'latest': 'snapshot_1709829573'}
+--------------+--------------------+--------------------+--------+-------+-----+----------------+-----------+----------+
|   reviewer_id|         review_text|             summary|verified|overall| vote|unix_review_time|review_time|      asin|
+--------------+--------------------+--------------------+--------+-------+-----+----------------+-----------+----------+
|A3QSXILNE7LMD6|When I first hear...|One of the best g...|   false|    5.0|  543|      1113264000|04 12, 2005|0975277324|
|A1ZO9D554VQO9F|This is an excell...|A step above the ...|   false|    5.0|1,221|      1110412800|03 10, 2005|0975277324|
|A3NZBDQ7COJZ10|I adore this game...|Its super simple ...|    true|    5.0| NULL|      1524960000|04 29, 2018|0975277324|
|A3SNK5TW9FWZOT|Granddaughter los...|              So so.|    true|    3.0| NULL|      1524441600|04 23,

In [23]:
# Date-time in pandas

from datetime import datetime

raw_pdf['reviewed_at'] = raw_pdf[['unix_review_time']].applymap(
     datetime.fromtimestamp
     )
print(raw_pdf.head())
print('Code Executed Successfully')

      reviewer_id        asin  \
0  A2LSCFZM2FBZK7  0486427706   
1  A3IXP5VS847GE5  0486427706   
2  A1274GG1EB2JLJ  0486427706   
3  A30X5EGBYAZQQK  0486427706   
4  A3U6UNXLAUY6ZV  0486427706   

                                         review_text  \
0  The stained glass pages are pretty cool. And i...   
1  My 11 y.o. loved this...and so do I (you know ...   
2  The pictures are great , I've done one and gav...   
3  I absolutely love this book! Its translucent p...   
4                                         I love it!   

                                             summary  verified  overall vote  \
0                                          Nice book      True        5    3   
1                                     Great pictures      True        5    9   
2  The pictures are great, I've done one and gave...      True        5  NaN   
3                                      So beautiful!      True        5    3   
4                                         Five Stars      True  

In [24]:
# Date-time in PySpark
from pyspark.sql import functions as fn
main_df = main_df.withColumn(
    'reviewed_at', fn.from_unixtime(main_df.unix_review_time)
)

In [25]:
main_df.show(10)

+--------------+--------------------+--------------------+--------+-------+-----+----------------+-----------+----------+-------------------+
|   reviewer_id|         review_text|             summary|verified|overall| vote|unix_review_time|review_time|      asin|        reviewed_at|
+--------------+--------------------+--------------------+--------+-------+-----+----------------+-----------+----------+-------------------+
|A3QSXILNE7LMD6|When I first hear...|One of the best g...|   false|    5.0|  543|      1113264000|04 12, 2005|0975277324|2005-04-12 00:00:00|
|A1ZO9D554VQO9F|This is an excell...|A step above the ...|   false|    5.0|1,221|      1110412800|03 10, 2005|0975277324|2005-03-10 00:00:00|
|A3NZBDQ7COJZ10|I adore this game...|Its super simple ...|    true|    5.0| NULL|      1524960000|04 29, 2018|0975277324|2018-04-29 00:00:00|
|A3SNK5TW9FWZOT|Granddaughter los...|              So so.|    true|    3.0| NULL|      1524441600|04 23, 2018|0975277324|2018-04-23 00:00:00|
|A2A6O

In [26]:
# Impute missing data in Pandas
raw_pdf['vote'] = raw_pdf['vote'].fillna(value=0)

In [27]:
# Impute missing data in PySpark
main_df = main_df.fillna({'vote':0})

In [28]:
# Average review per product in Pandas

def average_review_pd(df: pd.DataFrame) -> float:
  unique_asin = len(df['asin'].unique())
  total_reveiws = len(df)

  return total_reveiws/unique_asin

mean_over_all_review = average_review_pd(raw_pdf)
print(mean_over_all_review)
print('Code Executed Successfully')

32.154340836012864
Code Executed Successfully


In [29]:
# Average review per product in PySpark

def average_review(df: pd.DataFrame) -> float:
  unique_asin = df.select('asin').distinct().count()
  total_reveiws = df.count()

  return total_reveiws/unique_asin

mean_over_all_review = average_review(main_df)
print(mean_over_all_review)
print('Code Executed Successfully')

32.154340836012864
Code Executed Successfully


In [30]:
# Total number of reviews for each product in pandas
review_by_product_pd = raw_pdf.groupby('asin')['asin'].count()
print(review_by_product_pd)

asin
0152014764     1
0486427706     9
0486448789    42
0545346193     4
0545561647    56
              ..
B00000IU3D     1
B00000IU3E    11
B00000IU6X     2
B00000IU6Z    25
B00000IUF7     4
Name: asin, Length: 311, dtype: int64


In [31]:
# Total number of reviews for each product in PySpark
review_by_product = main_df.groupby('asin').count()
review_by_product.show()

+----------+-----+
|      asin|count|
+----------+-----+
|B00000IS02|  268|
|157982319X|  233|
|0786955708|   53|
|0984155856|   12|
|1633441903|   31|
|B00000GBX8|  164|
|1933054395|  549|
|0980223644|   63|
|1936112191|   97|
|B00000DMF6|  154|
|0976990709|  613|
|B00000GBQJ|  159|
|1589947207|   59|
|1932188126|  317|
|B00000DMBD|  140|
|0786950072|   31|
|B00000DMFM|  181|
|B00000DMFD|  227|
|1932855785|  218|
|B00000DMD2|  463|
+----------+-----+
only showing top 20 rows



In [32]:
# Adding a new column for the length of review text using pandas
raw_pdf['review_text_len'] = raw_pdf[['review_text']].applymap(
    lambda x: len(str(x))
)

In [33]:
# Adding a new column for the length of review text using PySpark
main_df = main_df.withColumn(
    'review_text_len', fn.length(main_df.review_text)
)

In [34]:
# Distribution of review text length using pandas
def show_review_text_stat(df: pd.DataFrame) -> None:
    """
    Show the distribution of review text length
    :param df: A dataframe
    :return: Nothing
    """
    stat = df['review_text_len'].describe().to_dict()
    weird_reviews = len(df[df['review_text_len'] <= 1])
    print("Review Length Stat")
    print(stat)
    print(f"Reviews with length one or less: {weird_reviews}")

show_review_text_stat(raw_pdf)

print('Code Executed Successfully')

Review Length Stat
{'count': 10000.0, 'mean': 338.6489, 'std': 642.4983366827139, 'min': 1.0, '25%': 61.0, '50%': 159.0, '75%': 359.0, 'max': 13083.0}
Reviews with length one or less: 1
Code Executed Successfully


In [35]:
# Distribution of review text length using PySpark
def show_review_text_stat(df: SparkDf) -> None:
    """
    Show general Stats for review text length
    :param df: Dataframe
    :return: Nothing
    """
    summary_df = df.select('review_text_len').summary("count", "min", "25%", "75%", "max")
    summary = summary_df.rdd.map(lambda row: row.asDict(recursive=True)).collect()
    print("Review Length Stat")
    print(summary)
    weird_reviews = df.filter(df.review_text_len <= 1).count()
    print(f"Reviews with length one or less: {weird_reviews}")

show_review_text_stat(main_df)
print('Code Executed Successfully')

Review Length Stat
[{'summary': 'count', 'review_text_len': '9998'}, {'summary': 'min', 'review_text_len': '1'}, {'summary': '25%', 'review_text_len': '61'}, {'summary': '75%', 'review_text_len': '359'}, {'summary': 'max', 'review_text_len': '13083'}]
Reviews with length one or less: 1
Code Executed Successfully


In [36]:
raw_pdf['review_year'] = raw_pdf['reviewed_at'].dt.year
raw_pdf['review_month'] = raw_pdf['reviewed_at'].dt.month

# Calculating median monthly reviews per year using pandas
median_review_by_year_df = (
     raw_pdf
     .groupby(['review_month', 'review_year'])
     .agg(median_review=("asin", "count"))
     .sort_values(['review_year'])
     .groupby('review_year')
     .agg({'median_review': 'median'})
     .sort_values(['review_year'])
     .reset_index()
)
median_review_by_year_df.head(20)

Unnamed: 0,review_year,median_review
0,1999,1.0
1,2000,2.0
2,2001,1.0
3,2002,2.5
4,2003,3.0
5,2004,2.0
6,2005,4.0
7,2006,3.0
8,2007,5.0
9,2008,5.5


In [37]:
# Calculating median product reviews per year using pandas
median_review_by_year_df = (
     raw_pdf
     .groupby(['asin', 'review_year'])
     .agg(median_review=("asin", "count"))
     .reset_index()
     .groupby('review_year')
     .agg({'median_review': 'median'})
     .sort_values(['review_year'])
     .reset_index()
)

# Find median yearly review of the top products
review_top_item_by_year = (
     raw_pdf[raw_pdf['overall'] > 4]
     .groupby(['asin', 'review_year'])
     .agg(median_review=("asin", "count"))
     .reset_index() .groupby('review_year')
     .agg({'median_review': 'median'})
     .reset_index()
)
print('Median Review by Year ','\n',median_review_by_year_df)
print('Review of top item by Year ','\n',review_top_item_by_year)
print('Code Executed Successfully')

Median Review by Year  
     review_year  median_review
0          1999            1.0
1          2000            1.0
2          2001            1.0
3          2002            1.0
4          2003            1.0
5          2004            1.0
6          2005            2.0
7          2006            1.0
8          2007            1.0
9          2008            1.0
10         2009            1.0
11         2010            2.0
12         2011            2.0
13         2012            2.0
14         2013            2.0
15         2014            3.0
16         2015            5.0
17         2016            5.0
18         2017            5.0
19         2018            6.0
Review of top item by Year  
     review_year  median_review
0          1999            1.0
1          2000            1.0
2          2001            1.0
3          2002            1.0
4          2003            1.0
5          2004            1.0
6          2005            2.0
7          2006            1.0
8          2007

In [38]:
main_df = main_df.withColumn(
    'reviewed_year', fn.year(main_df.reviewed_at)
)
main_df = main_df.withColumn(
    'reviewed_month', fn.month(main_df.reviewed_at)
)
# Calculating median product reviews per year using PySpark
median_review_by_year_df = (
    main_df
    .groupby('asin', 'reviewed_year')
    .agg(fn.count('asin')
    .alias('count'))
    .groupby('reviewed_year')
    .agg(fn.percentile_approx('count', 0.5)
    .alias('median'))
    .orderBy('reviewed_year', ascending=True)
)
# Find median yearly review of the top products
review_top_item_by_year = (
    main_df
    .filter(main_df.overall > 4)
    .groupby('asin', 'reviewed_year')
    .agg(fn.count('asin')
    .alias('count'))
    .groupby('reviewed_year')
    .agg(fn.percentile_approx('count', 0.5)
    .alias('median'))
    .orderBy('reviewed_year', ascending=True)
)
print('Median Review by Year ')
median_review_by_year_df.show()
print('Review of top item by Year ')
review_top_item_by_year.show()
print('Code Executed Successfully')

Median Review by Year 
+-------------+------+
|reviewed_year|median|
+-------------+------+
|         1999|     1|
|         2000|     1|
|         2001|     1|
|         2002|     1|
|         2003|     1|
|         2004|     1|
|         2005|     2|
|         2006|     1|
|         2007|     1|
|         2008|     1|
|         2009|     1|
|         2010|     2|
|         2011|     2|
|         2012|     2|
|         2013|     2|
|         2014|     3|
|         2015|     5|
|         2016|     5|
|         2017|     5|
|         2018|     6|
+-------------+------+

Review of top item by Year 
+-------------+------+
|reviewed_year|median|
+-------------+------+
|         1999|     1|
|         2000|     1|
|         2001|     1|
|         2002|     1|
|         2003|     1|
|         2004|     1|
|         2005|     2|
|         2006|     1|
|         2007|     1|
|         2008|     1|
|         2009|     1|
|         2010|     1|
|         2011|     1|
|         2012|     1|
|    

In [39]:
# Get Top reviews for the year 2017 using pandas

# Some votes looks like this 1,221. So we are doing the transform to avoid issues during type-casting
raw_pdf['vote'] = raw_pdf['vote'].apply(lambda x: int(str(x).replace(',','')))
raw_pdf['vote'] = raw_pdf['vote'].astype(int)
reviews_2017 = raw_pdf[raw_pdf['review_year'] == 2017]
quantile_99_2017_reviews = reviews_2017['vote'].quantile(0.99)
top_reviews_2017 = (
                  raw_pdf[(raw_pdf['review_year'] == 2017) & (raw_pdf['vote'] >= quantile_99_2017_reviews)]
                  .sort_values('vote', ascending=False)
)
top_reviews_2017.head()

Unnamed: 0,reviewer_id,asin,review_text,summary,verified,overall,vote,unix_review_time,review_time,reviewed_at,review_text_len,review_year,review_month
874,A1H9KZETOK0WR8,975277324,This is a counterfeit game. When you try to re...,DO NOT BUY THIS FROM AMAZON!!!,True,1,29,1506211200,"09 24, 2017",2017-09-24,388,2017,9
119,A3CBCCYRK31GVT,615638996,The game is basically Uno with discussion cues...,5 year old finally opened up!,True,5,15,1490227200,"03 23, 2017",2017-03-23,864,2017,3
347,A29XP8IL9RY9HT,786964502,"If you love Lords of Waterdeep, you will proba...",Everything an expansion should be!,True,5,10,1487203200,"02 16, 2017",2017-02-16,2040,2017,2
6651,A2OY5YNT2040OT,9269809021,1 month of use\nim torn between 4 and 5 stars....,love it!,True,5,10,1486339200,"02 6, 2017",2017-02-06,2378,2017,2
3089,A3I9L8VZPB1ZQ1,1579824005,Perfect reading companion. Bean bag material f...,Encourages Reading Pilkey,True,5,6,1512172800,"12 2, 2017",2017-12-02,92,2017,12


In [40]:
main_df = main_df.withColumn(
    'vote', fn.cast(int, fn.regexp_replace(main_df.vote, ',',''))
)
main_df.show()

+--------------+--------------------+--------------------+--------+-------+----+----------------+-----------+----------+-------------------+---------------+-------------+--------------+
|   reviewer_id|         review_text|             summary|verified|overall|vote|unix_review_time|review_time|      asin|        reviewed_at|review_text_len|reviewed_year|reviewed_month|
+--------------+--------------------+--------------------+--------+-------+----+----------------+-----------+----------+-------------------+---------------+-------------+--------------+
|A3QSXILNE7LMD6|When I first hear...|One of the best g...|   false|    5.0| 543|      1113264000|04 12, 2005|0975277324|2005-04-12 00:00:00|          10463|         2005|             4|
|A1ZO9D554VQO9F|This is an excell...|A step above the ...|   false|    5.0|1221|      1110412800|03 10, 2005|0975277324|2005-03-10 00:00:00|           1209|         2005|             3|
|A3NZBDQ7COJZ10|I adore this game...|Its super simple ...|    true|   

In [41]:
top_percentile_reviews_2017 = (
    main_df
    .filter(main_df.reviewed_year==2017)
    .agg(fn.percentile_approx(main_df.vote, 0.99))
    .collect()[0][0]
)

In [42]:
top_reviews_2017 = (
    main_df
    .filter(
        (main_df.reviewed_year==2017)
        &
        (main_df.vote >= top_percentile_reviews_2017)
    )
    .sort(main_df.vote.desc())
)
top_reviews_2017.show()

+--------------+--------------------+--------------------+--------+-------+----+----------------+-----------+----------+-------------------+---------------+-------------+--------------+
|   reviewer_id|         review_text|             summary|verified|overall|vote|unix_review_time|review_time|      asin|        reviewed_at|review_text_len|reviewed_year|reviewed_month|
+--------------+--------------------+--------------------+--------+-------+----+----------------+-----------+----------+-------------------+---------------+-------------+--------------+
|A1L7ONTSNR8D6D|I bought this gam...|Teaches you to th...|    true|    5.0|   6|      1513123200|12 13, 2017|B00000DMER|2017-12-13 00:00:00|           1177|         2017|            12|
|A3I9L8VZPB1ZQ1|Perfect reading c...|Encourages Readin...|    true|    5.0|   6|      1512172800| 12 2, 2017|1579824005|2017-12-02 00:00:00|             92|         2017|            12|
|A2QP0VYB6DEGTB|Ordered for my so...|Damaged board out...|    true|   

In [50]:
# Comparing total reviews of 2016 and 2017 using pandas

# Create subset for 2 different years
total_review_by_mth_df = (
     raw_pdf
     .groupby(['review_year', 'review_month'])
     .agg(total_review=("asin", "count"))
     .sort_values(['review_year', 'review_month'])
     .reset_index()
)
total_review_2016 = total_review_by_mth_df[total_review_by_mth_df["review_year"] == 2016]
total_review_2017 = total_review_by_mth_df[total_review_by_mth_df["review_year"] == 2017]
# Join the subsets for comparison
merged_20_16_17 = (
     total_review_2016
     .merge(total_review_2017, on=["review_month"], suffixes=['_2016', '_2017'])
     .sort_values("review_month")
     )
print(merged_20_16_17.head(12))
print('Code Executed Successfully')

    review_year_2016  review_month  total_review_2016  review_year_2017  \
0               2016             1                317              2017   
1               2016             2                219              2017   
2               2016             3                172              2017   
3               2016             4                138              2017   
4               2016             5                122              2017   
5               2016             6                125              2017   
6               2016             7                128              2017   
7               2016             8                139              2017   
8               2016             9                111              2017   
9               2016            10                108              2017   
10              2016            11                114              2017   
11              2016            12                218              2017   

    total_review_2017  


In [52]:
"""
Most of data visualization tools, such as Tableau and PowerBI,
prefer the long format over the wide format when creating charts
that compare certain metrics between two or more groups.
"""

# Converting wide format body to long format body in pandas

merged_20_1617_long = (
    merged_20_16_17
    .melt(id_vars=["review_month"], value_vars=["total_review_2016", "total_review_2017"])
    )
print(merged_20_1617_long.head())
print('Code Executed Successfully')

   review_month           variable  value
0             1  total_review_2016    317
1             2  total_review_2016    219
2             3  total_review_2016    172
3             4  total_review_2016    138
4             5  total_review_2016    122
Code Executed Successfully


In [53]:
# Comparing total reviews of 2016 and 2017 using PySpark

total_review_by_mth_df = (
    main_df
    .groupBy('reviewed_year', 'reviewed_month')
    .agg(fn.count(main_df.asin)
    .alias("total_review"))
    .orderBy('reviewed_year', 'reviewed_month') )
total_review_2016 = total_review_by_mth_df.filter(main_df.reviewed_year == 2016)
total_review_2017 = total_review_by_mth_df.filter(main_df.reviewed_year == 2017)
merged_20_16_17 = (
    total_review_2016
    .select(
       "reviewed_month",
       total_review_2016.total_review
       .alias("total_review_2016"))
    .join(
        total_review_2017.select("reviewed_month", total_review_2017.total_review
        .alias("total_review_2017")),
        on="reviewed_month"
    )
    .orderBy('reviewed_month')
)
merged_20_16_17.show()
print('Code Executed Successfully')

+--------------+-----------------+-----------------+
|reviewed_month|total_review_2016|total_review_2017|
+--------------+-----------------+-----------------+
|             1|              317|              215|
|             2|              219|              113|
|             3|              172|               98|
|             4|              138|               89|
|             5|              122|               59|
|             6|              125|               48|
|             7|              128|               64|
|             8|              139|               61|
|             9|              111|               59|
|            10|              108|               56|
|            11|              114|               72|
|            12|              218|              105|
+--------------+-----------------+-----------------+

Code Executed Successfully


In [57]:
merged_20_16_17_united = total_review_2016.union(total_review_2017)

merged_20_16_17_pivoted = (
    merged_20_16_17_united
    .groupBy("reviewed_month")
    .pivot("reviewed_year")
    .sum("total_review")
    .orderBy("reviewed_month")
)
print(merged_20_16_17_pivoted.show())
print('Code Executed Successfully')

+--------------+----+----+
|reviewed_month|2016|2017|
+--------------+----+----+
|             1| 317| 215|
|             2| 219| 113|
|             3| 172|  98|
|             4| 138|  89|
|             5| 122|  59|
|             6| 125|  48|
|             7| 128|  64|
|             8| 139|  61|
|             9| 111|  59|
|            10| 108|  56|
|            11| 114|  72|
|            12| 218| 105|
+--------------+----+----+

None
Code Executed Successfully
