In [1]:
import os
spark_version = "spark-3.2.2"
os.environ["SPARK_VERSION"] = spark_version

!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:2 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done


In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-11-20 23:44:18--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-11-20 23:44:19 (5.67 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Digital_Software").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles

url= "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

In [5]:
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Digital_Software_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   17747349|R2EI7QLPK4LF7U|B00U7LCE6A|     106182406|CCleaner Free [Do...|Digital_Software|          4|            0|          0|   N|                Y|          Four Stars|      So far so good| 2015-08-31|
|         US|   10956619|R1W5OMFK1Q3I3O|B00HRJMOM4|     162269768|ResumeMaker Profe...|Digital_Software|          3|    

In [6]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [7]:
df = df.withColumn("review_date", df["review_date"].cast("date"))

In [8]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [9]:
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   17747349|R2EI7QLPK4LF7U|B00U7LCE6A|     106182406|CCleaner Free [Do...|Digital_Software|          4|            0|          0|   N|                Y|          Four Stars|      So far so good| 2015-08-31|
|         US|   10956619|R1W5OMFK1Q3I3O|B00HRJMOM4|     162269768|ResumeMaker Profe...|Digital_Software|          3|    

In [10]:
df = df.dropna()

num_rows = df.count()

print(f"Number of rows = {num_rows:,}")

Number of rows = 102,077


In [11]:
df.distinct().count()

102077

In [12]:
from pyspark.sql.types import StructField, StructType, DateType, IntegerType, StringType

In [13]:
# Make tables: review_id_table; products; customers; vine_table

# CREATE TABLE review_id_table (
#   review_id TEXT PRIMARY KEY NOT NULL,
#   customer_id INTEGER,
#   product_id TEXT,
#   product_parent INTEGER,
#   review_date DATE -- this should be in the formate yyyy-mm-dd
# );

review_id_df = df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])

review_id_df.show()

review_id_df.printSchema()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R2EI7QLPK4LF7U|   17747349|B00U7LCE6A|     106182406| 2015-08-31|
|R1W5OMFK1Q3I3O|   10956619|B00HRJMOM4|     162269768| 2015-08-31|
| RPZWSYWRP92GI|   13132245|B00P31G9PQ|     831433899| 2015-08-31|
|R2WQWM04XHD9US|   35717248|B00FGDEPDY|     991059534| 2015-08-31|
|R1WSPK2RA2PDEF|   17710652|B00FZ0FK0U|     574904556| 2015-08-31|
|R11JVGRZRHTDAS|   42392705|B004KPKSRQ|     306022575| 2015-08-31|
|R2B8468OKXXYE2|   52845868|B00B1TFNTW|      54873662| 2015-08-31|
|R2HGGCCZSSNUCB|   15696503|B00M9GTJLY|     103182180| 2015-08-31|
| REEE4LHSVPRV9|    9723928|B00H9A60O4|     608720080| 2015-08-31|
|R25OMUUILFFHI9|   23522877|B008XAXAC4|      87969525| 2015-08-31|
|R2966PB8UBD5BM|   17022093|B00MHZ6Z64|     249773946| 2015-08-31|
|R1OU91L2G5H6H1|   11635690|B00OPCQ70Q|     956532818| 2015-08

In [14]:
# Change nullable value

new_schema = StructType([
   StructField("review_id", StringType(), False),
   StructField("customer_id", IntegerType(), True),
   StructField("product_id", StringType(), True),
   StructField("product_parent", IntegerType(), True),
   StructField("review_date", DateType(), True)
   ])

review_id_df = spark.createDataFrame(review_id_df[["review_id", "customer_id", "product_id", "product_parent", "review_date"]].collect(), new_schema)

review_id_df.printSchema()

review_id_df.show()

root
 |-- review_id: string (nullable = false)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R2EI7QLPK4LF7U|   17747349|B00U7LCE6A|     106182406| 2015-08-31|
|R1W5OMFK1Q3I3O|   10956619|B00HRJMOM4|     162269768| 2015-08-31|
| RPZWSYWRP92GI|   13132245|B00P31G9PQ|     831433899| 2015-08-31|
|R2WQWM04XHD9US|   35717248|B00FGDEPDY|     991059534| 2015-08-31|
|R1WSPK2RA2PDEF|   17710652|B00FZ0FK0U|     574904556| 2015-08-31|
|R11JVGRZRHTDAS|   42392705|B004KPKSRQ|     306022575| 2015-08-31|
|R2B8468OKXXYE2|   52845868|B00B1TFNTW|      54873662| 2015-08-31|
|R2HGGCCZSSNUCB|   15696503|B00M9GTJLY|     103182180| 2015-08-31|
| REEE4LHSVPRV9|    9723928|B00H9A60O4|  

In [15]:
# -- This table will contain only unique values

# CREATE TABLE products (
#   product_id TEXT PRIMARY KEY NOT NULL UNIQUE,
#   product_title TEXT
# );

products_df = df.select(["product_id", "product_title"])

products_df.count()

102077

In [16]:
products_df = products_df.distinct()

products_df.show()

products_df.printSchema()

products_df.count()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00BSDZRTC|Office Suite [Dow...|
|B00E7XA542|QuickBooks 2014 f...|
|B00SG4YZC6|Power2Go 10 Delux...|
|B008DXM7RK|YourSafetynet hom...|
|B009H6ERAC|        Rostta Stone|
|B00M9H6RJ0|MAGIX Audio & Mus...|
|B00PRJ4IBG|HyperSnap screen ...|
|B007SOLPF0|Math Coloring Boo...|
|B00JAQQANG|openCanvas 5.5 [D...|
|B00CQ8SXOO|Volume Repair [Do...|
|B0092T30G6|Innovative Knowle...|
|B007JYTWC2|Kitt GPS Voice fo...|
|B005UHJYYQ|Internet Security...|
|B003ZK51XS|Quicken Rental Pr...|
|B005S4Z2PQ|Home Designer Int...|
|B004GJWZNG|Charts and Graphs...|
|B00W63HQLA|Doxillion Documen...|
|B005S4Y13K|TurboTax Deluxe F...|
|B00IUSWBBU|Anime Studio Debu...|
|B009H6FIOQ|        Rostta Stone|
+----------+--------------------+
only showing top 20 rows

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



2998

In [17]:
# Change nullable value

new_schema = StructType([
   StructField("product_id", StringType(), False),
   StructField("product_title", StringType(), True)])

products_df = spark.createDataFrame(products_df[["product_id", "product_title"]].collect(), new_schema)

products_df.printSchema()

products_df.show()

root
 |-- product_id: string (nullable = false)
 |-- product_title: string (nullable = true)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00BSDZRTC|Office Suite [Dow...|
|B00E7XA542|QuickBooks 2014 f...|
|B00SG4YZC6|Power2Go 10 Delux...|
|B008DXM7RK|YourSafetynet hom...|
|B009H6ERAC|        Rostta Stone|
|B00M9H6RJ0|MAGIX Audio & Mus...|
|B00PRJ4IBG|HyperSnap screen ...|
|B007SOLPF0|Math Coloring Boo...|
|B00JAQQANG|openCanvas 5.5 [D...|
|B00CQ8SXOO|Volume Repair [Do...|
|B0092T30G6|Innovative Knowle...|
|B007JYTWC2|Kitt GPS Voice fo...|
|B005UHJYYQ|Internet Security...|
|B003ZK51XS|Quicken Rental Pr...|
|B005S4Z2PQ|Home Designer Int...|
|B004GJWZNG|Charts and Graphs...|
|B00W63HQLA|Doxillion Documen...|
|B005S4Y13K|TurboTax Deluxe F...|
|B00IUSWBBU|Anime Studio Debu...|
|B009H6FIOQ|        Rostta Stone|
+----------+--------------------+
only showing top 20 rows



In [18]:
# -- Customer table for first data set

# CREATE TABLE customers (
#   customer_id INT PRIMARY KEY NOT NULL UNIQUE,
#   customer_count INT
# );

customers_df = df.groupBy("customer_id").count()

customers_df = customers_df.withColumnRenamed("count", "customer_count")

customers_df = customers_df.withColumn("customer_count", customers_df["customer_count"].cast("int"))

customers_df.show()

customers_df.printSchema()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   46639223|             1|
|   50407297|             1|
|   52707015|             1|
|   21817779|             1|
|     178254|             1|
|    1442229|             1|
|   12811624|             1|
|   40840841|             1|
|   48319420|             1|
|   16205135|             1|
|   12115907|             1|
|   30190304|             1|
|   43013424|             1|
|   51495952|             1|
|   16558777|             1|
|   53013109|             2|
|   50065664|             1|
|    3534576|             1|
|   36456998|             1|
|   31609314|             1|
+-----------+--------------+
only showing top 20 rows

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



In [19]:
# Change nullable value

new_schema = StructType([
   StructField("customer_id", IntegerType(), False),
   StructField("customer_count", IntegerType(), True)])

customers_df = spark.createDataFrame(customers_df[["customer_id", "customer_count"]].collect(), new_schema)

customers_df.printSchema()

customers_df.show()

root
 |-- customer_id: integer (nullable = false)
 |-- customer_count: integer (nullable = true)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   46639223|             1|
|   50407297|             1|
|   52707015|             1|
|   21817779|             1|
|     178254|             1|
|    1442229|             1|
|   12811624|             1|
|   40840841|             1|
|   48319420|             1|
|   16205135|             1|
|   12115907|             1|
|   30190304|             1|
|   43013424|             1|
|   51495952|             1|
|   16558777|             1|
|   53013109|             2|
|   50065664|             1|
|    3534576|             1|
|   36456998|             1|
|   31609314|             1|
+-----------+--------------+
only showing top 20 rows



In [21]:
# -- vine table

# CREATE TABLE vine_table (
#   review_id TEXT PRIMARY KEY,
#   star_rating INTEGER,
#   helpful_votes INTEGER,
#   total_votes INTEGER,
#   vine TEXT
# );

vine_table_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])

vine_table_df.show()

vine_table_df.printSchema()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R2EI7QLPK4LF7U|          4|            0|          0|   N|
|R1W5OMFK1Q3I3O|          3|            0|          0|   N|
| RPZWSYWRP92GI|          1|            1|          2|   N|
|R2WQWM04XHD9US|          5|            0|          0|   N|
|R1WSPK2RA2PDEF|          4|            1|          2|   N|
|R11JVGRZRHTDAS|          5|            4|          4|   N|
|R2B8468OKXXYE2|          1|            1|          1|   N|
|R2HGGCCZSSNUCB|          1|            0|          0|   N|
| REEE4LHSVPRV9|          1|            0|          0|   N|
|R25OMUUILFFHI9|          5|            0|          0|   N|
|R2966PB8UBD5BM|          5|            0|          0|   N|
|R1OU91L2G5H6H1|          1|            0|          2|   N|
|R3M6YQVMXWGTR6|          5|            0|          0|   N|
|R2M8VZGO4BFN9J|          1|            

In [23]:
# AWS 

my_aws_endpoint = 'mypostgresdb.ct46lxgt7vhw.us-east-2.rds.amazonaws.com' 
my_aws_port_number = '5432' 
my_aws_database_name = 'my_data_class_db'
my_aws_username = '' 
my_aws_password = '' 

In [24]:
# Define the connection string
jdbc_url=f'jdbc:postgresql://{my_aws_endpoint}:{my_aws_port_number}/{my_aws_database_name}'

# Set up the configuration parameters
config = {"user": f'{my_aws_username}', 
          "password": f'{my_aws_password}', 
          "driver":"org.postgresql.Driver"}

mode = 'append' 

In [25]:
# Make tables: review_id_table; products; customers; vine_table
# Write the dataframe to the appropriate table in your PostgreSQL RDS

review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [26]:
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [27]:
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [28]:
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)