In [2]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
# import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:11 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [55.5 kB]
Hit:12 http://ppa.launchpad.net/cran/

In [3]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-05-08 13:04:37--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-05-08 13:04:38 (5.69 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [5]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   45610553| RMDCHWD0Y5OZ9|B00HH62VB6|     618218723|AGPtek® 10 Isolat...|Musical Instruments|          3|            0|          1|   N|                N|         Three Stars|Works very good, ...| 2015-08-31|
|         US|   14640079| RZSL0BALIYUNU|B003LRN53I|     986692292|Sennheiser HD203 ...|Musical Instruments| 

In [None]:
# Print our schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



### Create DataFrames to match tables

In [6]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [7]:
from pyspark.sql.functions import to_date

In [8]:
# Read in the Review dataset as a DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|   product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+-------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   45610553| RMDCHWD0Y5OZ9|B00HH62VB6|     618218723|AGPtek® 10 Isolat...|Musical Instruments|          3|            0|          1|   N|                N|         Three Stars|Works very good, ...| 2015-08-31|
|         US|   14640079| RZSL0BALIYUNU|B003LRN53I|     986692292|Sennheiser HD203 ...|Musical Instruments| 

In [9]:
df.groupBy("customer_id").count().show()


+-----------+-----+
|customer_id|count|
+-----------+-----+
|   27314089|    1|
|    6711334|    1|
|   44525866|    4|
|   47461997|    1|
|   42560427|    1|
|   35225823|    1|
|   52526865|   20|
|    1954060|    1|
|   34202730|    3|
|   40014361|    1|
|   42719693|    1|
|   27252006|    1|
|   48297144|    1|
|   12204397|    1|
|   17090175|    2|
|   13352125|    9|
|   46958825|   28|
|     134138|    1|
|     283456|    1|
|   42847010|    2|
+-----------+-----+
only showing top 20 rows



In [71]:
# Create the customers_table DataFrame
customers_table = df.groupby('customer_id').agg({'customer_id':'count'}).withColumnRenamed('count(customer_id)', 'customer_count')

In [18]:
# Create the customers_table DataFrame
# customers_table = df.groupby('customer_id').agg({'customer_id':'count'}).withColumnRenamed('count(customer_id)', 'customer_count').show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   27314089|             1|
|    6711334|             1|
|   44525866|             4|
|   47461997|             1|
|   42560427|             1|
|   35225823|             1|
|   52526865|            20|
|    1954060|             1|
|   34202730|             3|
|   40014361|             1|
|   42719693|             1|
|   27252006|             1|
|   48297144|             1|
|   12204397|             1|
|   17090175|             2|
|   13352125|             9|
|   46958825|            28|
|     134138|             1|
|     283456|             1|
|   42847010|             2|
+-----------+--------------+
only showing top 20 rows



In [72]:
# Create the products_table DataFrame and drop duplicates. 
products_table = df.select(['product_id','product_title']).drop_duplicates(subset=['product_id'])

In [19]:
# Create the products_table DataFrame and drop duplicates. 
# products_table = df.select(['product_id','product_title']).drop_duplicates(subset=['product_id']).show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B000078JQ0|M-Audio Portable ...|
|B0000C8CFP|Gemini CDJ 15X TO...|
|B0002CZTIE|Gator 88 Note Key...|
|B0002D02NA|Protec CTG234 Ele...|
|B0002D03EI|Traveler Guitar O...|
|B0002D049M|Wireless Solution...|
|B0002D04B0|Zildjian A Custom...|
|B0002D064U|Seymour Duncan - ...|
|B0002D0DVG|Evans RGS Pad Bas...|
|B0002D0GBS|Zildjian ZXT 10-I...|
|B0002D0LRC|Gibson Gear PPAT-...|
|B0002D0PJ6|Clayton Picks Bla...|
|B0002DUS2A|Fender Stainless ...|
|B0002DVBBW|Odyssey C45200 Ca...|
|B0002DYB9Q|Pearl D050 Tensio...|
|B0002E1HG0|Schecter Stiletto...|
|B0002E1P5I|Savarez 520P1 Tra...|
|B0002E2G92|Sabian 18 Inch HH...|
|B0002E2TH6|Aquarian Drumhead...|
|B0002E3D5S|DiMarzio DP123-CR...|
+----------+--------------------+
only showing top 20 rows



In [73]:
# Create the review_id_table DataFrame
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
# review_id_df = df.select([, to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_table = df.select(['review_id','customer_id','product_id','product_parent', to_date("review_date", 'yyyy-MM-dd').alias("review_date")])

In [38]:
# Create the review_id_table DataFrame
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
# review_id_df = df.select([, to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
# review_id_table = df.select(['review_id','customer_id','product_id','product_parent','review_date']).show()
# review_id_table = df.select(['review_date', to_date("review_date", 'yyyy-MM-dd').alias("review_date")])

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RMDCHWD0Y5OZ9|   45610553|B00HH62VB6|     618218723| 2015-08-31|
| RZSL0BALIYUNU|   14640079|B003LRN53I|     986692292| 2015-08-31|
| RIZR67JKUDBI0|    6111003|B0006VMBHI|     603261968| 2015-08-31|
|R27HL570VNL85F|    1546619|B002B55TRG|     575084461| 2015-08-31|
|R34EBU9QDWJ1GD|   12222213|B00N1YPXW2|     165236328| 2015-08-31|
|R1WCUI4Z1SIQEO|   46018513|B001N4GRGS|     134151483| 2015-08-31|
| RL5LNO26GAVJ1|   10225065|B009PJRMHQ|     694166585| 2015-08-31|
|R3GYQ5W8JHP8SB|    6356995|B00NKBDAZS|     446431775| 2015-08-31|
|R30SHYQXGG5EYC|   35297198|B006MIU7U2|     125871705| 2015-08-31|
|R14YLXA56NP51I|   32139520|B000FIBD0I|     771888534| 2015-08-31|
|R1ZH0HSH38IOTZ|   36060782|B0002E52GG|      68535945| 2015-08-31|
|R3H53KLLC210XI|    5301309|B00RZIH52G|     725541773| 2015-08

In [74]:
# Create the vine_table. DataFrame
# vine_df = df.select([])
vine_table=df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase'])

In [41]:
# Create the vine_table. DataFrame
# vine_df = df.select([])
# vine_table=df.select(['review_id','star_rating','helpful_votes','total_votes','vine','verified_purchase']).show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RMDCHWD0Y5OZ9|          3|            0|          1|   N|                N|
| RZSL0BALIYUNU|          5|            0|          0|   N|                Y|
| RIZR67JKUDBI0|          3|            0|          1|   N|                Y|
|R27HL570VNL85F|          5|            0|          0|   N|                Y|
|R34EBU9QDWJ1GD|          5|            0|          0|   N|                Y|
|R1WCUI4Z1SIQEO|          5|            0|          0|   N|                N|
| RL5LNO26GAVJ1|          2|            3|          4|   N|                Y|
|R3GYQ5W8JHP8SB|          5|            0|          0|   N|                Y|
|R30SHYQXGG5EYC|          5|            0|          0|   N|                Y|
|R14YLXA56NP51I|          5|            1|          1|   N|     

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [84]:
# Configure settings for RDS
mode = "append"
#challenge.cyzgpxgthpus.us-east-2.rds.amazonaws.com (this is used to replace <endpoiont>)
#jdbc_url="jdbc:postgresql://<endpoint>:5432/<database name>" 
#MY OWN NOTE: my challenge db became inaccesible so created a new one called database1
jdbc_url="jdbc:postgresql://database-1.cyzgpxgthpus.us-east-2.rds.amazonaws.com:5432/initialdatabase"
config = {"user":"postgres", 
          "password": "password1", 
          "driver":"org.postgresql.Driver"}

In [85]:
# Write review_id_df to table in RDS
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [86]:
# Write products_df to table in RDS
# about 3 min
products_table.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [87]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_table.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [88]:
# Write vine_df to table in RDS
# 11 minutes
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)