In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Gift_Card_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   24371595|R27ZP1F1CD0C3Y|B004LLIL5A|     346014806|Amazon eGift Card...|       Gift Card|          5|            0|          0|   N|                Y|          Five Stars|Great birthday gi...| 2015-08-31|
|         US|   42489718| RJ7RSBCHUDNNE|B004LLIKVU|     473048287|Amazon.com eGift ...|       Gift Card|          5|    

### Recreate vine_table DataFrames

In [6]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R27ZP1F1CD0C3Y|          5|            0|          0|   N|                Y|
| RJ7RSBCHUDNNE|          5|            0|          0|   N|                Y|
|R1HVYBSKLQJI5S|          5|            0|          0|   N|                Y|
|R2HAXF0IIYQBIR|          1|            0|          0|   N|                Y|
| RNYLPX611NB7Q|          5|            0|          0|   N|                Y|
|R3ALA9XXMBEDZR|          5|            0|          0|   N|                Y|
|R3R8PHAVJFTPDF|          5|            0|          0|   N|                Y|
|R18WWEK8OIXE30|          5|            0|          0|   N|                Y|
|R1EGUNQON2J277|          1|            0|          0|   N|                Y|
|R21Z4M4L98CPU2|          5|            0|          0|   N|     

In [11]:
vine_df.count()

149086

In [9]:
# vine_df is filtered to create a DataFrame or table where there are 20 or more total votes
vine_df_20 = vine_df.filter("total_votes>=20")
vine_df_20.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R36PQ9D8L2AAOH|          5|           30|         32|   N|                Y|
|R23GHBE86II0SK|          5|           18|         21|   N|                Y|
|R18KVPY0TO33BM|          5|          241|        248|   N|                Y|
|R2ZB78BDM8BRCF|          1|           99|        113|   N|                Y|
|R2JMJOWGM7V7NX|          4|           20|         20|   N|                N|
|R10ORTN0I3G5B9|          1|           19|         26|   N|                Y|
|R1D1VPJBTP1WG8|          1|           38|         51|   N|                Y|
|R3Q3ULUTULAQYH|          1|           22|         29|   N|                Y|
|R1SHEELXA1IXQP|          5|           45|         57|   N|                Y|
|R1IQJ99JOPJOE4|          5|           73|         94|   N|     

In [12]:
vine_df_20.count()

392

In [18]:
# The data is filtered to create a DataFrame where the percentage of helpful_votes is equal to or greater than 50%
# vine_df_helpful_50 = vine_df_20.filter("helpful_votes>50")
vine_df_helpful_50 = vine_df_20.withColumn('percent_helpful_votes', vine_df_20['helpful_votes']/vine_df_20['total_votes']).filter("percent_helpful_votes>0.50")
vine_df_helpful_50.show()

+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|percent_helpful_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R36PQ9D8L2AAOH|          5|           30|         32|   N|                Y|               0.9375|
|R23GHBE86II0SK|          5|           18|         21|   N|                Y|   0.8571428571428571|
|R18KVPY0TO33BM|          5|          241|        248|   N|                Y|   0.9717741935483871|
|R2ZB78BDM8BRCF|          1|           99|        113|   N|                Y|   0.8761061946902655|
|R2JMJOWGM7V7NX|          4|           20|         20|   N|                N|                  1.0|
|R10ORTN0I3G5B9|          1|           19|         26|   N|                Y|   0.7307692307692307|
|R1D1VPJBTP1WG8|          1|           38|         51|   N|                Y|   0.7450980392156863|


In [19]:
vine_df_helpful_50.count()

355

In [23]:
# The data is filtered to create a DataFrame or table where there is a Vine review
from pyspark.sql.functions import col
vine_yes = vine_df_helpful_50.filter(col("vine")== "Y")
vine_yes.show()

+---------+-----------+-------------+-----------+----+-----------------+---------------------+
|review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|percent_helpful_votes|
+---------+-----------+-------------+-----------+----+-----------------+---------------------+
+---------+-----------+-------------+-----------+----+-----------------+---------------------+



In [25]:
vine_yes.count()

0

In [29]:
# The data is filtered to create a DataFrame or table where there IS NO Vine review
vine_no = vine_df_helpful_50.filter(col("vine")== "N")
vine_no.show()

+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|percent_helpful_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R36PQ9D8L2AAOH|          5|           30|         32|   N|                Y|               0.9375|
|R23GHBE86II0SK|          5|           18|         21|   N|                Y|   0.8571428571428571|
|R18KVPY0TO33BM|          5|          241|        248|   N|                Y|   0.9717741935483871|
|R2ZB78BDM8BRCF|          1|           99|        113|   N|                Y|   0.8761061946902655|
|R2JMJOWGM7V7NX|          4|           20|         20|   N|                N|                  1.0|
|R10ORTN0I3G5B9|          1|           19|         26|   N|                Y|   0.7307692307692307|
|R1D1VPJBTP1WG8|          1|           38|         51|   N|                Y|   0.7450980392156863|


In [31]:
# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).
total_number_of_reviews = vine_df_helpful_50.count()
number_of_5_star_reviews = vine_df_helpful_50.filter("star_rating==5").count()


90

In [37]:
unpaid_total_number_of_reviews = vine_no.count()
unpaid_5_star_reviews = vine_no.filter("star_rating==5").count()
percentage_unpaid_5_star_reviews = unpaid_5_star_reviews/unpaid_total_number_of_reviews
percentage_unpaid_5_star_reviews

0.2535211267605634

In [51]:
paid_total_number_of_reviews = vine_yes.count()
paid_5_star_reviews = vine_yes.filter("star_rating==5").count()
percentage_paid_5_star_reviews = paid_5_star_reviews/paid_total_number_of_reviews
percentage_paid_5_star_reviews

ZeroDivisionError: ignored

In [52]:
print("Total number of paid reviews %f" % total_number_of_reviews)
print("Total number of the number of 5-star reviews %f" % number_of_5_star_reviews)
print("Percentage of unpaid 5 star reviews %f" % (percentage_unpaid_5_star_reviews*100), "%")
print("Percentage of paid 5 star reviews %f" % (percentage_paid_5_star_reviews*100), "%")

Total number of paid reviews 355.000000
Total number of the number of 5-star reviews 90.000000
Percentage of unpaid 5 star reviews 25.352113 %


NameError: ignored