In [1]:
import os

# Find the latest versions of
#   Spark & Hadoop:  https://spark.apache.org/downloads.html (https://www.apache.org/dist/spark/)
#   Postgres driver: https://jdbc.postgresql.org/
os.environ['HADOOP_VERSION']   = hadoop_version   = 'hadoop3'
os.environ['SPARK_VERSION']    = spark_version    = 'spark-3.3.1'
os.environ['POSTGRES_VERSION'] = postgres_version = 'postgresql-42.5.1'

# Install Java
! apt install openjdk-11-jdk-headless > /dev/null
os.environ['JAVA_HOME']  = '/usr/lib/jvm/java-11-openjdk-amd64'

# Install Spark
! wget https://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-$HADOOP_VERSION.tgz
! tar xf $SPARK_VERSION-bin-$HADOOP_VERSION.tgz
os.environ['SPARK_HOME'] = f'/content/{spark_version}-bin-{hadoop_version}'
! pip install findspark

# Install Postgres driver
! wget https://jdbc.postgresql.org/download/$POSTGRES_VERSION.jar

# Install AWS's Boto3
! pip install boto3

import boto3
import findspark
findspark.init()
from   getpass     import getpass
from   pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('Vine-Review-Challenge') \
  .config('spark.driver.extraClassPath', f'/content/{postgres_version}.jar') \
  .getOrCreate()
spark



--2022-12-18 18:59:19--  https://www.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving www.apache.org (www.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to www.apache.org (www.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz [following]
--2022-12-18 18:59:20--  https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f8:10a:201a::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz’


2022-12-18 18:59:32 (24.3 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz’ saved [299350810/299350810]

Looking in indexes: https://pypi.org/simple, https://us-python

In [2]:
from pyspark import SparkFiles
url = "https://temp-haerman-dec-15.s3.ap-northeast-1.amazonaws.com/amazon_reviews_us_Baby_v1_00.tsv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Baby_v1_00.tsv"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    9970739| R8EWA1OFT84NX|B00GSP5D94|     329991347|Summer Infant Swa...|            Baby|          5|            0|          0|   N|                Y|Great swaddled bl...|Loved these swadd...|2015-08-31 00:00:00|
|         US|   23538442|R2JWY4YRQD4FOP|B00YYDDZGU|     646108902|Pacifier Clip Gir...| 

In [3]:
# Drop incomplete rows
print(df.count())
df = df.dropna()
print(df.count())

1752932
1752727


In [4]:
# Create the vine_table. DataFrame
vine_df = df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase'])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R8EWA1OFT84NX|          5|            0|          0|   N|                Y|
|R2JWY4YRQD4FOP|          5|            0|          0|   N|                N|
| RL5ESX231LZ0B|          5|            0|          0|   N|                Y|
| RRMS9ZWJ2KD08|          5|            0|          0|   N|                Y|
|R14I3ZG5E6S7YM|          5|            0|          0|   N|                Y|
|R13EPSFP5DODN5|          4|            0|          0|   N|                Y|
| R6RBP4HTE67SY|          5|            0|          0|   N|                Y|
|R15B3EU40RSU2W|          5|            0|          0|   N|                Y|
| RP4DD53A4ZJA2|          5|            0|          0|   N|                Y|
|R2C99DJEO4RZ4K|          5|            3|          4|   N|     

In [6]:
# Step 1: Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20
from pyspark.sql.functions import col
total_votes_df = vine_df.filter(col("total_votes") >= 20)
total_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| ROI00QN8IS49I|          5|           83|         91|   N|                N|
|R13C5INE1RTZP6|          2|           26|         26|   N|                Y|
| RXVMMXCL67MZN|          4|          378|        383|   N|                Y|
|R33JJQWAUYBKD3|          5|          270|        280|   N|                Y|
|R3N0XV9267NOXV|          5|           41|         47|   Y|                N|
|R31HQD6YXSQV1W|          3|           40|         48|   N|                Y|
|R33LQSF958O6K8|          5|           39|         41|   N|                Y|
|R3OIDSQJ84W7J1|          5|           21|         24|   N|                N|
|R3UUR313K5VVTL|          1|            1|         24|   N|                Y|
| RODE8K12S7148|          5|           41|         45|   N|     

In [7]:
# Step 2: Retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
votes_df = total_votes_df.withColumn('percent_votes',col('helpful_votes')/col('total_votes')).alias('percent_votes').filter(col("percent_votes") >= 0.5)
votes_df.show()
     

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
| ROI00QN8IS49I|          5|           83|         91|   N|                N|0.9120879120879121|
|R13C5INE1RTZP6|          2|           26|         26|   N|                Y|               1.0|
| RXVMMXCL67MZN|          4|          378|        383|   N|                Y|0.9869451697127938|
|R33JJQWAUYBKD3|          5|          270|        280|   N|                Y|0.9642857142857143|
|R3N0XV9267NOXV|          5|           41|         47|   Y|                N|0.8723404255319149|
|R31HQD6YXSQV1W|          3|           40|         48|   N|                Y|0.8333333333333334|
|R33LQSF958O6K8|          5|           39|         41|   N|                Y|0.9512195121951219|
|R3OIDSQJ84W7J1|          5|  

In [8]:
# Step 3 Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'.
paid_review_df = votes_df.filter(col('vine')== 'Y')
paid_review_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|R3N0XV9267NOXV|          5|           41|         47|   Y|                N|0.8723404255319149|
| RSA6JQ346JZHZ|          5|           55|         64|   Y|                Y|          0.859375|
|R1FXF4HRMCLG4C|          5|           69|         84|   Y|                N|0.8214285714285714|
| RCTBWC3II42MG|          4|          113|        117|   Y|                N|0.9658119658119658|
| RTMQM2CQ1XIZ0|          5|           21|         25|   Y|                N|              0.84|
|R2CBJLCKQ612KU|          4|           27|         35|   Y|                N|0.7714285714285715|
|R322QDGO4AV2B5|          5|           55|         65|   Y|                N|0.8461538461538461|
| R2S7CXRDGQ6EE|          4|  

In [9]:
# Step 4 Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.
unpaid_review_df = votes_df.filter(col('vine')=='N')
unpaid_review_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|     percent_votes|
+--------------+-----------+-------------+-----------+----+-----------------+------------------+
| ROI00QN8IS49I|          5|           83|         91|   N|                N|0.9120879120879121|
|R13C5INE1RTZP6|          2|           26|         26|   N|                Y|               1.0|
| RXVMMXCL67MZN|          4|          378|        383|   N|                Y|0.9869451697127938|
|R33JJQWAUYBKD3|          5|          270|        280|   N|                Y|0.9642857142857143|
|R31HQD6YXSQV1W|          3|           40|         48|   N|                Y|0.8333333333333334|
|R33LQSF958O6K8|          5|           39|         41|   N|                Y|0.9512195121951219|
|R3OIDSQJ84W7J1|          5|           21|         24|   N|                N|             0.875|
| RODE8K12S7148|          5|  

In [10]:
# Step 5 Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for the two types of review (paid vs unpaid).
from pyspark.sql.functions import col, count, when
reviews_df = votes_df.groupBy("vine").agg(
    count(col("vine")).alias("Total_Reviews"),
    count(when(col("star_rating") == 5, True)).alias("Total_5-Star_Reviews"),
    (count(when(col("star_rating") == 5, True))/count(col("vine"))*100).alias("Percent_5-Star_Reviews")).show()

+----+-------------+--------------------+----------------------+
|vine|Total_Reviews|Total_5-Star_Reviews|Percent_5-Star_Reviews|
+----+-------------+--------------------+----------------------+
|   Y|          463|                 202|    43.628509719222464|
|   N|        25079|               12028|     47.96044499381953|
+----+-------------+--------------------+----------------------+

