In [66]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [3 InRelease 14.2 kB/88.7 kB 16%] [Connecting to security.ubuntu.com (185.120% [1 InRelease gpgv 3,626 B] [3 InRelease 14.2 kB/88.7 kB 16%] [Connecting to 0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
0% [1 InRelease gpgv 3,626 B] [4 InRelease 8,39

In [67]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-12-12 04:26:43--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2022-12-12 04:26:44 (5.34 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [70]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [71]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...|2015-08-31 00:00:00|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie Stick Fibl...| 

### Create DataFrames to match tables

In [54]:
from pyspark.sql.functions import to_date
import pyspark.sql.functions as F
from pyspark.sql.functions import avg
from pyspark.sql import Window
from pyspark.sql.functions import sum
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

# Read in the Review dataset as a DataFrame


In [22]:
# show columns
df.columns

['marketplace',
 'customer_id',
 'review_id',
 'product_id',
 'product_parent',
 'product_title',
 'product_category',
 'star_rating',
 'helpful_votes',
 'total_votes',
 'vine',
 'verified_purchase',
 'review_headline',
 'review_body',
 'review_date']

In [76]:
# Create the vine_table. DataFrame
#df.filter("Salary > 80").select("occupation").show()
votes_df = df.filter("total_votes >= 20")
votes_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   44689470|R2WOW0TURNXB26|B00YY3UBV2|     310491927|            Garmin 1|        Wireless|          3|           54|         59|   N|                Y|Pretty Disappoint...|Bought this unit ...|2015-08-31 00:00:00|
|         US|     112342|R13VL62Y2HBQ0B|B010VFZJD6|     129632031|iTaste MVP3 PRO -...| 

In [96]:
new_total_df = votes_df.filter("(helpful_votes/total_votes)>=0.5").select(["product_id", "review_id", "star_rating", "helpful_votes", "vine", "total_votes"])
new_total_df.show()

+----------+--------------+-----------+-------------+----+-----------+
|product_id|     review_id|star_rating|helpful_votes|vine|total_votes|
+----------+--------------+-----------+-------------+----+-----------+
|B00YY3UBV2|R2WOW0TURNXB26|          3|           54|   N|         59|
|B010VFZJD6|R13VL62Y2HBQ0B|          5|           15|   N|         21|
|B00C8S8S4W|R22G55KAPZKJQV|          4|           20|   N|         21|
|B011I4XMXS|R1610PGTJS7G3N|          2|           28|   N|         44|
|B00OSTKZWM| RLQL04BL0QXOJ|          4|           45|   N|         47|
|B013D32WVA|R2AYJHH8WJNGAU|          4|           98|   N|        104|
|B013BHLU66|R111DJA10Y6CMU|          5|           26|   N|         39|
|B00Y1Z87UU|R2EE2TR4MRDV0U|          5|           53|   N|         59|
|B00UY29N8Y| RD4A80I5JDHED|          5|           69|   N|         77|
|B00NPZG6DW|R1GU6IYZQWQE8X|          2|           24|   N|         25|
|B00X0X3EQ6| RZOPM62JMW97V|          2|          103|   N|        108|
|B013F

In [97]:
# Create the vine_table. DataFrame
#df.select(["product_id", "review_id", "star_rating", "helpful_votes", "vine", "total_votes"])
#https://sqlandhadoop.com/pyspark-filter-25-examples-to-teach-you-everything/ df.filter("price<20").show() #https://www.geeksforgeeks.org/filtering-rows-based-on-column-values-in-pyspark-dataframe/
vine_paid_df = new_total_df.filter("vine == 'Y'")
vine_paid_df.show()

+----------+--------------+-----------+-------------+----+-----------+
|product_id|     review_id|star_rating|helpful_votes|vine|total_votes|
+----------+--------------+-----------+-------------+----+-----------+
|B013X0V11K|R1MAOLI5FJHAFM|          4|          249|   Y|        261|
|B013X0V4VM| R9PYAUDIBJVEC|          4|           12|   Y|         22|
|B013X0V11K| R6V9SHMMG5M8F|          5|          101|   Y|        110|
|B011HT9AL2|R37PVLT6ELL5J4|          4|          181|   Y|        209|
|B0129T0XXS| R2FSFGWZF24V9|          4|           50|   Y|         51|
|B0129TQLPW|R3SRW1E8J56IGV|          5|          262|   Y|        281|
|B0129T0XXS| R86Z11D4CWOFM|          4|           32|   Y|         36|
|B00W7S34HY| RNP01HW9YISJO|          4|           20|   Y|         23|
|B00W75BKQ4|R3KLACA6LCDZ0S|          3|           21|   Y|         22|
|B0129T0XXS| RZEQYOT2RE0N7|          4|           75|   Y|         80|
|B0129TQLPW|R2WBPX441TH495|          5|          202|   Y|        221|
|B0129

In [98]:
vine_unpaid_df = new_total_df.filter("vine = 'N'")
vine_unpaid_df.show()

+----------+--------------+-----------+-------------+----+-----------+
|product_id|     review_id|star_rating|helpful_votes|vine|total_votes|
+----------+--------------+-----------+-------------+----+-----------+
|B00YY3UBV2|R2WOW0TURNXB26|          3|           54|   N|         59|
|B010VFZJD6|R13VL62Y2HBQ0B|          5|           15|   N|         21|
|B00C8S8S4W|R22G55KAPZKJQV|          4|           20|   N|         21|
|B011I4XMXS|R1610PGTJS7G3N|          2|           28|   N|         44|
|B00OSTKZWM| RLQL04BL0QXOJ|          4|           45|   N|         47|
|B013D32WVA|R2AYJHH8WJNGAU|          4|           98|   N|        104|
|B013BHLU66|R111DJA10Y6CMU|          5|           26|   N|         39|
|B00Y1Z87UU|R2EE2TR4MRDV0U|          5|           53|   N|         59|
|B00UY29N8Y| RD4A80I5JDHED|          5|           69|   N|         77|
|B00NPZG6DW|R1GU6IYZQWQE8X|          2|           24|   N|         25|
|B00X0X3EQ6| RZOPM62JMW97V|          2|          103|   N|        108|
|B013F

In [112]:
#total number of reviews
#df.count()
new_total_df.count()


65581

In [113]:
#number of 5-star reviews
#df.filter("star_rating == 5").count()
new_total_df.filter("star_rating == 5").count()

30765

In [125]:
vine_paid_df.count()

613

In [126]:
vine_unpaid_df.count()

64968

In [130]:
vine_paid_df.filter("star_rating == 5").count()

222

In [131]:
vine_unpaid_df.filter("star_rating == 5").count()

30543

In [133]:
#the percentage of 5-star reviews for paid
#(vine_paid_df.count()/new_total_df.filter("star_rating == 5").count())*100

In [134]:
#the percentage of 5-star reviews unpaid
#(vine_unpaid_df.count()/new_total_df.filter("star_rating == 5").count())*100

In [128]:
#the percentage of 5-star reviews for paid
(vine_paid_df.filter("star_rating == 5").count()/vine_paid_df.count())*100

36.215334420880914

In [135]:
#the percentage of 5-star reviews for unpaid
(vine_unpaid_df.filter("star_rating == 5").count()/vine_unpaid_df.count())*100

47.01237532323606