In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connected to cloud.r-pro                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:11 http://ppa.l

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-09-22 22:05:56--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.2’


2021-09-22 22:05:57 (5.64 MB/s) - ‘postgresql-42.2.16.jar.2’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_multilingual_DE_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)

In [5]:
# Create the vine_table DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RVOG49N0H1FB6|          5|            0|          0|   N|                Y|
| RNCMD6OLTP4HM|          5|            1|          1|   N|                Y|
| R4AUOBI8YC0R8|          5|            0|          0|   N|                Y|
|R1VSHIJ1RHIBTE|          5|            0|          0|   N|                Y|
|R3JBLVALWSLCZD|          5|            9|         14|   N|                Y|
| RJ6GK77Y2NKCK|          4|            6|          7|   N|                Y|
| R13W0E5EDX50J|          5|            0|          1|   N|                Y|
|R2L8UN6YSPY1CK|          3|            1|          3|   N|                Y|
| RA2DFU68J832H|          4|            1|          1|   N|                Y|
|R191XKANDDV34N|          4|            0|          1|   N|     

In [6]:
# Create a DataFrame to retrieve all the rows where the total_votes count is equal to or greater than 20:
vine_dfa = vine_df.filter(vine_df.total_votes >= 20)
vine_dfa.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3JSOBULWNE10K|          5|           14|         23|   N|                N|
| R973TDYT0B600|          4|          289|        350|   N|                Y|
|R2A78SJJS8NPUK|          5|           37|         56|   N|                Y|
| RF3JH2YYCJKRM|          4|            2|         40|   N|                Y|
|R31KTNNEZK2IDF|          2|            2|         32|   N|                Y|
|R1QTHPDQZZ4HCX|          1|            5|         86|   N|                Y|
|R2UT7AZ2X3AY1A|          4|           46|         49|   N|                Y|
| RH6ZMZVUIKMAG|          1|            5|         26|   N|                Y|
|R3TH9UIAKX1FF1|          1|            5|         22|   N|                Y|
|R3FV6QDK4DS9CI|          1|           23|         47|   N|     

In [7]:
# Create a new DataFrame to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%:
vine_dfb = vine_dfa.filter(vine_dfa['helpful_votes']/vine_dfa['total_votes'] >= 0.5)
vine_dfb.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3JSOBULWNE10K|          5|           14|         23|   N|                N|
| R973TDYT0B600|          4|          289|        350|   N|                Y|
|R2A78SJJS8NPUK|          5|           37|         56|   N|                Y|
|R2UT7AZ2X3AY1A|          4|           46|         49|   N|                Y|
|R1HAGIRT9K8YYH|          5|           19|         35|   N|                N|
|R35XPA8PFX88SH|          4|           44|         53|   N|                N|
| ROL6706VGJFC6|          5|           44|         59|   N|                N|
|R3CYZYCBJ4KXMF|          1|           15|         20|   N|                N|
|R2P7AWG36QK0LN|          5|           68|         76|   N|                Y|
|R17Q8279ZKZ630|          1|           12|         20|   N|     

In [8]:
# Create a new DataFrame that retrieves all the rows where a review was written as part of the Vine program:
vine_dfc = vine_dfb.filter(vine_dfb['vine']=="Y")
vine_dfc.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R11E7I5VQ03MK9|          3|          179|        222|   Y|                N|
| RIPUS8IPBEQ3I|          4|           37|         40|   Y|                N|
|R2QREWQEZOB2CK|          5|           21|         24|   Y|                N|
| R9RWVZPQ6MZES|          5|           23|         23|   Y|                N|
|R3TWKO71IXOP02|          3|           49|         51|   Y|                N|
|R3T1ONLG4I4GVH|          5|           25|         29|   Y|                N|
|R2E1FQHAOPHG51|          5|           59|         73|   Y|                N|
| RMPKTFKVEF5VL|          4|           13|         22|   Y|                N|
|R1G2VBVXL4U7R6|          4|          121|        137|   Y|                N|
|R2D7X1L8I2JSZE|          3|           19|         21|   Y|     

In [9]:
# Create a new DataFrame that retrieves all the rows where a review was not written as part of the Vine program:
vine_dfd = vine_dfb.filter(vine_dfb['vine'] == "N")
vine_dfd.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3JSOBULWNE10K|          5|           14|         23|   N|                N|
| R973TDYT0B600|          4|          289|        350|   N|                Y|
|R2A78SJJS8NPUK|          5|           37|         56|   N|                Y|
|R2UT7AZ2X3AY1A|          4|           46|         49|   N|                Y|
|R1HAGIRT9K8YYH|          5|           19|         35|   N|                N|
|R35XPA8PFX88SH|          4|           44|         53|   N|                N|
| ROL6706VGJFC6|          5|           44|         59|   N|                N|
|R3CYZYCBJ4KXMF|          1|           15|         20|   N|                N|
|R2P7AWG36QK0LN|          5|           68|         76|   N|                Y|
|R17Q8279ZKZ630|          1|           12|         20|   N|     

In [10]:
# Determine the total number of reviews:
vine_reviews_count = vine_dfa.count()
vine_reviews_count

52100

In [11]:
# Determine the total number of paid reviews:
paid_reviews_count = vine_dfc.count()
paid_reviews_count

26

In [12]:
# Determine the total number of unpaid reviews:
unpaid_reviews_count = vine_dfd.count()
unpaid_reviews_count

29264

In [13]:
# Determine the total number of 5-star reviews:
vine_5_star = vine_df.filter(vine_df.star_rating == 5).count()
vine_5_star

438977

In [14]:
# Determine the total number of paid 5-star reviews:
paid_star_count = vine_dfc.filter(vine_dfc.star_rating == 5).count()
paid_star_count

9

In [15]:
# Determine the total number of unpaid 5-star reviews:
unpaid_star_count = vine_dfd.filter(vine_dfd.star_rating == 5).count()
unpaid_star_count

15784

In [16]:
# Determine the total percentage of paid 5-star reviews:
table_1 = (paid_star_count/paid_reviews_count)*100
table_1

34.61538461538461

In [17]:
# Determine the total percentage of paid 5-star reviews:
table_2 = (unpaid_star_count/unpaid_reviews_count)*100
table_2

53.93657736468015