In [406]:
from dotenv import load_dotenv
import os

load_dotenv(dotenv_path="../.env")

True

In [407]:
from itertools import count
from snowflake.snowpark import Session

connection_parameters = {
    "account": "YUJMLNP-YOB51920",
    "user": os.environ["SNOWFLAKE_USER"],
    "password": os.environ["SNOWFLAKE_PASSWORD"],
    "role": os.environ.get("SNOWFLAKE_ROLE"),
    "warehouse": "COMPUTE_WH",
    "database": "NBU_EXCHANGE",
    "schema": "SILVER"
}

session = Session.builder.configs(connection_parameters).create()

from snowflake.snowpark import Window
from snowflake.snowpark.functions import \
    col, \
    when, \
    trim, \
    to_date, \
    lit, \
    sql_expr, \
    count, \
    upper, \
    coalesce, \
    first_value, \
    row_number, \
    lead

exchange_rate_extracted_df = session.table("nbu_exchange.silver.exchange_rate_extracted")


UPPER AND TRIM STRING COLUMNS

In [408]:
# Upper and trim string columns
def clean_string_columns(df):
    # Identify string columns
    string_cols = [field.name for field in df.schema.fields if "StringType" in str(field.datatype)]
    
    # Build a list of columns: apply upper and trim to string columns, leave others as is
    new_cols = [
        upper(trim(col(c))).alias(c) if c in string_cols else col(c)
        for c in df.columns
    ]
    
    # Return the transformed DataFrame
    return df.select(new_cols)

exchange_rate_extracted_df_upper_trim = clean_string_columns(exchange_rate_extracted_df)

CAST TO DATE TYPE  

In [409]:
# Cast date columns to date type and handle null or empty values for calculation_date
exchange_rate_extracted_df_date_cast = exchange_rate_extracted_df_upper_trim \
    .with_column(
        "calculation_date",
        when(
            (trim(col("calculation_date")) == lit('')) | col("calculation_date").is_null(),
            to_date(lit("01.01.1900"), "DD.MM.YYYY")
        ).otherwise(
            to_date(trim(col("calculation_date")), "DD.MM.YYYY")
        )
    ) \
    .with_column(
        "exchange_date",
        to_date(col("exchange_date"), "DD.MM.YYYY")
    )


CURRENCY_NAME COLUMN CLEANING (FILL IN NULL VALUES, NAMES UNIFICATION)


In [410]:
# Fill in nulls in currency_name with value from currency_name that has the same code
windows = Window.partition_by(trim(upper(col("currency_code")))).order_by(col("currency_name").desc())

exchange_rate_extracted_df_currency_name_nulls = exchange_rate_extracted_df_date_cast \
    .with_column(
        "currency_name",
        coalesce(
            col("currency_name"),
            first_value(col("currency_name")).over(windows)
        )
    )

In [411]:
iso_4217_currency_codes_df = session.table("nbu_exchange.silver.iso_4217_currencies")

In [430]:
iso_4217_currency_codes_df_renamed = iso_4217_currency_codes_df \
    .select(
        col("currency_name").alias("iso_currency_name"),
        col("currency_code").alias("iso_currency_code"),
        col("withdrawal_date")
    )

In [434]:
# Join with iso_4217_currency_codes_df_renamed to get iso_currency_name
# Fill in nulls in currency_name with iso_currency_name,
# Drop iso_currency_name, withdrawal_date 
# P.S. only code PEN has 2 options of naming, I choose the one with withdrawal_date null
exchange_rate_extracted_df_currency_name_nulls = exchange_rate_extracted_df_currency_name_nulls \
    .join(
        iso_4217_currency_codes_df_renamed,
        exchange_rate_extracted_df_currency_name_nulls.currency_code == iso_4217_currency_codes_df_renamed.iso_currency_code,
        "left"
    ) \
    .select(
        exchange_rate_extracted_df_currency_name_nulls["*"],
        iso_4217_currency_codes_df_renamed["iso_currency_name"],
    ) \
    .withColumn("currency_name", coalesce(col("currency_name"), col("iso_currency_name"))) \
    .drop("iso_currency_code","iso_currency_name", "withdrawal_date")

CURRENCY_CODE DEDUPLICATION

1ST LEVEL - CURRENCY_CODE & CURRENCY_NAME

CURRENCY_CODE AND CURRENCY_NAME UNIFICATION

In [443]:
# Update currency_code and currency name for USM Узбецький сум
exchange_rate_extracted_df_uzs = exchange_rate_extracted_df_currency_name_nulls \
    .withColumn("currency_code", when(col("currency_code") == "USM", "UZS").otherwise(col("currency_code"))) \
    .withColumn("currency_name", 
                when((col("currency_code") == "UZS") 
                     & ((col("currency_name") != "UZBEKISTAN SUM") | (col("currency_name").isNull())), 
                     "UZBEKISTAN SUM") \
                     .otherwise(col("currency_name"))) 

In [444]:
# Update currency_code and currency name for WDR Вiрменський драм

exchange_rate_extracted_df_amd = exchange_rate_extracted_df_uzs \
    .withColumn("currency_code", when(col("currency_code") == "WDR", "AMD").otherwise(col("currency_code"))) \
    .withColumn("currency_name", when((col("currency_code") == "AMD") & col("currency_name").isNull(), "ARMENIAN DRAM") \
                .otherwise(col("currency_name")))

In [445]:
# Update currency_code and currency name for SDR Спецiальнi права запозичення
# Where currency_code = 'SDR' or currency_code = 'XDR' or r030_code = 960

exchange_rate_extracted_df_xdr = exchange_rate_extracted_df_amd \
    .withColumn("currency_code", 
                when(col("currency_code") == "SDR", "XDR").otherwise(col("currency_code"))) \
    .withColumn("currency_name", 
                when((col("currency_code") == "XDR") & col("currency_name").isNull(), "SDR (SPECIAL DRAWING RIGHT)") \
                .otherwise(col("currency_name"))) \
    .withColumn("currency_name_ua", 
                when(col("currency_code") == "XDR", "СПЗ (СПЕЦІАЛЬНІ ПРАВА ЗАПОЗИЧЕННЯ)") \
                .otherwise(col("currency_name_ua")))


In [448]:
exchange_rate_extracted_df_usd = exchange_rate_extracted_df_xdr \
    .withColumn("currency_name", when((col("currency_code") == "USD"), "US DOLLAR") \
                .otherwise(col("currency_name"))) 

2ND LEVEL - CURRENCY_CODE & CURRENCY_NAME & CURRENCY_NAME_UA

CURRENCY_NAME_UA UNIFICATION

In [455]:
exchange_rate_extracted_df_usd \
    .select("currency_code", "currency_name", "currency_name_ua") \
    .distinct() \
    .groupBy("currency_code") \
    .count() \
    .filter(col("count") > 1) \
    .orderBy(col("count").desc()) \
    .show(50)

exchange_rate_extracted_df_usd \
    .select("currency_code", "currency_name", "currency_name_ua") \
    .distinct() \
    .where(col("currency_code") == "SIT")\
    .show(10)  

-----------------------------
|"CURRENCY_CODE"  |"COUNT"  |
-----------------------------
|SAR              |3        |
|SIT              |3        |
|TMT              |3        |
|GEL              |3        |
|ROL              |3        |
|TJS              |3        |
|GRD              |3        |
|FIM              |2        |
|CZK              |2        |
|KZT              |2        |
|KWD              |2        |
|TMM              |2        |
|MTL              |2        |
|ATS              |2        |
|HUF              |2        |
|MXN              |2        |
|LTL              |2        |
|BGL              |2        |
|DEM              |2        |
|TRL              |2        |
|HRK              |2        |
|GBP              |2        |
|KRW              |2        |
|HKD              |2        |
|AMD              |2        |
|BYN              |2        |
|ITL              |2        |
|NLG              |2        |
|XOF              |2        |
|ISK              |2        |
|EEK      

stopped above

In [417]:
# r030 codes mappping from nbu site
r030_code_mapping = session.table("nbu_exchange.bronze.r030_csv_raw")

r030_code_mapping.show(5)


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"PR"  |"R030"  |"K040"  |"A3"  |"R031"  |"R032"  |"R033"  |"R034"  |"R035"  |"GR"  |"KOD_LIT"  |"LOD_NUM"  |"CURRENCY_NAME_UA"   |"NOMIN"  |"NAIM"               |"D_OPEN"    |"D_CLOSE"   |"D_MODI"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|F     |004     |004     |AFA   |3       |9       |3       |2       |9       |NULL  |AFA        |252        |Афгані (Афганістан)  |0        |афгані (Афганістан)  |1/1/1998    |1/2/2013    |NULL      |
|F     |008     |008     |ALL   |3       |9       |3       |2       |9       |NULL  |ALL        |583        |Лек                  |0        |лек                  |1/1/1998    |NULL        |5/4/201

In [418]:
# Join with r030_code_mapping to get r030, 
# Drop r030_code, A3, 
# Rename r030 to r030_code
exchange_rate_extracted_df_r030 = exchange_rate_extracted_df_xdr \
    .join(
        r030_code_mapping.select(col("r030"), col("A3")),
        exchange_rate_extracted_df_xdr.currency_code == r030_code_mapping.A3,
        "left"
    ) \
    .drop("r030_code","A3" ) \
    .withColumnRenamed("r030", "r030_code") 

In [419]:
exchange_rate_extracted_df_r030.show(10)

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"GROUP_NUMBER"  |"RATE"   |"RATE_PER_UNIT"  |"SPECIAL_CONDITIONS"  |"UNITS"  |"CALCULATION_DATE"  |"EXCHANGE_DATE"  |"CURRENCY_CODE"  |"CURRENCY_NAME"  |"CURRENCY_NAME_UA"  |"R030_CODE"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1               |43.0904  |43.0904          |N                     |1        |2026-02-10          |2026-02-11       |USD              |US DOLLAR        |ДОЛАР США           |840          |
|1               |43.0904  |43.0904          |N                     |1        |2026-02-10          |2026-02-11       |USD              |US DOLLAR        |ДОЛАР США           |840          |
|1               |43.0904  |43.0904          |N   

In [420]:
duplicates = exchange_rate_extracted_df_r030 \
    .select('currency_code', 'currency_name', 'currency_name_ua', 'r030_code') \
    .distinct() \
    .orderBy(col("currency_code")) 

duplicates.show(10)

count_duplicates = duplicates \
    .group_by("currency_code") \
    .count() \
    .filter(col("count") > 1) \
    .orderBy(col("count").desc()) 

# count_duplicates.count() 46

count_duplicates.show(50)



------------------------------------------------------------------------------
|"CURRENCY_CODE"  |"CURRENCY_NAME"    |"CURRENCY_NAME_UA"      |"R030_CODE"  |
------------------------------------------------------------------------------
|AED              |UAE DIRHAM         |ДИРХАМ ОАЕ              |784          |
|AMD              |Armenian Dram      |ВIРМЕНСЬКИЙ ДРАМ        |051          |
|AMD              |ARMENIAN DRAM      |ВІРМЕНСЬКИЙ ДРАМ        |051          |
|AMD              |ARMENIAN DRAM      |ВIРМЕНСЬКИЙ ДРАМ        |051          |
|ATS              |NULL               |ШИЛІНГ (АВСТРІЯ)        |040          |
|ATS              |NULL               |АВСТРІЙСЬКИЙ ШИЛІНГ     |040          |
|AUD              |AUSTRALIAN DOLLAR  |АВСТРАЛІЙСЬКИЙ ДОЛАР    |036          |
|AZM              |NULL               |АЗЕРБАЙДЖАНСЬКИЙ МАНАТ  |031          |
|AZN              |AZERBAIJAN MANAT   |АЗЕРБАЙДЖАНСЬКИЙ МАНАТ  |944          |
|BDT              |TAKA               |ТАКА         

In [421]:
exchange_rate_extracted_df_r030.create_or_replace_temp_view("exchange_rate_extracted_df_r030")
count_duplicates.create_or_replace_temp_view("count_duplicates")

query = """
SELECT *
FROM exchange_rate_extracted_df_r030
WHERE currency_code IN (
    SELECT currency_code FROM count_duplicates
)
"""
result_df = session.sql(query)

result_df \
    .select('currency_code', 'currency_name', 'currency_name_ua', 'r030_code') \
    .distinct() \
    .orderBy(col("currency_code")) \
    .show(150)

----------------------------------------------------------------------------------------------------
|"CURRENCY_CODE"  |"CURRENCY_NAME"              |"CURRENCY_NAME_UA"                  |"R030_CODE"  |
----------------------------------------------------------------------------------------------------
|AMD              |ARMENIAN DRAM                |ВIРМЕНСЬКИЙ ДРАМ                    |051          |
|AMD              |ARMENIAN DRAM                |ВІРМЕНСЬКИЙ ДРАМ                    |051          |
|AMD              |Armenian Dram                |ВIРМЕНСЬКИЙ ДРАМ                    |051          |
|ATS              |NULL                         |ШИЛІНГ (АВСТРІЯ)                    |040          |
|ATS              |NULL                         |АВСТРІЙСЬКИЙ ШИЛІНГ                 |040          |
|BGL              |NULL                         |БОЛГАРСЬКІ ЛЕВИ                     |100          |
|BGL              |NULL                         |ЛЕВ (БОЛГАРІЯ)                      |100  

In [422]:
for field in exchange_rate_extracted_df_r030.schema.fields:
    print(f"Column: {field.name}, Type: {field.datatype}")

    

Column: GROUP_NUMBER, Type: LongType()
Column: RATE, Type: DoubleType()
Column: RATE_PER_UNIT, Type: DoubleType()
Column: SPECIAL_CONDITIONS, Type: StringType(50331648)
Column: UNITS, Type: LongType()
Column: CALCULATION_DATE, Type: DateType()
Column: EXCHANGE_DATE, Type: DateType()
Column: CURRENCY_CODE, Type: StringType(50331648)
Column: CURRENCY_NAME, Type: StringType(50331648)
Column: CURRENCY_NAME_UA, Type: StringType(50331648)
Column: R030_CODE, Type: StringType(16777216)


In [423]:
from snowflake.snowpark.functions import col, upper, trim

def clean_string_columns(df):
    # Identify string columns
    string_cols = [field.name for field in df.schema.fields if "StringType" in str(field.datatype)]
    
    # Build a list of columns: apply upper and trim to string columns, leave others as is
    new_cols = [
        upper(trim(col(c))).alias(c) if c in string_cols else col(c)
        for c in df.columns
    ]
    
    # Return the transformed DataFrame
    return df.select(new_cols)

exchange_rate_extracted_df_upper_trim = clean_string_columns(exchange_rate_extracted_df_r030)

In [424]:
duplicates_2 = exchange_rate_extracted_df_upper_trim \
    .select('currency_code', 'currency_name', 'currency_name_ua', 'r030_code') \
    .distinct() \
    .orderBy(col("currency_code")) 

duplicates_2.show(100)

count_duplicates_2 = duplicates_2 \
    .group_by("currency_code") \
    .count() \
    .filter(col("count") > 1) \
    .orderBy(col("count").desc()) 


# count_duplicates.count() 46

count_duplicates_2.show(50)



------------------------------------------------------------------------------------
|"CURRENCY_CODE"  |"CURRENCY_NAME"     |"CURRENCY_NAME_UA"           |"R030_CODE"  |
------------------------------------------------------------------------------------
|AED              |UAE DIRHAM          |ДИРХАМ ОАЕ                   |784          |
|AMD              |ARMENIAN DRAM       |ВІРМЕНСЬКИЙ ДРАМ             |051          |
|AMD              |ARMENIAN DRAM       |ВIРМЕНСЬКИЙ ДРАМ             |051          |
|ATS              |NULL                |ШИЛІНГ (АВСТРІЯ)             |040          |
|ATS              |NULL                |АВСТРІЙСЬКИЙ ШИЛІНГ          |040          |
|AUD              |AUSTRALIAN DOLLAR   |АВСТРАЛІЙСЬКИЙ ДОЛАР         |036          |
|AZM              |NULL                |АЗЕРБАЙДЖАНСЬКИЙ МАНАТ       |031          |
|AZN              |AZERBAIJAN MANAT    |АЗЕРБАЙДЖАНСЬКИЙ МАНАТ       |944          |
|BDT              |TAKA                |ТАКА                     

In [425]:
exchange_rate_extracted_df_upper_trim.create_or_replace_temp_view("exchange_rate_extracted_df_upper_trim")
count_duplicates.create_or_replace_temp_view("count_duplicates_2")

query = """
SELECT *
FROM exchange_rate_extracted_df_upper_trim
WHERE currency_code IN (
    SELECT currency_code FROM count_duplicates_2
)
"""
result_df_2 = session.sql(query)

result_df_2 \
    .select('currency_code', 'currency_name', 'currency_name_ua', 'r030_code') \
    .distinct() \
    .orderBy(col("currency_code")) \
    .show(150)

----------------------------------------------------------------------------------------------------
|"CURRENCY_CODE"  |"CURRENCY_NAME"              |"CURRENCY_NAME_UA"                  |"R030_CODE"  |
----------------------------------------------------------------------------------------------------
|AMD              |ARMENIAN DRAM                |ВІРМЕНСЬКИЙ ДРАМ                    |051          |
|AMD              |ARMENIAN DRAM                |ВIРМЕНСЬКИЙ ДРАМ                    |051          |
|ATS              |NULL                         |АВСТРІЙСЬКИЙ ШИЛІНГ                 |040          |
|ATS              |NULL                         |ШИЛІНГ (АВСТРІЯ)                    |040          |
|BGL              |NULL                         |БОЛГАРСЬКІ ЛЕВИ                     |100          |
|BGL              |NULL                         |ЛЕВ (БОЛГАРІЯ)                      |100          |
|BYN              |BELARUSIAN RUBLE             |БІЛОРУСЬКИЙ РУБЛЬ                   |933  

In [426]:
iso_4217_currency_codes_df.where(col("currency_code") == "PEN").show(10)
r030_code_mapping.where(col("A3") == "PEN").show(10)

--------------------------------------------------------------------------------------------------------------
|"COUNTRY_NAME"  |"CURRENCY_NAME"  |"CURRENCY_CODE"  |"CURRENCY_NUMBER"  |"MINOR_UNITS"  |"WITHDRAWAL_DATE"  |
--------------------------------------------------------------------------------------------------------------
|PERU            |Sol              |PEN              |604                |2              |NULL               |
|PERU            |Nuevo Sol        |PEN              |604                |NULL           |2015-12            |
--------------------------------------------------------------------------------------------------------------

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"PR"  |"R030"  |"K040"  |"A3"  |"R031"  |"R032"  |"R033"  |"R034"  |"R035"  |"GR"  |"KOD_LIT"  |"LOD_NUM"  |"CURRENCY_NAME_UA"  |"NOMIN"  |"NAIM"  

In [427]:
# session.close()
