In [0]:
transaction_schema='transaction_id string, card_id string, transaction_date string, amount double,merchant string,transaction_type string,status string'
transaction_df=spark.read.format('csv').option('header',True).schema(transaction_schema).load('abfss://data@creditcardstorage11.dfs.core.windows.net/transactions/transaction_data')

In [0]:
%sql
create catalog if not exists catalog3
managed location 'abfss://data@creditcardstorage11.dfs.core.windows.net/transactions/catlog1';

In [0]:
%sql
use catalog catalog3;
create schema if not exists schema1;
use schema1;
create table if not exists log_table
(source_name string,
last_success_time timestamp,
last_run_status STRING,
last_run_time TIMESTAMP
);

In [0]:
from pyspark.sql.functions import *
rows = (
    spark.read
    .table("log_table")
    .filter("source_name = 'transactions'")
    .select('last_success_time')
    .collect()[0])
last_success_time = rows['last_success_time'] if rows else None
transaction_incremental_df=transaction_df.filter(col('transaction_date')>last_success_time)
display(transaction_incremental_df)

datetime.datetime(2024, 1, 19, 0, 0)

In [0]:
from pyspark.sql.functions import max

try:
    # 1. Load data
    max_ts = transaction_incremental_df.agg(max("transaction_date")).collect()[0][0]

    transaction_incremental_df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", True) \
        .save("abfss://data@creditcardstorage11.dfs.core.windows.net/transactions/transaction_target/")

    # 2. Update log table on SUCCESS
    if last_success_time is None:
        spark.sql(f"""
            INSERT INTO log_table
            VALUES (
                'transactions',
                '{max_ts}',
                'SUCCESS',
                current_timestamp()
            )
        """)
    else:
        spark.sql(f"""
            UPDATE log_table
            SET last_success_time = '{max_ts}',
                last_run_status = 'SUCCESS',
                last_run_time = current_timestamp()
            WHERE source_name = 'transactions'
        """)

except Exception as e:
    # 3. Update log table on FAILURE
    spark.sql("""
        UPDATE log_table
        SET last_run_status = 'FAILED',
            last_run_time = current_timestamp()
        WHERE source_name = 'transactions'
    """)
    raise e


In [0]:
transaction_target_df=spark.read.format('delta').load('abfss://data@creditcardstorage11.dfs.core.windows.net/transactions/transaction_target/')
table_df=spark.read.table('log_table')
display(transaction_target_df)

transaction_id,card_id,transaction_date,amount,merchant,transaction_type,status
T5001,C9001,2024-01-05,1200.0,Amazon,ONLINE,SUCCESS
T5002,C9001,2024-01-07,5000.0,Flipkart,ONLINE,SUCCESS
T5003,C9002,2024-01-10,3000.0,BigBazaar,POS,FAILED
T5004,C9003,2024-01-12,15000.0,Apple Store,POS,SUCCESS
T5005,C9004,2024-01-15,800.0,Swiggy,ONLINE,SUCCESS
T5007,C9001,2024-01-16,1200.0,Amazon,ONLINE,SUCCESS
T5008,C9001,2024-01-17,5000.0,Flipkart,ONLINE,SUCCESS
T5003,C9002,2024-01-10,3000.0,BigBazaar,POS,FAILED
T50010,C9001,2024-01-19,1200.0,Amazon,ONLINE,SUCCESS
T5009,C9001,2024-01-18,1200.0,Amazon,ONLINE,SUCCESS
