In [2]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu focal InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Get:6 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:7 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:8 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ Packages [71.6 kB]
Get:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Get:10 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Get:13 http://archive.ubuntu.com/ubuntu focal-updates/universe amd64 Packages [1,297 kB]
Hit:14 http://p

In [3]:
# Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
# Load Amazon Data into Spark DataFrame
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Camera_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Camera_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    2975964|R1NBG94582SJE2|B00I01JQJM|     860486164|GoPro Rechargeabl...|          Camera|          5|            0|          0|   N|                Y|          Five Stars|                  ok| 2015-08-31|
|         US|   23526356|R273DCA6Y0H9V7|B00TCO0ZAA|     292641483|Professional 58mm...|          Camera|          5|    

In [5]:
# Determine Bias of Vine Reviews 
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1NBG94582SJE2|          5|            0|          0|   N|                Y|
|R273DCA6Y0H9V7|          5|            0|          0|   N|                Y|
| RQVOXO7WUOFK6|          2|            1|          1|   N|                Y|
|R1KWKSF21PO6HO|          5|            0|          0|   N|                Y|
|R38H3UO1J190GI|          5|            1|          1|   N|                Y|
|R3NPIFKLR19NQA|          3|            0|          0|   N|                Y|
|R3MBE6UCH3435E|          3|            8|          8|   N|                N|
|R2E7A4FF0PVY5Q|          5|            0|          1|   N|                Y|
| R3R8JDQ2BF4NM|          5|            0|          2|   N|                Y|
|R1YND4BS823GN5|          1|            0|          0|   N|     

In [6]:
# Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 (Step 1)
totalvotes20_df = vine_df.filter("total_votes>=20").show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3FJ319XA6ZAUQ|          2|           21|         25|   N|                Y|
| R6HRF25HUMIIE|          5|           24|         24|   N|                Y|
|R2CTAK3APOFKZU|          5|           37|         38|   N|                N|
|R1WQZB0CAEPQBU|          2|           85|        128|   N|                N|
| RHZDSA48HVRVK|          5|           49|         86|   N|                N|
|R190J2PDOZ5GVK|          3|           36|         51|   N|                Y|
|R1WZSWWOTN58OP|          5|           73|         77|   N|                Y|
|R3EM8C9CKA6GPK|          2|           41|        115|   N|                N|
|R29THVJFO35FZA|          5|           48|         50|   N|                Y|
|R3V8FKXIHBLWEL|          5|           91|        108|   N|     

In [7]:
# Filter the new DataFrame or table created in Step 1 and create a new DataFrame or table to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50% (Step 2)
helpfulvotes_df = vine_df.withColumn('helpful_votes_50%',(vine_df['helpful_votes']/vine_df['total_votes']*100))
helpfulvotes50_df = helpfulvotes_df.filter("helpful_votes>=50")
hv50_df = helpfulvotes50_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
hv50_df.show()




+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1WQZB0CAEPQBU|          2|           85|        128|   N|                N|
|R1WZSWWOTN58OP|          5|           73|         77|   N|                Y|
|R3V8FKXIHBLWEL|          5|           91|        108|   N|                Y|
|R2O9VZEU64LFXH|          4|          115|        136|   N|                Y|
| RQN1N103PBM22|          1|           61|         69|   N|                Y|
|R2FDEPNQCD8OZI|          5|          109|        125|   N|                Y|
|R2IP1IS4HLRSB2|          4|           83|         87|   N|                Y|
|R3O1UL27MCXO23|          3|          108|        116|   N|                Y|
|R1966830MKMBD7|          4|          629|        668|   N|                Y|
|R3HG21YGM9D8A9|          4|          249|        298|   N|     

In [8]:
# Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y' (Step 3)
vinepaid_df = hv50_df.filter(hv50_df.vine == 'Y').show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R326QV66CKBB43|          4|          127|        139|   Y|                N|
|R2ZS3WCJYUDQH0|          3|          101|        115|   Y|                N|
|R31771VBPRDVJS|          4|           75|         83|   Y|                N|
|R1LSBTTOR7MC2T|          5|          364|        373|   Y|                N|
|R23USDQ7OW3CWH|          5|           58|         65|   Y|                N|
|R21XG60MOE370I|          5|           95|        105|   Y|                N|
|R38IT6Q7UDG0AC|          5|           52|         65|   Y|                N|
|R1ZNRVVBZEZPMT|          2|           64|         74|   Y|                N|
| RYCSFTDYMJCXV|          2|           52|         60|   Y|                N|
|R2HCK1N56I0Q6I|          5|          294|        312|   Y|     

In [9]:
# Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'
vineunpaid_df = hv50_df.filter(hv50_df.vine == 'N').show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1WQZB0CAEPQBU|          2|           85|        128|   N|                N|
|R1WZSWWOTN58OP|          5|           73|         77|   N|                Y|
|R3V8FKXIHBLWEL|          5|           91|        108|   N|                Y|
|R2O9VZEU64LFXH|          4|          115|        136|   N|                Y|
| RQN1N103PBM22|          1|           61|         69|   N|                Y|
|R2FDEPNQCD8OZI|          5|          109|        125|   N|                Y|
|R2IP1IS4HLRSB2|          4|           83|         87|   N|                Y|
|R3O1UL27MCXO23|          3|          108|        116|   N|                Y|
|R1966830MKMBD7|          4|          629|        668|   N|                Y|
|R3HG21YGM9D8A9|          4|          249|        298|   N|     

In [15]:
# Determine the total number of reviews ( for filtered DataFrame in Step 2)
totalreviews_hv50_df = hv50_df.count()
print(f"Total number of reviews (helpful_votes over 50 %) : {totalreviews_hv50_df}")

Total number of reviews (helpful_votes over 50 %) : 14701


In [46]:
# Number of paid reviews Vine program ( for filtered DataFrame in Step 2)
vinepaid_df = hv50_df.filter(hv50_df.vine == 'Y').count()
vinepaid_df

233

In [47]:
# Number of unpaid reviews Non Vine program ( for filtered DataFrame in Step 2)
unpaid_df = hv50_df.filter(hv50_df.vine == 'N').count()
unpaid_df

14468

In [20]:
# The number of 5-star reviews (for filtered DataFrame in Step 2)
starreviews_5_df = hv50_df.filter(hv50_df["star_rating"] == "5").count()
starreviews_5_df


7971

In [22]:
# 5-star reviews for Vine program (paid), (filtered DataFrame in Step 2)
starreviews_5_Vinepaid_df = hv50_df.filter((hv50_df["star_rating"] == "5") & (vine_df["vine"]=="Y")) .count()
starreviews_5_Vinepaid_df

103

In [23]:
# 5-star reviews for non Vine program (unpaid), (filtered DataFrame in Step 2)
starreviews_5_Vineunpaid_df = hv50_df.filter((hv50_df["star_rating"] == "5") & (vine_df["vine"]=="N")) .count()
starreviews_5_Vineunpaid_df 

7868

In [48]:
# Percentage of 5-star reviews for Vine program (paid), (filtered DataFrame in Step 2)
five_star_vine_program_paid = (starreviews_5_Vinepaid_df/vinepaid_df)*100
round(five_star_vine_program_paid,2)

44.21

In [49]:
# Percentage of 5-star reviews for Non  Vine program (unpaid), (filtered DataFrame in Step 2)
five_star_nonvine_program_unpaid = (starreviews_5_Vineunpaid_df/unpaid_df)*100
round(five_star_nonvine_program_unpaid,2)

54.38

In [None]:
## Amazon_Reviews_US_Camera FULL DATASET

In [10]:
# Determine the total number of reviews
totalreviews = vine_df.count()
print(f"Total number of reviews : {totalreviews}")

Total number of reviews : 1801974


In [11]:
# The number of 5-star reviews
vine_df.filter(vine_df["star_rating"] == "5").count()

1062706

In [51]:
# Number of paid reviews Vine program 
vinepaid_total = vine_df.filter(vine_df.vine == 'Y').count()
vinepaid_total

7883

In [52]:
# Number of Non paid reviews  
unpaid_total = vine_df.filter(vine_df.vine == 'N').count()
unpaid_total 

1794089

In [53]:
# The number of Vine  5-star reviews
fivestar_total_vine = vine_df.filter((vine_df["star_rating"] == "5") & (vine_df["vine"]=="Y")).count()
fivestar_total_vine

3293

In [54]:
# The number of nonVine  5-star reviews
fivestar_total_nonvine = vine_df.filter((vine_df["star_rating"] == "5") & (vine_df["vine"]=="N")).count()
fivestar_total_nonvine

1059413

In [55]:
#  Percentage of 5-star reviews for Vine program (paid)
vine_percentage_five_star = (fivestar_total_vine/vinepaid_total)*100
round(vine_percentage_five_star,2)

41.77

In [56]:
#  Percentage of 5-star reviews for Non Vine program (unpaid)
nonvine_percentage_five_star = (fivestar_total_nonvine/unpaid_total)*100
round(nonvine_percentage_five_star,2)

59.05