In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu b

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...| 2015-08-31|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...|     Video Games|          5|    

In [4]:
# Recreate the vine_table DataFrame
vine_df = df.select(["review_id", "star_rating", "vine", "verified_purchase", "helpful_votes", "total_votes",])
vine_df.show(20)

+--------------+-----------+----+-----------------+-------------+-----------+
|     review_id|star_rating|vine|verified_purchase|helpful_votes|total_votes|
+--------------+-----------+----+-----------------+-------------+-----------+
| RTIS3L2M1F5SM|          5|   N|                Y|            0|          0|
| R1ZV7R40OLHKD|          5|   N|                Y|            0|          0|
|R3BH071QLH8QMC|          1|   N|                Y|            0|          1|
|R127K9NTSXA2YH|          3|   N|                Y|            0|          0|
|R32ZWUXDJPW27Q|          4|   N|                Y|            0|          0|
|R3AQQ4YUKJWBA6|          1|   N|                Y|            0|          0|
|R2F0POU5K6F73F|          5|   N|                Y|            0|          0|
|R3VNR804HYSMR6|          5|   N|                Y|            0|          0|
| R3GZTM72WA2QH|          5|   N|                Y|            0|          0|
| RNQOY62705W1K|          4|   N|                Y|            0

In [5]:
# Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on.
from pyspark.sql.functions import col
total_votes_df = vine_df.filter(col("total_votes") >= 20)
total_votes_df.show()

+--------------+-----------+----+-----------------+-------------+-----------+
|     review_id|star_rating|vine|verified_purchase|helpful_votes|total_votes|
+--------------+-----------+----+-----------------+-------------+-----------+
| R4PKAZRQJJX14|          1|   N|                N|           21|         34|
|R2CI0Y288CC7E2|          1|   N|                Y|           21|         35|
|R127WEQY2FM1T3|          1|   N|                Y|          147|        175|
|R3EZ0EPYLDA34S|          1|   N|                Y|           14|         31|
|R2FJ94555FZH32|          2|   N|                N|           55|         60|
|R1U3AR67RE273L|          1|   N|                Y|           51|         65|
|R3PZOXA5X1U8KW|          4|   N|                N|           31|         36|
| R6KTC1OPIOIIG|          2|   N|                Y|           19|         34|
|R36O341WWXXKNP|          5|   N|                N|           28|         31|
|R3GSK9MM8DNOYI|          1|   N|                N|            4

In [10]:
# Filter the new DataFrame or table created in Step 1 and create a new DataFrame or table to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
vote_percent_df = total_votes_df.withColumn('vote_percent', col('helpful_votes')/col('total_votes')).alias('vote_percent').filter(col("vote_percent") >= 0.5)
vote_percent_df.show()

+--------------+-----------+----+-----------------+-------------+-----------+------------------+
|     review_id|star_rating|vine|verified_purchase|helpful_votes|total_votes|      vote_percent|
+--------------+-----------+----+-----------------+-------------+-----------+------------------+
| R4PKAZRQJJX14|          1|   N|                N|           21|         34|0.6176470588235294|
|R2CI0Y288CC7E2|          1|   N|                Y|           21|         35|               0.6|
|R127WEQY2FM1T3|          1|   N|                Y|          147|        175|              0.84|
|R2FJ94555FZH32|          2|   N|                N|           55|         60|0.9166666666666666|
|R1U3AR67RE273L|          1|   N|                Y|           51|         65|0.7846153846153846|
|R3PZOXA5X1U8KW|          4|   N|                N|           31|         36|0.8611111111111112|
| R6KTC1OPIOIIG|          2|   N|                Y|           19|         34|0.5588235294117647|
|R36O341WWXXKNP|          5|  

In [11]:
# Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'.
paid_review_df = vote_percent_df.filter(col("vine") == "Y").show()

+--------------+-----------+----+-----------------+-------------+-----------+------------------+
|     review_id|star_rating|vine|verified_purchase|helpful_votes|total_votes|      vote_percent|
+--------------+-----------+----+-----------------+-------------+-----------+------------------+
|R3KKUSGFZWSUIY|          5|   Y|                N|           56|         63|0.8888888888888888|
|R10FO5UKKVZBK2|          3|   Y|                N|           23|         23|               1.0|
| RM4KSGEOR7MU1|          5|   Y|                N|           19|         24|0.7916666666666666|
| RG7VRMYLEXD23|          4|   Y|                N|           22|         26|0.8461538461538461|
|R11O4YSCPSNL6L|          3|   Y|                N|           20|         26|0.7692307692307693|
|R286MFBAJ8NPD6|          5|   Y|                N|           46|         51|0.9019607843137255|
|R1JRR530H4COA2|          5|   Y|                N|           22|         28|0.7857142857142857|
| RQ5WD90PUNBU9|          5|  

In [12]:
# Repeat above, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.
unpaid_review_df = vote_percent_df.filter(col("vine") == "N").show()

+--------------+-----------+----+-----------------+-------------+-----------+------------------+
|     review_id|star_rating|vine|verified_purchase|helpful_votes|total_votes|      vote_percent|
+--------------+-----------+----+-----------------+-------------+-----------+------------------+
| R4PKAZRQJJX14|          1|   N|                N|           21|         34|0.6176470588235294|
|R2CI0Y288CC7E2|          1|   N|                Y|           21|         35|               0.6|
|R127WEQY2FM1T3|          1|   N|                Y|          147|        175|              0.84|
|R2FJ94555FZH32|          2|   N|                N|           55|         60|0.9166666666666666|
|R1U3AR67RE273L|          1|   N|                Y|           51|         65|0.7846153846153846|
|R3PZOXA5X1U8KW|          4|   N|                N|           31|         36|0.8611111111111112|
| R6KTC1OPIOIIG|          2|   N|                Y|           19|         34|0.5588235294117647|
|R36O341WWXXKNP|          5|  

In [13]:
# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).
from pyspark.sql.functions import col,when,count,lit
reviews_df = vote_percent_df.groupBy("vine").agg(
    count(col("vine")).alias("review_total"),
    count(when(col("star_rating") == 5, True)).alias("5star_reviews"),
    (count(when(col("star_rating") == 5, True))/count(col("vine"))*100).alias("5star_reviews_percent")).show()

+----+------------+-------------+---------------------+
|vine|review_total|5star_reviews|5star_reviews_percent|
+----+------------+-------------+---------------------+
|   Y|          94|           48|    51.06382978723404|
|   N|       40471|        15663|   38.701786464381904|
+----+------------+-------------+---------------------+

