In [1]:
import os

spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Install Postgres Driver
!wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Get:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Get:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Get:13 https://develo

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-08-31 00:00:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [4]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

# Filter for only columns with total_votes equal or greater than 20
total_votes_20plus_df = df.filter(col("total_votes") >= 20)
total_votes_20plus_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   17728960|R14W2VCHHK5V7W|B00ZI3R5VG|     213303319|Stuhrling Origina...|         Watches|          5|           19|         20|   N|                Y|this is a good al...|As a former Rolex...|2015-08-31 00:00:00|
|         US|   20240757|R1S3T57O3OYT5S|B003DZ7VOW|     336197576|G-Shock GA100A-7A...| 

In [5]:
# Filter the data for helpful_votes/total_votes is >= 50%

helpful_votes_50plus_df = total_votes_20plus_df.filter(col("helpful_votes")/col("total_votes") >= .50)
helpful_votes_50plus_df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   17728960|R14W2VCHHK5V7W|B00ZI3R5VG|     213303319|Stuhrling Origina...|         Watches|          5|           19|         20|   N|                Y|this is a good al...|As a former Rolex...|2015-08-31 00:00:00|
|         US|   20240757|R1S3T57O3OYT5S|B003DZ7VOW|     336197576|G-Shock GA100A-7A...| 

In [6]:
# Filter for data where a review was written as part of the Vine program

paid_vine_df = helpful_votes_50plus_df.filter(col("vine") == 'Y')
paid_vine_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   53096363|R1B7M0OP3UNP6O|B00V0G9OGE|     297682929|Armitron Sport Wo...|         Watches|          5|           49|         52|   Y|                N|Lots of features ...|My wife has been ...|2015-08-27 00:00:00|
|         US|   51394788|R2UUV4UGGYMQG8|B00M1Y5AIU|     368146384|Boulevard ClickTi...| 

In [13]:
paid_vine_df.count()

47

In [14]:
unpaid_vine_df.count()

8362

In [7]:
# Filter for data where a review was not part of the Vine program

unpaid_vine_df = helpful_votes_50plus_df.filter(col("vine") == 'N')
unpaid_vine_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   17728960|R14W2VCHHK5V7W|B00ZI3R5VG|     213303319|Stuhrling Origina...|         Watches|          5|           19|         20|   N|                Y|this is a good al...|As a former Rolex...|2015-08-31 00:00:00|
|         US|   20240757|R1S3T57O3OYT5S|B003DZ7VOW|     336197576|G-Shock GA100A-7A...| 

In [10]:
# Finding total # of review paid

paid_vine_df.count()

47

In [11]:
# Finding total # of review unpaid

unpaid_vine_df.count()

8362

In [8]:
# Finding total # of 5-star reviews paid

paid_star_rating_df = paid_vine_df.filter(col("star_rating") == 5)
paid_star_rating_df.count()

15

In [9]:
# Finding total # of 5-star reviews unpaid

unpaid_star_rating_df = unpaid_vine_df.filter(col("star_rating") == 5)
unpaid_star_rating_df.count()

4332

In [16]:
# Finding percentage of 5-star reviews paid

percent_paid = (paid_star_rating_df.count()/paid_vine_df.count())*100
print("Percentage of 5-star reviews for paid Vine program was: %f" % percent_paid + "%")

Percentage of 5-star reviews for paid Vine program was: 31.914894%


In [17]:
# Finding percentage of 5-star reviews unpaid

percent_unpaid = (unpaid_star_rating_df.count()/unpaid_vine_df.count())*100
print("Percentage of 5-star reviews for unpaid Vine program was: %f" % percent_unpaid + "%")

Percentage of 5-star reviews for unpaid Vine program was: 51.805788%
