In [None]:
import os

# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.0.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
postgresql_java_version = '42.2.9'
os.environ['POSTGRESQL_JAVA_VERSION'] = postgresql_java_version
!wget https://jdbc.postgresql.org/download/postgresql-$POSTGRESQL_JAVA_VERSION.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Level-One-Starter-Code").config(
    "spark.driver.extraClassPath",
    f"/content/postgresql-{postgresql_java_version}.jar"
  ).getOrCreate()

# Load Amazon Data into Spark DataFrame

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Ebook_Purchase_v1_01.tsv.gz"
spark.sparkContext.addFile(url)

In [None]:
# set schema
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

schema = [StructField("marketplace", StringType(), True),
          StructField("customer_id", IntegerType(), True),
          StructField("review_id", StringType(), True),
          StructField("product_id", StringType(), True),
          StructField("product_parent", IntegerType(), True),
          StructField("product_title", StringType(), True),
          StructField("product_category", StringType(), True),
          StructField("star_rating", IntegerType(), True),
          StructField("helpful_votes", IntegerType(), True),
          StructField("total_votes", IntegerType(), True),
          StructField("vine", StringType(), True),
          StructField("verified_purchase", StringType(), True),
          StructField("review_headline", StringType(), True),
          StructField("review_body", StringType(), True),
          StructField("review_date", DateType(), True),]

final=StructType(fields=schema)

In [None]:
ebook_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Digital_Ebook_Purchase_v1_01.tsv.gz"), sep="\t", header=True, schema=final)

# Show DataFrame
ebook_df.show(truncate=False)

# Cleaned up DataFrames to match tables

In [None]:
# show number of rows in dataset
ebook_df.select("customer_id").count()

In [None]:
# find unique customers and count number of occurrences
customers_df = ebook_df.select("customer_id").groupBy("customer_id").count().withColumnRenamed("count","customer_count")
customers_df.show()

In [None]:
customers_df.printSchema()

In [None]:
# find unique products and their titles
from pyspark.sql import Row
products_df = ebook_df.dropDuplicates((["product_id"])).select("product_id", "product_title")

products_df.show(truncate=False)

In [None]:
products_df.printSchema()

In [None]:
# find info specified for review table
review_id_table_df = ebook_df.select("review_id", "customer_id", "product_id", "product_parent", "review_date")
review_id_table_df.show(truncate=False)

In [None]:
review_id_table_df.printSchema()

In [None]:
# find info specified for vine table
vine_table_df = ebook_df.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")
vine_table_df.show(truncate=False)

In [None]:
vine_table_df.printSchema()

# Push to AWS RDS instance

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.cqdxhnrowhog.us-east-1.rds.amazonaws.com:5432/homework_level_1_db"
configuration = {"user": "root", 
          "password": "<password>"", 
          "driver": "org.postgresql.Driver"}

In [None]:
# Write DataFrame to review_id_table table in RDS
review_id_table_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=configuration)

In [None]:
# Write products_df to table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=configuration)

In [None]:
# Write customers_df to table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=configuration)


In [None]:
# Write vine_df to table in RDS
vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=configuration)
