In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm -rf $SPARK_VERSION-bin-hadoop2.7.tgz
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connected to cloud.r-project.org (108.157.162.110)] [                                                                               Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [2 InRelease 14.2 kB/88.7 kB 16%] [Connected to cloud.0% [1 InRelease gpgv 242 kB] [Waiting for headers] [2 InRelease 14.2 kB/88.7 kB                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 242 kB] [3 InRelease 14.2 kB/88.7 kB 16%] [2 InRelease 14.                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:6 http://ppa.launchpad

In [2]:

# Use MySQL instead of postgre
!rm -f mysql-connector-java-8.0.20.tar.gz
!wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.20.tar.gz
!tar -zxf mysql-connector-java-8.0.20.tar.gz mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar
!mv mysql-connector-java-8.0.20/mysql-connector-java-8.0.20.jar .
!rmdir mysql-connector-java-8.0.20

--2022-04-20 00:41:24--  https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.20.tar.gz
Resolving downloads.mysql.com (downloads.mysql.com)... 137.254.60.14
Connecting to downloads.mysql.com (downloads.mysql.com)|137.254.60.14|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-java-8.0.20.tar.gz [following]
--2022-04-20 00:41:24--  https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-java-8.0.20.tar.gz
Resolving cdn.mysql.com (cdn.mysql.com)... 23.49.97.31
Connecting to cdn.mysql.com (cdn.mysql.com)|23.49.97.31|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3930793 (3.7M) [application/x-tar-gz]
Saving to: ‘mysql-connector-java-8.0.20.tar.gz’


2022-04-20 00:41:24 (42.7 MB/s) - ‘mysql-connector-java-8.0.20.tar.gz’ saved [3930793/3930793]



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","mysql-connector-java-8.0.20.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles

# Load in sample_us.tsv from S3 into a DataFrame
# url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/sample_us.tsv"
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Luggage_v1_00.tsv.gz"
file_name = os.path.basename(url)
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get(file_name), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")
# Drop duplicated rows if there is any
df = df.drop_duplicates()

# Drop rows with null values if there is any
df = df.dropna()

df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   42610491| R5MPHGZNMQRNJ|B00EAKKOKW|     393337306|Samsonite Fiero H...|         Luggage|          5|            0|          0|   N|                Y|          Five Stars|Nothing to compla...|2015-01-31 00:08:00|
|         US|   22778458|R2SCIZ2KIIW7G2|B00VWKS05Y|     197290186|World Traveler 21...| 

In [5]:
df.count()

348613

In [6]:
# Create dataframes for tables: review_id_table, products, vine_table
# review_id_table
review_id_df = df[['review_id', 'customer_id', 'product_id', 'product_parent', 'review_date']]
review_id_df = review_id_df.drop_duplicates()
review_id_df = review_id_df.dropna()

# products table
products_df = df[['product_id', 'product_title']]
products_df = products_df.drop_duplicates()
products_df = products_df.dropna()

# customers table
customers_df = df.groupBy('customer_id').count()
customers_df = customers_df.withColumnRenamed("count", "customer_count")
customers_df = customers_df.drop_duplicates()
customers_df = customers_df.dropna()


# vine_table
vine_df = df[['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine']]
vine_df = vine_df.drop_duplicates()
vine_df = vine_df.dropna()

In [7]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:mysql://rds-big-data.cszz1vg6fdhl.us-west-1.rds.amazonaws.com:3306/big_data?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
config = {"user":"admin",
          "password": "password",
          "driver":"com.mysql.jdbc.Driver"}

In [8]:
# Write vine DataFrame to vine table
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

In [9]:
# Write vine DataFrame to vine table
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [10]:
# Write products DataFrame to products table
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [11]:
# Write customers DataFrame to customers table
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)