In [None]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:14 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages [1,921 kB]

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-02-04 00:11:48--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-02-04 00:11:50 (1.21 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
amazon_review_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
amazon_review_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16199106|R203HPW78Z7N4K|B0067WNSZY|     633038551|FGGF3032MW Galler...|Major Appliances|          5|            0|          0|   N|                Y|If you need a new...|What a great stov...| 2015-08-31|
|         US|   16374060|R2EAIGVLEALSP3|B002QSXK60|     811766671|Best Hand Clothes...|Major Appliances|          5|    

### Create DataFrames to match tables

In [None]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame

amazon_review_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16199106|R203HPW78Z7N4K|B0067WNSZY|     633038551|FGGF3032MW Galler...|Major Appliances|          5|            0|          0|   N|                Y|If you need a new...|What a great stov...| 2015-08-31|
|         US|   16374060|R2EAIGVLEALSP3|B002QSXK60|     811766671|Best Hand Clothes...|Major Appliances|          5|    

In [None]:
# Create the customers_table DataFrame
customers_df = amazon_review_df.groupby("customer_id").agg({"customer_id": "count"}).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   13326061|             1|
|   52512151|             1|
|    8968933|             1|
|   39416583|             1|
|   21737233|             1|
|   44119972|             1|
|   13947800|             1|
|   21482558|             1|
|    2802853|             1|
|   18518845|             1|
|     122484|             1|
|   14035069|             1|
|   20004191|             1|
|   16699322|             1|
|   24768141|             1|
|   24424556|             1|
|   44293588|             1|
|   13188682|             1|
|   23298840|             1|
|   50731398|             1|
+-----------+--------------+
only showing top 20 rows



In [None]:
# Create the products_table DataFrame and drop duplicates. 
# products_df = df.select([]).drop_duplicates()
products_df = amazon_review_df.select(["product_id", "product_title"]).drop_duplicates()
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00RE645WC|WindMax 23" Stain...|
|B008SCVL6E|G.E. Microwave Gl...|
|B0081LV860|Samsung WF405ATPA...|
|B00KC812WK|2-pack OnePurify ...|
|B00MEPNYRK|GE PSB48YSHSS Pro...|
|B000JONLMQ|KitchenAid 18" TR...|
|B0073M7GNC|Whirlpool WOS92EC...|
|B00BIWR3IQ|GE Profile PWE23K...|
|B00O3XF1RC|MobileWasher Kit ...|
|B00JDB6P3I|Bosch NGM5055UC 5...|
|B0011YJE7Y|GE PHP960DMBB Pro...|
|B00J4EBYBM|Bosch HBL5651UC 5...|
|B0056HJ07Q|Frigidaire FHPC36...|
|B00FGWW82A|Frigidaire FFFS51...|
|B004UM5Y32|None Ers30t10074 ...|
|B00MANTPJM|GE Profile PFE28R...|
|B006L8PW1C|Fisher Paykel DD2...|
|B00T9WOH5E|Lg Lt600p Compati...|
|B001DHRJSU|Whirlpool Part Nu...|
|B0050KKN8E|Whirlpool Part Nu...|
+----------+--------------------+
only showing top 20 rows



In [None]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
# review_id_df = amazon_review_df.select([, to_date("review_date", 'yyyy-MM-dd').alias("review_date")])

review_id_df = amazon_review_df.select(["review_id", "customer_id", "product_id", "product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R203HPW78Z7N4K|   16199106|B0067WNSZY|     633038551| 2015-08-31|
|R2EAIGVLEALSP3|   16374060|B002QSXK60|     811766671| 2015-08-31|
|R1K1CD73HHLILA|   15322085|B00EC452R6|     345562728| 2015-08-31|
|R2KZBMOFRMYOPO|   32004835|B00MVVIF2G|     563052763| 2015-08-31|
| R6BIZOZY6UD01|   25414497|B00IY7BNUW|     874236579| 2015-08-31|
|R1MCXZFNF8E7Y0|   36311751|B0033X29CI|     294467812| 2015-08-31|
|R3EMB3E3ODR6BW|   30920961|B005R597HA|     183784715| 2015-08-31|
| RJTONVTTOPJ5S|   52491265|B00MO6V8Y0|     960251524| 2015-08-31|
|R21U5QZ2CQECUM|   48166169|B00HT39QDI|     992475314| 2015-08-31|
| RL2BBC51H89DH|   50394924|B00LESFZ52|       1641606| 2015-08-31|
|R3RNEPHF3WIRSZ|    3915552|B0149IJVPI|     838108342| 2015-08-31|
|R38DNT9KML2PF3|   17068589|B002HT0958|     387104338| 2015-08

In [None]:
# Create the vine_table. DataFrame
# vine_df = amazon_review_df.select([])

vine_df = amazon_review_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R203HPW78Z7N4K|          5|            0|          0|   N|                Y|
|R2EAIGVLEALSP3|          5|            1|          1|   N|                Y|
|R1K1CD73HHLILA|          5|            0|          0|   N|                Y|
|R2KZBMOFRMYOPO|          5|            1|          1|   N|                Y|
| R6BIZOZY6UD01|          5|            0|          0|   N|                Y|
|R1MCXZFNF8E7Y0|          1|            0|          0|   N|                Y|
|R3EMB3E3ODR6BW|          5|            2|          2|   N|                Y|
| RJTONVTTOPJ5S|          5|            0|          0|   N|                Y|
|R21U5QZ2CQECUM|          4|            0|          0|   N|                Y|
| RL2BBC51H89DH|          4|            0|          0|   N|     

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://dataviz.cpbe0lwdy7yu.us-east-2.rds.amazonaws.com:5432/App_data_review"
config = {"user":"postgres", 
          "password": "Fuckoff46", 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [None]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [None]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

## Deliverable 2: Determine Bias of Vine Reviews

In [None]:
# Filter data and create a new dataframe to retrieve all rows where the  `total_votes` count is equal to or greater than 20.

vine_votes_df = vine_df.filter(vine_df["total_votes"] >= 20)
vine_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1QXNQBTH7LIUB|          3|            9|         22|   N|                Y|
|R3BC75VKJK6LNV|          4|           83|         89|   N|                Y|
| R5XKK92G5N9FU|          1|           18|         22|   N|                N|
| RYMOWYW38WKOB|          5|           35|         35|   N|                N|
| RYHZAVESD0T37|          1|           39|         45|   N|                Y|
|R19Y5VBOF3BQOG|          1|           26|         35|   N|                Y|
| R8V9F3139Z1WZ|          1|           30|         34|   N|                N|
|R3RNY5OLY451GS|          1|           25|         30|   N|                Y|
|R16LPVYSJOAH1T|          1|           20|         20|   N|                N|
|R2NLL7ISMM5QYQ|          4|           89|         94|   N|     

In [None]:
# Filter new dataframe from above to create a new DataFrame to retrieve all teh rows where the number of `help_votes` divided by
# 'total_votes' is equal to or greater than 50%.

vine_Hvotes_df = vine_votes_df.filter(vine_votes_df["helpful_votes"]/vine_votes_df["total_votes"] >= 0.5)
vine_Hvotes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3BC75VKJK6LNV|          4|           83|         89|   N|                Y|
| R5XKK92G5N9FU|          1|           18|         22|   N|                N|
| RYMOWYW38WKOB|          5|           35|         35|   N|                N|
| RYHZAVESD0T37|          1|           39|         45|   N|                Y|
|R19Y5VBOF3BQOG|          1|           26|         35|   N|                Y|
| R8V9F3139Z1WZ|          1|           30|         34|   N|                N|
|R3RNY5OLY451GS|          1|           25|         30|   N|                Y|
|R16LPVYSJOAH1T|          1|           20|         20|   N|                N|
|R2NLL7ISMM5QYQ|          4|           89|         94|   N|                Y|
|R3FAFI6Q0YL37W|          1|           30|         35|   N|     

In [None]:
from pyspark.sql.functions import col, avg

In [None]:
# Filter vine_Hvotes_df and create a new DataFrame that retrieves all the rows where a reveiw written 
# is part of the Vine program (paid), vine == 'Y'...

vine_paid_df = vine_Hvotes_df.filter(vine_Hvotes_df['vine']== 'Y')




In [None]:
vine_paid_df.describe().show()

+-------+--------------+------------------+------------------+------------------+----+-----------------+
|summary|     review_id|       star_rating|     helpful_votes|       total_votes|vine|verified_purchase|
+-------+--------------+------------------+------------------+------------------+----+-----------------+
|  count|            35|                35|                35|                35|  35|               35|
|   mean|          null| 4.371428571428571| 75.28571428571429| 80.57142857142857|null|             null|
| stddev|          null|0.7702449681266148|136.07082313105562|141.57232317467856|null|             null|
|    min|R15NQCQUNOSI4U|                 2|                15|                20|   Y|                N|
|    max| RX7QEPPDUZLO7|                 5|               814|               848|   Y|                Y|
+-------+--------------+------------------+------------------+------------------+----+-----------------+



In [None]:
# ...repeat where Vine program (unpaid), vine == 'N'.

vine_unpaid_df = vine_Hvotes_df.filter(vine_Hvotes_df['vine']== 'N')


In [None]:
vine_unpaid_df.describe().show()

+-------+--------------+------------------+-----------------+-----------------+----+-----------------+
|summary|     review_id|       star_rating|    helpful_votes|      total_votes|vine|verified_purchase|
+-------+--------------+------------------+-----------------+-----------------+----+-----------------+
|  count|          4957|              4957|             4957|             4957|4957|             4957|
|   mean|          null| 3.246520072624571|48.58644341335485|53.22796045995562|null|             null|
| stddev|          null|1.7280125973419265|77.04607780493804|79.32273398498282|null|             null|
|    min|R100212BMGLGI0|                 1|               10|               20|   N|                N|
|    max| RZZQQEWCN58IM|                 5|             1724|             1761|   N|                Y|
+-------+--------------+------------------+-----------------+-----------------+----+-----------------+



In [None]:
# Determine stats for five-star reviews among PAID vine reviews.

paid_Treviews = vine_paid_df.count()

paid_5star_Treviews = vine_paid_df[vine_paid_df['star_rating']== 5].count()

paid_5star_pct = float(paid_5star_Treviews) / float(paid_Treviews)

print(paid_Treviews)
print(paid_5star_Treviews)
print(paid_5star_pct)


35
18
0.5142857142857142


In [None]:
# Determine stats for five-star reviews among UNPAID vine reviews.

unpaid_Treviews = vine_unpaid_df.count()

unpaid_5star_Treviews = vine_unpaid_df[vine_unpaid_df['star_rating']== 5].count()

unpaid_5star_pct = float(unpaid_5star_Treviews) / float(unpaid_Treviews)

print(unpaid_Treviews)
print(unpaid_5star_Treviews)
print(unpaid_5star_pct)

4957
1963
0.3960056485777688
