In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:6 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:7 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [76.8 kB]
Get:8 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:9 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Ign:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Get:12 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [2,596 kB]
Get:13 http://security.ubunt

### Load Amazon Data into Spark DataFrame

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32787517| RED72VWWCOS7S|B008HDQYLQ|     348668413|Garden Weasel Gar...| Lawn and Garden|          1|            2|          8|   N|                Y|            One Star|I don't hate the ...| 2015-08-31|
|         US|   16374060| RZHWQ208LTEPV|B005OBZBD6|     264704759|10 Foot Mc4 Solar...| Lawn and Garden|          5|    

### Create VINE DataFrame for Deliverable 2 Analysis

In [4]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame


In [5]:
# Create the vine_table. DataFrame

vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RED72VWWCOS7S|          1|            2|          8|   N|                Y|
| RZHWQ208LTEPV|          5|            0|          0|   N|                Y|
|R37LBC3XAVLYOO|          5|            4|          5|   N|                Y|
|R3L7XJMA0MVJWC|          5|            0|          0|   N|                Y|
|R2I2GHSI7T1UBN|          1|            5|          6|   N|                Y|
|R2GFFKHK4I6VMX|          5|            0|          0|   N|                Y|
|R1R0UDX2XAN1S4|          4|            0|          0|   N|                Y|
|R22C8FMBSTFRY8|          5|            2|          2|   N|                Y|
|R118NNIQ75XPGO|          3|            0|          0|   N|                Y|
|R30HYXHZQ49621|          2|            0|          0|   N|     

### Deliverable 2: Determine Bias of Vine Reviews

In [6]:
# 1. Filter the data and create a new DataFrame or table to retrieve all the rows
#    where the total_votes count is equal to or greater than 20 to pick reviews 
#    that are more likely to be helpful and to avoid having division by zero errors later on

vine_df_filtered = vine_df.filter(vine_df.total_votes >= "20")
vine_df_filtered.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|
|R2RKCSAG6GBA4A|          1|            8|         28|   N|                Y|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|
| R9QNQUL94RX1F|          3|           27|         33|   N|                Y|
| RTULFZTUS1VBP|          5|           51|         52|   N|     

In [7]:
# 2. Filter the new DataFrame or table created in Step 1 and create a new DataFrame
#    or table to retrieve all the rows where the number of helpful_votes divided by
#    total_votes is equal to or greater than 50%.

help_total_50_plus = vine_df_filtered.filter(vine_df_filtered.helpful_votes/vine_df_filtered.total_votes >= .5)
help_total_50_plus.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|
| R9QNQUL94RX1F|          3|           27|         33|   N|                Y|
| RTULFZTUS1VBP|          5|           51|         52|   N|                Y|
|R1BM9RBQWI62O2|          5|           43|         60|   N|     

In [8]:
# 3. Filter the DataFrame or table created in Step 2, and create a new DataFrame or
#    table that retrieves all the rows where a review was written as part of the 
#    Vine program (paid), vine == 'Y'.

helpful_paid = help_total_50_plus.filter(help_total_50_plus.vine == 'Y')
helpful_paid.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R28DXTC3JQ9IY1|          4|           24|         26|   Y|                N|
|R3AFZKLQXATHBU|          5|           44|         49|   Y|                N|
|R2RUUF2JPJPC0E|          4|           20|         22|   Y|                N|
| RFZ2WUH4248AB|          2|           26|         27|   Y|                N|
|R1Q4LVHIFOWYFR|          5|           23|         28|   Y|                N|
| R4YEPTQED3X1Q|          5|           19|         20|   Y|                N|
|R2Z7C8YCRSC9DP|          5|           22|         22|   Y|                N|
|R3J8OI5CB74P5K|          1|           22|         25|   Y|                N|
| RH39LMKN6AZDC|          5|           33|         40|   Y|                N|
| R8RD8K0ESJSRD|          5|           21|         21|   Y|     

In [9]:
# 4. Repeat Step 3, but this time retrieve all the rows where the review was not part
#    of the Vine program (unpaid), vine == 'N'.

helpful_unpaid = help_total_50_plus.filter(help_total_50_plus.vine == 'N')
helpful_unpaid.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RQQ3KVTU5TJ4I|          5|           24|         29|   N|                Y|
|R3FELXWV9T5CWE|          2|           22|         24|   N|                Y|
| ROBYK6EZYK398|          5|           29|         30|   N|                Y|
|R2YVBBR6NXIA4V|          5|           25|         28|   N|                N|
|R2AVTBDIVG2AW4|          5|           26|         26|   N|                N|
|R1Z2LNN3FANMTO|          1|           20|         24|   N|                N|
|  RLNULBKRWNNR|          5|           42|         43|   N|                Y|
| R9QNQUL94RX1F|          3|           27|         33|   N|                Y|
| RTULFZTUS1VBP|          5|           51|         52|   N|                Y|
|R1BM9RBQWI62O2|          5|           43|         60|   N|     

In [10]:
# 5. Determine the total number of reviews, the number of 5-star reviews, and the
#    percentage of 5-star reviews for the two types of review (paid vs unpaid).

total_reviews = help_total_50_plus.count()
print("Total Reviews = ", total_reviews)

total_paid_5 = helpful_paid.filter(helpful_paid.star_rating == '5').count()
percent_paid_5 = total_paid_5/total_reviews*100
print("Total Paid 5 Star Reviews = ", total_paid_5, "(",percent_paid_5,"%)")

total_unpaid_5 = helpful_unpaid.filter(helpful_unpaid.star_rating == '5').count()
percent_unpaid_5 = total_unpaid_5/total_reviews*100
print("Total Unpaid 5 Star Reviews = ", total_unpaid_5, "(",percent_unpaid_5,"%)")

Total Reviews =  49103
Total Paid 5 Star Reviews =  176 ( 0.35843023847830074 %)
Total Unpaid 5 Star Reviews =  24026 ( 48.92980062317985 %)


In [11]:
# 5. Determine the total number of reviews, the number of 5-star reviews, and the
#    percentage of 5-star reviews for the two types of review (paid vs unpaid).

total_reviews = help_total_50_plus.count()
print("Total Reviews = ", total_reviews)
total_paid_reviews = helpful_paid.count()
print("Total Paid Reviews = ", total_paid_reviews)

total_paid_5 = helpful_paid.filter(helpful_paid.star_rating == '5').count()
percent_paid_5 = total_paid_5/total_paid_reviews*100
print("Total Paid 5 Star Reviews = ", total_paid_5, "(",percent_paid_5,"%)")

total_unpaid_reviews = helpful_unpaid.count()
print("Total Unpaid Reviews = ", total_unpaid_reviews)

total_unpaid_5 = helpful_unpaid.filter(helpful_unpaid.star_rating == '5').count()
percent_unpaid_5 = total_unpaid_5/total_unpaid_reviews*100
print("Total Unpaid 5 Star Reviews = ", total_unpaid_5, "(",percent_unpaid_5,"%)")

Total Reviews =  49103
Total Paid Reviews =  386
Total Paid 5 Star Reviews =  176 ( 45.59585492227979 %)
Total Unpaid Reviews =  48717
Total Unpaid 5 Star Reviews =  24026 ( 49.317486708951705 %)


In [71]:
# Distribution of Paid Rating 
star_groups_paid = helpful_paid.groupBy(helpful_paid.star_rating).count()
star_groups_paid = star_groups_paid.sort(star_groups_paid.star_rating)
star_groups_paid.show()

+-----------+-----+
|star_rating|count|
+-----------+-----+
|          1|    9|
|          2|   11|
|          3|   61|
|          4|  129|
|          5|  176|
+-----------+-----+



In [132]:
import pandas as pd
paid_pandasDF = star_groups_paid.toPandas()
paid_pandasDF


Unnamed: 0,star_rating,count
0,1,9
1,2,11
2,3,61
3,4,129
4,5,176


In [136]:
import numpy as np

weighted_average_paid = np.average(a =paid_pandasDF['star_rating'] , weights = paid_pandasDF['count'])
weighted_average_paid

4.170984455958549

In [115]:
# Distribution of Unpaid Rating 
star_groups_unpaid = helpful_unpaid.groupBy(helpful_unpaid.star_rating).count()
star_groups_unpaid = star_groups_unpaid.sort(star_groups_unpaid.star_rating)
star_groups_unpaid.show()

+-----------+-----+
|star_rating|count|
+-----------+-----+
|          1|10222|
|          2| 2993|
|          3| 4134|
|          4| 7342|
|          5|24026|
+-----------+-----+



In [134]:
import pandas as pd
unpaid_pandasDF = star_groups_unpaid.toPandas()
unpaid_pandasDF

Unnamed: 0,star_rating,count
0,1,10222
1,2,2993
2,3,4134
3,4,7342
4,5,24026


In [137]:
import numpy as np

weighted_average_unpaid = np.average(a =unpaid_pandasDF['star_rating'] , weights = unpaid_pandasDF['count'])
weighted_average_unpaid

3.6559722478806167

In [139]:
# Calculate the Paid Ratings Bias
paid_bias = (weighted_average_paid/weighted_average_unpaid - 1)*100
paid_bias

14.086874110614133