### Import and install and start a spark session

In [1]:
# in starter-code, spark-3.0.3 hadoop 2.7; current is spark-3.3.0 hadoop3

import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.0'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Waiting for headers] [Conne                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 242 kB] [2 InRelease 14.2 kB/88.7 kB 16%] [Waiting for hea                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:8 http://arch

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-10-14 00:39:39--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2022-10-14 00:39:40 (1.99 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Kitchen_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

In [5]:
import pyspark.sql.functions as F
# Read in the Review dataset as a DataFrame
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   37000337|R3DT59XH7HXR9K|B00303FI0G|     529320574|Arthur Court Pape...|         Kitchen|          5|            0|          0|   N|                Y|Beautiful. Looks ...|Beautiful.  Looks...|2015-08-31 00:00:00|
|         US|   15272914|R1LFS11BNASSU8|B00JCZKZN6|     274237558|Olde Thompson Bav...| 

 Initial exploration revealed 6 records that are not read in correctly: marketplace, customer_id, review_id, product_id, product_parent are fine, but the rest is all in product_title, including "\t" separators. Here they are:

In [6]:
from pyspark.sql.functions import col
df.filter(col("product_category").isNull()).show(truncate=False)

+-----------+-----------+--------------+----------+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                                                                                                                                                                                                                                                                                             

These do not share product_id with other records, so will not corrupt uniqueness for the product table below.

In [7]:
misread_prod_ids = ['B00QD66F7W',  'B008A7HHVK',  'B001D0MQ1M',  'B000QHMSOI',  'B000UWQP20',  'B0011D1SWE']
df.filter(F.col("product_id").isin(misread_prod_ids)).count()

6

### Create and analyze the Vine DataFrame

In [8]:
# Create the vine_table. DataFrame
vine_df = df.select(['review_id',  'star_rating',  'helpful_votes',  'total_votes',  'vine',  'verified_purchase'])
vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3DT59XH7HXR9K|5          |0            |0          |N   |Y                |
|R1LFS11BNASSU8|5          |0            |1          |N   |Y                |
|R296RT05AG0AF6|5          |0            |0          |N   |Y                |
|R3V37XDZ7ZCI3L|5          |0            |1          |N   |Y                |
|R14GU232NQFYX2|5          |0            |0          |N   |Y                |
|RZQH4V7L2O1PL |1          |1            |1          |N   |Y                |
|R1F8JMOSPJ3KO7|5          |0            |0          |N   |Y                |
|R1ZISGY2BWW4Z5|5          |0            |0          |N   |Y                |
|R17PW4I3AE5WZW|5          |0            |0          |N   |Y                |
|R3D93G1KTP6A8P|3          |0            |0          |N   |Y    

In [10]:
# step 1: throw away vast majority of records, only keep only those with 20 or more votes
voteful_df = vine_df.filter(vine_df['total_votes'] >= 20)
# step 2: throw away a small fraction of records, keeping those where at least half the votes are "helpful"
helpful_df = voteful_df.where("helpful_votes*2 >= total_votes")
# add a Yes/No column identifying 5-star reviews
star5_df = helpful_df.withColumn('five_star', F.when(df.star_rating == 5, "Y").otherwise("N"))
# pivot and count to get a 2-by-2 box of the four counts requested: vine vs not-vine, and 5star vs not-5star
simple_df = star5_df.select('vine', 'five_star').groupBy('vine').pivot('five_star').count()
# add columns for the requested metrics, and
# rename the column counting 5-start reviews that inherited its name from the value 'Y' of the field 'five_star'
simple2_df = simple_df.withColumn('total reviews', simple_df['Y']+simple_df['N']).withColumnRenamed('Y', '5-star reviews')
simple3_df = simple2_df.withColumn('percent 5-star', simple2_df['5-star reviews'] / simple2_df['total reviews'] * 100)
# remove unwanted column counting non-5-star reviews
output = simple3_df['vine', 'total reviews', '5-star reviews', 'percent 5-star']

The following two dataframes are requested by the specs, but not used in my computation.




In [11]:
#the "DataFrame where there is a Vine review"
only_vine = helpful_df.filter(helpful_df['vine'] == 'Y')
only_vine.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1Z71RW4J9IK93|          5|           20|         22|   Y|                N|
|R3FVB5QI11KI9Q|          4|          192|        200|   Y|                N|
|R2G027YBMVXV6Y|          5|           39|         48|   Y|                N|
|R1QGBAN7BMGWRR|          5|          121|        129|   Y|                N|
|R2NH2AU7XL9ZDZ|          3|           18|         20|   Y|                N|
|R2YVVJ9NOPNX50|          4|           36|         41|   Y|                N|
|R1XH1LK1FWX3OS|          4|          214|        238|   Y|                N|
|R38LSQ71G2IZGS|          5|           26|         29|   Y|                N|
|R2FLITQVKWXDF4|          3|           26|         34|   Y|                N|
|R25LMMZF3DJTWY|          2|           18|         21|   Y|     

In [12]:
# and the "DataFrame where there isn't a Vine review"
only_unvine = helpful_df.filter(helpful_df['vine'] == 'N')
only_unvine.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28RB82UG4RDD5|          5|           20|         20|   N|                Y|
|R3FB6BERWPEIJP|          4|           40|         43|   N|                Y|
|R1D4Z38STRDQXK|          5|           53|         56|   N|                Y|
|R1XMWJZICINIFX|          3|           20|         21|   N|                Y|
|R20QKY1GABXFLM|          1|          272|        297|   N|                Y|
|R328FA1E6FY3F5|          5|           17|         20|   N|                N|
|R3DH22AA5WGLLS|          5|           30|         30|   N|                N|
|R1E0E5EFZSLJCS|          1|           66|         80|   N|                Y|
|R1TXZQWEHYWEWN|          2|           48|         51|   N|                Y|
| RQQAI8YL3UCY2|          5|           23|         25|   N|     

**Here's the requested output!**

In [13]:
output.show()

+----+-------------+--------------+-----------------+
|vine|total reviews|5-star reviews|   percent 5-star|
+----+-------------+--------------+-----------------+
|   Y|         1207|           509|42.17067108533554|
|   N|        97839|         45858|46.87087971054488|
+----+-------------+--------------+-----------------+



Let's see how much the two filterings cut out of the dataset.

In [14]:
orig = vine_df.count()
votef = voteful_df.count()
per_vote = votef/orig *100
helpf = helpful_df.count()
per_help = helpf/votef *100
print(f"Original table had {orig} records.")
print(f"We only kept {votef} records ( {per_vote} percent of the total ) that had 20 or more votes.")
print(f"Then we only kept {helpf} records ( {per_help} percent of those) where at least half the votes were helpful.")


Original table had 4880466 records.
We only kept 107421 records ( 2.2010398187386206 percent of the total ) that had 20 or more votes.
Then we only kept 99046 records ( 92.20357285819347 percent of those) where at least half the votes were helpful.


I'm curious how the results will change if we include unvoteful, unhelpful reviews.

In [15]:
two_column_df = vine_df.withColumn('five_star', F.when(df.star_rating == 5, "Y").otherwise("N"))['vine', 'five_star']
# this time i need to remove those 6 misread records by hand
clean_two_col_df = two_column_df.na.drop(how="any")
# pivot and count to get a 2-by-2 box of the four counts requested: vine vs not-vine, and 5star vs not-5star
pretty_df = clean_two_col_df.select('vine', 'five_star').groupBy('vine').pivot('five_star').count()

In [16]:
pretty_df.show()

+----+-------+-------+
|vine|      N|      Y|
+----+-------+-------+
|   Y|  12681|  11753|
|   N|1739219|3116807|
+----+-------+-------+



So, vine reviews are still below half 5-star, but non-vine reviews are now much more than half 50-star.