In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers]0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                              

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-04-25 00:13:09--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.4’


2022-04-25 00:13:10 (10.9 MB/s) - ‘postgresql-42.2.16.jar.4’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
video_games_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Video_Games_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
video_games_df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...| 2015-08-31|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...|     Video Games|          5|    

In [5]:
# Count the number of records in the Video Games DataFrame
video_games_df.count()

1785997

**TRANSFORM:** Now that the raw data stored in S3 is available in a PySpark DataFrame, perform transformations.

In [6]:
#Drop null Values
dropna_df=video_games_df.dropna()
dropna_df.count()

1785886

In [7]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

In [8]:
# Drop duplicates
clean_df = dropna_df.drop_duplicates()
clean_df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   26197704|R10010D6B1QM7U|B004HX5OFW|       8155204|Xbox 360 Slim AC ...|     Video Games|          1|            0|          0|   N|                Y|                 POS|I purchased this ...| 2013-03-28|
|         US|   10409381|R100BI61LIKH2N|B000QW9D14|     583724133|Blue Dragon - Xbo...|     Video Games|          5|    

### Create DataFrames to match tables

In [9]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
clean_df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   26197704|R10010D6B1QM7U|B004HX5OFW|       8155204|Xbox 360 Slim AC ...|     Video Games|          1|            0|          0|   N|                Y|                 POS|I purchased this ...| 2013-03-28|
|         US|   10409381|R100BI61LIKH2N|B000QW9D14|     583724133|Blue Dragon - Xbo...|     Video Games|          5|    

In [10]:
# Create the customers_table DataFrame
customers_df = clean_df.groupby("customer_id").agg({"customer_id":"count"}).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   40504830|             8|
|    3675423|             3|
|   11393715|             9|
|   14175902|            22|
|   39664150|             4|
+-----------+--------------+
only showing top 5 rows



In [11]:
# Create the products_table DataFrame and drop duplicates. 
products_df = clean_df.select(["product_id", "product_title"]).drop_duplicates()
products_df.show(10)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00CJ7IUI6|The Elder Scrolls...|
|B00DHF39KS|Wolfenstein: The ...|
|B00MUTAVH6|Under Night In-Bi...|
|B001AZSEUW|              Peggle|
|B00KVOVBGM|PlayStation 4 Con...|
|B00O9VGH4Y|USPRO&reg; Headph...|
|B004OQNZY4|Phineas and Ferb:...|
|B00ZLN980O|Donop seablue 2.4...|
|B002L8W5V6|Dotop Nintendo Ga...|
|B007AJZ5PY|Nyko Game Case fo...|
+----------+--------------------+
only showing top 10 rows



In [12]:
# Create the review_id_table DataFrame. 
review_id_df = clean_df.select(["review_id", "customer_id", "product_id", "product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show(10)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R10010D6B1QM7U|   26197704|B004HX5OFW|       8155204| 2013-03-28|
|R100BI61LIKH2N|   10409381|B000QW9D14|     583724133| 2013-10-28|
|R100TPJCYDYGBU|   39071377|B00D3IKP7Y|     378642975| 2013-12-31|
|R1018OO9U6UUWO|   32826337|B0094X28J0|     272196441| 2014-03-21|
|R102YIWZEIAXT9|   16385120|B000UQAUWW|     694091073| 2007-11-29|
|R1034S7FA21OI1|   14198390|B004S5PBM0|     275810674| 2011-11-06|
|R103LC8CTAHWZ6|   16965219|B002EE7OKE|     852505908| 2013-10-07|
|R103PT7S4HRIP8|   48295356|B003R7H5TC|     318359987| 2010-12-28|
|R104RCQZYPYDXS|   21497984|B000R3BNDI|     318965716| 2008-02-28|
|R105KZIXHZW63W|   22595160|B000NKFEZI|     256274221| 2013-02-04|
+--------------+-----------+----------+--------------+-----------+
only showing top 10 rows



In [13]:
# Create the vine_table. DataFrame
vine_df = clean_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show(10)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R10010D6B1QM7U|          1|            0|          0|   N|
|R100BI61LIKH2N|          5|            1|          2|   N|
|R100TPJCYDYGBU|          3|            1|          2|   N|
|R1018OO9U6UUWO|          5|            0|          0|   N|
|R102YIWZEIAXT9|          5|            2|          4|   N|
|R1034S7FA21OI1|          5|            2|          2|   N|
|R103LC8CTAHWZ6|          2|            1|          1|   N|
|R103PT7S4HRIP8|          2|            0|          0|   N|
|R104RCQZYPYDXS|          5|            7|          8|   N|
|R105KZIXHZW63W|          5|            1|          1|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 10 rows



### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [16]:
from getpass import getpass
password = getpass('Babak12191205$#')
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-1.cmzvaiawlnio.us-east-2.rds.amazonaws.com:5432/Challenge_db"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}

Babak12191205$#··········


In [17]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [18]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [19]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [20]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)