# Daily Incremental Load

## Problem Statement

Implement a logic to load incremental exchange rates data on a daily basis from RBA website into warehouse.

## Psuedo code

1. Extract data from RBA website
2. Cleanse the data and add extra ETL specific fields if needed.
3. Establish spark session
4. Load the created dataframe into the table.
5. The similar approach can be used to load into any table by establishing db connection and writing the dataframe
   df.write.options API in pyspark can be used to insert data into Hive/HDFS in various formats.


## Importing Necessary Modules and Assignments


In [21]:
import pandas as pd
import pyspark
from datetime import datetime
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession, Row
from pyspark.sql import SQLContext
from pyspark.sql.types import *

spark = SparkSession.builder.appName("RBA Incremental Exchange Rates").getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j 
log = log4jLogger.LogManager.getLogger(__name__) 

def get_data(load_type):
    if load_type == 'historical':
        data=pd.read_csv("/Users/karthikdivya/Desktop/Personal/PySpark/historical_rates.csv", header=10)
        return data
    elif load_type == 'delta':
        # Data Url can extract the latest records when it is run and load into the table.
        data_url = 'https://www.rba.gov.au/statistics/frequency/exchange-rates.html#latest-exchange-rates'
        log.info("Accessing the site {site} and extracting exchange rates data".format(site=data_url))
        tables = pd.read_html(data_url)
        latest_exchange_rates = tables[0]
        return latest_exchange_rates
        log.info("Downloaded data. Data Cleansing in progress...")

def get_cleansed_data(latest_exchange_rates):
    
    latest_fx_rate_date = datetime.strptime(latest_exchange_rates.columns[-1], '%d %b %Y').strftime('%Y-%m-%d')
    prev_fx_rate_date = datetime.strptime(latest_exchange_rates.columns[-2], '%d %b %Y').strftime('%Y-%m-%d')
    oldest_fx_rate_date = datetime.strptime(latest_exchange_rates.columns[-3], '%d %b %Y').strftime('%Y-%m-%d')
    latest_exchange_rates.columns = ['currency', 'oldest_fx_rate_date', 'prev_fx_rate_date', 'latest_fx_rate_date']

    # Additional Columns - Currency+exch_rate_date combination would be unique
    load_date = datetime.now().strftime('%Y-%m-%d')
    base_currency = 'AUD'
    latest_exchange_rates['exch_rate_date'] = latest_fx_rate_date # The date for which the exchange rates apply
    latest_exchange_rates['load_date'] = load_date # The date on which the load happened
    latest_exchange_rates['base_currency'] = str(base_currency) # Currency against which these rates are reported

    # Drop the previous exchange rates column
    latest_exchange_rates.drop(['oldest_fx_rate_date', 'prev_fx_rate_date'], axis=1, inplace=True)
    
    return latest_exchange_rates

## Registering an empty table for the initial run

1. I have included this step to demonstrate the working in displaying the spark temporary table.
2. Ideally we would create the table in database before hand and append data during the regular run.
3. This step will not be needed and can be skipped at that time.

In [22]:
# Creating the fields and defining datatypes for the table
field = [StructField("currency",StringType(), True),StructField("latest_fx_rate", StringType(), True),\
StructField("exch_rate_date", StringType(), True),StructField("load_date", StringType(), True),\
StructField("base_currency", StringType(), True)]
schema = StructType(field)

# Creating empty dataframe based on the above schema
empty_df = spark.createDataFrame([], schema)

# Creating writer object for the above dataframe
df_writer=pyspark.sql.DataFrameWriter(empty_df)
 
# We can use this to do full load every time if needed or use mode=overwrite if the table is partitioned.
df_writer.saveAsTable("daily_exchange_rates", mode="append")

# create_df
rows=spark.sql("select * from daily_exchange_rates")
rows.show(100)
rows.count()

+--------+--------------+--------------+---------+-------------+
|currency|latest_fx_rate|exch_rate_date|load_date|base_currency|
+--------+--------------+--------------+---------+-------------+
+--------+--------------+--------------+---------+-------------+



0

## Data loading using Spark. 

Create a dataframe and save this dataframe as a table for querying. For the purpose of this exercise, this is being stored as a spark temporary table in this notebook which would ideally be in spark_warehouse (/Users/karthikdivya/Desktop/Personal/PySpark/spark-warehouse) that is locally created. There would be a file that would be created with the name of the table. Incase if we want to drop the table, this file can be deleted and the script can be rerun to load the data.

Ideally for production scenario, this would be written into hive directly or in any file formats (parquet, orc, etc., ) in HDFS directly. There are connectors in pyspark to connect to other databases as well

In [27]:
# Execute the below steps to clean up the table before re-running
spark.sql("truncate table daily_exchange_rates")
rerun_rows=spark.sql("select * from daily_exchange_rates")
rerun_rows.show()

+--------+--------------+--------------+---------+-------------+
|currency|latest_fx_rate|exch_rate_date|load_date|base_currency|
+--------+--------------+--------------+---------+-------------+
+--------+--------------+--------------+---------+-------------+



In [31]:
# Main Program Starts here
# load_type can be 'historical' or 'delta' if it need to be parameterized.
load_type = 'delta' 

# Call the function to get the data based on the load type. For delta, it will get the data directly from the RBA URL
# and get the data cleansed. For full, it would be from the downloaded csv file stored in the local path. Assumption is
# that full load will be initial one-off and the delta load will continue from there on a daily basis.
data = get_data(load_type)
cleansed_data = get_cleansed_data(data)
cleansed_data.rename(columns={'latest_fx_rate_date':'latest_fx_rate'}, inplace=True)

# Checking the cleansed data. Just for debugging purpose. Not needed in production code.
print(cleansed_data.head(5))

log.info("Data Cleansing Finished. Building spark session and performing validation before insertion..")

# Data validation. We do not want the rates for the same date to be loaded twice into our table.
# The below rule ensures that the load will be triggered only when the latest record from table is not
# equal to the latest information from RBA data downloaded
max_exch_rate_date_df=spark.sql("select max(exch_rate_date) as max_exch_date from daily_exchange_rates")
max_exch_rate_date = max_exch_rate_date_df.collect()[0]['max_exch_date']
if not (max_exch_rate_date==cleansed_data['exch_rate_date'].unique()[0]):
    log.info("The latest date available in table is not equal to the latest from RBA. So, inserting new RBA rates !!!")
    cleansed_data_df=spark.createDataFrame(cleansed_data)
    
    # Create dataframe writer object for the cleansed exchange rates data frame
    df_writer=pyspark.sql.DataFrameWriter(cleansed_data_df) 

    # Saving the data as table. We can include the code to write to database here.
    # We can use this to do full load every time if needed or use mode=overwrite if the table is partitioned.
    df_writer.saveAsTable("daily_exchange_rates", mode="append")
    load_count = cleansed_data_df.count()
    print('Loading is completed. Loaded a total of {load_count} records'.format(load_count=load_count))
    log.info('Loading is completed. Loaded a total of {load_count} records'.format(load_count=load_count))



               currency  latest_fx_rate exch_rate_date   load_date  \
0  United States dollar          0.7093     2019-02-22  2019-02-23   
1      Chinese renminbi          4.7736     2019-02-22  2019-02-23   
2          Japanese yen         78.5800     2019-02-22  2019-02-23   
3         European euro          0.6257     2019-02-22  2019-02-23   
4      South Korean won        798.4900     2019-02-22  2019-02-23   

  base_currency  
0           AUD  
1           AUD  
2           AUD  
3           AUD  
4           AUD  


In [30]:
rows=spark.sql("select * from daily_exchange_rates")
rows.show(100)

+--------------------+--------------+--------------+----------+-------------+
|            currency|latest_fx_rate|exch_rate_date| load_date|base_currency|
+--------------------+--------------+--------------+----------+-------------+
|Papua New Guinea ...|        2.3882|    2019-02-22|2019-02-23|          AUD|
|         Swiss franc|        0.7102|    2019-02-22|2019-02-23|          AUD|
|United Arab Emira...|        2.6049|    2019-02-22|2019-02-23|          AUD|
|     Canadian dollar|         0.939|    2019-02-22|2019-02-23|          AUD|
|Trade-weighted In...|          60.5|    2019-02-22|2019-02-23|          AUD|
|Special Drawing R...|        0.5093|    2019-02-22|2019-02-23|          AUD|
|United States dollar|        0.7093|    2019-02-22|2019-02-23|          AUD|
|    Chinese renminbi|        4.7736|    2019-02-22|2019-02-23|          AUD|
|        Japanese yen|         78.58|    2019-02-22|2019-02-23|          AUD|
|       European euro|        0.6257|    2019-02-22|2019-02-23| 

## Rerun the above script and check the count

As long the maximum of exch_rate_date equals the latest exchange rate from the downloaded daily exchange rates table, it would not load duplicate data irrespective of how many ever times it was re run.

In [32]:
rerun_rows=spark.sql("select * from daily_exchange_rates")
rerun_rows.count()

21

## Historical data load

## Steps to Cleanse the table before historical loading

"daily_exchange_rates" is the table that is created for storing the data. It is a good idea to truncate the table before the historical load and make sure the table does not have any data.

In [33]:
spark.sql("truncate table daily_exchange_rates")
rerun_rows=spark.sql("select * from daily_exchange_rates")
rerun_rows.show()

+--------+--------------+--------------+---------+-------------+
|currency|latest_fx_rate|exch_rate_date|load_date|base_currency|
+--------+--------------+--------------+---------+-------------+
+--------+--------------+--------------+---------+-------------+



In [34]:
# Main Program Starts here
# load_type can be 'historical' or 'delta' if it need to be parameterized.
load_type = 'historical' 
currency_dict={'FXRUSD': 'United States dollar',
               'FXRCR': 'Chinese renminbi',
               'FXRJY': 'Japanese yen',
               'FXREUR': 'European euro',
               'FXRSKW': 'South Korean won',
               'FXRUKPS': 'UK pound sterling',
               'FXRSD': 'Singapore dollar',
               'FXRIRE': 'Indian rupee',
               'FXRTB': 'Thai baht',
               'FXRNZD': 'New Zealand dollar',
               'FXRNTD': 'New Taiwan dollar',
               'FXRMR': 'Malaysian ringgit',
               'FXRIR': 'Indonesian rupiah',
               'FXRVD': 'Vietnamese dong',
               'FXRUAED': 'United Arab Emirates dirham',
               'FXRPNGK': 'Papua New Guinea kina',
               'FXRHKD': 'Hong Kong dollar',
               'FXRCD': 'Canadian dollar',
               'FXRSF': 'Swiss franc',
               'FXRSDR': 'Special Drawing Right',
               'FXRTWI': 'Trade-weighted Index (4pm)'}

data=get_data(load_type)

# Dropping unwanted columns and extracting only relevant rows
data.drop('FXRPHP', axis=1, inplace=True)
data.drop('FXRSARD', axis=1, inplace=True)
#print(data.columns)
cleansed_data=data.iloc[:288, 0:22]

# Renaming the column names as per the mapping dictionary currency_dict
cleansed_data.rename(columns=currency_dict, inplace=True)

# Extracting the currency column names for looping
currency_columns = cleansed_data.columns[1:]

# Additional columns
load_date = datetime.now().strftime('%Y-%m-%d')
base_currency = 'AUD'

for currency in currency_columns:
    hist_currency_data=cleansed_data.loc[:, ['Series ID', currency]]
    hist_currency_data['currency']=currency
    hist_currency_data['load_date'] = load_date
    hist_currency_data['base_currency'] = str(base_currency)
    columns_mapping={'Series ID': 'exch_rate_date',
                     currency: 'latest_fx_rate' ,
                     'currency': 'currency',
                     'load_date': 'load_date',
                     'base_currency': 'base_currency'}
    hist_currency_data.rename(columns=columns_mapping, inplace=True)
    hist_currency_data_df=spark.createDataFrame(hist_currency_data)
    
    # Create dataframe writer object for the cleansed historical exchange rates data frame
    df_hist_writer=pyspark.sql.DataFrameWriter(hist_currency_data_df) 

    # Saving the data as table. We can include the code to write to database here.
    df_hist_writer.saveAsTable("daily_exchange_rates", mode="append")
    load_count = hist_currency_data_df.count()
    print('Loading is completed. Loaded a total of {hist_currency_data_df} records for {currency}'.format(hist_currency_data_df=load_count, currency=currency))
    log.info('Loading is completed. Loaded a total of {hist_currency_data_df} records'.format(hist_currency_data_df=load_count))

Loading is completed. Loaded a total of 288 records for United States dollar
Loading is completed. Loaded a total of 288 records for Trade-weighted Index (4pm)
Loading is completed. Loaded a total of 288 records for Chinese renminbi
Loading is completed. Loaded a total of 288 records for Japanese yen
Loading is completed. Loaded a total of 288 records for European euro
Loading is completed. Loaded a total of 288 records for South Korean won
Loading is completed. Loaded a total of 288 records for UK pound sterling
Loading is completed. Loaded a total of 288 records for Singapore dollar
Loading is completed. Loaded a total of 288 records for Indian rupee
Loading is completed. Loaded a total of 288 records for Thai baht
Loading is completed. Loaded a total of 288 records for New Zealand dollar
Loading is completed. Loaded a total of 288 records for New Taiwan dollar
Loading is completed. Loaded a total of 288 records for Malaysian ringgit
Loading is completed. Loaded a total of 288 record

## Data Vaidation for Historical Load

In [35]:
hist_load_rows=spark.sql("select * from daily_exchange_rates where currency='United States dollar'\
                         order by exch_rate_date asc")
hist_load_rows.count()


288

In [36]:
hist_load_rows.show()

+--------------------+--------------+--------------+----------+-------------+
|            currency|latest_fx_rate|exch_rate_date| load_date|base_currency|
+--------------------+--------------+--------------+----------+-------------+
|United States dollar|        0.7407|   01-Aug-2018|2019-02-23|          AUD|
|United States dollar|        0.8044|   01-Feb-2018|2019-02-23|          AUD|
|United States dollar|        0.7241|   01-Feb-2019|2019-02-23|          AUD|
|United States dollar|        0.7541|   01-Jun-2018|2019-02-23|          AUD|
|United States dollar|        0.7731|   01-Mar-2018|2019-02-23|          AUD|
|United States dollar|         0.754|   01-May-2018|2019-02-23|          AUD|
|United States dollar|         0.713|   01-Nov-2018|2019-02-23|          AUD|
|United States dollar|        0.7385|   02-Aug-2018|2019-02-23|          AUD|
|United States dollar|        0.7997|   02-Feb-2018|2019-02-23|          AUD|
|United States dollar|        0.7837|   02-Jan-2018|2019-02-23| 

In [37]:
hist_load_rows=spark.sql("select * from daily_exchange_rates")
hist_load_rows.count() # 21 currencies * 288 rows per currency

6048