In [2]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
0% [1 InRelease gpgv 3,62

In [3]:
# Get postgresql package
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-12-23 06:48:28--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-12-23 06:48:29 (5.50 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [4]:
# Import Spark and create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-HW-1").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Extract the Amazon Data into Spark DataFrame

In [5]:
# Read in the data from an S3 Bucket
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Beauty_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("header", "true").csv(SparkFiles.get("amazon_reviews_us_Beauty_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="mm/dd/yy")
df.show(20)



+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+---------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|          review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+---------------------+-----------+
|         US|    1797882|R3I2DHQBR577SS|B001ANOOOE|       2102612|The Naked Bee Vit...|          Beauty|          5|            0|          0|   N|                Y|          Five Stars| Love this, excell...| 2015-08-31|
|         US|   18381298|R1QNE9NQFJC2Y4|B0016J22EQ|     106393691|Alba Botanica Sun...|          Beauty|          5|

In [6]:
# Get the number of rows in the DataFrame.
print(df.count())


5115666


# Transform the Data

In [7]:
dropped_df = df.dropna()
print(dropped_df.count())

clean_df = dropped_df.dropDuplicates()
print(clean_df.count())

5114733
5114733


In [8]:
# Check the schema
clean_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



## Create the "review_id_table".

In [9]:
from pyspark.sql.functions import to_date
# Create the "review_id_df" DataFrame with the appropriate columns and data types.
review_id_df = clean_df.select(["review_id","customer_id","product_id", "product_parent","review_date"])
review_id_df.show(20)


+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1006OY7VTYDQU|   25376189|B00C224BHC|     925968264| 2014-07-16|
|R1009Z54C76YE4|   22936491|B00GW4FCHE|     232795368| 2014-04-15|
|R100GGYBCRAPXT|   43772801|B001C366FC|     935717996| 2010-04-02|
|R100Q30O1KC9YS|    3672023|B007ZN5ATQ|     240454053| 2014-01-19|
|R100Q4ABWCVLLM|    8208438|B0080CO2F4|     795960598| 2013-07-15|
|R100VYQTE2RKV9|   12724016|B002TECJN6|     616804441| 2012-11-29|
|R100YPJUVN5NUR|   13544167|B003983HZK|     520693335| 2013-06-06|
|R10117UK99MG2H|   22345652|B001LNOEL6|     725062138| 2013-06-17|
|R101D95IKEM6B6|   44225259|B000I9TUTK|     182783072| 2014-11-02|
|R101IYTRCDDSXE|   20839887|B0081NLRE6|     715202277| 2012-11-24|
|R101OQ7AX286DI|     949736|B001GFCV32|     474553079| 2015-06-04|
|R101TZ5H9YV0X9|   52133207|B00325D0WK|      82125437| 2013-08

## Create the "products" Table

In [10]:
# Create the "products_df" DataFrame that drops the duplicates in the "product_id" and "product_title columns. 
products_df= clean_df.select(["product_id", "product_title"])
products_df.show(20)


+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00C224BHC|Leegoal(TM) Lovel...|
|B00GW4FCHE|Showgirl Black an...|
|B001C366FC|Tarte Lash Hugger...|
|B007ZN5ATQ|Philips Sonicare ...|
|B0080CO2F4|Fashion Accessori...|
|B002TECJN6|Meltonian Shoe St...|
|B003983HZK|Gillette Fusion P...|
|B001LNOEL6|Pre De Provence H...|
|B000I9TUTK|Diane 1.75" Hair ...|
|B0081NLRE6|Olay Smooth Finis...|
|B001GFCV32|Alter Ego Impact ...|
|B00325D0WK|Grandelash LASH-M...|
|B002BW0348|Yes to Cucumbers ...|
|B00KOC8LWS|COVERGIRL Outlast...|
|B009XJ4HK8|RMS Beauty Lip2Cheek|
|B000TNOUA4|Maybelline Dream ...|
|B00RUJNUNI|Instantly Ageless...|
|B0058T8QVI|Swirl Roll On Ref...|
|B00IAKEO22|NYX Soft Matte Li...|
|B001KYQESC|c. Booth Body Sou...|
+----------+--------------------+
only showing top 20 rows



## Create the "customers" Table

In [11]:
# Create the "customers_df" DataFrame that groups the data on the "customer_id" by the number of times a customer reviewed a product. 
customer_count=clean_df.select(["customer_id"]).count()
customer_count




5114733

In [12]:
customers=clean_df.groupBy("customer_id").count()
customers_df=customers.withColumnRenamed("count","customers_count")
customers_df.show(20)

+-----------+---------------+
|customer_id|customers_count|
+-----------+---------------+
|    1327720|              3|
|   39234490|              1|
|   21655184|              1|
|   28862013|              2|
|   12791221|              1|
|   12542214|              6|
|   13867540|             10|
|    8848836|              4|
|    4170434|              1|
|   43876299|              1|
|   11415725|              1|
|   21802892|              1|
|    5904234|              4|
|   10606675|             22|
|   26793187|              1|
|   10477284|              1|
|   31164671|              2|
|   14876455|             18|
|    5450490|              1|
|   23732246|             12|
+-----------+---------------+
only showing top 20 rows



## Create the "vine_table".

In [13]:
# Create the "vine_df" DataFrame that has the "review_id", "star_rating", "helpful_votes", "total_votes", and "vine" columns. 
vine_df = clean_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show(20)


+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1006OY7VTYDQU|          1|            1|          1|   N|
|R1009Z54C76YE4|          5|            1|          2|   N|
|R100GGYBCRAPXT|          2|            0|          0|   N|
|R100Q30O1KC9YS|          5|            0|          0|   N|
|R100Q4ABWCVLLM|          4|            0|          0|   N|
|R100VYQTE2RKV9|          5|            0|          0|   N|
|R100YPJUVN5NUR|          5|            0|          0|   N|
|R10117UK99MG2H|          5|            3|          3|   N|
|R101D95IKEM6B6|          5|            0|          0|   N|
|R101IYTRCDDSXE|          2|            0|          1|   N|
|R101OQ7AX286DI|          5|            0|          0|   N|
|R101TZ5H9YV0X9|          2|            3|          4|   N|
|R101Z29BQTZR2Q|          2|            1|          1|   N|
|R1021X6EZO8EEE|          5|            

# Load

In [None]:
mode = "overwrite"
jdbc_url="jdbc:postgresql://database-big-data.cteomnge5i1n.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"", "password": "", "driver":"org.postgresql.Driver"}

In [None]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table="review_id_table1", mode=mode, properties=config)

In [None]:
# Write products_df to table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
# Write customers_df to table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [None]:
# Write vine_df to table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)