In [1]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETLProjectAnalysis").getOrCreate()

In [19]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"), header=True, sep="\t")
df.show(truncate=False)

+-----------+-----------+--------------+----------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+-------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                                                                                                                  |product_category|star_rating|helpful_votes

In [20]:
print(f"Total Number rows in the dataframe are {df.count()}")

Total Number rows in the dataframe are 96901


In [21]:
review_id_table=df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R203HPW78Z7N4K|   16199106|B0067WNSZY|     633038551| 2015-08-31|
|R2EAIGVLEALSP3|   16374060|B002QSXK60|     811766671| 2015-08-31|
|R1K1CD73HHLILA|   15322085|B00EC452R6|     345562728| 2015-08-31|
|R2KZBMOFRMYOPO|   32004835|B00MVVIF2G|     563052763| 2015-08-31|
| R6BIZOZY6UD01|   25414497|B00IY7BNUW|     874236579| 2015-08-31|
|R1MCXZFNF8E7Y0|   36311751|B0033X29CI|     294467812| 2015-08-31|
|R3EMB3E3ODR6BW|   30920961|B005R597HA|     183784715| 2015-08-31|
| RJTONVTTOPJ5S|   52491265|B00MO6V8Y0|     960251524| 2015-08-31|
|R21U5QZ2CQECUM|   48166169|B00HT39QDI|     992475314| 2015-08-31|
| RL2BBC51H89DH|   50394924|B00LESFZ52|       1641606| 2015-08-31|
|R3RNEPHF3WIRSZ|    3915552|B0149IJVPI|     838108342| 2015-08-31|
|R38DNT9KML2PF3|   17068589|B002HT0958|     387104338| 2015-08

In [22]:
review_id_table.dtypes

[('review_id', 'string'),
 ('customer_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('review_date', 'string')]

In [23]:
from pyspark.sql.types import IntegerType, DateType
review_id_table=review_id_table.withColumn("customer_id", review_id_table["customer_id"].cast(IntegerType()))
review_id_table=review_id_table.withColumn("product_id", review_id_table["product_id"].cast(IntegerType()))
review_id_table=review_id_table.withColumn("review_date", review_id_table["review_date"].cast(DateType()))


In [24]:
products=df.select(["product_id", "product_title" ])
products=products.dropDuplicates()
products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00RE645WC|WindMax 23" Stain...|
|B008SCVL6E|G.E. Microwave Gl...|
|B0081LV860|Samsung WF405ATPA...|
|B00KC812WK|2-pack OnePurify ...|
|B00MEPNYRK|GE PSB48YSHSS Pro...|
|B000JONLMQ|KitchenAid 18" TR...|
|B0073M7GNC|Whirlpool WOS92EC...|
|B00BIWR3IQ|GE Profile PWE23K...|
|B00O3XF1RC|MobileWasher Kit ...|
|B00JDB6P3I|Bosch NGM5055UC 5...|
|B0011YJE7Y|GE PHP960DMBB Pro...|
|B00J4EBYBM|Bosch HBL5651UC 5...|
|B0056HJ07Q|Frigidaire FHPC36...|
|B00FGWW82A|Frigidaire FFFS51...|
|B004UM5Y32|None Ers30t10074 ...|
|B00MANTPJM|GE Profile PFE28R...|
|B006L8PW1C|Fisher Paykel DD2...|
|B00T9WOH5E|Lg Lt600p Compati...|
|B001DHRJSU|Whirlpool Part Nu...|
|B0050KKN8E|Whirlpool Part Nu...|
+----------+--------------------+
only showing top 20 rows



In [25]:
products.dtypes

[('product_id', 'string'), ('product_title', 'string')]

In [26]:
# customers=df.select(["customer_id", ""])
from pyspark.sql import functions as F
customers=df.groupby("customer_id").agg( F.count("customer_id"))
customers=customers.withColumnRenamed("count(customer_id)", "customer_count")
customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   41307088|             1|
|   36030878|             1|
|   52001665|             1|
|    2131939|             1|
|    5901451|             1|
|    4468816|             1|
|   30054413|             1|
|     498070|             1|
|   14632268|             1|
|   41893483|             1|
|   20723844|             1|
|   11955952|             1|
|   19309400|             1|
|   30677319|             1|
|   12651580|             1|
|   13920155|             1|
|     138964|             1|
|   29947344|             1|
|   48607266|             1|
|   15908254|             1|
+-----------+--------------+
only showing top 20 rows



In [27]:
vine_table=df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_table.dtypes

[('review_id', 'string'),
 ('star_rating', 'string'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string')]

In [28]:
vine_table=vine_table.withColumn("star_rating", vine_table["star_rating"].cast(IntegerType()))
vine_table=vine_table.withColumn("helpful_votes", vine_table["helpful_votes"].cast(IntegerType()))
vine_table=vine_table.withColumn("total_votes", vine_table["total_votes"].cast(IntegerType()))
vine_table.show()


+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R203HPW78Z7N4K|          5|            0|          0|   N|
|R2EAIGVLEALSP3|          5|            1|          1|   N|
|R1K1CD73HHLILA|          5|            0|          0|   N|
|R2KZBMOFRMYOPO|          5|            1|          1|   N|
| R6BIZOZY6UD01|          5|            0|          0|   N|
|R1MCXZFNF8E7Y0|          1|            0|          0|   N|
|R3EMB3E3ODR6BW|          5|            2|          2|   N|
| RJTONVTTOPJ5S|          5|            0|          0|   N|
|R21U5QZ2CQECUM|          4|            0|          0|   N|
| RL2BBC51H89DH|          4|            0|          0|   N|
|R3RNEPHF3WIRSZ|          2|            0|          0|   N|
|R38DNT9KML2PF3|          5|            0|          0|   N|
|R2ECMBJM8KNNC8|          4|            0|          0|   N|
|R2F3F92PRN9T7S|          5|            

In [29]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://<RDS-Endpoint>:5432/amazon-sentiments"
config = {"user":"root", 
          "password": "<RDS password>", 
          "driver":"org.postgresql.Driver"}


In [14]:
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [15]:
products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [16]:
customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [17]:
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)