In [1]:
from pyspark.sql import SparkSession
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import os
from dotenv import load_dotenv
from sqlalchemy.engine.url import URL
load_dotenv()

driver=os.environ.get('POSTGRES_DRIVER') # Postgres JDBC driver path
host=os.environ.get('POSTGRES_HOST') # Postgres host url
port=os.environ.get('POSTGRES_PORT') # Postgres port
database=os.environ.get('POSTGRES_DATABASE') # Postgres database
username=os.environ.get('POSTGRES_USERNAME') # Postgres username
password=os.environ.get('POSTGRES_PASSWORD') # Postgres Password
staging_schema=os.environ.get('POSTGRES_STAGING_SCHEMA') # Staging schema
analytics_schema=os.environ.get('POSTGRES_ANALYTICS_SCHEMA') # Staging schema
sparkClassPath=os.environ.get('PYSPARK_SUBMIT_ARGS') 

# The postgres driver needs some dependencies, so call it
# directly from the package from maven using spark.jars.packages. See link:
# https://www.reddit.com/r/apachespark/comments/qhv03n/error_javalangclassnotfoundexception/

### Configure spark session and specify classpath config for reading and writing to postgres database

##### Use the command `wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jar  `  to download the postgresql jar file and save the jar file to the pysaprk jars directory of your python environment that is :  
`/Users/akawiifeanyicourage/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/`

In [2]:
spark = (SparkSession
        .builder
        .config("spark.executor.extraClassPath", sparkClassPath)
        .master('local[*]')
        .appName('data2bots')
        .getOrCreate())

23/06/21 03:58:42 WARN Utils: Your hostname, Akawi-Ifeanyi-Courage-Data-Engineer.local resolves to a loopback address: 127.0.0.1; using 172.20.10.2 instead (on interface en0)
23/06/21 03:58:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.3.1/libexec/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/akawiifeanyicourage/.ivy2/cache
The jars for the packages stored in: /Users/akawiifeanyicourage/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8f92874e-6d82-4634-bcdd-c823dd9524d5;1.0
	confs: [default]
	found org.postgresql#postgresql;42.6.0 in central
	found org.checkerframework#checker-qual;3.31.0 in central
:: resolution report :: resolve 68ms :: artifacts dl 2ms
	:: modules in use:
	org.checkerframework#checker-qual;3.31.0 from central in [default]
	org.postgresql#postgresql;42.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------

23/06/21 03:58:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Download the CSV files using boto3 Client

In [3]:
# Download all the files using boto3
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
bucket_name = "d2b-internal-assessment-bucket"
response = s3.list_objects(Bucket=bucket_name, Prefix="orders_data")
files_to_download = ['orders.csv','reviews.csv','shipment_deliveries.csv']

for file in files_to_download:
    s3.download_file(bucket_name, f"orders_data/{file}", f"{file}")

### Read the downloaded files into Pyspark Dataframes

In [4]:
# read the csv files into spark dataframes
orders_schema = "order_id INT, customer_id INT, order_date DATE, product_id STRING, \
unit_price INT, quantity INT,amount INT"

reviews_schema = "review INT, product_id INT"

shipment_deliveries_schema = "shipment_id INT, order_id INT, shipment_date DATE, delivery_date DATE"


orders_df = (spark
            .read
            .format('csv')
            .option('header', 'true')
            .option('mode', 'FAILFAST')
            .schema(orders_schema)
            .load('orders.csv')
            )

reviews_df = (spark
            .read
            .format('csv')
            .option('header', 'true')
            .option('mode', 'FAILFAST')
            .schema(reviews_schema)
            .load('reviews.csv')
            )

shipment_deliveries_df = (spark
            .read
            .format('csv')
            .option('header', 'true')
            .option('mode', 'FAILFAST')
            .schema(shipment_deliveries_schema)
            .load('shipment_deliveries.csv')
            )

### Check

In [5]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: string (nullable = true)
 |-- unit_price: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- amount: integer (nullable = true)



In [6]:
orders_df.show(5)

23/06/21 03:58:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: order_id, customer_id, order_date, product_id, unit_price, quantity, total_price
 Schema: order_id, customer_id, order_date, product_id, unit_price, quantity, amount
Expected: amount but found: total_price
CSV file: file:///Users/akawiifeanyicourage/Desktop/data_engineering_projects/data_2_bots_de_assessment/cleanup/cleanup_spark/orders.csv
+--------+-----------+----------+----------+----------+--------+------+
|order_id|customer_id|order_date|product_id|unit_price|quantity|amount|
+--------+-----------+----------+----------+----------+--------+------+
|       1|          5|2022-07-13|        24|       139|      10|  1390|
|       2|         14|2021-04-06|         2|       273|       4|  1092|
|       3|         17|2022-07-29|        20|       253|       9|  2277|
|       4|         14|2022-08-27|         8|       334|       1|   334|
|       5|         25|2021-12-15|         6|       334|     

In [7]:
reviews_df.printSchema()

root
 |-- review: integer (nullable = true)
 |-- product_id: integer (nullable = true)



In [8]:
reviews_df.show(5)

+------+----------+
|review|product_id|
+------+----------+
|     1|        21|
|     3|         1|
|     2|         8|
|     1|         5|
|     5|        22|
+------+----------+
only showing top 5 rows



In [9]:
shipment_deliveries_df.printSchema()

root
 |-- shipment_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- shipment_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)



In [10]:
shipment_deliveries_df.show(5)

+-----------+--------+-------------+-------------+
|shipment_id|order_id|shipment_date|delivery_date|
+-----------+--------+-------------+-------------+
|          1|       1|   2022-07-14|         null|
|          2|       2|         null|         null|
|          3|       3|   2022-07-31|   2022-08-03|
|          4|       4|   2022-09-02|   2022-09-05|
|          5|       5|   2021-12-19|   2021-12-20|
+-----------+--------+-------------+-------------+
only showing top 5 rows



### Write the raw data to Postgres database

In [11]:
# write orders data to postgres database
(orders_df.write
 .format('jdbc')
 .option('url', f'{host}:{port}/{database}')
 .option('driver', driver)
 .option('dbtable', f'{staging_schema}.orders')
 .option('user', username)
 .option('password', password)
 .mode('overwrite')
.save())

23/06/21 03:58:52 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: order_id, customer_id, order_date, product_id, unit_price, quantity, total_price
 Schema: order_id, customer_id, order_date, product_id, unit_price, quantity, amount
Expected: amount but found: total_price
CSV file: file:///Users/akawiifeanyicourage/Desktop/data_engineering_projects/data_2_bots_de_assessment/cleanup/cleanup_spark/orders.csv


                                                                                

In [12]:
# write reviews data to postgres database
(reviews_df.write
 .format('jdbc')
 .option('url', f'{host}:{port}/{database}')
 .option('driver', driver)
 .option('dbtable', f'{staging_schema}.reviews')
 .option('user', username)
 .option('password', password)
 .mode('overwrite')
.save())

                                                                                

In [13]:
# write shipment_deliveries data to postgres database
(shipment_deliveries_df.write
 .format('jdbc')
 .option('url', f'{host}:{port}/{database}')
 .option('driver', driver)
 .option('dbtable', f'{staging_schema}.shipment_deliveries')
 .option('user', username)
 .option('password', password)
 .mode('overwrite')
.save())

                                                                                