In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/{spark_version}/{spark_version}-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install findspark # Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"# Start a SparkSession

# Start a SparkSession
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Automotive_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
# Read in the Review dataset as a DataFrame
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
# Create the vine_table DataFrame
vine_df = df.select(['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase'])
vine_df.show()

In [None]:
# Filter where the total_votes count is equal to or greater than 20,  create a new DataFrame
total_votes_greaterthan20 = vine_df.filter("total_votes>=20")
total_votes_greaterthan20.show()

In [None]:
# Filter and create a new DataFrame to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%
helpful_divided_by_total = total_votes_greaterthan20.filter(total_votes_greaterthan20["helpful_votes"]/total_votes_greaterthan20["total_votes"]>=0.5)
helpful_divided_by_total.show()

In [None]:
# Filter the and create a new DataFrame that retrieves all the rows where a review was written as part of the Vine program (paid
vine_paid = helpful_divided_by_total.filter(helpful_divided_by_total["vine"] == "Y") 
vine_paid.show()

In [None]:
# Filter and create a new DataFrame that retrieves all the rows where the review was not part of the Vine program (unpaid)
vine_unpaid = helpful_divided_by_total.filter(helpful_divided_by_total["vine"] == "N") 
vine_unpaid.show()

In [None]:
# total number of reviews
total_number_reviews = helpful_divided_by_total.count()
total_number_reviews

In [None]:
# number of five star reviews
fivestar_reviews = helpful_divided_by_total.filter(helpful_divided_by_total["star_rating"] == 5) 
number_fivestar_reviews = fivestar_reviews.count()
number_fivestar_reviews

In [None]:
# number of paid reviews
paid_reviews = vine_paid.count()
paid_reviews

In [None]:
# number of unpaid reviews
unpaid_reviews = vine_unpaid.count()
unpaid_reviews

In [None]:
# number of paid five star reviews
fivestar_reviews_paid = vine_paid.filter(vine_paid["star_rating"] == 5)
number_fivestar_reviews_paid = fivestar_reviews_paid.count()
number_fivestar_reviews_paid

In [None]:
# number of unpaid five star reviews
fivestar_reviews_unpaid = vine_unpaid.filter(vine_unpaid["star_rating"] == 5)
number_fivestar_reviews_unpaid = fivestar_reviews_unpaid.count()
number_fivestar_reviews_unpaid

In [None]:
# percentage of paid five star reviews
fivestars_as_percent_paid = (number_fivestar_reviews_paid/paid_reviews)*100
fivestars_as_percent_paid 

In [None]:
# percentage of unpaid five star reviews
fivestars_as_percent_unpaid  = (number_fivestar_reviews_unpaid/unpaid_reviews)*100
fivestars_as_percent_unpaid 