In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()


In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Electronics_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Electronics_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
# Filter DataFrame for total_votes above or equal to 20
df1 = df.filter(df.total_votes >= 20)
df1.show(5)

In [None]:
# Filter DataFrame for helpful_votes ratio above or equal to 50%
df2 = df1.filter((df1.helpful_votes / df1.total_votes) >= 0.5)
df2.show(5)

In [None]:
# Create paid vine DataFrame
paid_df = df2.filter(df2.vine == 'Y')
paid_df.show(5)

In [None]:
# Create unpaid vine DataFrame
unpaid_df = df2.filter(df2.vine == 'N')
unpaid_df.show(5)

In [None]:
# Paid total number of reviews
total_paid_reviews = paid_df.count()
total_paid_reviews

In [None]:
# Paid 5-star reviews
paid_five_star_reviews = paid_df.filter(paid_df.star_rating == 5).count()
paid_five_star_reviews

In [None]:
# paid 5-star reviews percentage
paid_five_star_percent = (paid_five_star_reviews / total_paid_reviews) * 100
paid_five_star_percent

In [None]:
# unpaid total number of reviews
total_unpaid_reviews = unpaid_df.count()
total_unpaid_reviews

In [None]:
# unpaid 5-star reviews
unpaid_five_star_reviews = unpaid_df.filter(unpaid_df.star_rating == 5).count()
unpaid_five_star_reviews

In [None]:
# unpaid 5-star reviews percentage
unpaid_five_star_percent = (unpaid_five_star_reviews / total_unpaid_reviews) * 100
unpaid_five_star_percent