In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backpor

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-02-08 23:13:46--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-02-08 23:13:47 (10.7 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()


# EXTRACT

In [4]:
# Read in the data from S3 Buckets from Amazon
from pyspark import SparkFiles
# Load in Outdoor data from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Outdoors_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Outdoors_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")
df.show(10)


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   18446823|R35T75OLUGHL5C|B000NV6H94|     110804376|Stearns Youth Boa...|        Outdoors|          4|            0|          0|   N|                Y|          Four Stars|          GOOD VALUE|2015-01-31 00:08:00|
|         US|   13724367|R2BV735O46BN33|B000IN0W3Y|     624096774|Primal Wear Men's...| 

# TRANSFORM

## Counting Records: Drop duplicates and incomplete rows

In [None]:
print(df.count())
df = df.dropna()
print(df.count())
df = df.dropDuplicates()
print(df.count())

2302401
2302174
2302174


## Examine the schema

In [None]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



### Created Tables in Level 1

# Level 2  Bias & Analysis review


In [None]:
### Created Tables in Level 1
# LEVEL 2. (all info in s3 so create dataframe of table needed)
# SPLIT REVIEWS : Paid vs Unpaid
# Filtering for reviews that meet a certain number of 'helpful votes', 'total votes'

# First select columns of interest for vine
df_select = df.select(["star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
df_select.show()

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          2|            0|          0|   N|                Y|
|          4|            0|          0|   N|                N|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          1|           13|         66|   N|                Y|
|          4|            1|          1|   N|                Y|
|          5|            1|          1|   N|                Y|
|          5|            0|          0|   N|                Y|
|          5|            0|          0|   N|                Y|
|          4|            1|          1|   N|                N|
|          1|            0|          1|   N|           

In [None]:
## Votes or Points of Interest:: review those who reviews were >=10. (good number of reviews)  filter helpful_votes/total_votes >=0.5 (votes were helpful)
df_vine_POI = df_select.filter("total_votes>=10").filter(df_select["helpful_votes"]/df_select["total_votes"] >= 0.5)
df_vine_POI.show()

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          1|            9|         10|   N|                Y|
|          1|           22|         28|   N|                Y|
|          5|           15|         17|   N|                N|
|          5|           11|         12|   N|                Y|
|          5|           20|         25|   N|                N|
|          5|           12|         14|   N|                Y|
|          5|           31|         34|   N|                Y|
|          1|            7|         13|   N|                N|
|          5|           32|         36|   N|                Y|
|          5|           25|         26|   N|                N|
|          5|           11|         11|   N|                N|
|          5|           17|         18|   N|                Y|
|          5|           14|         14|   N|           

## Analysis (compare the paid vs unpaid)


In [None]:
# Create a DataFrame for paid and unpaid by filtering on vine
from pyspark.sql.functions import col, avg
df_paid = df_vine_POI.filter("vine='Y'")
df_unpaid = df_vine_POI.filter("vine='N'")

In [None]:
# Paid stats
df_paid.describe().show()

+-------+------------------+-----------------+------------------+----+-----------------+
|summary|       star_rating|    helpful_votes|       total_votes|vine|verified_purchase|
+-------+------------------+-----------------+------------------+----+-----------------+
|  count|               202|              202|               202| 202|              202|
|   mean| 4.252475247524752|33.00990099009901|36.618811881188115|null|             null|
| stddev|0.9253848530476053|38.25522955165541|40.554591210282126|null|             null|
|    min|                 1|                6|                10|   Y|                N|
|    max|                 5|              326|               340|   Y|                Y|
+-------+------------------+-----------------+------------------+----+-----------------+



In [None]:
# unpaid stats
df_unpaid.describe().show()


+-------+------------------+------------------+------------------+-----+-----------------+
|summary|       star_rating|     helpful_votes|       total_votes| vine|verified_purchase|
+-------+------------------+------------------+------------------+-----+-----------------+
|  count|             93116|             93116|             93116|93116|            93116|
|   mean|3.8687336225782896|28.620731131062332| 31.45209201426178| null|             null|
| stddev|1.4562311829199786|56.808061182273136|59.442526744269344| null|             null|
|    min|                 1|                 5|                10|    N|                N|
|    max|                 5|              2703|              2751|    N|                Y|
+-------+------------------+------------------+------------------+-----+-----------------+



# Paid Reviews for the 5-star ratings

In [None]:
# count the number five star reviews
five_star = df_paid[df_paid['star_rating'] == 5].count()
print(f'The number of PAID five star reviews: {five_star}')

The number of PAID five star reviews: 102


In [None]:
# total number of PAID reviews:
paid_ttl = df_paid.count()
print(f'The total number of PAID reviews: {paid_ttl}')

The total number of PAID reviews: 202


In [None]:
# Pecentage of paid five-star reviews in Vine
print(f'Percentage of paid five star reviews: {five_star/paid_ttl}')

Percentage of paid five star reviews: 0.504950495049505


# Unpaid Reviews for the 5-star ratings

In [None]:
# unpaid five star reviews
five_star_unpaid = df_unpaid[df_paid['star_rating'] == 5].count()
print(f'The number of UNPAID five star reviews: {five_star_unpaid}')


The number of UNPAID five star reviews: 48008


In [None]:
# total number of UNPAID reviews:
unpaid_ttl = df_unpaid.count()
print(f'The total number of UNPAID reviews: {unpaid_ttl}')


The total number of UNPAID reviews: 93116


In [None]:
# Pecentage of unpaid five-star reviews in non-Vine
print(f'Percentage of unpaid five star reviews: {five_star_unpaid/unpaid_ttl}')


Percentage of unpaid five star reviews: 0.515571974741183


# Analysis

### From the reviews of interest:

* We can see that the percentage of 5-star reviews in Vine is very close to non-Vine reviews (50.5% to 51.6%).

* The total number of Vine reviews is pretty low (202), with an average Vine star rating of 4.25, with a standard deviation of 0.92

* Compared to non-Vine reviews, total number is very high here (93116), with an average non-Vine star rating of 3.87, with a standard deviation of 1.46

* It looks like the Vine ratings on average are giving a higher score and they are not deviating from this score as much as the non-vine ratings. Although, there are far fewer Vine ratings. 

* Is there a bias? It is fairly close to tell here. Although, there are far fewer Vine customers giving ratings, it looks like those that are rating give a rate closer to other vine members. 

* I would better trust the non-Vine ratings because there are far more and they additionally give a greater variety in the ratings. 








