<a href="https://colab.research.google.com/github/chanhodchang/Amazon_Reviews_ETL/blob/master/AWS_PC_Reviews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [2]:
# Connect to postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-03-17 20:49:41--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-03-17 20:49:41 (4.90 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [0]:
# Start a SparkSession
import findspark
findspark.init()
import pyspark

# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AWS_PC_Reviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [6]:
!wget -q https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_PC_v1_00.tsv.gz
!gunzip amazon_reviews_us_PC_v1_00.tsv.gz

gzip: amazon_reviews_us_PC_v1_00.tsv already exists; do you wish to overwrite (y or n)? y


In [7]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_PC_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
pc_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_PC_v1_00.tsv.gz"), sep="\t", header=True)

# Show DataFrame
pc_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   22873041|R3ARRMDEGED8RD|B00KJWQIIC|     335625766|Plemo 14-Inch Lap...|              PC|          5|            0|          0|   N|                Y|Pleasantly surprised|I was very surpri...| 2015-08-31|
|         US|   30088427| RQ28TSA020Y6J|B013ALA9LA|     671157305|TP-Link OnHub AC1...|              PC|          5|    

In [0]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

In [9]:
schema = [StructField('marketplace', StringType(), True), StructField('customer_id', IntegerType(), True),
          StructField('review_id', StringType(), True), StructField('product_id', StringType(), True),
          StructField('product_parent', IntegerType(), True), StructField('product_title', StringType(), True),
          StructField('product_category', StringType(), True), StructField('star_rating', IntegerType(), True),
          StructField('helpful_votes', IntegerType(), True), StructField('total_votes', IntegerType(), True),
          StructField('vine', StringType(), True), StructField('verified_purchase', StringType(), True),
          StructField('review_headline', StringType(), True), StructField('review_body', StringType(), True),
          StructField('review_date', DateType(), True), ]
schema

[StructField(marketplace,StringType,true),
 StructField(customer_id,IntegerType,true),
 StructField(review_id,StringType,true),
 StructField(product_id,StringType,true),
 StructField(product_parent,IntegerType,true),
 StructField(product_title,StringType,true),
 StructField(product_category,StringType,true),
 StructField(star_rating,IntegerType,true),
 StructField(helpful_votes,IntegerType,true),
 StructField(total_votes,IntegerType,true),
 StructField(vine,StringType,true),
 StructField(verified_purchase,StringType,true),
 StructField(review_headline,StringType,true),
 StructField(review_body,StringType,true),
 StructField(review_date,DateType,true)]

In [10]:
final=StructType(fields=schema)
me_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_PC_v1_00.tsv.gz"), schema=final, sep="\t", header=True)
me_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [0]:
clean_pc = me_df.dropna()

In [12]:
review_id_table = clean_pc.select(['review_id','customer_id','product_id','product_parent','review_date'])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R3ARRMDEGED8RD|   22873041|B00KJWQIIC|     335625766| 2015-08-31|
| RQ28TSA020Y6J|   30088427|B013ALA9LA|     671157305| 2015-08-31|
| RUXJRZCT6953M|   20329786|B00PML2GQ8|     982036237| 2015-08-31|
| R7EO0UO6BPB71|   14215710|B001NS0OZ4|     576587596| 2015-08-31|
|R39NJY2YJ1JFSV|   38264512|B00AQMTND2|     964759214| 2015-08-31|
|R31SR7REWNX7CF|   30548466|B00KX4TORI|     170101802| 2015-08-31|
| RVBP8I1R0CTZ8|     589298|B00P17WEMY|     206124740| 2015-08-31|
|R1QF6RS1PDLU18|   49329488|B00TR05L9Y|     778403103| 2015-08-31|
|R23AICGEDAJQL1|   50728290|B0098Y77OG|     177098042| 2015-08-31|
|R2EY3N4K9W19UP|   37802374|B00IFYEYXC|     602496520| 2015-08-31|
| RC9AW4HKJ016M|   52027882|B0091ITP0S|     977217357| 2015-08-31|
|R2ALWJE9N6ZBXD|   41770239|B008I21EA2|     295632907| 2015-08

In [13]:
products=clean_pc.select(['product_id','product_title'])
products =products.dropDuplicates(['product_id'])
products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|9875987018|Professional Ultr...|
|9966285946|Professional King...|
|9966694242|Professional King...|
|9967222247|Professional King...|
|9985538803|Samsung Galaxy St...|
|9985725344|Professional King...|
|9989476071|Professional King...|
|9990950369|Samsung SGH-i780 ...|
|B00000J3SV|Intel ICS2USB Cre...|
|B00000JBK6|    ALTEC ACS SERIES|
|B00002S73F|Guillemot Maxi So...|
|B00004VV4B|Viking MN1664P 12...|
|B00004YNSK|Sony CPD-E400 19"...|
|B00004Z7BU|Netgear FE104 100...|
|B00005045V|Ricoh 12x10x32 CD...|
|B000051156|AT&T Anti-Static ...|
|B0000511IB|Thinkpad TP56850T...|
|B0000511K8|C2G/Cables to Go ...|
|B0000512S1|Allied Telesyn Ce...|
|B00005853W|Microsoft Wheel M...|
+----------+--------------------+
only showing top 20 rows



In [14]:
customers = clean_pc.groupby('customer_id').count().select('customer_id', col('count').alias('customer_count'))
customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   42560427|            11|
|   14991350|             6|
|    9002099|             1|
|   35535911|             3|
|    1117644|             1|
|   46909180|             2|
|   48670265|             2|
|   43626894|             2|
|   17083191|             1|
|   20808998|             1|
|   36728141|             8|
|   16306618|             3|
|   13910751|             1|
|   27377384|             1|
|   19005540|             1|
|   51804200|             1|
|   47108763|             6|
|   51451778|             2|
|   44998024|             2|
|   32753385|             1|
+-----------+--------------+
only showing top 20 rows



In [15]:
vine_table = clean_pc.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3ARRMDEGED8RD|          5|            0|          0|   N|
| RQ28TSA020Y6J|          5|           24|         31|   N|
| RUXJRZCT6953M|          1|            2|          2|   N|
| R7EO0UO6BPB71|          1|            0|          0|   N|
|R39NJY2YJ1JFSV|          5|            0|          0|   N|
|R31SR7REWNX7CF|          5|            0|          0|   N|
| RVBP8I1R0CTZ8|          3|            1|          2|   N|
|R1QF6RS1PDLU18|          4|            1|          1|   N|
|R23AICGEDAJQL1|          1|            0|          0|   N|
|R2EY3N4K9W19UP|          5|            3|          4|   N|
| RC9AW4HKJ016M|          1|            0|          0|   N|
|R2ALWJE9N6ZBXD|          1|            0|          0|   N|
|R2G5FPA4OX37GV|          5|            1|          1|   N|
|R1IKTSEVXSIMOD|          5|            

In [17]:
non_vine_df = vine_table.filter(col('vine')=='N')
non_vine_df.orderBy(non_vine_df['helpful_votes'].desc()).select('star_rating','helpful_votes','total_votes').show()

+-----------+-------------+-----------+
|star_rating|helpful_votes|total_votes|
+-----------+-------------+-----------+
|          5|        47524|      48362|
|          5|        31924|      32373|
|          4|        31417|      32166|
|          4|        28611|      29433|
|          1|        24714|      26143|
|          5|        16982|      17441|
|          5|        16563|      16865|
|          4|        15457|      15985|
|          5|        13828|      14145|
|          5|        13557|      13870|
|          3|        12465|      12970|
|          3|        11995|      12278|
|          5|        11526|      11832|
|          5|        11444|      11819|
|          4|        11426|      11744|
|          3|        11173|      11684|
|          4|        10933|      11205|
|          5|        10801|      11125|
|          5|        10787|      11005|
|          5|        10683|      10900|
+-----------+-------------+-----------+
only showing top 20 rows



In [18]:
paid_vine_df = vine_table.filter(col('vine')=='Y')
paid_vine_df.orderBy(paid_vine_df['helpful_votes'].desc()).select('star_rating','helpful_votes','total_votes').show()

+-----------+-------------+-----------+
|star_rating|helpful_votes|total_votes|
+-----------+-------------+-----------+
|          5|         3028|       3159|
|          5|         2482|       2598|
|          5|         1842|       1907|
|          5|         1661|       1714|
|          4|         1635|       1670|
|          5|         1298|       1437|
|          5|         1288|       1328|
|          5|         1162|       1259|
|          3|         1033|       1145|
|          4|          984|       1013|
|          5|          922|        967|
|          5|          903|        935|
|          4|          859|        887|
|          4|          855|        919|
|          3|          852|        956|
|          4|          724|        748|
|          3|          712|        739|
|          3|          680|        720|
|          3|          660|        695|
|          4|          644|        678|
+-----------+-------------+-----------+
only showing top 20 rows



In [19]:
non_vine_df.filter('star_rating = 5').select('helpful_votes').count()

4090931

In [20]:
paid_vine_df.filter('star_rating = 5').select('helpful_votes').count()

15706

In [23]:
non_vine_avg= non_vine_df.groupBy('star_rating').avg()
non_vine_avg.orderBy(non_vine_avg['star_rating'].desc()).select('star_rating','avg(helpful_votes)','avg(total_votes)').show()

+-----------+------------------+------------------+
|star_rating|avg(helpful_votes)|  avg(total_votes)|
+-----------+------------------+------------------+
|          5|1.2084664835461658|1.4218291630927042|
|          4|1.5202392350296317|1.8044057126928916|
|          3|1.8001080826144202|2.5380843830447857|
|          2|1.6481129197748259|2.6600232938631763|
|          1| 2.438265712223101| 4.182918122175476|
+-----------+------------------+------------------+



In [24]:
paid_vine_avg = paid_vine_df.groupBy('star_rating').avg()
paid_vine_avg.orderBy(non_vine_avg['star_rating'].desc()).select('star_rating','avg(helpful_votes)','avg(total_votes)').show()

+-----------+------------------+------------------+
|star_rating|avg(helpful_votes)|  avg(total_votes)|
+-----------+------------------+------------------+
|          5|  5.90201196994779| 6.886667515599134|
|          4| 5.082655826558265| 5.957844022884673|
|          3| 5.661013485901103| 7.069064160196159|
|          2| 4.017083587553386|5.5070164734594265|
|          1|6.7227722772277225| 9.168316831683168|
+-----------+------------------+------------------+



In [25]:
non_vine_df.select('review_id').count()

6871915

In [26]:
paid_vine_df.select('review_id').count()

36230

In [27]:
non_vine_df.select('helpful_votes').count()

6871915

In [28]:
paid_vine_df.select('helpful_votes').count()

36230

In [0]:
# Configure setting for RDS
mode = 'append'
jdbc_url="jdbc:postgresql://dataviz.cbp360boojkd.us-east-2.rds.amazonaws.com:5431/AWS_PC_Reviews"
config = {"user":"postgres",
          "password": "Dc909263",
          "driver":"org.postgresql.Driver"}

In [0]:
# Write DataFrame to review_id_table table in RDS
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [0]:
# Write DataFrame to products table in RDS
products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [0]:
# Write DataFrame to customers table in RDS
customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [0]:
# Write DataFrame to vine_table table in RDS
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)