In [1]:
pip install pypandoc

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pypandoc
  Downloading pypandoc-1.10-py3-none-any.whl (20 kB)
Installing collected packages: pypandoc
Successfully installed pypandoc-1.10


In [2]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 43.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=fc2f520cf2447b83265a28c887b38b7de4088bc811e967626a04d9fee4f8136a
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [3]:
# Activate Spark in our Colab notebook.
import os
# Find the latest version of spark from http://www.apache.org/dist/spark/ 
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [2 InRelease 14.2 kB/88.7 kB 16%] [Waiting for headers] [Waiting for headers0% [1 InRelease gpgv 242 kB] [2 InRelease 14.2 kB/88.7 kB 16%] [Waiting for hea                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [1 InRelease gpgv 242 kB] [2 InRelease 15.6 kB/88.7 kB 18%] [Waiting for hea0% [1 InRelease gpgv 242 kB] [2 InRelease 15.6 kB/88.7 kB 18%] [Waiting for hea                                                                               Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [1 InRelease gpgv 242 kB] [2 InRel

In [4]:
# Get postgresql package
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2023-01-02 05:30:10--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2023-01-02 05:30:10 (5.73 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [5]:
# Import Spark and create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigData-reviews_us_Software").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Extract the Amazon Data into Spark DataFrame

In [6]:
# Read in the data from an S3 Bucket
from pyspark import SparkFiles
url= "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Software_v1_00.tsv.gz" 
spark.sparkContext.addFile(url)
software_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Software_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
software_data_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   42605767|R3EFW2STIYIY0I|B00MUTIDKI|     248732228|McAfee 2015 Inter...|        Software|          1|            2|          2|   N|                Y|I was very disapp...|I was very disapp...|2015-08-31 00:00:00|
|         US|   51771800|R12NR0R5A9F7FT|B00EPACNUG|     531462352|Hallmark Card Stu...| 

In [7]:
# Get the number of rows in the DataFrame.
software_data_df.count()

341931

# Transform the Data

## Create the "review_id_table".

In [12]:
from pyspark.sql.functions import to_date
software_review_id_df = software_data_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
software_review_id_df.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R3EFW2STIYIY0I|   42605767|B00MUTIDKI|     248732228|2015-08-31 00:00:00|
|R12NR0R5A9F7FT|   51771800|B00EPACNUG|     531462352|2015-08-31 00:00:00|
|R1LSH74R9XAP59|   16053526|B00164AZA4|     473982505|2015-08-31 00:00:00|
|R1QXUNTF76K7L6|   15319481|B00E6LIEFM|     189774198|2015-08-31 00:00:00|
|R2F7DR75PS8NKT|    1441820|B00VWEBG06|     852470365|2015-08-31 00:00:00|
|R2C1DJSCC8UFS6|   37107850|B00EP7AP7C|     279360628|2015-08-31 00:00:00|
|R1AXGS1W4YFXMX|     302120|B00OW2PET4|     729971168|2015-08-31 00:00:00|
|R1XU1B93402SYJ|   20193077|B00N4OLCRO|     776572654|2015-08-31 00:00:00|
|R2U432NB3OPVR0|   13106017|B005CELN8W|     222071424|2015-08-31 00:00:00|
|R3R6FIMIOQ5SP9|   32587108|B005CELL1G|     168801430|2015-08-31 00:00:00|
|R12TX6V09C9QNQ|   320200

## Create the "products" Table

In [20]:
software_products_df = software_data_df.dropDuplicates(subset= ["product_id","product_title"] )

In [21]:
software_products_df.count()

28736

In [22]:
software_products_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   33712978|R38LNO9CFVPKQS|0030331927|     524450836|INTERMEDIATE FIN ...|        Software|          5|            3|         15|   N|                N|one of the best s...|one of the best s...|2004-03-01 00:00:00|
|         US|   17241396|R2C1QO9AQGA4HC|007293056X|     149413153|Gregg College Key...| 

## Create the "customers" Table

In [25]:
# Create the "customers_df" DataFrame that groups the data on the "customer_id" by the number of times a customer reviewed a product. 
software_customer_df = software_data_df.groupBy('customer_id').agg({"customer_id": "count"})
software_customer_df = software_customer_df.withColumnRenamed("count(customer_id)", "customer_review_count")
software_customer_df.show()

+-----------+---------------------+
|customer_id|customer_review_count|
+-----------+---------------------+
|   15634680|                    1|
|   44696507|                    1|
|   19901367|                    1|
|   11337682|                    1|
|   52021773|                    1|
|   29916198|                    1|
|   42030944|                    1|
|   52765209|                    1|
|   13519509|                    2|
|   39320350|                    5|
|   23249582|                    1|
|   46351928|                    3|
|   44179620|                    1|
|     637252|                    1|
|   34038793|                    1|
|   51136490|                    1|
|   49354815|                    1|
|     149761|                    2|
|   43333798|                    1|
|    4657027|                    1|
+-----------+---------------------+
only showing top 20 rows



In [26]:
software_customer_df.count()

275379

## Create the "vine_table".

In [27]:
# Create the "vine_df" DataFrame that has the "review_id", "star_rating", "helpful_votes", "total_votes", and "vine" columns. 
from pyspark.sql.functions import to_date
software_vine_df = software_data_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
software_vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3EFW2STIYIY0I|          1|            2|          2|   N|
|R12NR0R5A9F7FT|          5|            0|          0|   N|
|R1LSH74R9XAP59|          2|            0|          1|   N|
|R1QXUNTF76K7L6|          2|            0|          0|   N|
|R2F7DR75PS8NKT|          5|            0|          0|   N|
|R2C1DJSCC8UFS6|          3|            0|          0|   N|
|R1AXGS1W4YFXMX|          1|            0|          2|   N|
|R1XU1B93402SYJ|          1|            1|          1|   N|
|R2U432NB3OPVR0|          5|            0|          0|   N|
|R3R6FIMIOQ5SP9|          5|            0|          0|   N|
|R12TX6V09C9QNQ|          5|            0|          0|   N|
|R33UCII6YKUMKV|          3|            2|          2|   N|
| RZKDAB9TGO053|          1|            0|          0|   N|
|R2EMN2EEDN73ZA|          4|            

# Load

In [30]:
mode = "append"
jdbc_url="jdbc:postgresql://database-2.c2psmop0qavn.ap-southeast-2.rds.amazonaws.com:5432/my_data_class_db"
config = {"user":"", "password": "", "driver":"org.postgresql.Driver"}

In [31]:
# Write review_id_df to table in RDS
software_review_id_df.write.jdbc(url=jdbc_url, table='software_review_id', mode=mode, properties=config)

In [33]:
# Write products_df to table in RDS
software_products_df.write.jdbc(url=jdbc_url, table='software_product', mode=mode, properties=config)

In [34]:
# Write customers_df to table in RDS
software_customer_df.write.jdbc(url=jdbc_url, table='software_customers', mode=mode, properties=config)

In [35]:
# Write vine_df to table in RDS
software_vine_df.write.jdbc(url=jdbc_url, table='software_vine', mode=mode, properties=config)