In [1]:
dbutils.library.installPyPI("boto3", version="1.9.157")
dbutils.library.restartPython()

In [2]:
# import AWS credentials
# import config.py ##for local
%run "/dbfs/FileStore/tables/config" ##for databricks

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql import DataFrame
from pyspark.sql.types import DateType, IntegerType
import pyspark.sql.functions as F

In [4]:
# get or create Spark session

app_name = "spark-postgres"
spark = SparkSession.builder.appName(app_name).getOrCreate()

In [5]:
# file to transform and load after uploading to DBFS
amazon_reviews_file = "/FileStore/tables/amazon_reviews_us_Software_v1_00_tsv-16dc0.gz"
amazon_reviews_file

In [6]:
df_reviews = spark.read \
    .format("com.databricks.spark.csv") \
    .options(header='true', inferSchema="true") \
    .option("delimiter", "\t") \
    .load(amazon_reviews_file)
df_reviews.printSchema()

In [7]:
print("This dataset has " + str(df_reviews.count()) + " rows.")

In [8]:
## Transform data to schema specifications

# review_id_table
review_id_df = df_reviews.select("review_id", "customer_id", "product_id", "product_parent", "review_date") \
                        .withColumn('review_date', F.date_format('review_date', "yyyy-MM-dd").cast(DateType()))

# products table
# need to select unique values due to PK constraint so aggregate and drop count column
products_df = df_reviews.groupBy("product_id", "product_title").agg(F.count("product_id")).select("product_id", "product_title")

# customers table
customers_df = df_reviews.groupBy("customer_id").agg(F.count("review_id").cast(IntegerType()).alias("customer_count"))

# vine table
vine_df = df_reviews.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine")

In [9]:
import boto3

secret_name = my_secret_name
region_name = my_region_name
access_key = my_access_key
secret_key = my_secret_key

session = boto3.session.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name)
client = session.client('secretsmanager')
secret_value = client.get_secret_value(SecretId=secret_name)
secret_value

In [10]:
import json
def get_connection(secret_value):
  return json.loads(secret_value['SecretString'])
get_connection(secret_value)

In [11]:
connection = get_connection(secret_value)

# Postgres credentials
jdbcHostname = connection['host']
jdbcPort = connection['port']
jdbcDatabase = "postgres"
dialect = "postgresql"
jdbcUsername = connection['username']
jdbcPassword = connection['password']

jdbcUrl = f"jdbc:{dialect}://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "org.postgresql.Driver" 
}
jdbcUrl

In [12]:
mode = "append" # options are: error, append, overwrite

review_id_df.write.jdbc(jdbcUrl, "sw_review_id_table", mode, connectionProperties)
products_df.write.jdbc(jdbcUrl, "sw_products", mode, connectionProperties)
customers_df.write.jdbc(jdbcUrl, "sw_customers", mode, connectionProperties)
vine_df.write.jdbc(jdbcUrl, "sw_vine_table", mode, connectionProperties)