In [None]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine_Review_Analysis").getOrCreate()

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Furniture_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

In [None]:
# retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%
more_votes = vine_df.filter(vine_df["total_votes"] > 20)
more_votes.show()

In [None]:
# Filter for rows with helpful votes making up 50% or more of all votes
most_helpful = more_votes.filter((more_votes["helpful_votes"]/more_votes["total_votes"]) >= 0.5)
most_helpful.show()

In [None]:
# retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'
paid_reviews = most_helpful.filter(most_helpful["vine"] == "Y")
paid_reviews.show()

In [None]:
# retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'
unpaid_reviews = most_helpful.filter(most_helpful["vine"] == "N")
unpaid_reviews.show()

In [None]:
# Determine total number of reviews, number of 5-star reviews, and percentage of 5-star reviews paid vs unpaid

In [None]:
total_reviews = paid_reviews.count() + unpaid_reviews.count()
print(f'There are {total_reviews} total reviews')

In [None]:
paid_reviews_count = paid_reviews.count()
unpaid_reviews_count = unpaid_reviews.count()
print(f'There are {paid_reviews_count} from vine users and {unpaid_reviews_count} from regular users.')

In [None]:
paid_five_star = paid_reviews.filter(paid_reviews.star_rating == 5).count()
unpaid_five_star = unpaid_reviews.filter(unpaid_reviews.star_rating == 5).count()
print(f"There are {paid_five_star} five star reviews from vine users and {unpaid_five_star} from non-vine users.")

In [None]:
paid_reviews_percent = (paid_five_star / paid_reviews_count) * 100
unpaid_reviews_percent = (unpaid_five_star / unpaid_reviews_count) * 100
print(f"{paid_reviews_percent: .2f}% of reviews from vine users are 5-star and {unpaid_reviews_percent:.2f}% of reviews from non-vine users are 5-star.")