In [None]:
import os
# find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# for example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# start a SparkSession
import findspark
findspark.init()

!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 http://ppa.launchpad.net/graph

In [None]:
# configure settings and authentication for RDS
mode="append"
jdbc_url="<end-point>"
config = {"user":"<username>", 
          "password": "<password>", 
          "driver":"org.postgresql.Driver"}

In [None]:
# url to the s3 bucket with csv
url1="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Office_Products_v1_00.tsv.gz"
csv1="amazon_reviews_us_Office_Products_v1_00.tsv.gz"
url2="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
csv2="amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
url3="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"
csv3="amazon_reviews_us_Home_Improvement_v1_00.tsv.gz"

# lists of url and csv
url_list = [url1, url2, url3]
csv_list = [csv1, csv2, csv3]

from pyspark import SparkFiles
from pyspark.sql.functions import col

In [None]:
# looping through url_list length
for num in range(0,len(url_list)):

  # reading csv into df
  print('Iteration:', num)
  spark.sparkContext.addFile(url_list[num])
  df = spark.read.csv(SparkFiles.get(csv_list[num]), sep="\t", header=True, inferSchema=True)

  # dropping duplicates and dropping nulls
  df = df.dropDuplicates()
  df = df.na.drop("all")

  # creating tables
  # creating review_id_table
  review_id_table = df.select(["review_id", "customer_id", "product_id", "product_parent", "product_category", "review_date"])
  # changing customer_id datatype to int, product_parent datatype to int, and review_date to datetype
  from pyspark.sql.types import IntegerType,BooleanType,DateType
  review_id_table = review_id_table.withColumn("customer_id", review_id_table.customer_id.cast("int")).withColumn("product_parent", review_id_table.product_parent.cast("int")).withColumn("review_date", review_id_table.review_date.cast(DateType()))
  review_id_table = review_id_table.dropDuplicates(["review_id"])
  
  # creating product table
  products = df.select(["product_id", "product_title"])
  products = products.dropDuplicates(["product_id"])
  products = products.na.drop("all")
  # reading old table and appending new table and removing duplicates and nulls
  old_products = spark.read.jdbc(url=jdbc_url, table='products', properties=config)
  final_products = products.union(old_products).dropDuplicates().na.drop("all")

  # creating customers table
  customers = df.select(["customer_id"])
  customers = customers.withColumn("customer_id", customers.customer_id.cast("int"))
  customers = customers.groupBy("customer_id").count()
  customers = customers.withColumnRenamed("count", "customer_count").withColumn("customer_count", customers.customer_id.cast("int"))
  # reading old table and appending new table and recounting customer_id
  customersdf = spark.read.jdbc(url=jdbc_url, table='customers', properties=config)
  final_customers = customersdf.union(customers)
  final_customers = final_customers.groupBy("customer_id").sum('customer_count').withColumnRenamed("sum(customer_count)", "customer_count").na.drop("all")

  # creating vine table
  vine_table = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
  # casting star_rating to int
  vine_table = vine_table.withColumn("star_rating", vine_table.star_rating.cast("int")).withColumn("helpful_votes", vine_table.helpful_votes.cast("int")).withColumn("total_votes",vine_table.total_votes.cast("int"))
  vine_table = vine_table.dropDuplicates(["review_id"])
  vine_table = vine_table.na.drop("all")

  # appending to tables in rds database
  review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)
  final_products.write.jdbc(url=jdbc_url, table='products', mode="overwrite", properties=config)
  final_customers.write.jdbc(url=jdbc_url, table='customers', mode="overwrite", properties=config)
  vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

Iteration: 0
Iteration: 1
Iteration: 2
