In [None]:
import os
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Install Postgres Driver
!wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar")

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Shoes_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Shoes_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
clean_df = df.dropna()

In [None]:
# Recreate vine_table
vine_df = clean_df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

In [None]:
#1. Filter total votes >= 20
vote_count_greater20 = vine_df.filter("total_votes >= 20")
vote_count_greater20.show()

In [None]:
#2. Filter to retrieve helpful_votes/total_votes >= 50
vote_count_greater50 = vote_count_greater20.filter("(helpful_votes/total_votes)>=0.5")
vote_count_greater50.show()

In [None]:
#3. Vine(paid) Reviews
paid_reviews = vote_count_greater50.filter("vine == 'Y'")
paid_reviews.show()

In [None]:
# Vine(unpaid) Reviews
unpaid_reviews = vote_count_greater50.filter("vine == 'N'")
unpaid_reviews.show()

In [None]:
# Total Reviews
total_reviews = vote_count_greater50.count()
total_reviews

In [None]:
# Total Vine(paid)
total_paid_reviews = vote_count_greater50.filter("vine == 'Y'")
total_paid_reviews_count = total_paid_reviews.count()
total_paid_reviews_count

In [None]:
# Total Vine(unpaid)
total_unpaid_reviews = vote_count_greater50.filter ("vine == 'N'")
total_unpaid_reviews_count = total_unpaid_reviews.count()
total_unpaid_reviews_count

In [None]:
# Total 5-star reviews
total_5star_reviews = vote_count_greater50.filter("star_rating == 5")
total_5star_reviews_count = total_5star_reviews.count()
total_5star_reviews_count

In [None]:
# Total 5-star Vine(paid)
paid_5star_reviews = total_paid_reviews.filter("star_rating == 5")
paid_5star_reviews_count = paid_5star_reviews.count()
paid_5star_reviews_count

In [None]:
# Total 5-star vine(unpaid)
unpaid_5star_reviews = total_unpaid_reviews.filter("star_rating < 5")
unpaid_5star_reviews_count = unpaid_5star_reviews.count()
unpaid_5star_reviews_count

In [None]:
# 5-Star paid reviews % of total vine(paid)
paid_5star_per_total_paid = (paid_5star_reviews_count/total_paid_reviews_count) * 100 
round(paid_5star_per_total_paid, 2)

In [None]:
# Paid reviews as % total 5-star reviews
paid_5star_per_total_5star = (paid_5star_reviews_count/total_5star_reviews_count) * 100
round(paid_5star_per_total_5star, 2)

In [None]:
# Unpaid 5-star reviews as % total unpaid reviews
unpaid_5star_per_total_unpaid = (unpaid_5star_reviews_count/total_unpaid_reviews_count) * 100
round(unpaid_5star_per_total_unpaid, 3)

In [None]:
# Unpaid 5-star reviews as % total 5-star reviews
unpaid_5star_per_total_5star = (unpaid_5star_reviews_count/total_5star_reviews_count) * 100
round(unpaid_5star_per_total_5star, 3)