In [2]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.38)] [Connected to cloud.r-pro                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [2 InRelease 47.5 kB/88.7 kB 54%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 242 kB] [2 InRelease 47.5 kB/88.7 kB 54%] [Connecting to s0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to security.ubun                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
0% [1 InRelease gpgv 242 kB] [4 InRelease 8,396 B/74.6 kB 11%] [Connecting to s0% [1 InRelease gpgv 242 kB] [Connect

In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VineReviewFunctions").getOrCreate()

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Baby_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Baby_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df

DataFrame[marketplace: string, customer_id: int, review_id: string, product_id: string, product_parent: int, product_title: string, product_category: string, star_rating: int, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: string]

In [5]:
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R8EWA1OFT84NX|          5|            0|          0|   N|                Y|
|R2JWY4YRQD4FOP|          5|            0|          0|   N|                N|
| RL5ESX231LZ0B|          5|            0|          0|   N|                Y|
| RRMS9ZWJ2KD08|          5|            0|          0|   N|                Y|
|R14I3ZG5E6S7YM|          5|            0|          0|   N|                Y|
|R13EPSFP5DODN5|          4|            0|          0|   N|                Y|
| R6RBP4HTE67SY|          5|            0|          0|   N|                Y|
|R15B3EU40RSU2W|          5|            0|          0|   N|                Y|
| RP4DD53A4ZJA2|          5|            0|          0|   N|                Y|
|R2C99DJEO4RZ4K|          5|            3|          4|   N|     

In [6]:
vine_df.dropna()

DataFrame[review_id: string, star_rating: int, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string]

In [7]:
print(vine_df.count())

1752932


In [8]:
# create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 
totalvotes_df = vine_df.filter("total_votes>=20")
totalvotes_df.show()
print(totalvotes_df.count())

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| ROI00QN8IS49I|          5|           83|         91|   N|                N|
|R13C5INE1RTZP6|          2|           26|         26|   N|                Y|
| RXVMMXCL67MZN|          4|          378|        383|   N|                Y|
|R33JJQWAUYBKD3|          5|          270|        280|   N|                Y|
|R3N0XV9267NOXV|          5|           41|         47|   Y|                N|
|R31HQD6YXSQV1W|          3|           40|         48|   N|                Y|
|R33LQSF958O6K8|          5|           39|         41|   N|                Y|
|R3OIDSQJ84W7J1|          5|           21|         24|   N|                N|
|R3UUR313K5VVTL|          1|            1|         24|   N|                Y|
| RODE8K12S7148|          5|           41|         45|   N|     

In [9]:
# create a new DataFrame or table to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%
helpful_df = totalvotes_df.filter("helpful_votes/total_votes>=.50")
helpful_df.show()
print(helpful_df.count())

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| ROI00QN8IS49I|          5|           83|         91|   N|                N|
|R13C5INE1RTZP6|          2|           26|         26|   N|                Y|
| RXVMMXCL67MZN|          4|          378|        383|   N|                Y|
|R33JJQWAUYBKD3|          5|          270|        280|   N|                Y|
|R3N0XV9267NOXV|          5|           41|         47|   Y|                N|
|R31HQD6YXSQV1W|          3|           40|         48|   N|                Y|
|R33LQSF958O6K8|          5|           39|         41|   N|                Y|
|R3OIDSQJ84W7J1|          5|           21|         24|   N|                N|
| RODE8K12S7148|          5|           41|         45|   N|                Y|
|R2QZFYFUKP22SP|          5|           33|         37|   N|     

In [10]:
# create a new DataFrame or table that retrieves all the rows where a review was written as part of the Vine program (paid)
vine_paid_review = helpful_df.filter(helpful_df["vine"] == "Y")
vine_paid_review.show()
print(vine_paid_review.count())

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3N0XV9267NOXV|          5|           41|         47|   Y|                N|
| RSA6JQ346JZHZ|          5|           55|         64|   Y|                Y|
|R1FXF4HRMCLG4C|          5|           69|         84|   Y|                N|
| RCTBWC3II42MG|          4|          113|        117|   Y|                N|
| RTMQM2CQ1XIZ0|          5|           21|         25|   Y|                N|
|R2CBJLCKQ612KU|          4|           27|         35|   Y|                N|
|R322QDGO4AV2B5|          5|           55|         65|   Y|                N|
| R2S7CXRDGQ6EE|          4|           36|         39|   Y|                N|
|R3ECMXK0SGR1VV|          5|           59|         64|   Y|                N|
|R3D55CPJ6J6Z78|          4|           15|         20|   Y|     

In [11]:
# create a new DataFrame or table that retrieves all the rows where a review was not part of the Vine program (unpaid)
vine_unpaid_review=helpful_df.filter(helpful_df["vine"] == "N")
vine_unpaid_review.show()
print(vine_unpaid_review.count())

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| ROI00QN8IS49I|          5|           83|         91|   N|                N|
|R13C5INE1RTZP6|          2|           26|         26|   N|                Y|
| RXVMMXCL67MZN|          4|          378|        383|   N|                Y|
|R33JJQWAUYBKD3|          5|          270|        280|   N|                Y|
|R31HQD6YXSQV1W|          3|           40|         48|   N|                Y|
|R33LQSF958O6K8|          5|           39|         41|   N|                Y|
|R3OIDSQJ84W7J1|          5|           21|         24|   N|                N|
| RODE8K12S7148|          5|           41|         45|   N|                Y|
|R2QZFYFUKP22SP|          5|           33|         37|   N|                Y|
| RFON10GAZKDL3|          3|           16|         20|   N|     

In [12]:
# Determine the total number of reviews
vine_review_count=helpful_df.count()
print("Total number of helpful reviews : % .0f" %  vine_review_count)

Total number of helpful reviews :  25557


In [22]:
#  Determine total number of paid vine reviews
paid_review_count = helpful_df.filter(helpful_df["vine"] == "Y")
print("Total number of  vine reviews :  % .0f" % paid_review_count.count())

Total number of  vine reviews :   463


In [23]:
# Determine total number of  unpaid reviews
unpaid_review_count = helpful_df.filter(helpful_df["vine"] == "N")
print("Total number of  unpaid  reviews :  % .0f" % unpaid_review_count.count())

Total number of  unpaid  reviews :   25094


In [17]:
# Determine total number of 5 star review
star_review = helpful_df.filter(helpful_df["star_rating"]=="5")
print("Total number of 5-star :  % .0f" % star_review.count())

Total number of 5-star :   12235


In [18]:
# Determine total number of 5 star paid reviews
paid_review = star_review.filter(star_review["vine"] == "Y")
print("Total number of 5-star paid vine reviews :  % .0f" % paid_review.count())

Total number of 5-star paid vine reviews :   202


In [19]:
# Determine total number of 5 star unpaid reviews
unpaid_review= star_review.filter(star_review["vine"]=="N")
print("Total number of 5-star unpaid reviews :  % .0f" % unpaid_review.count())


Total number of 5-star unpaid reviews :   12033


In [20]:
# Determine percentage of paid reviews
percent_paid_review= paid_review.count()/star_review.count()
print("Percentage of 5-star paid vine reviews :  %f" % percent_paid_review)

Percentage of 5-star paid vine reviews :  0.016510


In [21]:
# Determine percentage of unpaid reviews
percent_unpaid_review= unpaid_review.count()/star_review.count()
print("Percentage of 5-star unpaid reviews :  %f" % percent_unpaid_review)

Percentage of 5-star unpaid reviews :  0.983490
