In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Fetched 256 kB in 3s (94.3 kB/s)
Reading package lists... Done


In [None]:
## Download the Postgres driver that will allow Spark to interact with Postgres.
#!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Jewelry_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   50423057|R135Q3VZ4DQN5N|B00JWXFDMG|     657335467|Everbling Purple ...|         Jewelry|          5|            0|          0|   N|                Y|           Beauties!|so beautiful even...| 2015-08-31|
|         US|   11262325|R2N0QQ6R4T7YRY|B00W5T1H9W|      26030170|925 Sterling Silv...|         Jewelry|          5|    

In [None]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame


In [None]:
# Create the vine_table. DataFrame

vine_df = df.select(['review_id', 'star_rating','helpful_votes','total_votes','vine','verified_purchase'])
vine_df= vine_df.withColumn('star_rating',vine_df.star_rating.cast('int'))

vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R135Q3VZ4DQN5N|          5|            0|          0|   N|                Y|
|R2N0QQ6R4T7YRY|          5|            0|          0|   N|                N|
|R3N5JE5Y4T6W5M|          5|            0|          0|   N|                Y|
|R2I150CX5IVY9Q|          5|            0|          0|   N|                Y|
|R1RM9ICOOA9MQ3|          5|            0|          0|   N|                Y|
|R2J2KMDL10UMSH|          5|            0|          0|   N|                Y|
|R3R9ZUFA4TB4FQ|          5|            0|          0|   N|                Y|
|R3UQ8VAQN7R6WL|          5|            0|          0|   N|                Y|
|R1FXZ69C01JNQM|          5|            0|          0|   N|                Y|
| RY36LB4OW0FFS|          5|            0|          0|   N|     

Filter the data to retrieve all the rows where the total_votes count is equal to or greater than 20 

In [None]:
vine_filtered = vine_df.filter(vine_df['total_votes']>='20')


Filter and retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.

In [None]:
vine_filtered2 = vine_filtered.filter(vine_filtered['helpful_votes']/vine_filtered['total_votes']>='0.5')


retrieves all the rows where a review was written as part of the Vine program (paid)

In [None]:
vine_filtered_paid = vine_filtered2.filter(vine_filtered2['vine'] =='Y')


retrieves all the rows where a review was written not as part of the Vine program (unpaid)

In [None]:
vine_filtered_unpaid= vine_filtered2.filter(vine_filtered2['vine'] =='N')


Determine the total number of reviews paid

In [None]:
total_number_of_reviews_paid = vine_filtered_paid.agg({'star_rating':'count'}).withColumnRenamed("count(star_rating)", "total_count_paid")
total_number_of_reviews_paid.show()

+----------------+
|total_count_paid|
+----------------+
|              21|
+----------------+



Determine the total number of reviews unpaid

In [None]:
total_number_of_reviews_unpaid = vine_filtered_unpaid.agg({'star_rating':'count'}).withColumnRenamed("count(star_rating)", "total_count_unpaid")
total_number_of_reviews_unpaid.show()

+------------------+
|total_count_unpaid|
+------------------+
|              7689|
+------------------+



Determine the number of 5-star reviews paid

In [None]:
star_rating_paid = vine_filtered_paid.filter(vine_filtered_paid['star_rating']>='5').agg({'star_rating':'count'}).withColumnRenamed("count(star_rating)", "total_5star_paid")
star_rating_paid.show()

+----------------+
|total_5star_paid|
+----------------+
|              11|
+----------------+



Determine the number of 5-star reviews unpaid

In [None]:
star_rating_unpaid = vine_filtered_unpaid.filter(vine_filtered_unpaid['star_rating']>='5').agg({'star_rating':'count'}).withColumnRenamed("count(star_rating)", "total_5star_unpaid")
star_rating_unpaid.show()



+------------------+
|total_5star_unpaid|
+------------------+
|              4444|
+------------------+



Join the 2 tables for paid

In [None]:
new_df_paid = star_rating_paid.join(total_number_of_reviews_paid)
new_df_paid.show()

+----------------+----------------+
|total_5star_paid|total_count_paid|
+----------------+----------------+
|              11|              21|
+----------------+----------------+



Calculate the percentage of paid 5 star reviews 

In [None]:
percentage_paid = new_df_paid.withColumn('percentage_5star_paid',new_df_paid['total_5star_paid']/new_df_paid['total_count_paid'])
percentage_paid.show()

+----------------+----------------+---------------------+
|total_5star_paid|total_count_paid|percentage_5star_paid|
+----------------+----------------+---------------------+
|              11|              21|   0.5238095238095238|
+----------------+----------------+---------------------+



Join the 2 tables for unpaid

In [None]:
new_df_unpaid = star_rating_unpaid.join(total_number_of_reviews_unpaid)
new_df_unpaid.show()

+------------------+------------------+
|total_5star_unpaid|total_count_unpaid|
+------------------+------------------+
|              4444|              7689|
+------------------+------------------+



Calculate the percentage of paid 5 star reviews 

In [None]:
percentage_unpaid = new_df_unpaid.withColumn('percentage_5star_unpaid',new_df_unpaid['total_5star_unpaid']/new_df_unpaid['total_count_unpaid'])
percentage_unpaid.show()

+------------------+------------------+-----------------------+
|total_5star_unpaid|total_count_unpaid|percentage_5star_unpaid|
+------------------+------------------+-----------------------+
|              4444|              7689|     0.5779685264663805|
+------------------+------------------+-----------------------+

