In [55]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()


0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Wa                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machi

In [56]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-10-03 00:18:40--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2021-10-03 00:18:40 (10.3 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [57]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Luggage_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
luggage_reviews_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Luggage_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
luggage_reviews_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   40884699| R9CO86UUJCAW5|B00VGTN02Y|     786681372|Teenage Mutant Ni...|         Luggage|          3|            0|          0|   N|                Y|my review of this...|my review of this...| 2015-08-31|
|         US|   23208852|R3PR8X6QGVJ8B1|B005KIWL0E|     618251799|Kenneth Cole Reac...|         Luggage|          5|    

In [5]:
luggage_reviews_df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

## Total Rows in the dataframe

In [6]:
total_rows = luggage_reviews_df.count()
print(f"Total rows: {total_rows}")

Total rows: 348657


In [7]:
# checking of review_id can be used as primary key; number of rows = number of unique review_ids
luggage_reviews_df.select('review_id').distinct().count()

348657

In [8]:
mode = 'append'
jdbc_url="jdbc:postgresql://databasel1.cqunbbkxrhjk.us-east-1.rds.amazonaws.com:5432/databaseL1"
config = {"user":"postgres", 
          "password": "postgres", 
          "driver":"org.postgresql.Driver"}

In [12]:
from pyspark.sql.functions import *
luggage_review_id_table = luggage_reviews_df.select('review_id','customer_id','product_id','product_parent',to_date('review_date','yyyy-mm-dd').alias('review_date'))
luggage_review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| R9CO86UUJCAW5|   40884699|B00VGTN02Y|     786681372| 2015-01-31|
|R3PR8X6QGVJ8B1|   23208852|B005KIWL0E|     618251799| 2015-01-31|
|R39BO2819ABUPF|   17100246|B007UNSHJ6|     810480328| 2015-01-31|
|R3ADL7V6EGGEEP|   13533670|B00WHFNXB4|      20765193| 2015-01-31|
|R1OXYPBPLVRMI5|   38541771|B000M5NBYU|     812890709| 2015-01-31|
|R1WYM8Z5ATQ98O|   12686499|B00ACBJ89G|     624906499| 2015-01-31|
|R3LCIANTN1H9EC|   27767206|B001H4BQ1A|     385044506| 2015-01-31|
|R3U2M23N1P0KQ6|   46630606|B00ESG6NDK|      51297652| 2015-01-31|
|R29MB6N7HB6NZI|   41622754|B00M14SAXC|     832113872| 2015-01-31|
| RGEQ6DGRG7DQG|   15296380|B012PC5QAY|     490793867| 2015-01-31|
|R38HNH0BSS2KFE|   21206415|B00VWKWWMG|     356411837| 2015-01-31|
|R2CATRM9CH59LY|   34554097|B00N2U14UK|     326835930| 2015-01

In [13]:
luggage_review_id_table.dtypes

[('review_id', 'string'),
 ('customer_id', 'int'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('review_date', 'date')]

In [14]:
#uploading dataframe to amazon RDS
luggage_review_id_table.write.jdbc(url=jdbc_url, table='luggage_review_id_table', mode=mode, properties=config)

In [32]:
luggage_products = luggage_reviews_df.dropDuplicates((['product_id'])).select('product_id','product_title')

luggage_products.count()

64589

In [34]:
luggage_products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B0000732QL|   Eastpak Unplugged|
|B00015GEIQ|AmeriLeather Leat...|
|B0001N6F00|Western Pack Off ...|
|B0001ZTM3U|Royce Leather Aut...|
|B000AM1BQA|LaSelva Designs M...|
|B000AYJN2M|U.S. Luggage Busi...|
|B000BV069I|Victorinox CS2-Cr...|
|B000CS7PEY|Ladies Red Hot Ba...|
|B000E99YS6|Personalized Leat...|
|B000HUAXRI|Clava Leather Upr...|
|B000LZ121A|Kenneth Cole Reac...|
|B000MNB93M|Travel Accessorie...|
|B000OTKR7S|Case Logic DVD Al...|
|B000PWIZJQ|Rick Steves Veloc...|
|B000SKLK6K|Alice Pack Frame ...|
|B000UO7ZFO|Travelers Club Lu...|
|B000VIMO7I|AmeriLeather Leat...|
|B000XT5WY2|WATERSHED Yukon D...|
|B000Y4WO38|   Jansport Shoe Bag|
|B000YV66N0|Military Style 2 ...|
+----------+--------------------+
only showing top 20 rows



In [33]:
# upload luggage_products to database
luggage_products.write.jdbc(url=jdbc_url, table='luggage_products', mode=mode, properties=config)

In [35]:
luggage_reviews_df.select('customer_id').distinct().count()

308188

In [44]:
# luggage_reviews_df.groupby('customer_id').agg(count('customer_id').alias('customer_count')).show()
luggage_reviews_df.groupby('customer_id').count().where(col('count')>1).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   43742600|    2|
|     134205|    2|
|   52748596|    2|
|   23984016|    2|
|   39349195|    3|
|   40617235|    2|
|   46281517|    2|
|    2056398|    2|
|   39346466|    2|
|   41587809|    2|
|   36728141|    2|
|    1290207|    2|
|    2294898|    2|
|   14750529|    2|
|   50996390|    2|
|   37096773|    3|
|   46208657|    2|
|   52032969|    2|
|   12924204|    4|
|   47535771|    3|
+-----------+-----+
only showing top 20 rows



In [47]:
luggage_reviews_df.groupby('customer_id').agg(count('customer_id').alias('customer_count')).where(col('customer_count')>1).show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   43742600|             2|
|     134205|             2|
|   52748596|             2|
|   23984016|             2|
|   39349195|             3|
|   40617235|             2|
|   46281517|             2|
|    2056398|             2|
|   39346466|             2|
|   41587809|             2|
|   36728141|             2|
|    1290207|             2|
|    2294898|             2|
|   14750529|             2|
|   50996390|             2|
|   37096773|             3|
|   46208657|             2|
|   52032969|             2|
|   12924204|             4|
|   47535771|             3|
+-----------+--------------+
only showing top 20 rows



In [49]:
luggage_customers = luggage_reviews_df.groupby('customer_id').agg(count('customer_id').alias('customer_count'))
luggage_customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   42596797|             1|
|   12296525|             1|
|   49101505|             1|
|   28377689|             1|
|   14468845|             1|
|   26079415|             1|
|   12945150|             1|
|   42049189|             1|
|     740134|             1|
|   15123832|             1|
|   43742600|             2|
|     134205|             2|
|     255890|             1|
|   52748596|             2|
|   30070208|             1|
|   20994440|             1|
|   45330946|             1|
|   18061621|             1|
|    1051742|             1|
|   38273165|             1|
+-----------+--------------+
only showing top 20 rows



In [50]:
# upload luggage_customers to amazon RDS
luggage_customers.write.jdbc(url=jdbc_url, table='luggage_customers', mode=mode, properties=config)

In [53]:
luggage_vine_table = luggage_reviews_df.select('review_id','star_rating','helpful_votes','total_votes','vine')
luggage_vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| R9CO86UUJCAW5|          3|            0|          0|   N|
|R3PR8X6QGVJ8B1|          5|            0|          0|   N|
|R39BO2819ABUPF|          4|            0|          0|   N|
|R3ADL7V6EGGEEP|          4|            0|          0|   N|
|R1OXYPBPLVRMI5|          5|            0|          0|   N|
|R1WYM8Z5ATQ98O|          3|            0|          0|   N|
|R3LCIANTN1H9EC|          4|            1|          1|   N|
|R3U2M23N1P0KQ6|          5|            0|          0|   N|
|R29MB6N7HB6NZI|          1|            2|          2|   N|
| RGEQ6DGRG7DQG|          5|            0|          0|   N|
|R38HNH0BSS2KFE|          1|            1|          1|   N|
|R2CATRM9CH59LY|          5|            4|          4|   N|
|R18DMKNDPJ1BSN|          5|            0|          1|   N|
| RZRSHBWXO3XUQ|          5|            

In [54]:
# upload luggage_vine_table to amazon RDS
luggage_vine_table.write.jdbc(url=jdbc_url, table='luggage_vine_table', mode=mode, properties=config)