In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connected to cloud.r-pro0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10

In [2]:
# 
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-01-17 21:15:17--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-01-17 21:15:17 (5.94 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("bigdata").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Gift_Card_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

gc_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
gc_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   24371595|R27ZP1F1CD0C3Y|B004LLIL5A|     346014806|Amazon eGift Card...|       Gift Card|          5|            0|          0|   N|                Y|          Five Stars|Great birthday gi...| 2015-08-31|
|         US|   42489718| RJ7RSBCHUDNNE|B004LLIKVU|     473048287|Amazon.com eGift ...|       Gift Card|          5|    

# Total Number of Records #

In [5]:
total_rows = gc_df.count()
print(f"The total number of records in the dataset is {total_rows}")

The total number of records in the dataset is 149086


In [6]:
gc_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [7]:
# create the review id table
gc_review_id_temp_df = gc_df.select(['review_id', 'customer_id', 'product_id', 'product_parent', 'review_date'])
gc_review_id_temp_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R27ZP1F1CD0C3Y|   24371595|B004LLIL5A|     346014806| 2015-08-31|
| RJ7RSBCHUDNNE|   42489718|B004LLIKVU|     473048287| 2015-08-31|
|R1HVYBSKLQJI5S|     861463|B00IX1I3G6|     926539283| 2015-08-31|
|R2HAXF0IIYQBIR|   25283295|B00IX1I3G6|     926539283| 2015-08-31|
| RNYLPX611NB7Q|     397970|B005ESMGV4|     379368939| 2015-08-31|
|R3ALA9XXMBEDZR|   18513645|B004KNWWU4|     326384774| 2015-08-31|
|R3R8PHAVJFTPDF|   22484620|B004LLIKVU|     473048287| 2015-08-31|
|R18WWEK8OIXE30|   14765851|BT00CTP2EE|     775486538| 2015-08-31|
|R1EGUNQON2J277|   18751931|B004LLIKVU|     473048287| 2015-08-31|
|R21Z4M4L98CPU2|   15100528|B004W8D102|     595099956| 2015-08-31|
| R6JH7A117FHFA|    3559726|B004LLIKVU|     473048287| 2015-08-31|
|R1XZHS8M1GCGI7|   23413911|B004KNWWU4|     326384774| 2015-08

In [8]:
# check schema
gc_review_id_temp_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: string (nullable = true)



In [9]:
# convert review_date to DATE from string
from pyspark.sql.functions import to_date
gc_review_id_df = gc_review_id_temp_df.withColumn('REVIEW_DATED', to_date(gc_review_id_temp_df.review_date, 'yyyy-MM-dd'))
gc_review_id_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: string (nullable = true)
 |-- REVIEW_DATED: date (nullable = true)



In [10]:
# drop "review_date" column
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
gc_review_id_df = gc_review_id_df.drop("review_date")
gc_review_id_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- REVIEW_DATED: date (nullable = true)



In [11]:
# rename "REVIEW_DATE" column
gc_review_id_df = gc_review_id_df.withColumnRenamed('REVIEW_DATED', 'review_dated')
gc_review_id_df.show()

+--------------+-----------+----------+--------------+------------+
|     review_id|customer_id|product_id|product_parent|review_dated|
+--------------+-----------+----------+--------------+------------+
|R27ZP1F1CD0C3Y|   24371595|B004LLIL5A|     346014806|  2015-08-31|
| RJ7RSBCHUDNNE|   42489718|B004LLIKVU|     473048287|  2015-08-31|
|R1HVYBSKLQJI5S|     861463|B00IX1I3G6|     926539283|  2015-08-31|
|R2HAXF0IIYQBIR|   25283295|B00IX1I3G6|     926539283|  2015-08-31|
| RNYLPX611NB7Q|     397970|B005ESMGV4|     379368939|  2015-08-31|
|R3ALA9XXMBEDZR|   18513645|B004KNWWU4|     326384774|  2015-08-31|
|R3R8PHAVJFTPDF|   22484620|B004LLIKVU|     473048287|  2015-08-31|
|R18WWEK8OIXE30|   14765851|BT00CTP2EE|     775486538|  2015-08-31|
|R1EGUNQON2J277|   18751931|B004LLIKVU|     473048287|  2015-08-31|
|R21Z4M4L98CPU2|   15100528|B004W8D102|     595099956|  2015-08-31|
| R6JH7A117FHFA|    3559726|B004LLIKVU|     473048287|  2015-08-31|
|R1XZHS8M1GCGI7|   23413911|B004KNWWU4|     3263

In [12]:
# create the products table df
gc_products_df = gc_df.select(['product_id', "product_title"])
gc_products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B004LLIL5A|Amazon eGift Card...|
|B004LLIKVU|Amazon.com eGift ...|
|B00IX1I3G6|Amazon.com Gift C...|
|B00IX1I3G6|Amazon.com Gift C...|
|B005ESMGV4|Amazon.com Gift C...|
|B004KNWWU4|Amazon Gift Card ...|
|B004LLIKVU|Amazon.com eGift ...|
|BT00CTP2EE|Amazon.com Gift C...|
|B004LLIKVU|Amazon.com eGift ...|
|B004W8D102|Amazon Gift Card ...|
|B004LLIKVU|Amazon.com eGift ...|
|B004KNWWU4|Amazon Gift Card ...|
|B004LLIKVU|Amazon.com eGift ...|
|B004LLIKVU|Amazon.com eGift ...|
|B00H5BNLUS|Amazon eGift Card...|
|B004KNWX6C|Amazon Gift Card ...|
|BT00CTOYC0|Amazon.com $15 Gi...|
|B004LLIKVU|Amazon.com eGift ...|
|B00H5BMH44|Amazon eGift Card...|
|B005ESMMKE|Amazon.com Gift C...|
+----------+--------------------+
only showing top 20 rows



In [13]:
# create the customers df
from pyspark.sql.functions import desc
gc_customers_df = gc_df.select(['customer_id'])\
  .groupby("customer_id")\
  .agg({"customer_id": "count"})\
  .orderBy(desc("count(customer_id)"))
gc_customers_df.show()

+-----------+------------------+
|customer_id|count(customer_id)|
+-----------+------------------+
|    9374792|                 7|
|   30058414|                 6|
|   52166758|                 6|
|   37097444|                 5|
|   50442527|                 5|
|   41017998|                 5|
|    3184017|                 5|
|   42184434|                 5|
|   51046621|                 5|
|   41920477|                 5|
|   15374693|                 5|
|   29054990|                 5|
|   50822336|                 5|
|   36702734|                 5|
|   37636715|                 5|
|    5572588|                 5|
|   28838326|                 5|
|   45359231|                 5|
|   45298444|                 5|
|   12891421|                 4|
+-----------+------------------+
only showing top 20 rows



In [14]:
# create the vine df
gc_vine_df = gc_df.select(['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine'])
gc_vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R27ZP1F1CD0C3Y|          5|            0|          0|   N|
| RJ7RSBCHUDNNE|          5|            0|          0|   N|
|R1HVYBSKLQJI5S|          5|            0|          0|   N|
|R2HAXF0IIYQBIR|          1|            0|          0|   N|
| RNYLPX611NB7Q|          5|            0|          0|   N|
|R3ALA9XXMBEDZR|          5|            0|          0|   N|
|R3R8PHAVJFTPDF|          5|            0|          0|   N|
|R18WWEK8OIXE30|          5|            0|          0|   N|
|R1EGUNQON2J277|          1|            0|          0|   N|
|R21Z4M4L98CPU2|          5|            0|          0|   N|
| R6JH7A117FHFA|          5|            0|          0|   N|
|R1XZHS8M1GCGI7|          5|            1|          1|   N|
|R1DAI0N03SKRJN|          5|            1|          1|   N|
|R2F6SKZOEYQRU3|          5|            

In [15]:
# configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://amazonpostgresdb.cec7i89ma4zy.us-east-2.rds.amazonaws.com:5432/amazon_db"
config = {"user":"xxxxxxxx", 
          "password": "xxxxxxxx", 
          "driver":"org.postgresql.Driver"}

In [16]:
# write the watch_review_id_df to the watch_review_id table in postress

gc_review_id_df.write.jdbc(url=jdbc_url, table='gc_review_id', mode=mode, properties=config)

Py4JJavaError: ignored