In [None]:
from databricks.connect import DatabricksSession
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, current_date
from ecb_enrich import enrich_with_usd
from datetime import datetime 

In [None]:
dbutils.widgets.text("catalog_name", "anup_kalburgi", "Target Catalog")
dbutils.widgets.text("schema_name", "dab_dev", "Target Schema")
dbutils.widgets.text("date", datetime.now().strftime("%Y-%m-%d"), "Date (YYYY-MM-DD)")

In [None]:
catalog = dbutils.widgets.get("catalog_name")
schema = dbutils.widgets.get("schema_name")
date_param = dbutils.widgets.get("date")

In [None]:
src_table_name = f"{catalog}.{schema}.fx_rates_ecb_eur_base"
dest_table_name = f"{catalog}.{schema}.fx_rates_ecb_eur_base_usd_enriched"

In [None]:
print(f"Using catalog: {catalog}")
print(f"Using schema: {schema}")
print(f"Using date: {date_param}")

In [None]:
def get_spark() -> SparkSession:
    if spark is not None:
        return spark
    else:
        try:
            return DatabricksSession.builder.serverless().getOrCreate()
        except ImportError:
            return SparkSession.builder.getOrCreate()

In [None]:
spark = get_spark()
df = spark.read.table(src_table_name)

# Filter by date if provided, otherwise use all data
if date_param and date_param.strip():
    df = df.filter(col("TIME_PERIOD") == date_param)
    print(f"Filtered data for date: {date_param}")
else:
    print("Using all available data (no date filter)")

In [None]:
df = enrich_with_usd(df, date_param)

# Write to destination table
df.write.mode("append").option("mergeSchema", "true").saveAsTable(dest_table_name)
print(f"Successfully enriched and saved data to: {dest_table_name}") 