In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-05-16 13:35:11--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-05-16 13:35:13 (1.05 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [0]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.4.1 pyspark-shell'

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETLProject").getOrCreate()

### Load in data from S3

In [0]:
# Read in data from S3 Buckets
from pyspark import SparkFiles

url="https://challengebigdata.s3-us-west-1.amazonaws.com/amazon_reviews_us_Books_v1_02.tsv.gz"
spark.sparkContext.addFile(url)
# fr_df = spark.read.option("header", "true").csv("amazon_reviews_us_Books_v1_02.tsv.gz", inferSchema=True, sep="\t")
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Books_v1_02.tsv.gz"), sep="\t", header=True, inferSchema=True)

# # Show DataFrame
df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   12076615| RQ58W7SMO911M|0385730586|     122662979|Sisterhood of the...|           Books|          4|            2|          3|   N|                N|this book was a g...|this boook was a ...|2005-10-14 00:00:00|
|         US|   12703090|  RF6IUKMGL8SF|0811828964|      56191234|The Bad Girl's Gu...| 

In [0]:
print('number of records:', df.count())
df=df.dropna()
print('new records:', df.count())

number of records: 3105520
new records: 3105360


In [0]:
df=df.drop_duplicates()
print('unique count', df.count())

unique count 3105360


### Examine the schema


In [0]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [0]:
from pyspark.sql.functions import col

df= df.withColumn("review_id",col("review_id").cast(TEXT))\
  .withColumn("product_id",col("product_id").cast(Text))\
  .withColumn("product_title",col("product_title").cast(Text))\
  .withColumn("vine",col("vine").cast(Text))\
  .withColumn("review_date",col("review_date").cast(date))
df.printschema()

NameError: ignored



```
# This is formatted as code
```

### Create New Dataframe for Amazon Book review


In [0]:
review_id_table = df.select(["review_id","customer_id", "product_id", "product_parent", "review_date"])
review_id_table .show(5)

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
| R1002FAEXYJTB|   53055712|0061002860|     121809515|1998-11-07 00:00:00|
|R100G0ZBG3OND5|   52453598|0380781484|     592019319|1998-11-23 00:00:00|
|R100I4QUZNIIR6|   28617069|0415227186|     667254241|2003-07-21 00:00:00|
|R101J95JQTCU0G|   49377164|1560252278|     202792810|2000-10-12 00:00:00|
|R101JCRAXEZ2XN|   35866380|080505314X|     501797008|2002-11-23 00:00:00|
+--------------+-----------+----------+--------------+-------------------+
only showing top 5 rows



### Write DataFrame to RDS

In [0]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://amazondb.cogiwzsyhvew.us-west-1.rds.amazonaws.com:5432/Amazondb"
config = {"user":"root",
          "password": "Hope1714",
          "driver":"org.postgresql.Driver"}

In [0]:
# Write DataFrame to table

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [0]:
vine_table = df.select(["review_id","star_rating", "helpful_votes", "total_votes", "vine"])
vine_table .show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| R1002FAEXYJTB|          4|            0|          0|   N|
|R100G0ZBG3OND5|          5|            4|          6|   N|
|R100I4QUZNIIR6|          5|            9|         15|   N|
|R101J95JQTCU0G|          5|            4|          6|   N|
|R101JCRAXEZ2XN|          2|            0|          7|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [0]:
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

In [0]:
products = df.select(["product_id","product_title"])
products .show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0061002860|The Murder of Rog...|
|0380781484|             Captive|
|0415227186|Schizophrenia: A ...|
|1560252278|The Outlaw Bible ...|
|080505314X|What Every Credit...|
+----------+--------------------+
only showing top 5 rows



In [0]:
products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

Py4JJavaError: ignored

In [0]:
customers = df.select(["customer_id"])
customers .show(5)

+-----------+
|customer_id|
+-----------+
|   53055712|
|   52453598|
|   28617069|
|   49377164|
|   35866380|
+-----------+
only showing top 5 rows



In [0]:
# customers = customers.withColumn("customer_count",lit(1));
df = df.select(col("customer_id"),lit("1").as("customer_count"))
df.show()

SyntaxError: ignored