In [1]:
import os

spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 3,626 B] [3 InRelease 14.2 kB/88.7 kB 16%] [Connecting to 0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:4 http://archive.ubuntu.com/ub

In [2]:
from pyspark.sql import SparkSession
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-10-18 17:35:46--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-10-18 17:35:47 (5.04 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://hales-coursework.s3.us-west-2.amazonaws.com/amazon_reviews_us_Outdoors_v1_00.tsv"
spark.sparkContext.addFile(url)
amazon_reviews_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Outdoors_v1_00.tsv"), sep="\t", header=True)

In [18]:
#transform customer id, product parent, star rating, helpful votes, total votes to integer values
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
schema = [StructField("marketplace", StringType(), True),
          StructField("customer_id", IntegerType(), True),
          StructField("review_id", StringType(), True), 
          StructField("product_id", StringType(), True),
          StructField("product_parent", IntegerType(), True),
          StructField("product_title", StringType(), True),
          StructField("product_category", StringType(), True),
          StructField("star_rating", IntegerType(), True),
          StructField("helpful_votes", IntegerType(), True),
          StructField("total_votes", IntegerType(), True),
          StructField("vine", StringType(), True),
          StructField("verified_purchase", StringType(), True),
          StructField("review_headline", StringType(), True),
          StructField("review_body", StringType(), True),
          StructField("review_date", StringType(), True)]

In [19]:
#creat vine_table df
vine_table_df = amazon_reviews_df.select(['review_id', 'star_rating', 'helpful_votes','total_votes',
                                          'vine', 'verified_purchase'])
vine_table_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R35T75OLUGHL5C|          4|            0|          0|   N|                Y|
|R2BV735O46BN33|          5|            0|          0|   N|                Y|
|R2NBEUGPQQGXP1|          4|            0|          0|   N|                Y|
|R17LLAOJ8ITK0S|          3|            1|          1|   N|                Y|
|R39PEQBT5ISEF4|          1|            0|          0|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [20]:
#filter to show only results with 20 or more votes
total_votes_df = vine_table_df.filter("total_votes>=20")
total_votes_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2FP3U4NHNFNL2|          5|           25|         29|   N|                Y|
|R1UUK1977O38MU|          5|           31|         31|   N|                Y|
| RXO216LWUDV6O|          3|           29|         31|   N|                Y|
|R3NMJF7EBMM19V|          3|           26|         27|   N|                Y|
|R2ZY0ZBDUO0XUY|          3|           20|         21|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [55]:
#filter table to return rowns where helpful votes / total votes is equal or greater than 50% 
helpful_votes_df = total_votes_df.filter((total_votes_df.helpful_votes / total_votes_df.total_votes) >= 0.5)
helpful_votes_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2FP3U4NHNFNL2|          5|           25|         29|   N|                Y|
|R1UUK1977O38MU|          5|           31|         31|   N|                Y|
| RXO216LWUDV6O|          3|           29|         31|   N|                Y|
|R3NMJF7EBMM19V|          3|           26|         27|   N|                Y|
|R2ZY0ZBDUO0XUY|          3|           20|         21|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [63]:
from pyspark.sql.functions import col

In [68]:
#return rows where reviews were written as part of vine program (paid)
paid_df = helpful_votes_df.filter(col('vine') == 'Y')
paid_df.count()

107

In [70]:
#return rows of paid reviews that are 5 star 
paid_5star_df = paid_df.filter(col('star_rating') == 5)
paid_5star_df.count()

56

In [74]:
#percent of 5-star paid reviews 
paid_percent_df = (paid_5star_df.count()/paid_df.count())*100
print("The percent of paid five-star vine reviews is: %f" % paid_percent_df + "%")

The percent of paid five-star vine reviews is: 52.336449%


In [69]:
#return rows where review was not part of vine program
not_paid_df = helpful_votes_df.filter(col('vine') == 'N')
not_paid_df.count()

39869

In [76]:
not_paid_5star_df = not_paid_df.filter(col('star_rating') == 5)
not_paid_5star_df.count()

21005

In [77]:
# percent of unpaid 5 star reviews
unpaid_percent = (not_paid_5star_df.count() / not_paid_df.count()) * 100
print("The percent of unpaid five-star reviews is: %f" % unpaid_percent + "%")

The percent of unpaid five-star reviews is: 52.685044%
