In [2]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics

In [23]:
# Installing PgAdmin4
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-01-27 23:17:46--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-01-27 23:17:47 (6.06 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [24]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PCReviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [25]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_PC_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_PC_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")

# Show DataFrame
df.show()


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   22873041|R3ARRMDEGED8RD|B00KJWQIIC|     335625766|Plemo 14-Inch Lap...|              PC|          5|            0|          0|   N|                Y|Pleasantly surprised|I was very surpri...|2015-01-31 00:08:00|
|         US|   30088427| RQ28TSA020Y6J|B013ALA9LA|     671157305|TP-Link OnHub AC1...| 

In [26]:
# Count number of records in the dataset
print(f"Total number of reviews: {df.count()}")

Total number of reviews: 6908554


# Transform Data to drop NaN values and duplicates and to macth SQL table

In [27]:
# Print how many nan values and duplicates where drop
before = df.count()
print(f"Total number of reviews before droping NaN values: {df.count()}")
df = df.dropna()

after_nan = df.count()
print(f"Total number of reviews after droping NaN values: {after_nan}")
print(f'Total number of reviews drop with NaN values: {before-after_nan}')

df = df.dropDuplicates()
after_d = df.count()
print(f"Total number of reviews before droping Duplicates values: {after_nan}")
print(f'Total number of reviews drop after Duplicates values: {after_d}')
print(f'Total number of reviews drop with Duplicates: {after_nan - after_d}')

Total number of reviews before droping NaN values: 6908554
Total number of reviews after droping NaN values: 6908145
Total number of reviews drop with NaN values: 409
Total number of reviews before droping Duplicates values: 6908145
Total number of reviews drop after Duplicates values: 6908145
Total number of reviews drop with Duplicates: 0


# Examine the Schema

In [28]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [29]:
# Creating review_id_df
review_id_table_df = df.select('review_id', "customer_id", "product_id", "product_parent","review_date")
print(f'Total number of reviews: {review_id_table_df.count()}')

review_id_table_df.show(5)

Total number of reviews: 6908145
+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R1000TO625AJYV|   18841598|B00RHD2GYG|     649036595|2015-01-10 00:07:00|
|R100493IGD5DK4|   34912272|B00A8LPVM8|     135571372|2014-01-24 00:10:00|
|R100AEPKQYUADQ|   43741451|B00B00UBCQ|     214473248|2014-01-19 00:12:00|
|R100GJ66WBS2EK|   30493506|B0034JWXBI|     100661062|2010-01-26 00:04:00|
|R100O3N3LCEF68|   27219536|B0088CJT4U|     572607859|2015-01-25 00:04:00|
+--------------+-----------+----------+--------------+-------------------+
only showing top 5 rows



In [34]:
# Creating products_df
products_df = df.select('product_id', "product_title")
print(f'Total number of products: {products_df.count()}')

#products_df.show(5)

Total number of products: 6908145


In [31]:
# Creating customer_df
customer_df = df.select('customer_id')
customer_df = customer_df.groupBy('customer_id').count()
customer_df = customer_df.withColumnRenamed('count', 'customer_count')
print(f'Number of customerss: {customer_df.count()}')

customer_df.show(5)

Number of customerss: 4056090
+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    8772082|             5|
|   16604210|             2|
|   52767478|             7|
|   32221376|             1|
|   51437479|             5|
+-----------+--------------+
only showing top 5 rows



In [32]:
# Creating vine_df
vine_df = df.select('review_id','star_rating','helpful_votes','total_votes','vine')
print(f'Number of vine: {vine_df.count()}')
vine_df.show(5)

Number of vine: 6908145
+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1000TO625AJYV|          1|            2|          2|   N|
|R100493IGD5DK4|          5|            0|          0|   N|
|R100AEPKQYUADQ|          5|            0|          0|   N|
|R100GJ66WBS2EK|          5|            0|          0|   N|
|R100O3N3LCEF68|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



# Load DataFrames to the Database in AWS

In [33]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-bigdatachallange-rodrigo.cyfk2avge4dd.us-west-1.rds.amazonaws.com:5432/assigment"
config = {"user":"postgres", 
          "password": "postgres2425", 
          "driver":"org.postgresql.Driver"}

# Write DataFrame to table in RDS

In [35]:
review_id_table_df.write.jdbc(url=jdbc_url, table="review_id_table", mode=mode, properties=config)

Py4JJavaError: ignored

In [None]:
products_df.write.jdbc(url=jdbc_url, table="products", mode=mode, properties=config)

In [None]:
customer_df.write.jdbc(url=jdbc_url, table="customers", mode=mode, properties=config)

In [None]:
vine_df.write.jdbc(url=jdbc_url, table="vine_table", mode=mode, properties=config)