In [None]:
import os
# Find the latest version of spark 2.0 from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-2.4.7'
spark_version = 'spark-2.4.7'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables ",
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
!pip install pyspark

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-ChallengeD2").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:

from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Furniture_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Furniture_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
# Confirm total_vote, helpful_votes, and star_rating are all integers
df

In [None]:
# Filter furniture data frame on total_vote greater than or equal to 20
totalvotes_df = df.filter("total_votes>=20")
totalvotes_df.show()

In [None]:
helpful_50_df = totalvotes_df.filter("helpful_votes/total_votes>=.50")
helpful_50_df.show()

In [None]:
print(helpful_50_df.count())

In [None]:
vine_review_df = helpful_50_df.filter(helpful_50_df["vine"] == "Y")
vine_review_df.show()

In [None]:
novine_review_df = helpful_50_df.filter(helpful_50_df["vine"] == "N")
novine_review_df.show()

In [None]:
from pyspark.sql.functions import count
# Total number of reviews for each dataframe from Step 3 and Step 4:
vine_review_count = vine_review_df.count()
print("Total Number of Reviews PAID and 'helpful': %f" % vine_review_count)

novine_review_count = novine_review_df.count()
print("Total Number of Reviews NON-PAID and 'helpful': %f" % novine_review_count)

In [None]:
# Number of 5-star Reviews for above Paid and Non-Paid dataframes
star5_vine_df = vine_review_df.filter(novine_review_df["star_rating"]=="5")
print("Total Number of 5-star Reviews PAID and 'helpful': %f" % star5_vine_df.count())


star5_novine_df = novine_review_df.filter(novine_review_df["star_rating"]=="5")
print("Total Number of 5-star Reviews NON-PAID and 'helpful': %f" % star5_novine_df.count())

In [None]:
# Percentage of 5-star Reviews for above Paid and Non-Paid dataframes
star5_vine_pct = (star5_vine_df.count()/vine_review_count)
print("Percent of 5-Star Furniture Reviews from PAID, 'helpful' dataset: %f" % star5_vine_pct)

star5_novine_pct = (star5_novine_df.count()/novine_review_count)
print("Percent of 5-Star Furniture Reviews from NON-PAID, 'helpful' dataset: %f" % star5_novine_pct)

In [None]:
# Total number of ALL reviews
total_reviews_count = df.count()
print("Total Number of Furniture Reviews: %f" % total_reviews_count)

help_reviews_count = helpful_50_df.count()
print("Total Number of 'Helpful' Furniture Reviews: %f" % help_reviews_count)

In [None]:
#Total number of ALL 5-star reviews
star5_df = df.filter(df["star_rating"] == '5')
star5_df.show()

star5_count = star5_df.count()
print("Total Number of 5-Star Furniture Reviews: %f" % star5_count)

In [None]:
#Total number of 5-star HELPFUL reviews
star5_help_df = helpful_50_df.filter(helpful_50_df["star_rating"] == '5')
star5_help_df.show()

star5_help_count = star5_help_df.count()
print("Total Number of 5-Star 'Helpful' Furniture Reviews: %f" % star5_help_count)

In [None]:
# Percentage of ALL 5-star reviews paid vs non-paid 
# 5-star Paid reviews 
vine_star5_all_df = star5_df.filter(star5_df["vine"] == "Y")
vine_star5_all_count = vine_star5_all_df.count()

vine_star5_all_pct = (vine_star5_all_count/star5_count)
print("Number of 5-Star Furniture Reviews PAID: %f" % vine_star5_all_count)
print("Percent of 5-Star Furniture Reviews PAID: %f" % vine_star5_all_pct)
 
#5-star Non-Paid reviews
novine_star5_all_df = star5_df.filter(star5_df["vine"] == "N")
novine_star5_all_count = novine_star5_all_df.count()

novine_star5_all_pct = novine_star5_all_count/star5_count
print("Number of 5-Star Furniture Reviews NON-PAID: %f" % novine_star5_all_count)
print("Percent of 5-Star Furniture Reviews NON-PAID: %f" % novine_star5_all_pct)

In [None]:
# Percentage of HELPFUL 5-star reviews paid vs non-paid (% of 5 star reviews in Step 3 dataframe) 
# 5-star Paid "helpful" reviews 
vine_star5_help_df = star5_help_df.filter(star5_help_df["vine"] == "Y")
vine_star5_help_count = vine_star5_help_df.count()

vine_star5_help_pct = (vine_star5_help_count/star5_help_count)
print("Number of 5-Star, 'Helpful' Furniture Reviews PAID: %f" % vine_star5_help_count)
print("Percent of 5-Star,'Helpful'Furniture Reviews PAID: %f" % vine_star5_help_pct)
 
#5-star Non-Paid "helpful" reviews
novine_star5_help_df = star5_help_df.filter(star5_help_df["vine"] == "N")
novine_star5_help_count = novine_star5_help_df.count()

novine_star5_help_pct = novine_star5_help_count/star5_help_count
print("Number of 5-Star, 'Helpful' Furniture Reviews NON-PAID: %f" % novine_star5_help_count)
print("Percent of 5-Star,'Helpful'Furniture Reviews NON-PAID: %f" % novine_star5_help_pct)

In [None]:
#Percent of Vine vs non-Vine all reviews
vine_all_df = df.filter(df["vine"] == "Y")
novine_all_df = df.filter(df["vine"] == "N")
vine_pct_all = (vine_all_df.count()/df.count())
novine_pct_all = (novine_all_df.count()/df.count())


print(df.count())
print(vine_all_df.count())
print(novine_all_df.count())
print(vine_pct_all)
print(novine_pct_all)

In [None]:
# Percent of All Vine that is 5-Star
vine_all_star5_df = vine_all_df.filter(vine_all_df["star_rating"] == '5')
vine_all_star_df = vine_all_df.filter(vine_all_df["star_rating"] != '5')

novine_all_star5_df = novine_all_df.filter(novine_all_df["star_rating"] == '5')
novine_all_star_df = novine_all_df.filter(novine_all_df["star_rating"] != '5')

vine_pct_star5_all = (vine_all_star5_df.count()/vine_all_df.count())
novine_pct_star5_all = (novine_all_star5_df.count()/novine_all_df.count())

print(vine_all_star5_df.count())
print(vine_all_star_df.count())

print(novine_all_star5_df.count())
print(novine_all_star_df.count())

print(vine_pct_star5_all)
print(novine_pct_star5_all)

In [None]:
# Pull out all Vine reviewed products
vine_prod_df = vine_all_df.select(vine_all_df["product_id"]).dropDuplicates()
vine_prod_df.show()
vine_prod_df.count()

In [None]:
# create an inner join between all the Vine Reviews df and the full df
inner_join = vine_prod_df.join(df, vine_all_df.product_id == df.product_id, how="inner").dropDuplicates()
inner_join.show()
inner_join.count()

In [None]:
# Rating breakdown 
new_df = inner_join.groupby("star_rating").count()
new_df.show()

In [None]:
# Total Vine Reviews for Vine Products
vine_new_df = inner_join.filter(inner_join["vine"] == "Y")
vine_new_df.count()

In [None]:
# Total Vine Reviews for Vine Products
novine_new_df = inner_join.filter(inner_join["vine"] == "N")
novine_new_df.count()

In [None]:
# % of 5-star reviews are Vine or non-Vine Reviews
vine_star5_new = vine_new_df.groupby("star_rating").count()
vine_star5_new.show()

In [None]:
# % of 5-star reviews are Vine or non-Vine Reviews
novine_star5_new = novine_new_df.groupby("star_rating").count()
novine_star5_new.show()