<a href="https://colab.research.google.com/github/RichardYDepestre/amazon_vine_analysis/blob/main/Vine_Review_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.0.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.38)] [Connected to cloud.r-pro                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [2 InRelease 47.5 kB/88.7 kB 54%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 242 kB] [2 InRelease 47.5 kB/88.7 kB 54%] [Connecting to s                                                                               Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [1 InRelease gpgv 242 kB] [2 InRelease 80.8 kB/88.7 kB 91%] [Connecting to s0% [1 InRelease gpgv 242 kB] [Connecting to security.ubuntu.com (91.189.91.38)]                                             

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-06-06 21:34:56--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-06-06 21:34:58 (1.68 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
# df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Home_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
# df.show(truncate=False)

### Create DataFrames to match tables

In [None]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Home_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show(5, truncate=False)

In [None]:
print(f"entries in dataset: {df.count()}")

entries in dataset: 6221559


In [None]:
# Filter the data and create a new DataFrame or table to retrieve all the rows where 
# the total_votes count is equal to or greater than 20 to pick reviews that are more likely to 
# be helpful and to avoid having division by zero errors later on.
n_df_gt20 = df.filter(df.total_votes >= 20)
n_df_gt20.show(5, truncate=False)

+-----------+-----------+--------------+----------+--------------+----------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
print(f"all rows total votes count greater or equal to 20: {n_df_gt20.count()}")
n_df_gt20.columns

all rows total votes count greater or equal to 20: 96370


['marketplace',
 'customer_id',
 'review_id',
 'product_id',
 'product_parent',
 'product_title',
 'product_category',
 'star_rating',
 'helpful_votes',
 'total_votes',
 'vine',
 'verified_purchase',
 'review_headline',
 'review_body',
 'review_date']

In [None]:
# Filter the new DataFrame or table created in Step 1 and create a new DataFrame or 
# table to retrieve all the rows where the number of helpful_votes divided by total_votes 
# is equal to or greater than 50%.
# If you use the SQL option below, you’ll need to cast your columns as floats using 
# WHERE CAST(helpful_votes AS FLOAT)/CAST(total_votes AS FLOAT) >=0.5.
hlp_votes = n_df_gt20.helpful_votes
ttl_votes =  n_df_gt20.total_votes
pct = 0.5
n_df_votes = n_df_gt20.filter((hlp_votes/ttl_votes)>=pct)
print(f"number of votes: {n_df_votes.count()}.")
n_df_votes.show()

In [None]:
# Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves 
# all the rows where a review was written as part of the Vine program (paid), vine == 'Y'.
rows_w_reviews = n_df_votes.filter(n_df_votes.vine == "Y")
rows_w_reviews.show(5, truncate=False)
print(f"number of rows with reviews: {rows_w_reviews.count()}.")

+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves 
# all the rows where a review was written as part of the Vine program (paid), vine == 'N'.
rows_wo_reviews = n_df_votes.filter(n_df_votes.vine == "N")
rows_wo_reviews.show(5, truncate=False)
print(f"number of rows without reviews: {rows_wo_reviews.count()}.")

+-----------+-----------+--------------+----------+--------------+----------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Determine the total number of reviews
total_reviews = rows_w_reviews.count()
print(f"total purchases with reviews {total_reviews}.")
total_no_reviews = rows_wo_reviews.count()
print(f"total purchases without reviews {total_no_reviews}.")

# the number of 5-star reviews
five_star_reviews = rows_w_reviews.filter(rows_w_reviews.star_rating == 5).count()
print(f"\npurchases with 5-star review: {str(five_star_reviews)}.")
five_star_no_reviews = rows_wo_reviews.filter(rows_wo_reviews.star_rating == 5).count()
print(f"purchases without 5-star review: {str(five_star_no_reviews)}.")

five_star_reviews_pct = (five_star_reviews / total_reviews) * 100
print(f"\npercent purchases with 5-star review: {str(five_star_reviews_pct)}.")
five_star_no_reviews_pct = (five_star_no_reviews / total_no_reviews) * 100
print(f"\percent purchases with 5-star no review: {str(five_star_no_reviews_pct)}.")

total purchases with reviews 1448.
total purchases without reviews 90768.

purchases with 5-star review: 647.
purchases without 5-star review: 44104.

percent purchases with 5-star review: 44.68232044198895.
\percent purchases with 5-star no review: 48.589811387273045.
