In [1]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Find the latest versions of
#   Spark & Hadoop:  https://spark.apache.org/downloads.html (https://www.apache.org/dist/spark/)
#   Postgres driver: https://jdbc.postgresql.org/
os.environ['HADOOP_VERSION']   = hadoop_version   = 'hadoop3'
os.environ['SPARK_VERSION']    = spark_version    = 'spark-3.3.1'
os.environ['POSTGRES_VERSION'] = postgres_version = 'postgresql-42.5.1'

# Install Java
! apt install openjdk-11-jdk-headless > /dev/null
os.environ['JAVA_HOME']  = '/usr/lib/jvm/java-11-openjdk-amd64'

# Install Spark
! wget https://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-$HADOOP_VERSION.tgz
! tar xf $SPARK_VERSION-bin-$HADOOP_VERSION.tgz
os.environ['SPARK_HOME'] = f'/content/{spark_version}-bin-{hadoop_version}'
! pip install findspark

# Install Postgres driver
! wget https://jdbc.postgresql.org/download/$POSTGRES_VERSION.jar

# Install AWS's Boto3
! pip install boto3

import boto3
import findspark
findspark.init()
from   getpass     import getpass
from   pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('M16-Amazon-Challenge') \
  .config('spark.driver.extraClassPath', f'/content/{postgres_version}.jar') \
  .getOrCreate()
spark



--2022-11-30 00:48:02--  https://www.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving www.apache.org (www.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to www.apache.org (www.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz [following]
--2022-11-30 00:48:02--  https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f8:10a:201a::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz’


2022-11-30 00:48:18 (18.0 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz’ saved [299350810/299350810]

Looking in indexes: https://pypi.org/simple, https://us-python.pk

In [2]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_DVD_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
video_review_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
video_review_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   27288431| R33UPQQUZQEM8|B005T4ND06|     400024643|Yoga for Movement...|       Video DVD|          5|            3|          3|   N|                Y|This was a gift f...|This was a gift f...|2015-08-31 00:00:00|
|         US|   13722556|R3IKTNQQPD9662|B004EPZ070|     685335564|  Something Borrowed| 

In [3]:
# Create customer_table dataframe
customers_df = video_review_df.groupby("customer_id").agg({"customer_id":"count"}).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   24423656|             8|
|   24297214|             1|
|   12879980|             3|
|     515450|             2|
|   13313689|             1|
|   15523729|             4|
|    1673863|             1|
|   14552054|             2|
|   45392827|            14|
|   44178035|             1|
|   10522786|            10|
|   44848424|             3|
|   14230926|             1|
|   49243158|           274|
|   49084939|            10|
|   28777148|            41|
|   41836864|             1|
|    5219946|             1|
|   37795150|             1|
|   52081222|             1|
+-----------+--------------+
only showing top 20 rows



In [4]:
# Create the products_table DataFrame and drop duplicates. 
products_df = video_review_df.select(['product_id','product_title']).drop_duplicates(['product_id'])
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0000143502|Rise and Swine (G...|
|0000143529|My Fair Pastry (G...|
|000014357X|Everyday Italian ...|
|0000143588|Barefoot Contessa...|
|0000791156|Spirit Led—Moving...|
|000107461X|Live in Houston [...|
|000503860X|Chapter X Live [VHS]|
|0005119340|       Abraham [VHS]|
|0017702313|Planet Earth: The...|
|0060511109|   The Secret Sister|
|0060594551|Memories of John ...|
|0060738170|Misquoting Jesus:...|
|0073259152|Bien vu bien dit DVD|
|0073325570|DVD program to ac...|
|0077270444|     DVD t/a Avanti!|
|0078296757|Dinah Zike's Teac...|
|0078698146|Command Performan...|
|0126604258|The History of Ne...|
|0131904388|CHEMISTRY ALIVE H...|
|0132411822|VIDEO DVD KEYSTONE A|
+----------+--------------------+
only showing top 20 rows



In [5]:
from pyspark.sql.functions import to_date

In [6]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
review_id_df = video_review_df.select(['review_id', 'customer_id', 'product_id', 'product_parent', to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| R33UPQQUZQEM8|   27288431|B005T4ND06|     400024643| 2015-08-31|
|R3IKTNQQPD9662|   13722556|B004EPZ070|     685335564| 2015-08-31|
|R3U27V5QMCP27T|   20381037|B005S9EKCW|     922008804| 2015-08-31|
|R2TOH2QKNK4IOC|   24852644|B00FC1ZCB4|     326560548| 2015-08-31|
|R2XQG5NJ59UFMY|   15556113|B002ZG98Z0|     637495038| 2015-08-31|
|R1N1KHBRR4ZTX3|    6132474|B00X8RONBO|     896602391| 2015-08-31|
|R3OM9S0TCBP38K|   48049524|B000CEXFZG|     115883890| 2015-08-31|
|R1W4S949ZRCTBW|    3282516|B00ID8H8EW|     977932459| 2015-08-31|
|R18JL1NNQAZFV2|   51771179|B000TGJ8IU|     840084782| 2015-08-31|
|R1LP6PR06OPYUX|   31816501|B00DPMPTDS|     262144920| 2015-08-31|
| RZKBT035JA0UQ|   16164990|B00X797LUS|     883589001| 2015-08-31|
|R253N5W74SM7N3|   33386989|B00C6MXB42|     734735137| 2015-08

In [7]:
# Create the vine_table. DataFrame
vine_df = video_review_df.select(['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase'])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R33UPQQUZQEM8|          5|            3|          3|   N|                Y|
|R3IKTNQQPD9662|          5|            0|          0|   N|                Y|
|R3U27V5QMCP27T|          5|            1|          1|   N|                Y|
|R2TOH2QKNK4IOC|          5|            0|          1|   N|                Y|
|R2XQG5NJ59UFMY|          5|            0|          0|   N|                Y|
|R1N1KHBRR4ZTX3|          5|            0|          0|   N|                Y|
|R3OM9S0TCBP38K|          5|            0|          0|   N|                Y|
|R1W4S949ZRCTBW|          5|            0|          0|   N|                Y|
|R18JL1NNQAZFV2|          5|            0|          0|   N|                Y|
|R1LP6PR06OPYUX|          4|            0|          0|   N|     

### Connect to the AWS RDS instance and write each DataFrame to its table


In [8]:
# store environment variable
from getpass import getpass
password= getpass('Enter Database Password')

# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://amazonvinereviews.cbmdw7xiu9wr.us-east-2.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}

Enter Database Password··········


In [9]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [10]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [11]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [12]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)