In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example: version = 'spark-3.0.3'
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
from pyspark import SparkFiles

url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Pet_Products_v1_00.tsv.gz"

spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
df.printSchema()

In [None]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame

In [None]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

In [None]:
tot_reviews = vine_df.count()
tot_reviews

In [None]:
# 1. Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count 
# is equal to or greater than 20 to pick reviews that are more likely to be helpful and to avoid having dividing  by zero
# It wil reduce the data
review_df = vine_df.filter ("total_votes >= 20")
review_df.show(5)

In [None]:
# 2. Step 1 and create a new DataFrame or table to retrieve all the rows where the number of helpful_votes 
# divided by total_votes is equal to or greater than 50%.
pos_votes_df = review_df.filter ("(helpful_votes /total_votes) >= .5")
pos_votes_df.show(5)

In [None]:
# 3. Filter the DataFrame or table created in Step 2, and create a new DataFrame or table 
# that retrieves all the rows where a review was written as part of the Vine program (paid)
paid_votes_df = pos_votes_df.filter ("vine=='Y'") 
paid_votes_df.show(5)

In [None]:
paid_votes_count = paid_votes_df.count()
paid_votes_count

In [None]:
# 3. Filter the DataFrame or table created in Step 2, and create a new DataFrame or table 
# that retrieves all the rows where a review was written as part of the Vine program (nopaid),
nopaid_votes_df = pos_votes_df.filter ("vine=='N'")
nopaid_votes_df.show(5)

In [None]:
nopaid_votes_count = nopaid_votes_df.count()
nopaid_votes_count

In [None]:
# The total number of reviews, the number of 5-star reviews, and the percentage 5-star reviews are calculated 
#for all Vine and non-Vine reviews (15 pt)
tot_reviews = pos_votes_df.count()
tot_reviews

In [None]:
fivestar_reviews_df = pos_votes_df.filter("star_rating == 5")
fivestar_totreview = fivestar_reviews_df.count()
fivestar_totreview

In [None]:
#the number of 5-star paid reviews
fivestar_paid_df = paid_votes_df.filter("star_rating == 5")
fivestar_paid_review = fivestar_paid_df.count()
fivestar_paid_review

In [None]:
#the number of 5-star no paid reviews
fivestar_nopaid_df = nopaid_votes_df.filter("star_rating == 5")
fivestar_nopaid_review = fivestar_nopaid_df.count()
fivestar_nopaid_review

In [None]:
paid_per_reviews = fivestar_paid_review / fivestar_totreview
paid_per_reviews

In [None]:
nopaid_per_reviews = fivestar_nopaid_review / fivestar_totreview
nopaid_per_reviews

In [None]:
paid_votes_df.groupBy(paid_votes_df["star_rating"]).count().show()