In [3]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [66.2 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:12 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/

In [4]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge2").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [5]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Health_Personal_Care_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Health_Personal_Care_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|     650634| R3EQSTM9PWRAL|B0091LBZSU|     578484426|Demograss Capsule...|Health & Personal...|          3|            0|          0|   N|                Y|         Three Stars|Only came with 30...| 2015-08-31|
|         US|   19827510| RBWPRK17XKIXD|B00PWW3LQ6|     456433146|Viva Labs #1 Prem...|Health & Personal

### Create DataFrames to match tables


In [6]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
# df is already conveted to a DataFrame Skipping current step.

In [7]:
# Create the customers_table DataFrame
customers_df = df.groupby("customer_id").agg({"customer_id":"count"}).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    4919528|             5|
|   51451778|             3|
|   12713799|             1|
|    8673341|             1|
|   42146698|             2|
|    1117644|             2|
|   28058398|             1|
|   14375645|             1|
|   24540309|             1|
|   39715602|             1|
|     654272|             1|
|    9015608|             5|
|   43920023|            42|
|   38209321|             1|
|   12328685|             1|
|    2866605|             2|
|   38273165|             5|
|   41066514|             1|
|   28377689|             2|
|   17018784|             2|
+-----------+--------------+
only showing top 20 rows



In [8]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df = vine_df.withColumn("star_rating", vine_df.star_rating.cast("int")).dropDuplicates(['review_id']).dropna()
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1006MZQZESJLU|          5|            0|          0|   N|                Y|
| R100KFB0HIL17|          3|            1|          2|   N|                Y|
|R100VCRMA6O31M|          2|            0|          0|   N|                Y|
|R10123RVTQX469|          1|           10|         12|   N|                Y|
|R101AA8NMPCECG|          5|            3|          5|   N|                Y|
|R101H9BZ4XSO4P|          4|            4|          4|   N|                Y|
|R101OASBXZOKB5|          4|            0|          0|   N|                Y|
|R101RBWHDT9BUD|          5|            0|          0|   N|                Y|
|R101WUH16PCM6G|          5|            0|          0|   N|                Y|
|R1020VRZ9DVRFC|          4|            3|          3|   N|     

In [9]:
# Confirm total_vote, helpful_votes, and star_rating are all integers
vine_df

DataFrame[review_id: string, star_rating: int, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string]

In [10]:
# Filter Health Personal Care data frame to retrieve total_vote greater than or equal to 20
totalvotes_df = vine_df.filter("total_votes>=20")
totalvotes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1055X9B9Y3ZVA|          1|            7|         20|   N|                Y|
|R10C05WAX6F6PX|          1|            8|         23|   N|                Y|
|R10T3E8I8O3TW2|          5|           34|         41|   N|                Y|
|R10XH4CYLKGBE0|          5|          137|        159|   N|                Y|
|R110SV87XLLTQG|          5|           44|         44|   N|                Y|
|R11699H31Z87SC|          1|            6|         40|   N|                Y|
|R117872KL6C4WD|          4|           29|         31|   N|                Y|
|R11QNU7RW49QEN|          1|          220|        230|   N|                Y|
|R11QZUPWOHIJWR|          1|           36|         38|   N|                N|
|R11RUFETP99KJ6|          5|           27|         32|   Y|     

In [11]:
new_helpful_votes_df = totalvotes_df.filter("helpful_votes/total_votes>=.50")
new_helpful_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R10T3E8I8O3TW2|          5|           34|         41|   N|                Y|
|R10XH4CYLKGBE0|          5|          137|        159|   N|                Y|
|R110SV87XLLTQG|          5|           44|         44|   N|                Y|
|R117872KL6C4WD|          4|           29|         31|   N|                Y|
|R11QNU7RW49QEN|          1|          220|        230|   N|                Y|
|R11QZUPWOHIJWR|          1|           36|         38|   N|                N|
|R11RUFETP99KJ6|          5|           27|         32|   Y|                N|
|R128D60L8SZ3D7|          4|           25|         25|   N|                Y|
|R129WA7JCC8DAF|          3|           32|         45|   N|                N|
|R12CLTVFNDSTMK|          2|           59|         76|   N|     

In [12]:
print(new_helpful_votes_df.count())

121360


In [13]:
vine_review_df = new_helpful_votes_df.filter(new_helpful_votes_df["vine"] == "Y")
vine_review_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R11RUFETP99KJ6|          5|           27|         32|   Y|                N|
|R1MG20UGZ9K6GX|          5|           25|         28|   Y|                N|
|R108U30Q0YXJYO|          5|           28|         33|   Y|                N|
|R3FIEE8HAF008L|          1|           23|         26|   Y|                N|
|R2GEV5D6OOVVH3|          5|          124|        144|   Y|                N|
|R22ODVH2E96E2G|          5|           40|         44|   Y|                N|
|R3UIU2I3646BC5|          2|           88|        111|   Y|                N|
| RD9PN3ZPNHUCY|          5|           65|         71|   Y|                N|
|R1Z22C322BODG7|          5|           32|         38|   Y|                N|
|R2E8YVS4AFSBJ8|          5|           37|         42|   Y|     

In [14]:
non_vine_review_df = new_helpful_votes_df.filter(new_helpful_votes_df["vine"] == "N")
non_vine_review_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R10T3E8I8O3TW2|          5|           34|         41|   N|                Y|
|R10XH4CYLKGBE0|          5|          137|        159|   N|                Y|
|R110SV87XLLTQG|          5|           44|         44|   N|                Y|
|R117872KL6C4WD|          4|           29|         31|   N|                Y|
|R11QNU7RW49QEN|          1|          220|        230|   N|                Y|
|R11QZUPWOHIJWR|          1|           36|         38|   N|                N|
|R128D60L8SZ3D7|          4|           25|         25|   N|                Y|
|R129WA7JCC8DAF|          3|           32|         45|   N|                N|
|R12CLTVFNDSTMK|          2|           59|         76|   N|                Y|
|R12DFJSUOXE3QE|          1|           36|         41|   N|     

### Determine Bias of Vine Reviews

In [15]:
# Determine the total number of reviews
from pyspark.sql.functions import count

new_helpful_votes_count = new_helpful_votes_df.count()
print ("The total number of helpful reviews : %f " % new_helpful_votes_count)


The total number of helpful reviews : 121360.000000 


In [16]:
# Determine the number of Vine reviews and non-Vine reviews
vine_review_count = vine_review_df.count()
print("The number of Vine reviews : %f " % vine_review_count)

non_vine_review_count = non_vine_review_df.count()
print("The number of non-Vine reviews : %f" % non_vine_review_count)

The number of Vine reviews : 497.000000 
The number of non-Vine reviews : 120863.000000


In [17]:
# Determine how many Vine reviews and non-Vine reviews were 5 stars
vine_5stars_df = vine_review_df.filter(vine_review_df["star_rating"]=="5")
print("The number of 5-star Vine reviews : %f" % vine_5stars_df.count())


non_vine_5stars_df = non_vine_review_df.filter(non_vine_review_df["star_rating"]=="5")
print("The number of 5-star non-Vine reviews: %f" % non_vine_5stars_df.count())


The number of 5-star Vine reviews : 220.000000
The number of 5-star non-Vine reviews: 74470.000000


In [18]:
# Determine what percentage of Vine reviews and non-Vine reviews were 5 stars.
vine_5stars_percent = (vine_5stars_df.count()/vine_review_count)
print("The percentage of Vine reviews that were 5 stars: %f" % vine_5stars_percent)

non_vine_5stars_percent = (non_vine_5stars_df.count()/non_vine_review_count)
print("The percentage of non_Vine reviews that were 5 stars: %f" % non_vine_5stars_percent)

The percentage of Vine reviews that were 5 stars: 0.442656
The percentage of non_Vine reviews that were 5 stars: 0.616152
