In [0]:
df_new_aggregate_amounts = spark.sql("""
SELECT  skl.transaction_date, 
        skl.country, 
        skl.region, 
        slti.loan_theme_type, 
        COUNT(DISTINCT skl.loan_id) count_of_loan_ids,
        SUM(skl.funded_amount) funded_amount,
        SUM(skl.loan_amount) loan_amount,
        MAX(skl.processed_timestamp) processed_timestamp
FROM silver.kiva_loans skl
INNER JOIN silver.loan_theme_ids slti
    ON skl.loan_id = slti.loan_id
WHERE skl.country = 'Philippines'
GROUP BY skl.transaction_date, skl.country, skl.region, slti.loan_theme_type
ORDER BY skl.transaction_date, skl.country, skl.region, count_of_loan_ids DESC
""")

display(df_new_aggregate_amounts)

In [0]:
table_name = "gold.fact_amounts_by_date_by_country"

# Check if the table exists
if not spark.catalog.tableExists(table_name):
        print(f"Table '{table_name}' does not exist. Creating the table now.")
        df_empty = df_new_aggregate_amounts.limit(0)
        df_empty.write.format("delta").mode("overwrite").saveAsTable(table_name)
else:
    print(f"Table '{table_name}' already exists. Perform MERGE instead.")

In [0]:
%sql
SELECT * FROM gold.fact_amounts_by_date_by_country LIMIT 10

In [0]:
from pyspark.sql.functions import max
from datetime import datetime
from pyspark.sql.utils import AnalysisException

df_gold_fact_amounts_by_date_by_country = spark.table(table_name)
max_ts = df_gold_fact_amounts_by_date_by_country.select(max("processed_timestamp")).collect()[0][0] or datetime.strptime("1900-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
print(f"Max processed timestamp: {max_ts}")

In [0]:
query = f"""
CREATE OR REPLACE TEMPORARY VIEW temp_gold_fact_amounts_by_date_by_country AS 
SELECT * 
FROM gold.fact_amounts_by_date_by_country
WHERE processed_timestamp > TIMESTAMP('{max_ts}')
"""

spark.sql(query)

In [0]:
%sql
SELECT COUNT(1) FROM temp_gold_fact_amounts_by_date_by_country

In [0]:
%sql

MERGE INTO gold.fact_amounts_by_date_by_country AS target
USING temp_gold_fact_amounts_by_date_by_country AS source
  ON target.transaction_date = source.transaction_date 
  AND target.country = source.country 
  AND target.region = source.region 
  AND target.loan_theme_type = source.loan_theme_type
WHEN MATCHED AND 
  target.count_of_loan_ids != source.count_of_loan_ids OR
  target.funded_amount != source.funded_amount OR
  target.loan_amount != source.loan_amount
THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *


In [0]:
%sql
SELECT COUNT(1) FROM gold.fact_amounts_by_date_by_country