In [3]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:12 http://archive.ubunt

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine-analysis").getOrCreate()

In [5]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Grocery_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Grocery_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True , dateFormat="yyyy-MM-dd")
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   42521656|R26MV8D0KG6QI6|B000SAQCWC|     159713740|The Cravings Plac...|         Grocery|          5|            0|          0|   N|                Y|Using these for y...|As a family aller...| 2015-08-31|
|         US|   12049833|R1OF8GP57AQ1A0|B00509LVIQ|     138680402|Mauna Loa Macadam...|         Grocery|          5|    

In [94]:
# Create the vine_table. DataFrame
#excluding product id B004BUU488 where the data is not loaded proerply based on previous analysis.
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"]).where("product_id != 'B004BUU488'")
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R26MV8D0KG6QI6|          5|            0|          0|   N|                Y|
|R1OF8GP57AQ1A0|          5|            0|          0|   N|                Y|
|R3VDC1QB6MC4ZZ|          5|            0|          0|   N|                N|
|R12FA3DCF8F9ER|          5|            0|          0|   N|                Y|
| RTWHVNV6X4CNJ|          5|            0|          0|   N|                Y|
| RIG9AWFOGRDVO|          2|            1|          1|   N|                Y|
|R1S1XSG4ZCHDGS|          5|            1|          1|   N|                Y|
| RB15NBVY5ELVW|          5|            2|          2|   N|                Y|
| R56358YM1ZJ7I|          5|            0|          0|   N|                N|
|R1ODXB3C9UP3NL|          1|            1|          3|   N|     

Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on.

In [96]:
#checking data for total votes >20
vine_total_count_df=vine_df.groupby("total_votes").count()
vine_total_count_df.orderBy("count",ascending=False).where("count >=20").show(25)

+-----------+-------+
|total_votes|  count|
+-----------+-------+
|          0|1367577|
|          1| 440359|
|          2| 200836|
|          3| 109182|
|          4|  68340|
|          5|  44819|
|          6|  31141|
|          7|  23032|
|          8|  17536|
|          9|  13604|
|         10|  10962|
|         11|   8656|
|         12|   7214|
|         13|   6043|
|         14|   5025|
|         15|   4385|
|         16|   3667|
|         17|   3202|
|         18|   2813|
|         19|   2520|
|         20|   2203|
|         21|   1982|
|         22|   1795|
|         23|   1623|
|         24|   1445|
+-----------+-------+
only showing top 25 rows



In [7]:
#only focus on reviews where total_votes >=20
vine_review_df = vine_df.where("total_votes >=20")
vine_review_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1OAZUG90XPU2U|          5|          250|        274|   N|                Y|
|R23VPTUJ8I8NR5|          1|           28|         30|   N|                Y|
|R33VIWRD2X5IA9|          1|            0|         20|   N|                Y|
| R6SRJFJ5YH4UM|          5|           23|         26|   N|                Y|
|R3DOS07BIMP4DJ|          2|           32|         42|   N|                N|
|R26UFXLLQ2N0CR|          1|            0|         20|   N|                Y|
|R30PGWX4HN1CFG|          1|           85|         95|   N|                Y|
|R2XASGYIENUHQO|          5|           28|         32|   N|                Y|
|R34UQPQCTSKQK9|          5|           23|         26|   N|                Y|
|R2M6WNM2OZZHKS|          1|           20|         22|   N|     

In [8]:
# from the above dataframe filter on the reviews where atleat reivew is votied helpful 50% or more time compare to total votes
helpful_reivews_df = vine_review_df.withColumn("per_helpful",(vine_review_df["helpful_votes"]/vine_review_df["total_votes"])*100.00).where("per_helpful>=50")
helpful_reivews_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      per_helpful|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R1OAZUG90XPU2U|          5|          250|        274|   N|                Y|91.24087591240875|
|R23VPTUJ8I8NR5|          1|           28|         30|   N|                Y|93.33333333333333|
| R6SRJFJ5YH4UM|          5|           23|         26|   N|                Y|88.46153846153845|
|R3DOS07BIMP4DJ|          2|           32|         42|   N|                N|76.19047619047619|
|R30PGWX4HN1CFG|          1|           85|         95|   N|                Y|89.47368421052632|
|R2XASGYIENUHQO|          5|           28|         32|   N|                Y|             87.5|
|R34UQPQCTSKQK9|          5|           23|         26|   N|                Y|88.46153846153845|
|R2M6WNM2OZZHKS|          1|           2

In [51]:
#check values in vine column to confirm if vine has only two possible values Y or N
vine_groupby_df = helpful_reivews_df.groupby("vine","star_rating").count()
vine_groupby_df.orderBy("vine","star_rating").show()

+----+-----------+-----+
|vine|star_rating|count|
+----+-----------+-----+
|   N|          1| 6412|
|   N|          2| 1655|
|   N|          3| 1931|
|   N|          4| 2600|
|   N|          5|15689|
|   Y|          1|    7|
|   Y|          2|    6|
|   Y|          3|   11|
|   Y|          4|   17|
|   Y|          5|   20|
+----+-----------+-----+



In [89]:
#How many vine reviews vs non vine reviews
helpful_reivews_df.groupby("vine").count().show()

+----+-----+
|vine|count|
+----+-----+
|   Y|   61|
|   N|28287|
+----+-----+



In [10]:
#filter helpful views where vine = Y  as paid reviews
vine_paid_reviews_df = helpful_reivews_df.where("vine == 'Y'")
vine_paid_reviews_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      per_helpful|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R3MKO875WGIEBJ|          5|           34|         37|   Y|                N| 91.8918918918919|
|R3OPNTK61FQ7MP|          5|           32|         37|   Y|                N|86.48648648648648|
| RMDRU8I773X5U|          4|           22|         25|   Y|                N|             88.0|
| RR83RGWFEFZCL|          3|           61|         64|   Y|                N|          95.3125|
|R2TMZ7GV8SPIRV|          4|           23|         23|   Y|                N|            100.0|
| RX88JLUIFA4Q2|          4|           84|         87|   Y|                Y|96.55172413793103|
| RV3AHTDRHDH7R|          5|          253|        258|   Y|                N|98.06201550387597|
|R13EGJI8C8YJ4Y|          5|           1

In [11]:
#filter helpful views where vine = N  as i,e unpaid reviews
vine_unpaid_reviews_df = helpful_reivews_df.where("vine == 'N'")
vine_unpaid_reviews_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|      per_helpful|
+--------------+-----------+-------------+-----------+----+-----------------+-----------------+
|R1OAZUG90XPU2U|          5|          250|        274|   N|                Y|91.24087591240875|
|R23VPTUJ8I8NR5|          1|           28|         30|   N|                Y|93.33333333333333|
| R6SRJFJ5YH4UM|          5|           23|         26|   N|                Y|88.46153846153845|
|R3DOS07BIMP4DJ|          2|           32|         42|   N|                N|76.19047619047619|
|R30PGWX4HN1CFG|          1|           85|         95|   N|                Y|89.47368421052632|
|R2XASGYIENUHQO|          5|           28|         32|   N|                Y|             87.5|
|R34UQPQCTSKQK9|          5|           23|         26|   N|                Y|88.46153846153845|
|R2M6WNM2OZZHKS|          1|           2

In [55]:
import pyspark.sql.functions as F

In [58]:
#Paid vine 5 stars reviews summary

Total_paid_reviews, Paid_five_star_ratings = vine_paid_reviews_df.count(),vine_paid_reviews_df.where("star_rating >=5").count()

Paid_five_star_per = (Paid_five_star_ratings/Total_paid_reviews)*100

In [40]:
#UnPaid vine 5 stars reviews summary

Total_unpaid_reviews, UnPaid_five_star_ratings = vine_unpaid_reviews_df.count(),vine_unpaid_reviews_df.where("star_rating >=5").count()

UnPaid_five_star_per = (UnPaid_five_star_ratings/Total_unpaid_reviews)*100

In [92]:

print("**************** 5 Star Reviews Summary For Amazon Grocery Products Reviews ****************")
print ("\t\t\tPaid Reviews\t\tUnPaid Reviews" )
print(f"Total Reviews:\t\t\t{Total_paid_reviews}\t\t\t{Total_unpaid_reviews}" )
print(f"Total 5 Star Reviews:\t\t{Paid_five_star_ratings:}\t\t\t{UnPaid_five_star_ratings}")
print(f"5 Star Reviews %:\t\t{Paid_five_star_per:.2f}%\t\t\t{UnPaid_five_star_per:.2f}%")



**************** 5 Star Reviews Summary For Amazon Grocery Products Reviews ****************
			Paid Reviews		UnPaid Reviews
Total Reviews:			61			28287
Total 5 Star Reviews:		20			15689
5 Star Reviews %:		32.79%			55.46%
