In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version
# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
# !wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.2.tgz
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.2.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.2.tgz
!pip install -q findspark
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3.2"
# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 0 B/88.7 kB                                                                                Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bion

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-02-26 17:59:23--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2022-02-26 17:59:23 (6.49 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine_Analysis").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
home_improve_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
home_improve_df.count()

2634781

### Filter the data

In [None]:
# Deliverable 2 Step 1 filter df to retrieve all rows where total_votes >=20

filter_home_improve_df = home_improve_df.filter(home_improve_df.total_votes >=20).select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase'])

filter_home_improve_df.show(5)


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2VIQ3UX794Q0O|          4|           21|         21|   N|                Y|
|R1OA24IIHWF54G|          5|           43|         45|   N|                Y|
| RJ7N3OOJR9RL0|          4|           63|         71|   N|                Y|
|R1W8778CBXSRU6|          5|           28|         29|   N|                Y|
|R2EFAM03SWLIJX|          1|           32|         35|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [None]:
# Deliverable 2 Step  2 Filter filter_home_improve_df to show only helpful_votes/total_votes >= 50%
hi_helpful_df = filter_home_improve_df.filter((filter_home_improve_df.helpful_votes / filter_home_improve_df.total_votes) >= 0.5)
hi_helpful_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2VIQ3UX794Q0O|          4|           21|         21|   N|                Y|
|R1OA24IIHWF54G|          5|           43|         45|   N|                Y|
| RJ7N3OOJR9RL0|          4|           63|         71|   N|                Y|
|R1W8778CBXSRU6|          5|           28|         29|   N|                Y|
|R2EFAM03SWLIJX|          1|           32|         35|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [None]:
# Deliverable 2 Step 3 Filter the datafram in previous to create a new df that retrieves all the rows where a review was written as part of the vine program. 

paid_df = filter_home_improve_df.filter(hi_helpful_df.vine == 'Y')
paid_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R29V4UDSS053D8|          3|           33|         38|   Y|                N|
|R1I2D068WC37PA|          3|           32|         37|   Y|                N|
|R13W2U74F67QED|          5|           62|         76|   Y|                N|
|R2QI37XFOBKUGD|          4|           24|         33|   Y|                N|
|R19F60BB2DNCKN|          5|          170|        174|   Y|                N|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [None]:
# Deliverable 2 Step 4 Filter the datafram in previous to create a new df that retrieves all the rows where a review was written as NOT part of the vine program. 

no_paid_df = filter_home_improve_df.filter(hi_helpful_df.vine == 'N')
no_paid_df.show(5)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2VIQ3UX794Q0O|          4|           21|         21|   N|                Y|
|R1OA24IIHWF54G|          5|           43|         45|   N|                Y|
| RJ7N3OOJR9RL0|          4|           63|         71|   N|                Y|
|R1W8778CBXSRU6|          5|           28|         29|   N|                Y|
|R2EFAM03SWLIJX|          1|           32|         35|   N|                Y|
+--------------+-----------+-------------+-----------+----+-----------------+
only showing top 5 rows



In [None]:
# Deliverable 2 Step 5 - total number of reviews

total_reviews = paid_df.count() + no_paid_df.count()
total_reviews

41458

In [None]:
# total 5-star paid

pd_5star = paid_df.filter(paid_df.star_rating ==5).count()
pd_5star

125

In [None]:
# total 5-star no paid

np_5star = no_paid_df.filter(no_paid_df.star_rating ==5).count()
np_5star

18463

In [None]:
# Deliverable 2 Step 5 - total number of 5-star reviews

total_5star_reviews = pd_5star + np_5star
total_5star_reviews

18588

In [None]:
# total paid reviews

total_paid = paid_df.count()

total_paid

267

In [None]:
# total no paid reviews

total_no_paid = no_paid_df.count()

total_no_paid

41191

In [None]:
# Deliverable 2 Step 5 - percent 5 star reviews paid
paid_5star_perc = pd_5star / total_paid * 100
paid_5star_perc

46.81647940074906

In [None]:
# Deliverable 2 Step 5 - percent 5 star reviews no paid
no_paid_5star_perc = np_5star / total_no_paid * 100
no_paid_5star_perc

44.82289820591877