In [3]:
import os
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://security.ubun

In [4]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-11-28 04:27:02--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-11-28 04:27:02 (10.6 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [5]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hashing").getOrCreate()

In [6]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [7]:
aws_file = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz"

from pyspark import SparkFiles
spark.sparkContext.addFile(aws_file)
df = spark.read.option("header","true").csv(SparkFiles.get("amazon_reviews_us_Toys_v1_00.tsv.gz"), inferSchema=True, sep="\t")

# Show DataFrame
df.show(truncate=True)


+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!| 2015-08-31|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...|            Toys|          5|    

In [8]:
# count of dataset
df.count()

4864249

In [9]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



**First table to recreate**

CREATE TABLE review_id_table (
  review_id TEXT PRIMARY KEY NOT NULL,
  customer_id INTEGER,
  product_id TEXT,
  product_parent INTEGER,
  review_date DATE -- this should be in the formate yyyy-mm-dd
);

In [10]:
#grabbing the columns
review_id_table = df.select(["review_id","customer_id","product_id","product_parent","review_date"])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RDIJS7QYB6XNR|   18778586|B00EDBY7X8|     122952789| 2015-08-31|
|R36ED1U38IELG8|   24769659|B00D7JFOPC|     952062646| 2015-08-31|
| R1UE3RPRGCOLD|   44331596|B002LHA74O|     818126353| 2015-08-31|
|R298788GS6I901|   23310293|B00ARPLCGY|     261944918| 2015-08-31|
|  RNX4EXOBBPN5|   38745832|B00UZOPOFW|     717410439| 2015-08-31|
|R3BPETL222LMIM|   13394189|B009B7F6CA|     873028700| 2015-08-31|
|R3SORMPJZO3F2J|    2749569|B0101EHRSM|     723424342| 2015-08-31|
|R2RDOJQ0WBZCF6|   41137196|B00407S11Y|     383363775| 2015-08-31|
|R2B8VBEPB4YEZ7|     433677|B00FGPU7U2|     780517568| 2015-08-31|
|R1CB783I7B0U52|    1297934|B0013OY0S0|     269360126| 2015-08-31|
| R2D90RQQ3V8LH|   52006292|B00519PJTW|     493486387| 2015-08-31|
|R1Y4ZOUGFMJ327|   32071052|B001TCY2DO|     459122467| 2015-08

In [11]:
#checking the schema / need to update the review_date
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: string (nullable = true)



In [12]:
#googled why my casts weren't working, found this
from pyspark.sql.types import *

In [13]:
#updated schema
review_id_table = review_id_table.withColumn("review_date",review_id_table["review_date"].cast(DateType()))

review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [14]:
#just double checking the date cast didn't funk up my column
review_id_table.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RDIJS7QYB6XNR|   18778586|B00EDBY7X8|     122952789| 2015-08-31|
|R36ED1U38IELG8|   24769659|B00D7JFOPC|     952062646| 2015-08-31|
| R1UE3RPRGCOLD|   44331596|B002LHA74O|     818126353| 2015-08-31|
|R298788GS6I901|   23310293|B00ARPLCGY|     261944918| 2015-08-31|
|  RNX4EXOBBPN5|   38745832|B00UZOPOFW|     717410439| 2015-08-31|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



**Second Table to be created**

CREATE TABLE products (
  product_id TEXT PRIMARY KEY NOT NULL UNIQUE,
  product_title TEXT
);


In [15]:
#grabbing the columns I need
products =  df.select(["product_id","product_title"])

In [16]:
products.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00EDBY7X8|Monopoly Junior B...|
|B00D7JFOPC|56 Pieces of Wood...|
|B002LHA74O|Super Jumbo Playi...|
|B00ARPLCGY|Barbie Doll and F...|
|B00UZOPOFW|Emazing Lights eL...|
+----------+--------------------+
only showing top 5 rows



In [17]:
#schema check, looks goodie
products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [18]:
#confirming multiple products
products.groupby('product_id').count().show()

+----------+-----+
|product_id|count|
+----------+-----+
|B00VTE0VJQ|    1|
|B00B28IQ50|  150|
|B002IMMDN0|  411|
|B00VJKTLMS|   13|
|B00YH60L7U|   19|
|B00WJ1O76G|   21|
|B00TJ2RBGQ|    2|
|B008MTYTI0|   37|
|B013TODK3W|    1|
|B000FNTGZC|  140|
|B01197XC02|    1|
|B00MWJIP7M|    3|
|B00WHXSDVQ|    2|
|B008GPMF9U|  321|
|B00146LV7A|   20|
|B000UUA988|   25|
|B000PRRRR2|    1|
|B00PMGLB7I|   51|
|B003AKOYPE|    1|
|B0066LCY0G|    5|
+----------+-----+
only showing top 20 rows



In [19]:
#dropping the duplicates
products.dropDuplicates().show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00IANTUPU|Lego, Star Wars, ...|
|B002TMK962|Rock - Concert Ti...|
|B00AOTTSQY|Adult & Children ...|
|B00CH3O5S6|48 Wholesale Wood...|
|B00MF3SAF2|Teenitor 3.7V 680...|
|B004D0C5WY|ThinkMax Silver B...|
|B00NHQHJZS|LEGO Technic Arct...|
|B00NQJ2DBG|Super Bright Long...|
|B0082IGA06|Yu-Gi-Oh! - Dark ...|
|1609581520|American Girl Car...|
|B004RGYVLC|DC Comics POP Her...|
|B0053EX3D4|Inuyasha Kirara T...|
|B00OTLHNI8|Bloco Toys T-Rex ...|
|B00BXEYI8W|Kid Galaxy Robot ...|
|B00JUG2NSM|Weiss Schwarz Kil...|
|B00LVPY252|Maxx Action 30" T...|
|B00IGRHWSC|Mattel Hot Wheels...|
|B00U6FU3YM|Educational Insig...|
|B00Z7FS3Q6|Beach Ball Set, 3...|
|B000WEAGTY|Inflatable Tradit...|
+----------+--------------------+
only showing top 20 rows



**Third table to create**

CREATE TABLE customers (
  customer_id INT PRIMARY KEY NOT NULL UNIQUE,
  customer_count INT


In [20]:
#only grabbing one column this time, will get second column via groupby and rename
customer_ids =  df.select(["customer_id"])

In [21]:
#building the second column out
customers = customer_ids.groupby('customer_id').count()

In [22]:
customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   16989307|    1|
|   45632184|    2|
|   14703850|   13|
|   49645387|    2|
|   16343477|    1|
|   15554899|    1|
|   17067926|    1|
|   50843047|    2|
|    4051424|    1|
|   11487525|    1|
|   19371753|    1|
|   18634862|    1|
|   14552054|    1|
|   52695798|    1|
|   49438424|    3|
|   10854449|    9|
|   48521319|    1|
|   11839424|    2|
|   27887950|    1|
|   45392827|    3|
+-----------+-----+
only showing top 20 rows



In [23]:
#schema check, need to update count and rename still
customers.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- count: long (nullable = false)



In [24]:
customers = customers.withColumn("count",customers["count"].cast(IntegerType())).withColumnRenamed('count', 'customer_count')


In [25]:
customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   16989307|             1|
|   45632184|             2|
|   14703850|            13|
|   49645387|             2|
|   16343477|             1|
|   15554899|             1|
|   17067926|             1|
|   50843047|             2|
|    4051424|             1|
|   11487525|             1|
|   19371753|             1|
|   18634862|             1|
|   14552054|             1|
|   52695798|             1|
|   49438424|             3|
|   10854449|             9|
|   48521319|             1|
|   11839424|             2|
|   27887950|             1|
|   45392827|             3|
+-----------+--------------+
only showing top 20 rows



In [26]:
#schema updated and rename complete
customers.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



**Final table**

CREATE TABLE vine_table (
  review_id TEXT PRIMARY KEY,
  star_rating INTEGER,
  helpful_votes INTEGER,
  total_votes INTEGER,
  vine TEXT


In [27]:
#grabbing the columns
vine_table =  df.select(["review_id","star_rating","helpful_votes","total_votes","vine"])

In [28]:
#schema check, looks fine
vine_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [29]:
vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RDIJS7QYB6XNR|          5|            0|          0|   N|
|R36ED1U38IELG8|          5|            0|          0|   N|
| R1UE3RPRGCOLD|          2|            1|          1|   N|
|R298788GS6I901|          5|            0|          0|   N|
|  RNX4EXOBBPN5|          1|            1|          1|   N|
|R3BPETL222LMIM|          5|            0|          0|   N|
|R3SORMPJZO3F2J|          3|            2|          2|   N|
|R2RDOJQ0WBZCF6|          5|            0|          0|   N|
|R2B8VBEPB4YEZ7|          5|            0|          0|   N|
|R1CB783I7B0U52|          1|            0|          1|   N|
| R2D90RQQ3V8LH|          5|            0|          0|   N|
|R1Y4ZOUGFMJ327|          5|            0|          0|   N|
|R2BUV9QJI2A00X|          5|            0|          1|   N|
| RSUHRJFJIRB3Z|          4|            

**Example code from Big Data - Day 3 - 03-Ins_ETL_S3_RDS**

# Configure settings for RDS


```
mode = "append"

jdbc_url="jdbc:postgresql://<connection string>:5432/<database-name>"

config = {"user":"postgres",

          "password": "<password>", 

          "driver":"org.postgresql.Driver"}
```



In [37]:
mode = "append"
jdbc_url = "jdbc:postgresql://<connection>/<dbname>"
config =  {"user":"<user>",
         "password":"<password>",
         "driver":"org.postrgresql.Driver"}

**Example code taken from same class example as above**

`clean_user_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)`

In [38]:
#review table
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

#products table
products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

#customers table
customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

#vine_table
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)


Py4JJavaError: ignored