In [None]:
%md
## Get the data from AWS's endpoint

In [None]:
%pyspark
from pyspark import SparkFiles
endpoint = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"
spark.sparkContext.addFile(endpoint)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"), sep="\t", header=True)
df.show()

In [None]:
%pyspark
# Show the columns
df.columns

In [None]:
%md
## Transformations

In [None]:
%pyspark
from pyspark.sql.functions import to_date
# Review DataFrame
new_date_df = df.select(["review_id", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
new_date_df.show()

In [None]:
%pyspark
import pandas as pd
pandas_df = df.toPandas() 
pandas_date_df = new_date_df.toPandas()

In [None]:
%pyspark
import pandas as pd
# REVIEW_ID_TABLE_DF
review_id_table_df1 = pandas_df[['review_id', 'customer_id', 'product_id', 'product_parent']]
review_id_table_df = pd.merge(review_id_table_df1, pandas_date_df, how="left", on=["review_id", "review_id"])

# PRODUCTS_DF
products_df = pandas_df[['product_id', 'product_title']]
products_df = products_df.drop_duplicates().reset_index()

# CUSTOMERS_DF
customers_df = pandas_df[['customer_id']]
customer_count = customers_df["customer_id"].value_counts()
customers_df = pd.DataFrame({"customer_count": customer_count})
customers_df = customers_df.reset_index()
customers_df = customers_df.rename(columns={"index": "customer_id"})

# VINE_TABLE_DF
vine_table_df = pandas_df[['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine']]

In [None]:
%pyspark
review_id_table_spdf = spark.createDataFrame(review_id_table_df)
products_spdf = spark.createDataFrame(products_df)
customers_spdf = spark.createDataFrame(customers_df)
vine_table_spdf = spark.createDataFrame(vine_table_df)

In [None]:
%md
# Upload to AWS

In [None]:
%pyspark
from config import remote_db_endpoint, remote_db_port, remote_gwsis_dbname, remote_gwsis_dbuser, remote_gwsis_dbpwd
mode = "append"
jdbc_url=f"jdbc:postgresql:{remote_db_endpoint}//:5432/my_data_class_db"
config = {"user": remote_gwsis_dbuser, "password": remote_gwsis_dbpwd, "driver":"org.postgresql.Driver"}
review_id_table_spdf.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)
products_spdf.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)
customers_spdf.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)
vine_table_spdf.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)