In [None]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.2'
# spark_version = 'spark-<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connectin                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connectin                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpad.net0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
0% [1 InRelease gpgv 3,626 B] [Wait

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-11-20 00:13:37--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-11-20 00:13:37 (5.62 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
# Open Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Books").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in tsv file from S3
from pyspark import SparkFiles
url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Grocery_v1_00.tsv.gz'
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get('amazon_reviews_us_Grocery_v1_00.tsv.gz'), sep='\t', header=True)

In [None]:
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   42521656|R26MV8D0KG6QI6|B000SAQCWC|     159713740|The Cravings Plac...|         Grocery|          5|            0|          0|   N|                Y|Using these for y...|As a family aller...| 2015-08-31|
|         US|   12049833|R1OF8GP57AQ1A0|B00509LVIQ|     138680402|Mauna Loa Macadam...|         Grocery|          5|    

In [None]:
# Get size of DataFrame
size = df.count()
print(f"Number of rows in DataFrame : {size}")

Number of rows in DataFrame : 2402458


In [None]:
# Drop duplicate rows and nulls
df = df.dropDuplicates().dropna()

In [None]:
size = df.count()
print(f"Number of rows in DataFrame : {size}")

Number of rows in DataFrame : 2402212


In [None]:
# Examine schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [None]:
# Drop unnused columns
df = df.drop('marketplace','product_category','verified_purchase','review_headline','review_body')

In [None]:
# Convert numerical strings to integers to match schema
from pyspark.sql.types import IntegerType
df = df.withColumn("customer_id", df["customer_id"].cast(IntegerType()))
df = df.withColumn("product_parent", df["product_parent"].cast(IntegerType()))
df = df.withColumn("star_rating", df["star_rating"].cast(IntegerType()))
df = df.withColumn("helpful_votes", df["helpful_votes"].cast(IntegerType()))
df = df.withColumn("total_votes", df["total_votes"].cast(IntegerType()))

In [None]:
# Convert date strings to date types to match schema
from pyspark.sql.functions import col, to_date
df = df.withColumn("review_date",to_date(col("review_date"),"yyyy-mm-dd"))

In [None]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- review_date: date (nullable = true)



In [None]:
# Create DataFrames to load into RDF
review_id_df = df.select(["review_id", "customer_id","product_id","product_parent","review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R100KOICK3R3CF|   13956383|B000N308RE|     902787115| 2015-01-09|
|R100PWDLZSNOJE|   13214098|B0052UCCI6|      50621170| 2012-01-24|
|R10185C2H88C6O|   30221574|B006TU9EQ6|     520170030| 2014-01-08|
|R102A21SZ34VPM|   39771060|B00CWTZCI2|     489782231| 2014-01-28|
|R102A2XJDY3YPA|   13535367|B0005ZZ53C|     908810009| 2005-01-05|
|R102ASTZMWUDN5|   49571296|B00KSF4LZM|     106737264| 2015-01-18|
|R1038PDERZDPPA|   24247146|B009IQLTFW|     842461843| 2015-01-26|
|R104C1VFOOUG35|   21626858|B0000SWZX2|     540913718| 2004-01-15|
|R104HQ5X03UBJV|   12605375|B0012X9CIK|     960234318| 2011-01-28|
|R1050WCB1ADCLZ|    2432357|B000LL0R8I|     572710338| 2014-01-12|
|R1051UF4PE1RAM|    1285570|B00JBBX5UQ|     623654537| 2015-01-19|
|R1056E5M8D9VZD|   25452055|B004N5FR06|     886410786| 2014-01

In [None]:
products_df = df.dropDuplicates(["product_id"]).select("product_id","product_title")
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0657745316|100 Percent All N...|
|0681727810|Beemster Gouda - ...|
|0700026444|Pure Darjeeling T...|
|0786960159|Axis and Allies 1...|
|109274665X|Nestle Milkybar D...|
|1403796890|WWE Kids Todler V...|
|1453060464|Splash Energy Liq...|
|1601263724|      Speaking Amish|
|1603112251|Cocktail Kingdom ...|
|1625684479|Peter Rabbit &amp...|
|1837993823|HiPP Organic Comb...|
|1837993955|Milky Way Magic S...|
|2123457892|1 Month Suply (4 ...|
|2533790060|Ferrero Big Roche...|
|2635214875|Iaso Nutra Burst ...|
|3301261876|Haribo Jelly Babi...|
|3334353648|Fresh Ginger Root...|
|3355335541|Lipton Yellow Lab...|
|359595954X|Haldiram's Classi...|
|3621813667|HiPP Organic Firs...|
+----------+--------------------+
only showing top 20 rows



In [None]:
customers_df = df.groupBy("customer_id").count().withColumnRenamed("count","customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   19836511|             2|
|   37859085|             1|
|   48997737|            24|
|   42319435|            83|
|   19624806|             1|
|   52218500|             3|
|   50041220|             1|
|   44714015|            12|
|   27636670|             1|
|   24133679|             2|
|   35864889|            16|
|   12250773|             1|
|   16224491|             2|
|    2761659|             4|
|   49323498|             1|
|   13788774|             1|
|   25790304|             3|
|   28528294|            38|
|    2769839|             1|
|   14299971|             7|
+-----------+--------------+
only showing top 20 rows



In [None]:
vine_df = df.select("review_id","star_rating","helpful_votes","total_votes","vine")
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R100KOICK3R3CF|          5|            0|          0|   N|
|R100PWDLZSNOJE|          5|            0|          0|   N|
|R10185C2H88C6O|          5|            0|          0|   N|
|R102A21SZ34VPM|          5|            0|          0|   N|
|R102A2XJDY3YPA|          1|            1|          1|   N|
|R102ASTZMWUDN5|          5|            2|          2|   N|
|R1038PDERZDPPA|          5|            0|          0|   N|
|R104C1VFOOUG35|          5|           14|         15|   N|
|R104HQ5X03UBJV|          5|            0|          0|   N|
|R1050WCB1ADCLZ|          1|           75|         80|   N|
|R1051UF4PE1RAM|          1|            0|          0|   N|
|R1056E5M8D9VZD|          5|            0|          0|   N|
|R105I0GFDTIXTV|          5|            2|          2|   N|
|R105P0QQJXKAXZ|          5|            

In [None]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://database-1.cnudimefwqi6.us-west-1.rds.amazonaws.com:5432/my_data_class_db"
config = {"user":"root",
          "password": "goceltics!",
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrames to RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)


In [None]:
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)


In [None]:
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)


In [None]:
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)