In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
df_subset = df.limit(50)
df_subset

### Create DataFrames to match tables

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import to_date
import pandas as pd

# Read in the Review dataset as a DataFrame


In [None]:
# Create the customers_table DataFrame
customers_df = df_subset.groupby("customer_id").count()
customers_df=customers_df.withColumnRenamed("count","customer_count")
customers_df.show()

In [None]:
customers_df

In [None]:
#customers_df.sort_values(by="customer_id")


In [None]:
# Create the products_table DataFrame and drop duplicates. 
products_df = df_subset.select('product_id','product_title').drop_duplicates()
products_df.show()


In [None]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
review_id_df = df_subset.select(['review_id','customer_id','product_id','product_parent',to_date("review_date",'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

In [None]:
# Create the vine_table. DataFrame
#vine_df = df_subset.select(df_subset['review_id'],df_subset['star_rating'],df_subset['helpful_votes'],df_subset['total_votes'],df['vine'],df_subset['verified_purchase'])
#vine_df.show()

vine_df=df_subset.select(col("review_id"),col("star_rating"),col("helpful_votes"),col("total_votes"),col("vine"),col("verified_purchase"))
vine_df.show()


### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb1.cx17j5ikouuv.us-east-1.rds.amazonaws.com:5432/postgres2"
config = {"user":"postgres", 
          "password": "password", 
          "driver":"org.postgresql.Driver"}


In [None]:
# Write review_id_df to table in RDS

review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [None]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [None]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)