In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()
import pyspark

In [2]:
!pip install -q findspark
!pip install pyspark
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 52.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=a3bb832035dd294d9c5f740be58ba17edd6299b0cd4b160a489a9d15c3a22e8b
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


--2020-03-15 18:11:51--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-03-15 18:11:52 (4.67 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [0]:
from pyspark import SparkFiles
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ChallengeExample").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [0]:
bucketUrl = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Furniture_v1_00.tsv.gz"

In [0]:
spark.sparkContext.addFile(bucketUrl)

In [0]:
#Create dataframe to read the file
sparkDF = spark.read.csv(SparkFiles.get("amazon_reviews_us_Furniture_v1_00.tsv.gz"),sep="\t", header=True, inferSchema=True)

In [7]:
#Display the dataframe
sparkDF.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   24509695|R3VR960AHLFKDV|B004HB5E0E|     488241329|Shoal Creek Compu...|       Furniture|          4|            0|          0|   N|                Y|... desk is very ...|This desk is very...|2015-08-31 00:00:00|
|         US|   34731776|R16LGVMFKIUT0G|B0042TNMMS|     205864445|Dorel Home Produc...| 

In [8]:
#Total records
sparkDF.count()

792113

In [9]:
#remove duplicates
removeDupliDf = sparkDF.distinct()
removeDupliDf.count()

792113

In [10]:
#Drop null values
dropna_df =  removeDupliDf.dropna()
dropna_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   50599196|R100LZ3TZU4U3L|B009IMTNVS|     393895121|Flash Furniture R...|       Furniture|          4|            0|          0|   N|                Y|        Good quality|However be aware ...|2014-05-26 00:00:00|
|         US|   37762171|R101B5A6FRZPIL|B000S0BUO2|     643258319|    Cane Linen Tower| 

In [11]:
#Review table dataframe
review_id_table  = dropna_df.select("review_id","customer_id","product_id","product_parent","review_date")
review_id_table = review_id_table.distinct()
review_id_table.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R1HNNMB9NWPXOM|   30934546|B007B6ZX2Q|     902872420|2014-11-20 00:00:00|
|R1JFUT1F80RYKM|    1379589|B00342VCGM|     148029143|2015-04-06 00:00:00|
|R1M6H3WH1Q90VM|   15419523|B007YLQQGA|     680101871|2015-07-24 00:00:00|
|R25FC2460H0QAC|   18914451|B0002KNM4E|     964386680|2014-07-23 00:00:00|
|R2AU8F8F7XL0HO|   17868270|B005A4OP8Y|     999119538|2013-02-15 00:00:00|
|R2B7N8C09HQ87N|    4454066|B00JQLHURK|     319180460|2014-12-16 00:00:00|
|R2MWPS724A7VG0|   14317650|B001M0NDM4|     330848986|2011-08-10 00:00:00|
|R2OQOJKCP3DWOU|   14658280|B002T2LI8U|     768721775|2014-10-05 00:00:00|
|R30ISW43437Q41|   19011880|B00A811OLU|     124356391|2015-08-18 00:00:00|
|R32CZOACLSM8JZ|   12573770|B003R5JFY2|     664291029|2012-11-24 00:00:00|
|R3JZ3F9UXGPHTZ|   503927

In [12]:
#Products dataframe
products_df = dropna_df.select("product_id","product_title")
products_df=products_df.distinct()
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B005FP3TDA|Flash Furniture L...|
|B001N44DN2|Soft Suede Luxe B...|
|B00OJMMA5S|LavoHome 330lbs U...|
|B0074FGY9A|Safavieh Courtyar...|
|B0047G0BPC|White Elegant Cot...|
|B0058LI35U|Harry Potter and ...|
|B000BP4OKQ|Coaster Home Furn...|
|B00FM0OV1W|Sauder Edge Water...|
|B001U0UOO6|Furniture Repair Set|
|B009QH31XG|Winsome Obsidian ...|
|B007BCI9US|Reflections Three...|
|B001BX1JSC|Flash Furniture V...|
|B00BIP3ZDA|Black Desk & Book...|
|B00CX56PUY|Safavieh American...|
|B000O31MHS|The Furniture Sou...|
|B005UPGTM8|South Shore Agora...|
|B007OWPBGU|Frenchi Home Furn...|
|B009IWJU66|Big Joe Fuf 6' Me...|
|B00M49MHH4|Metro Shop Upton ...|
|B0014COQQM|Grip-It Non-Slip ...|
+----------+--------------------+
only showing top 20 rows



In [13]:
#Customer Dataframe
customers_df = dropna_df.groupBy("customer_id").agg({"customer_id":"count"}).withColumnRenamed("count(customer_id)","customer_count")
customers_df.show()
#Group by function does not work

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   16701883|             1|
|   21552484|             1|
|   39013924|             1|
|   10929119|             2|
|   15411200|             1|
|   19671756|             4|
|   15731761|             1|
|   12152570|             5|
|   32937025|             1|
|   11916151|             1|
|   15887642|             2|
|   17480020|             2|
|   13646959|             1|
|   15611613|             1|
|   19819565|             2|
|   15056685|             1|
|   26656231|             1|
|   21799522|             2|
|   42749153|             2|
|   28107028|             1|
+-----------+--------------+
only showing top 20 rows



In [14]:
#Vine table dataframe
vine_table_df = dropna_df.select("review_id","star_rating","helpful_votes","total_votes","vine")
vine_table_df = vine_table_df.distinct()
vine_table_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R18BJ01PGD2QZY|          5|            0|          0|   N|
|R1B2Z401UWPZ0Z|          3|           13|         21|   N|
|R1EGCT74J0N7XL|          4|            6|          6|   N|
|R1GPRQIRT5VRY4|          5|           10|         12|   N|
|R1NTVVMDL4N79T|          5|            4|          4|   N|
|R1NXU9U2F5CI4L|          5|            0|          0|   N|
|R1QYL41THXK05D|          3|           11|         11|   N|
|R1U4YEFJ35KNQD|          4|            1|          1|   N|
|R260MEF6OSI5MY|          4|            0|          0|   N|
|R2ESDHZTHNIJK1|          4|            2|          4|   N|
|R2LBNSR5KO5ZV5|          5|            1|          1|   N|
|R2R1DZH9T7RK7Q|          5|            0|          1|   N|
|R2ROVCLSTXOZWI|          1|            1|          3|   N|
|R2YTZ5BAE2CNW9|          5|            

In [0]:
import os
import pyspark
mode = "append"
jdbc_url="jdbc:postgresql://dataviz.cjfblavlxb2k.us-east-2.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres",
          "password": "welcome123",
          "driver":"org.postgresql.Driver"}

In [16]:
#Data Analysis - I have done this in SQL file. This is for additional work
vine_df = vine_table_df.filter(vine_table_df.vine == 'Y')
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R2IPEDLBI6W4YV|          5|            8|          8|   Y|
|R22OY342SCQ3L1|          4|            3|          4|   Y|
|R3M8HWERPAC57X|          2|            0|          0|   Y|
|R2710XN7IZAZJH|          4|            0|          0|   Y|
| RA8WXGT4DXU1S|          2|           11|         14|   Y|
|R1O0QTCE1TDU1L|          4|            6|          8|   Y|
|R2HQ6ERTBWD7DQ|          5|            0|          0|   Y|
|R2F67B5ZPRAGEP|          5|            0|          1|   Y|
|R3F51DEVSK4KR1|          4|            0|          0|   Y|
|R2HWQHUR578IVL|          5|            0|          0|   Y|
| ROELCBIGX212Z|          5|            2|          4|   Y|
|R3H93HZMAVXOUI|          3|            0|          0|   Y|
|R3JNZPIO1HGTBF|          5|            1|          2|   Y|
|R1Y1DPF7VUT8IA|          5|            

In [17]:
no_vine_df = vine_table_df.filter(vine_table_df.vine == 'N')
no_vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R18BJ01PGD2QZY|          5|            0|          0|   N|
|R1B2Z401UWPZ0Z|          3|           13|         21|   N|
|R1EGCT74J0N7XL|          4|            6|          6|   N|
|R1GPRQIRT5VRY4|          5|           10|         12|   N|
|R1NTVVMDL4N79T|          5|            4|          4|   N|
|R1NXU9U2F5CI4L|          5|            0|          0|   N|
|R1QYL41THXK05D|          3|           11|         11|   N|
|R1U4YEFJ35KNQD|          4|            1|          1|   N|
|R260MEF6OSI5MY|          4|            0|          0|   N|
|R2ESDHZTHNIJK1|          4|            2|          4|   N|
|R2LBNSR5KO5ZV5|          5|            1|          1|   N|
|R2R1DZH9T7RK7Q|          5|            0|          1|   N|
|R2ROVCLSTXOZWI|          1|            1|          3|   N|
|R2YTZ5BAE2CNW9|          5|            

In [18]:
# Number of reviews for vine
vine_df.count()

2775

In [19]:
# Number of reviews for non-vine
 no_vine_df.count()

789196

In [20]:
#Number of 5-star reviews for vine
vine_df.filter(vine_df.star_rating == '5').count()

1356

In [21]:
#Number of 5-star reviews for vine
no_vine_df.filter(no_vine_df.star_rating == '5').count()

446274

In [22]:
#Average Rating for vine
vine_df.agg({'star_rating':'avg'}).show()

+----------------+
|avg(star_rating)|
+----------------+
|4.25045045045045|
+----------------+



In [23]:
#Average Rating for no vine
no_vine_df.agg({'star_rating':'avg'}).show()

+----------------+
|avg(star_rating)|
+----------------+
|4.08337725989488|
+----------------+



In [24]:
 #Number of helpful votes vine
 vine_df.agg({'helpful_votes':'sum'}).show()

+------------------+
|sum(helpful_votes)|
+------------------+
|             15079|
+------------------+



In [25]:
 #Number of helpful votes no vine
 no_vine_df.agg({'helpful_votes':'sum'}).show()

+------------------+
|sum(helpful_votes)|
+------------------+
|           1884672|
+------------------+



In [0]:
# Write DataFrame to products table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [0]:
# Write DataFrame to review_id_table in RDS
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [0]:
# Write DataFrame to customers table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [0]:
# Write DataFrame to vine_table in RDS
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)