In [None]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("vineanalysis").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
# Load in data from S3 bucket
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Video_Games_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

In [None]:
# filter for only columns with 20 or more total votes 
total_votes = vine_df.filter('total_votes>=20')
total_votes.show(truncate=False)

In [None]:
# filter for only columns with mostly helpful votes
helpful_votes_df = total_votes.filter("helpful_votes/total_votes>=0.5")
helpful_votes_df.show()

In [None]:
# load in a sql function to use columns 
from pyspark.sql.functions import col

# Filter last DF to only columns with vine == "Y" (Paid)
paid_helpful_votes = helpful_votes_df.filter(col("vine") == "Y")
paid_helpful_votes.show()

In [None]:
# Filter last DF to only columns with vine == "N" (Unpaid)
unpaid_helpful_votes = helpful_votes_df.filter(col("vine") == "N")
unpaid_helpful_votes.show()

In [None]:
# Determine total number of reviews
helpful_votes_df.count()

# Determine total number of paid reviews
paid_helpful_votes.count()

# Determine total number of unpaid reviews
unpaid_helpful_votes.count()

# Determine number of 5-star reviews
helpful_votes_df.filter(col("star_rating") == 5).count()

# Determine number of 5-star paid reviews
paid_helpful_votes.filter(col("star_rating") == 5).count()

In [None]:
# Determine number of 5-star unpaid reviews
unpaid_helpful_votes.filter(col("star_rating") == 5).count()

In [None]:
# Determine percentage of 5-star paid reviews
count = paid_helpful_votes.count()
filtered = paid_helpful_votes.filter(col("star_rating") == 5).count()
percent_paid = filtered/count * 100
print(str(round(percent_paid,2)) + "% of paid reviews are 5-stars")

In [None]:
# Determine percentage of 5-star unpaid reviews
count_unpaid = unpaid_helpful_votes.count()
filtered_unpaid = unpaid_helpful_votes.filter(col("star_rating") == 5).count()
percent_unpaid = filtered_unpaid/count_unpaid * 100
print(str(round(percent_unpaid,2)) + "% of unpaid reviews are 5-stars")