In [0]:
#Install pytest if necessary
%pip install pytest

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


## Import Functions and Register UDF's
We define the necessary functions to transform the data.

In [0]:
from decimal import Decimal
from pyspark.sql.types import StringType, IntegerType, FloatType, BooleanType, DateType
from datetime import datetime
from pyspark.sql.functions import col, udf, lit, when, round, sum as _sum
import pandas as pd
import my_functions

# Register UDFs
remove_leading_zeros_udf = udf(my_functions.remove_leading_zeros, StringType())
convert_to_yn_udf = udf(my_functions.convert_to_yn, StringType())
calculate_amount_eur_udf = udf(my_functions.calculate_amount_eur, FloatType())
yn_to_bool_udf = udf(my_functions.yn_to_bool, BooleanType())
determine_enterprise_size_udf = udf(my_functions.determine_enterprise_size, StringType())
to_date_udf = udf(my_functions.to_date, DateType())

## Setup and Loading Tables
We set up the environment and load the necessary tables from the catalog into DataFrames.


In [0]:
# Load tables from the catalog into DataFrames
source1 = spark.table("source_1")
source2 = spark.table("source_2")
client_secured = spark.table("client_secured_ind")
exchange_rates = spark.table("exchange_rates")

## Transform and Rename Source DataFrames
We renames columns and applie transformations to the `source1`, `source2` and `client_secured` DataFrames.

In [0]:
# Transform and rename source1 DataFrame
source1_transformed = source1 \
    .withColumnRenamed('DataSource', 'SourceSystem') \
    .withColumnRenamed('ClientAmount', 'OriginalAmount') \
    .withColumnRenamed('Currency', 'OriginalCurrency') \
    .withColumnRenamed('ClientSince', 'OnboardingDate') \
    .withColumnRenamed('Location', 'Country') \
    .withColumnRenamed('EligibleForDiscount', 'DiscountCheck') \
    .withColumn('ClientNumber', remove_leading_zeros_udf(col('ClientNumber'))) \
    .withColumn('DiscountCheck', convert_to_yn_udf(col('DiscountCheck')))

# Transform and rename source2 DataFrame
source2_transformed = source2 \
    .withColumnRenamed('SourceSystem', 'SourceSystem') \
    .withColumnRenamed('ClientAmount', 'OriginalAmount') \
    .withColumnRenamed('Currency', 'OriginalCurrency') \
    .withColumnRenamed('ClientSince', 'OnboardingDate') \
    .withColumnRenamed('HasLoan', 'LoanCheck') \
    .withColumn('ClientNumber', remove_leading_zeros_udf(col('ClientNumber'))) \
    .withColumn('LoanCheck', convert_to_yn_udf(col('LoanCheck')))

# Remove duplicates in client_secured DataFrame
client_secured = client_secured.drop_duplicates(subset=['ClientNumber'])

## Handle Missing Columns and Concatenate DataFrames
We add missing columns with default values and concatenate the transformed DataFrames.

In [0]:
# Add missing columns with default values
source1_transformed = source1_transformed.withColumn('LoanCheck', lit('NA'))
source2_transformed = source2_transformed \
    .withColumn('Country', lit(None).cast(StringType())) \
    .withColumn('DiscountCheck', lit('NA'))

# Select final columns
final_columns = [
    'SourceSystem', 'ClientGroup', 'ClientNumber', 'OriginalAmount', 'OriginalCurrency',
    'NumberOfEmployees', 'OnboardingDate', 'LoanCheck', 'Country', 'DiscountCheck', 'SnapshotDate']

source1_transformed = source1_transformed.select(final_columns)
source2_transformed = source2_transformed.select(final_columns)

# Concatenate DataFrames using union
final_df = source1_transformed.union(source2_transformed)

## Data Cleaning and Join Operations
This section performs data cleaning and joins with the `exchange_rates` and `client_secured` DataFrames.

In [0]:
# Replace incorrect dates in the OnboardingDate column
final_df = final_df.withColumn('OnboardingDate', when(col('OnboardingDate') == '2010-05-111', '2010-05-11').otherwise(col('OnboardingDate')))

# Convert OriginalAmount to float
final_df = final_df.withColumn('OriginalAmount', col('OriginalAmount').cast('float'))

# Rename SnapshotDate in exchange_rates to avoid duplication
exchange_rates = exchange_rates.withColumnRenamed("SnapshotDate", "ExchangeRateSnapshotDate")

# Join with exchange_rates DataFrame
final_df = final_df.join(exchange_rates, 
                         on=[final_df.OriginalCurrency == exchange_rates.Currency, 
                             final_df.SnapshotDate == exchange_rates.ExchangeRateSnapshotDate], 
                         how='left')

# Apply calculate_amount_eur function
final_df = final_df.withColumn('AmountEUR', calculate_amount_eur_udf(col('OriginalAmount'), col('OriginalCurrency'), col('ExchangeRate')))

# Select desired columns
final_df = final_df.select('SourceSystem', 'ClientGroup', 'ClientNumber', 'OriginalAmount', 'OriginalCurrency',
                           'NumberOfEmployees', 'OnboardingDate', 'LoanCheck', 'Country', 'DiscountCheck', 
                           'SnapshotDate', 'AmountEUR')

# Drop unnecessary columns
final_df = final_df.drop('Currency', 'ExchangeRate', 'ExchangeRateSnapshotDate')

# Remove duplicates in client_secured DataFrame
client_secured = client_secured.drop_duplicates(subset=['ClientNumber'])

# Rename ClientNumber in client_secured to avoid duplication
client_secured = client_secured.withColumnRenamed("ClientNumber", "ClientSecuredClientNumber")

# Join with client_secured DataFrame
final_df = final_df.join(client_secured, on=final_df.ClientNumber == client_secured.ClientSecuredClientNumber, how='left')

# Convert ClientSecuredInd to boolean
final_df = final_df.withColumn('ClientSecuredIND', yn_to_bool_udf(col('ClientSecuredInd')))

# Drop temporary ClientSecuredClientNumber column
final_df = final_df.drop('ClientSecuredClientNumber')

# Convert 'NumberOfEmployees' column to int
final_df = final_df.withColumn('NumberOfEmployees', col('NumberOfEmployees').cast('int'))

# Apply the determine enterprise size function as a new column
final_df = final_df.withColumn('EnterpriseSize', determine_enterprise_size_udf(col('NumberOfEmployees')))

## Calculate Secured Amount and Convert to Pandas DataFrame
This section calculates the secured amount and converts the DataFrame to a Pandas DataFrame for further processing.

In [0]:
# Collect data to driver
data = final_df.collect()

# Create a Pandas DataFrame to apply the logic
pdf = pd.DataFrame(data, columns=final_df.columns)

# Apply calculate_secured_amount function
pdf = my_functions.calculate_secured_amount(pdf)

# Create a new PySpark DataFrame from the modified Pandas DataFrame
final_df = spark.createDataFrame(pdf)

In [0]:
# Convert columns to appropriate data types
final_df = final_df.withColumn('SourceSystem', col('SourceSystem').cast(StringType()))
final_df = final_df.withColumn('ClientGroup', col('ClientGroup').cast(IntegerType()))
final_df = final_df.withColumn('ClientNumber', col('ClientNumber').cast(IntegerType()))
final_df = final_df.withColumn('OriginalCurrency', col('OriginalCurrency').cast(StringType()))
final_df = final_df.withColumn('NumberOfEmployees', col('NumberOfEmployees').cast(IntegerType()))
final_df = final_df.withColumn('OnboardingDate', to_date_udf(col('OnboardingDate')))
final_df = final_df.withColumn('LoanCheck', col('LoanCheck').cast(StringType()))
final_df = final_df.withColumn('Country', col('Country').cast(StringType()))
final_df = final_df.withColumn('DiscountCheck', col('DiscountCheck').cast(StringType()))
final_df = final_df.withColumn('SnapshotDate', to_date_udf(col('SnapshotDate')))
final_df = final_df.withColumn('OriginalAmount', round(col('OriginalAmount').cast(FloatType()), 2))
final_df = final_df.withColumn('AmountEUR', round(col('AmountEUR').cast(FloatType()), 2))
final_df = final_df.withColumn('ClientSecuredIND', col('ClientSecuredIND').cast(BooleanType()))
final_df = final_df.withColumn('EnterpriseSize', col('EnterpriseSize').cast(StringType()))
final_df = final_df.withColumn('SecuredAmountEUR', round(col('SecuredAmountEUR').cast(FloatType()), 2))

In [0]:
final_df.createOrReplaceTempView("temp_table")

# Drop the table if it exists
spark.sql("DROP TABLE IF EXISTS hive_metastore.default.financialtransactions")

# Create the table with explicit ordering
spark.sql("""
    CREATE TABLE hive_metastore.default.financialtransactions 
    USING delta 
    PARTITIONED BY (SnapshotDate, ClientGroup) 
    AS SELECT * FROM temp_table 
    ORDER BY ClientGroup, ClientNumber
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
SELECT * FROM financialtransactions
ORDER BY ClientGroup, ClientNumber

SourceSystem,ClientGroup,ClientNumber,OriginalAmount,OriginalCurrency,NumberOfEmployees,OnboardingDate,LoanCheck,Country,DiscountCheck,SnapshotDate,AmountEUR,ClientSecuredIND,EnterpriseSize,SecuredAmountEUR
Source2,7,7421,4332.89,SEK,123,2011-06-12,Y,,,2023-02-07,368.3,True,M,368.3
Source2,7,127382,459120.0,EUR,19,1994-07-23,N,,,2023-02-07,459120.0,True,S,459120.0
Source1,10,12,459120.0,EUR,19,1994-07-23,,BE,N,2023-02-06,459120.0,True,S,459120.0
Source1,10,764,134078.0,EUR,3792,2020-04-12,,DE,Y,2023-02-06,134078.0,True,L,40880.0
Source1,11,901,78309.0,EUR,1048,2015-02-16,,NL,N,2023-02-06,78309.0,True,L,78309.0
Source1,11,1298,325280.0,EUR,40,2010-05-11,,NL,Y,2023-02-06,325280.0,True,S,325280.0
Source1,11,6345,15045.0,EUR,551,2010-04-12,,NL,Y,2023-02-06,15045.0,True,M,15045.0
Source1,11,664521,14783.09,USD,2200,2019-03-17,,FR,N,2023-02-06,14595.34,True,L,14595.34
Source2,17,442,259000.0,EUR,1654,2019-04-12,N,,,2023-02-07,259000.0,False,L,0.0
Source2,17,764,134078.0,EUR,3792,2020-04-12,Y,,,2023-02-07,134078.0,True,L,134078.0


## Questions

In [0]:
%sql
--1. Value of SecuredAmount for Client_Number 6991
SELECT ClientNumber, SecuredAmountEUR FROM financialtransactions
WHERE ClientNumber = 6991;


ClientNumber,SecuredAmountEUR
6991,0.0


In [0]:
%sql
--2. Number of records where EnterprizeSize is M
SELECT EnterpriseSize, Count(*) AS Qnt FROM financialtransactions
WHERE EnterpriseSize = 'M'
GROUP BY EnterpriseSize;

EnterpriseSize,Qnt
M,6


In [0]:
%sql
--3. Number of records where EnterprizeSize is M
SELECT SourceSystem, round(Sum(AmountEUR),2) FROM financialtransactions
WHERE SourceSystem = 'Source2'
GROUP BY SourceSystem;

SourceSystem,"round(sum(AmountEUR), 2)"
Source2,1800631.68


In [0]:
%sql
--4. Number of records where EnterprizeSize is M
SELECT ClientSecuredIND, round(Sum(OriginalAmount),2) FROM financialtransactions
WHERE ClientSecuredIND = FALSE
GROUP BY ClientSecuredIND;