In [0]:
base_data_dir="/mnt/devdbbatch1sa/raw"
check_point_dir="/mnt/devdbbatch1sa/checkpoint"

In [0]:
def readBronze():
    return spark.readStream.table("invoices_bz")

def getAggregates(invoices_df):
    from pyspark.sql.functions import sum, expr

    return invoices_df.groupBy("CustomerCardNo").agg(
        sum("TotalAmount").alias("TotalAmount"),
        sum(expr("TotalAmount*0.02")).alias("TotalPoints"),
    )

def aggregate_upsert(invoices_df, batch_id):
    rewards_df = getAggregates(invoices_df)
    rewards_df.createOrReplaceTempView("customer_rewards_df_temp_view")
    merge_statement = """MERGE INTO customer_rewards8 t
                USING customer_rewards_df_temp_view s
                ON s. == t.CustomerCardNo
                WHEN MATCHED THEN
                UPDATE SET t.TotalAmount = s.TotalAmount + t.TotalAmount, 
                           t.TotalPoints = s.TotalPoints + t.TotalPoints
                WHEN NOT MATCHED THEN
                INSERT *
            """
    invoices_df._jdf.sparkSession().sql(merge_statement)

def saveResults(invoices_df):
    print(f"\nStarting Silver Stream...", end='')
    return (invoices_df.writeStream
                .queryName("gold-update")
                .option("checkpointLocation", f"{base_data_dir}/chekpoint/customer_rewards8")
                .outputMode("update")
                .foreachBatch(aggregate_upsert)
                .start()
                )
    print("Done")

def process():
    invoices_df =readBronze()
    sQuery = saveResults(invoices_df)
    return sQuery

In [0]:
Query=process()


Starting Silver Stream...

In [0]:
%sql
select * from customer_rewards8

CustomerCardNo,TotalAmount,TotalPoints
2262471989,36859.0,737.18
3716602332,61743.0,1234.8599999999997
7829975914,38799.0,775.98
5576072500,23994.0,479.88
2325763742,30549.0,610.98
9316477281,47894.0,957.88
2502121621,30121.0,602.42
7543202868,47796.0,955.92
8189067868,22515.0,450.3
6797767929,30570.0,611.4000000000001
