In [1]:
# Author : Danilo Zocco
# Date : 2022-05-15
#
#
# Versions :
# - Python 3.10.2
# - pyspark 3.2.1
# - findspark 2.0.1
#
#
# Assumptions :
# - Variables can be reused across exercises and there is no need to start from scratch each time
# - There is a 1-to-many relationship between customer and account
#     - Each customer has at least one account
#     - Each account belongs to an existing customer
# - For every currency present in the system there is an exchange rate column in fx_rate
#

In [2]:
from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType
from pyspark.sql.functions import col, expr, round, sequence, to_date, explode, when
import findspark

In [3]:
tday = date.today() # This date is assumed to be the same throughout the exercises

findspark.init()
spark = SparkSession.builder.appName('Luxoft - Home Assignement').getOrCreate()
spark

## Get dummy data

This is not part of the solution to the exercises. It is only used for the purpose of developement.

In [4]:
schema_customer = StructType([
    StructField('customer_id', StringType(), True),
    StructField('name', StringType(), True),
    StructField('surname', StringType(), True),
    StructField('country_id', StringType(), True),
    StructField('date_of_birth', DateType(), True)
])

schema_account = StructType([
    StructField('account_id', StringType(), True),
    StructField('customer_id', StringType(), True),
    StructField('currency', StringType(), True),
    StructField('balance', DoubleType(), True)
])

schema_fxrate = StructType([
    StructField('fx_date', DateType(), True),
    StructField('CHF_USD', DoubleType(), True),
    StructField('EUR_USD', DoubleType(), True),
    StructField('JPY_USD', DoubleType(), True),
    StructField('AUD_USD', DoubleType(), True),
    StructField('GBP_USD', DoubleType(), True),
])

In [5]:
customer = spark.read.schema(schema_customer).csv('customer.csv', header=True)
account = spark.read.schema(schema_account).csv('account.csv', header=True)
fx_rate = spark.read.schema(schema_fxrate).csv('fx_rate.csv', header=True)

In [6]:
print('Customer')
customer.printSchema()

print('Account')
account.printSchema()

print('FX Rate')
fx_rate.printSchema()

Customer
root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- country_id: string (nullable = true)
 |-- date_of_birth: date (nullable = true)

Account
root
 |-- account_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- balance: double (nullable = true)

FX Rate
root
 |-- fx_date: date (nullable = true)
 |-- CHF_USD: double (nullable = true)
 |-- EUR_USD: double (nullable = true)
 |-- JPY_USD: double (nullable = true)
 |-- AUD_USD: double (nullable = true)
 |-- GBP_USD: double (nullable = true)



In [7]:
print('customer')
customer.show()

customer
+-----------+------+---------+----------+-------------+
|customer_id|  name|  surname|country_id|date_of_birth|
+-----------+------+---------+----------+-------------+
|          1|  Enzo|  Ferrari|        IT|   1980-01-01|
|          2|  John|      Doe|        US|   1990-01-01|
|          3|George|    Black|        UK|   1970-01-01|
|          4|Alvaro|  Sanchez|        MX|   1964-01-01|
|          5| Louis|   Dupont|        FR|   1980-01-01|
|          6|  John|    White|        US|   1977-01-01|
|          7|Thomas|Schneider|        CH|   1978-01-01|
|          8| Ramon|   Blanco|        ES|   1983-01-01|
|          9|  Emma|  Laurent|        FR|   1985-01-01|
|         10|Oliver|     Lahm|        DE|   1950-01-01|
+-----------+------+---------+----------+-------------+



In [8]:
print('account')
account.show()

account
+----------+-----------+--------+----------+
|account_id|customer_id|currency|   balance|
+----------+-----------+--------+----------+
|         1|         10|     EUR| 108999.21|
|         2|         10|     CHF|8910772.81|
|         3|          2|     USD|  189000.0|
|         4|          3|     JPY| 3587612.0|
|         5|          4|     USD|   87688.0|
|         6|          5|     CHF| 124000.49|
|         7|          6|     AUD|  126987.9|
|         8|          6|     EUR|    1000.0|
|         9|          7|     EUR|  34559.89|
|        10|          8|     EUR| 458089.77|
|        11|          8|     USD|       0.0|
|        12|          1|     CHF|    1000.5|
|        13|          9|     AUD|    710.62|
+----------+-----------+--------+----------+



In [9]:
print('fx_rate')
fx_rate.show()

fx_rate
+----------+-------+-------+-------+-------+-------+
|   fx_date|CHF_USD|EUR_USD|JPY_USD|AUD_USD|GBP_USD|
+----------+-------+-------+-------+-------+-------+
|2022-01-07|1.08667|1.15065|0.00871|   null|   null|
|2022-01-06|   null|   null|   null|   null|   null|
|2022-01-05|   null|   null|   null|   null|   null|
|2022-01-04|1.08453|1.14993|0.00856|   null|   null|
|2022-01-03|   null|   null|   null|   null|   null|
|2022-01-02|1.08246|1.14898|   null|   null|   null|
|2022-01-01|1.08345|1.14508|0.00889|   null|   null|
|2000-01-01|   1.05|    1.2|   0.05|    1.1|    1.3|
+----------+-------+-------+-------+-------+-------+



## Exercise 1

Since I was not provided with the full set of currencies available in the system, I will proceed in a way that does not require any specific currency to be present.
I will first unpivot the fx_rate dataframe, then fill NAs with a forward fill, and finally pivot the dataframe back to one currency per column.

In [10]:
# Build a sequence of daily date from 2000-01-01 to today
# This will be used to fill the date gaps in fx_rate
sql_qry = f"SELECT sequence(to_date('2000-01-01'), to_date('{tday}'), interval 1 day) AS date"
date_full = spark.sql(sql_qry).withColumn('date', explode(col('date')))

print('date_full')
date_full.show()

date_full
+----------+
|      date|
+----------+
|2000-01-01|
|2000-01-02|
|2000-01-03|
|2000-01-04|
|2000-01-05|
|2000-01-06|
|2000-01-07|
|2000-01-08|
|2000-01-09|
|2000-01-10|
|2000-01-11|
|2000-01-12|
|2000-01-13|
|2000-01-14|
|2000-01-15|
|2000-01-16|
|2000-01-17|
|2000-01-18|
|2000-01-19|
|2000-01-20|
+----------+
only showing top 20 rows



In [11]:
# Unpivot the fx_rate dataframe and save it into fx_rate_unpivot

## Get the list of currencies in the system 
fx_currency = fx_rate.columns[1:]

## Build string of columns to unpivot with stack()
str_currency = [f"'{c}',{c}" for c in fx_currency]
str_stack = ','.join(str_currency)
unpivotExpr = f"stack({len(fx_currency)}, {str_stack}) as (cross_currency, rate)"

## Fill missing dates and unpivot
fx_rate_unpivot = date_full \
    .join(fx_rate, date_full.date == fx_rate.fx_date, 'leftouter') \
    .select(col('date').alias('fx_date'), expr(unpivotExpr))

print('fx_rate_unpivot')
fx_rate_unpivot.show()

fx_rate_unpivot
+----------+--------------+----+
|   fx_date|cross_currency|rate|
+----------+--------------+----+
|2000-01-01|       CHF_USD|1.05|
|2000-01-01|       EUR_USD| 1.2|
|2000-01-01|       JPY_USD|0.05|
|2000-01-01|       AUD_USD| 1.1|
|2000-01-01|       GBP_USD| 1.3|
|2000-01-02|       CHF_USD|null|
|2000-01-02|       EUR_USD|null|
|2000-01-02|       JPY_USD|null|
|2000-01-02|       AUD_USD|null|
|2000-01-02|       GBP_USD|null|
|2000-01-03|       CHF_USD|null|
|2000-01-03|       EUR_USD|null|
|2000-01-03|       JPY_USD|null|
|2000-01-03|       AUD_USD|null|
|2000-01-03|       GBP_USD|null|
|2000-01-04|       CHF_USD|null|
|2000-01-04|       EUR_USD|null|
|2000-01-04|       JPY_USD|null|
|2000-01-04|       AUD_USD|null|
|2000-01-04|       GBP_USD|null|
+----------+--------------+----+
only showing top 20 rows



In [12]:
# Fill NAs by forward filling
sql_qry = """
    SELECT
        fx_date,
        cross_currency,
        last(rate, true) OVER (PARTITION BY cross_currency ORDER BY fx_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rate
    FROM fx_rate_unpivot;"""

fx_rate_unpivot.createOrReplaceTempView('fx_rate_unpivot')
fx_rate_unpivot_clean = spark.sql(sql_qry)

print('fx_rate_unpivot_clean')
fx_rate_unpivot_clean.show()

fx_rate_unpivot_clean
+----------+--------------+----+
|   fx_date|cross_currency|rate|
+----------+--------------+----+
|2000-01-01|       AUD_USD| 1.1|
|2000-01-02|       AUD_USD| 1.1|
|2000-01-03|       AUD_USD| 1.1|
|2000-01-04|       AUD_USD| 1.1|
|2000-01-05|       AUD_USD| 1.1|
|2000-01-06|       AUD_USD| 1.1|
|2000-01-07|       AUD_USD| 1.1|
|2000-01-08|       AUD_USD| 1.1|
|2000-01-09|       AUD_USD| 1.1|
|2000-01-10|       AUD_USD| 1.1|
|2000-01-11|       AUD_USD| 1.1|
|2000-01-12|       AUD_USD| 1.1|
|2000-01-13|       AUD_USD| 1.1|
|2000-01-14|       AUD_USD| 1.1|
|2000-01-15|       AUD_USD| 1.1|
|2000-01-16|       AUD_USD| 1.1|
|2000-01-17|       AUD_USD| 1.1|
|2000-01-18|       AUD_USD| 1.1|
|2000-01-19|       AUD_USD| 1.1|
|2000-01-20|       AUD_USD| 1.1|
+----------+--------------+----+
only showing top 20 rows



In [13]:
# Pivot fx_rate_unpivot_clean back to the original schema
fx_rate_clean = fx_rate_unpivot_clean \
    .groupBy('fx_date') \
    .pivot('cross_currency', fx_currency) \
    .min('rate')

print('fx_rate_clean')
fx_rate_clean.sort('fx_date').show()
fx_rate_clean.printSchema()

fx_rate_clean
+----------+-------+-------+-------+-------+-------+
|   fx_date|CHF_USD|EUR_USD|JPY_USD|AUD_USD|GBP_USD|
+----------+-------+-------+-------+-------+-------+
|2000-01-01|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-02|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-03|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-04|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-05|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-06|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-07|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-08|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-09|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-10|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-11|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-12|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-13|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-14|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-15|   1.05|    1.2|   0.05|    1.1|    1.3|
|2000-01-16|   1.05|    1.2|   0

In [14]:
# Original table for comparison
print('fx_rate')
fx_rate.sort('fx_date').show()
fx_rate.printSchema()

fx_rate
+----------+-------+-------+-------+-------+-------+
|   fx_date|CHF_USD|EUR_USD|JPY_USD|AUD_USD|GBP_USD|
+----------+-------+-------+-------+-------+-------+
|2000-01-01|   1.05|    1.2|   0.05|    1.1|    1.3|
|2022-01-01|1.08345|1.14508|0.00889|   null|   null|
|2022-01-02|1.08246|1.14898|   null|   null|   null|
|2022-01-03|   null|   null|   null|   null|   null|
|2022-01-04|1.08453|1.14993|0.00856|   null|   null|
|2022-01-05|   null|   null|   null|   null|   null|
|2022-01-06|   null|   null|   null|   null|   null|
|2022-01-07|1.08667|1.15065|0.00871|   null|   null|
+----------+-------+-------+-------+-------+-------+

root
 |-- fx_date: date (nullable = true)
 |-- CHF_USD: double (nullable = true)
 |-- EUR_USD: double (nullable = true)
 |-- JPY_USD: double (nullable = true)
 |-- AUD_USD: double (nullable = true)
 |-- GBP_USD: double (nullable = true)



## Exercise 2

In [15]:
# Get a subset of fx rate for today
fx_rate_tday = fx_rate_unpivot_clean \
    .filter(fx_rate_unpivot_clean.fx_date == tday) \
    .withColumn('currency', col('cross_currency')[0:3]) \
    .select('currency','rate')

# Add USD with a rate of 1.0
rowUSD = spark.createDataFrame([['USD', 1.0]])
fx_rate_tday = fx_rate_tday.union(rowUSD)

print('fx_rate_tday')
fx_rate_tday.show()

fx_rate_tday
+--------+-------+
|currency|   rate|
+--------+-------+
|     GBP|    1.3|
|     JPY|0.00871|
|     AUD|    1.1|
|     CHF|1.08667|
|     EUR|1.15065|
|     USD|    1.0|
+--------+-------+



In [16]:
# Convert balances to USD and aggregate by customer_id
customer_balance_USD = account \
    .join(fx_rate_tday, fx_rate_tday.currency == account.currency, 'leftouter') \
    .withColumn('balance_USD', col('balance') * col('rate')) \
    .groupBy('customer_id') \
    .sum('balance_USD') \
    .select('customer_id',
            round('sum(balance_USD)', 2).alias('tot_USD_balance'))

print('customer_balance_USD')
customer_balance_USD.show()

customer_balance_USD
+-----------+---------------+
|customer_id|tot_USD_balance|
+-----------+---------------+
|          7|       39766.34|
|          3|        31248.1|
|          8|      527100.99|
|          5|      134747.61|
|          6|      140837.34|
|          9|         781.68|
|          1|        1087.21|
|         10|     9808489.43|
|          4|        87688.0|
|          2|       189000.0|
+-----------+---------------+



## Exercise 3

In [17]:
# Get subset of customer with their local currency as per mapping in the instructions
df_cust = customer \
    .select('customer_id',
            when(customer.country_id == 'IT', 'EUR')
            .when(customer.country_id == 'FR', 'EUR')
            .when(customer.country_id == 'DE', 'EUR')
            .when(customer.country_id == 'CH', 'CHF')
            .when(customer.country_id == 'UK', 'GBP')
            .when(customer.country_id == 'JP', 'JPY')
            .otherwise('USD')
            .alias('local_currency'))

print('df_cust')
df_cust.show()

df_cust
+-----------+--------------+
|customer_id|local_currency|
+-----------+--------------+
|          1|           EUR|
|          2|           USD|
|          3|           GBP|
|          4|           USD|
|          5|           EUR|
|          6|           USD|
|          7|           CHF|
|          8|           USD|
|          9|           EUR|
|         10|           EUR|
+-----------+--------------+



In [18]:
# Convert USD balances to local currency
# Reuse customer_balance_USD from Exercise 2
customer_balance_local = df_cust \
    .join(customer_balance_USD, customer_balance_USD.customer_id == customer.customer_id, 'leftouter') \
    .join(fx_rate_tday, fx_rate_tday.currency == df_cust.local_currency, 'leftouter') \
    .withColumn('tot_balance', customer_balance_USD.tot_USD_balance / fx_rate_tday.rate) \
    .select(df_cust.customer_id,
            df_cust.local_currency,
            round('tot_balance', 2).alias('tot_balance'))

print('customer_balance_local')
customer_balance_local.show()

customer_balance_local
+-----------+--------------+-----------+
|customer_id|local_currency|tot_balance|
+-----------+--------------+-----------+
|          7|           CHF|   36594.68|
|          5|           EUR|  117105.64|
|          9|           EUR|     679.34|
|          1|           EUR|     944.87|
|         10|           EUR| 8524303.16|
|          3|           GBP|    24037.0|
|          8|           USD|  527100.99|
|          6|           USD|  140837.34|
|          4|           USD|    87688.0|
|          2|           USD|   189000.0|
+-----------+--------------+-----------+

