In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.38)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [3 InRelease 15.6 kB/88.7 kB 18%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 15.9 kB] [3 InRelease 15.6 kB/88.7 kB 18%] [Connecting to                                                                                Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [1 InRelease gpgv 15.9 kB] [3 InRelease 30.1 kB/88.7 kB 34%] [Connecting to                                                                                Get:5 https://cloud.r-project.org/bin/linux/ubuntu bion

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-10-03 00:19:39--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-10-03 00:19:41 (1.64 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Furniture_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
furniture_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Furniture_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
furniture_data_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   24509695|R3VR960AHLFKDV|B004HB5E0E|     488241329|Shoal Creek Compu...|       Furniture|          4|            0|          0|   N|                Y|... desk is very ...|This desk is very...| 2015-08-31|
|         US|   34731776|R16LGVMFKIUT0G|B0042TNMMS|     205864445|Dorel Home Produc...|       Furniture|          5|    

In [5]:
furniture_data_df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [6]:
furniture_data_df.count()

792113

In [7]:
furniture_data_df.select('review_id').distinct().count()

792113

In [8]:
mode = 'append'
jdbc_url="jdbc:postgresql://databasel1.cqunbbkxrhjk.us-east-1.rds.amazonaws.com:5432/databaseL1"
config = {"user":"postgres", 
          "password": "postgres", 
          "driver":"org.postgresql.Driver"}

In [9]:
from pyspark.sql.functions import *
furniture_review_id_table = furniture_data_df.select('review_id','customer_id','product_id','product_parent',to_date('review_date','yyyy-mm-dd').alias('review_date'))
furniture_review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R3VR960AHLFKDV|   24509695|B004HB5E0E|     488241329| 2015-01-31|
|R16LGVMFKIUT0G|   34731776|B0042TNMMS|     205864445| 2015-01-31|
|R1AIMEEPYHMOE4|    1272331|B0030MPBZ4|     124663823| 2015-01-31|
|R1892CCSZWZ9SR|   45284262|B005G02ESA|     382367578| 2015-01-31|
|R285P679YWVKD1|   30003523|B005JS8AUA|     309497463| 2015-01-31|
| RLB33HJBXHZHU|   18311821|B00AVUQQGQ|     574537906| 2015-01-31|
|R1VGTZ94DBAD6A|   42943632|B00CFY20GQ|     407473883| 2015-01-31|
|R168KF82ICSOHD|   43157304|B00FKC48QA|     435120460| 2015-01-31|
|R20DIYIJ0OCMOG|   51918480|B00N9IAL9K|     356495985| 2015-01-31|
| RD46RNVOHNZSC|   14522766|B001T4XU1C|     243050228| 2015-01-31|
|R2JDOCETTM3AXS|   43054112|B002HRFLBC|      93574483| 2015-01-31|
|R33YMW36IDZ6LE|   26622950|B006MISZOC|     941823468| 2015-01

In [10]:
furniture_review_id_table.dtypes

[('review_id', 'string'),
 ('customer_id', 'int'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('review_date', 'date')]

In [11]:
#uploading dataframe to amazon RDS
furniture_review_id_table.write.jdbc(url=jdbc_url, table='furniture_review_id_table', mode=mode, properties=config)

In [12]:
furniture_products = furniture_data_df.dropDuplicates((['product_id'])).select('product_id','product_title')

furniture_products.count()

141169

In [13]:
furniture_products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00005OU2I|Dolce Dark Walnut...|
|B00006IBAU|Sauder(R) 29 1/2i...|
|B00017I8U6|Venture Horizon R...|
|B00021RFBY|Michael Scott Ber...|
|B00021RFK0|Michael Scott Ava...|
|B00026F3C2|Ancient Egypt Kin...|
|B00027H2PW|Cherry Finish Que...|
|B0002IG5IQ|Traditional Area ...|
|B0002OZMPW|Clark Gable Joan ...|
|B0002WS7K6|Tolomeo micro in ...|
|B0006B4XVA|Votivo - Honeysuc...|
|B00081FO8E|Batman Flip Open ...|
|B00097BS00|Drawer Slide, Par...|
|B0009HL1NE|Bel Air Modern Li...|
|B000A2U5LW|Mission style 45"...|
|B000APUJF6|Carolina Cottage ...|
|B000BGDTSI|Queen Anne Style ...|
|B000BGLYXA|Safco Products 52...|
|B000CC3F3K|Signature Satin 8...|
|B000CSWBO8|Global Distinctio...|
+----------+--------------------+
only showing top 20 rows



In [14]:
# upload furniture_products to database
furniture_products.write.jdbc(url=jdbc_url, table='furniture_products', mode=mode, properties=config)

In [15]:
furniture_data_df.select('customer_id').distinct().count()

656007

In [16]:
furniture_data_df.groupby('customer_id').count().where(col('count')>1).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   17067926|    2|
|   52541790|    2|
|    2119235|    2|
|   45736741|    2|
|    1396980|    3|
|   11996505|    2|
|   23322936|    2|
|   12072867|    3|
|    7605703|    2|
|   15740523|    2|
|   10884269|    2|
|   38822017|    2|
|   12318815|    6|
|    4470605|    2|
|   32120453|    2|
|   43752501|    2|
|   45771996|    2|
|   13188682|    4|
|   38467478|    2|
|   18088354|    2|
+-----------+-----+
only showing top 20 rows



In [17]:
furniture_customers = furniture_data_df.groupby('customer_id').agg(count('customer_id').alias('customer_count'))
furniture_customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   17067926|             2|
|   10714827|             1|
|   42560427|             1|
|   30717305|             1|
|    1178966|             1|
|   10429047|             1|
|   41351814|             1|
|   52541790|             2|
|   52512151|             1|
|   37534120|             1|
|   22555935|             1|
|   18681995|             1|
|    2119235|             2|
|   21846356|             1|
|   42251639|             1|
|    7730812|             1|
|   37666248|             1|
|   43676452|             1|
|   41466760|             1|
|   30403003|             1|
+-----------+--------------+
only showing top 20 rows



In [18]:
# upload furniture_customers to amazon RDS
furniture_customers.write.jdbc(url=jdbc_url, table='furniture_customers', mode=mode, properties=config)

In [19]:
furniture_vine_table = furniture_data_df.select('review_id','star_rating','helpful_votes','total_votes','vine')
furniture_vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3VR960AHLFKDV|          4|            0|          0|   N|
|R16LGVMFKIUT0G|          5|            0|          0|   N|
|R1AIMEEPYHMOE4|          5|            1|          1|   N|
|R1892CCSZWZ9SR|          3|            0|          0|   N|
|R285P679YWVKD1|          3|            0|          0|   N|
| RLB33HJBXHZHU|          5|            0|          0|   N|
|R1VGTZ94DBAD6A|          5|            2|          2|   N|
|R168KF82ICSOHD|          5|            0|          0|   N|
|R20DIYIJ0OCMOG|          5|            0|          0|   N|
| RD46RNVOHNZSC|          5|            0|          0|   N|
|R2JDOCETTM3AXS|          5|            0|          0|   N|
|R33YMW36IDZ6LE|          5|            0|          0|   N|
|R30ZGGUHZ04C1S|          5|            1|          1|   N|
| RS2EZU76IK2BT|          5|            

In [20]:
# upload furniture_vine_table to amazon RDS
furniture_vine_table.write.jdbc(url=jdbc_url, table='furniture_vine_table', mode=mode, properties=config)