In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.1'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 http://ppa.launchpad

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-04-27 01:37:04--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-04-27 01:37:06 (1.23 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [None]:
# start a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Challenge_Deliverable2").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [8]:
# Load in data from S3 bucket
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show(20,truncate=False)

+-----------+-----------+--------------+----------+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+-----------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [9]:
# Create the vine_table DataFrame
vine_df = df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase'])
vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|RED72VWWCOS7S |1          |2            |8          |N   |Y                |
|RZHWQ208LTEPV |5          |0            |0          |N   |Y                |
|R37LBC3XAVLYOO|5          |4            |5          |N   |Y                |
|R3L7XJMA0MVJWC|5          |0            |0          |N   |Y                |
|R2I2GHSI7T1UBN|1          |5            |6          |N   |Y                |
|R2GFFKHK4I6VMX|5          |0            |0          |N   |Y                |
|R1R0UDX2XAN1S4|4          |0            |0          |N   |Y                |
|R22C8FMBSTFRY8|5          |2            |2          |N   |Y                |
|R118NNIQ75XPGO|3          |0            |0          |N   |Y                |
|R30HYXHZQ49621|2          |0            |0          |N   |Y    

In [10]:
# filter for only columns with 20 or more total votes
clean_vine_df = vine_df.filter(vine_df["total_votes"]>=20)
clean_vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|RQQ3KVTU5TJ4I |5          |24           |29         |N   |Y                |
|R3FELXWV9T5CWE|2          |22           |24         |N   |Y                |
|ROBYK6EZYK398 |5          |29           |30         |N   |Y                |
|R2RKCSAG6GBA4A|1          |8            |28         |N   |Y                |
|R2YVBBR6NXIA4V|5          |25           |28         |N   |N                |
|R2AVTBDIVG2AW4|5          |26           |26         |N   |N                |
|R1Z2LNN3FANMTO|1          |20           |24         |N   |N                |
|RLNULBKRWNNR  |5          |42           |43         |N   |Y                |
|R9QNQUL94RX1F |3          |27           |33         |N   |Y                |
|RTULFZTUS1VBP |5          |51           |52         |N   |Y    

In [11]:
# filter for only columns with mostly helpful votes
helpful_vine_df = clean_vine_df.filter("helpful_votes/total_votes>=0.5")
helpful_vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|RQQ3KVTU5TJ4I |5          |24           |29         |N   |Y                |
|R3FELXWV9T5CWE|2          |22           |24         |N   |Y                |
|ROBYK6EZYK398 |5          |29           |30         |N   |Y                |
|R2YVBBR6NXIA4V|5          |25           |28         |N   |N                |
|R2AVTBDIVG2AW4|5          |26           |26         |N   |N                |
|R1Z2LNN3FANMTO|1          |20           |24         |N   |N                |
|RLNULBKRWNNR  |5          |42           |43         |N   |Y                |
|R9QNQUL94RX1F |3          |27           |33         |N   |Y                |
|RTULFZTUS1VBP |5          |51           |52         |N   |Y                |
|R1BM9RBQWI62O2|5          |43           |60         |N   |N    

In [12]:
# load in a sql function to use columns 
from pyspark.sql.functions import col

# Filter new DF that retrieves all rows where review for Vine(paid) column with vine == "Y"
helpful_vine_paid_df = helpful_vine_df.filter(col("vine") == 'Y')
helpful_vine_paid_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28DXTC3JQ9IY1|4          |24           |26         |Y   |N                |
|R3AFZKLQXATHBU|5          |44           |49         |Y   |N                |
|R2RUUF2JPJPC0E|4          |20           |22         |Y   |N                |
|RFZ2WUH4248AB |2          |26           |27         |Y   |N                |
|R1Q4LVHIFOWYFR|5          |23           |28         |Y   |N                |
|R4YEPTQED3X1Q |5          |19           |20         |Y   |N                |
|R2Z7C8YCRSC9DP|5          |22           |22         |Y   |N                |
|R3J8OI5CB74P5K|1          |22           |25         |Y   |N                |
|RH39LMKN6AZDC |5          |33           |40         |Y   |N                |
|R8RD8K0ESJSRD |5          |21           |21         |Y   |N    

In [13]:
# Filter new DF that retrieves all rows where review for Vine(unpaid) column with vine == "N"

helpful_vine_unpaid_df = helpful_vine_df.filter(col("vine") == 'N')
helpful_vine_unpaid_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|RQQ3KVTU5TJ4I |5          |24           |29         |N   |Y                |
|R3FELXWV9T5CWE|2          |22           |24         |N   |Y                |
|ROBYK6EZYK398 |5          |29           |30         |N   |Y                |
|R2YVBBR6NXIA4V|5          |25           |28         |N   |N                |
|R2AVTBDIVG2AW4|5          |26           |26         |N   |N                |
|R1Z2LNN3FANMTO|1          |20           |24         |N   |N                |
|RLNULBKRWNNR  |5          |42           |43         |N   |Y                |
|R9QNQUL94RX1F |3          |27           |33         |N   |Y                |
|RTULFZTUS1VBP |5          |51           |52         |N   |Y                |
|R1BM9RBQWI62O2|5          |43           |60         |N   |N    

In [14]:
# Determine total number of reviews
helpful_vine_df.count()

49103

In [15]:
# Determine total number of paid reviews
helpful_vine_paid_df.count()

386

In [16]:
# Determine total number of unpaid reviews
helpful_vine_unpaid_df.count()

48717

In [17]:
# Determine number of 5-star reviews
helpful_vine_df.filter(col("star_rating") == 5).count()

24202

In [18]:
# Determine number of paid 5-star reviews
helpful_vine_paid_df.filter(col("star_rating") == 5).count()

176

In [19]:
# Determine number of unpaid 5-star reviews
helpful_vine_unpaid_df.filter(col("star_rating") == 5).count()

24026

In [20]:
#Determine percentage of 5-star paid reviews
a = helpful_vine_paid_df.count()
b = helpful_vine_paid_df.filter(col("star_rating") == 5).count()
percentage_paid = b/a*100
print(str(round(percentage_paid,2)) + "% of paid reviews are 5-stars")

45.6% of paid reviews are 5-stars


In [21]:
# Determine percentage of 5-star unpaid reviews
c = helpful_vine_unpaid_df.count()
d = helpful_vine_unpaid_df.filter(col("star_rating") == 5).count()
percentage_unpaid = d/c * 100
print(str(round(percentage_unpaid,2)) + "% of unpaid reviews are 5-stars")

49.32% of unpaid reviews are 5-stars
