In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/
SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()
     

In [None]:
# download a Postgres driver that will allow Spark to interact with Postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.17.jar

In [None]:
# start a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VineReview").config("spark.driver.extraClassPath","/content/postgresql-42.2.17.jar").getOrCreate()

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Shoes_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

In [None]:
# Retrieve all the rows where the total_votes count is equal to or greater than 20 
vine_votes_df=vine_df.filter("total_votes>=20")
vine_votes_df.show()

In [None]:
# Retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
votes_ratio_df=vine_votes_df.filter("helpful_votes/total_votes >= 0.5")
votes_ratio_df.show()

In [None]:
# Retrieves all the rows where a review was written as part of the Vine program (paid)
paid_review_df=votes_ratio_df.filter(votes_ratio_df.vine == "Y")
paid_review_df.show()

In [None]:
# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for paid program
from pyspark.sql.functions import count

# Total nb of reviews 
nb_paid_reviews= paid_review_df.count()
print("The total number of reviews for paid program is:")  
print(nb_paid_reviews)


In [None]:
# Nnumber of 5 stars reviews for paid program
nb_5star_paid_reviews = paid_review_df.filter(paid_review_df.star_rating == 5).count()
print("The total number of 5-star reviews for paid program is:")  
print(nb_5star_paid_reviews)

In [None]:
# Percentage of 5-star reviews for paid program
perc_5star_paid = float(nb_5star_paid_reviews)/float(nb_paid_reviews)
print("The percentage of 5-star reviews for paid program is:")  
print(perc_5star_paid)

In [None]:
# Retrieves all the rows where the review was not part of the Vine program (unpaid)
unpaid_review_df=votes_ratio_df.filter(votes_ratio_df.vine == "N")
unpaid_review_df.show()

In [None]:
# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for unpaid program
# Total nb of reviews 
nb_unpaid_reviews= unpaid_review_df.count()
print("The total number of reviews for unpaid program is:")  
print(nb_unpaid_reviews)

In [None]:
# Number of 5 stars reviews for unpaid program 
nb_5star_unpaid_reviews = unpaid_review_df.filter(unpaid_review_df.star_rating == 5).count()
print("The total number of 5-star reviews for unpaid program is:")  
print(nb_5star_unpaid_reviews)

In [None]:
# Percentage of 5-star reviews for unpaid program
perc_5star_unpaid = float(nb_5star_unpaid_reviews)/float(nb_unpaid_reviews)
print("The percentage of 5-star reviews for unpaid program is:")  
print(perc_5star_unpaid)