In [0]:
from pyspark.sql.functions import when, max, col, year, month, date_diff, lit

In [0]:
# %run ./02_Silver_Layer_Data_Processing

## Business Rule Implementations

In [0]:
br_streaming_df = spark.readStream.format("delta")\
  .option("cloudFiles.schemaLocation", "abfss://silver@rcmadls10dev.dfs.core.windows.net/checkpoint_br_complaints")\
  .load("abfss://silver@rcmadls10dev.dfs.core.windows.net/processed_complaints")

### Adding a Resolved column
Add a column to show if the complaint has been resolved (e.g., based on Company response).

In [0]:
br_streaming_df = br_streaming_df.withColumn('Resolved', col('company_response_to_consumer').rlike("Closed"))

%md
### Adding Year Received and month received column

In [0]:
br_streaming_df = br_streaming_df.withColumn('year_received', year(br_streaming_df.date_received))\
                .withColumn('month_received',month(br_streaming_df.date_received))

### Adding Complaint Age Column
Calculate how long it took to resolve a complaint (Date sent to company - Date received).

In [0]:
br_streaming_df = br_streaming_df.withColumn('complaint_age', date_diff(br_streaming_df.date_sent_to_company, br_streaming_df.date_received))

### Adding is_latest_record column

In [0]:
br_streaming_df = br_streaming_df.withColumn('is_latest', lit(True))

In [0]:
from delta.tables import DeltaTable

def upsert_is_latest(batch_df, batch_id):

    # batch_df = batch_df.withColumn('is_latest',lit(True))

    if DeltaTable.isDeltaTable(spark, "abfss://silver@rcmadls10dev.dfs.core.windows.net/br_complaints"):
        delta_table = DeltaTable.forPath(spark, "abfss://silver@rcmadls10dev.dfs.core.windows.net/br_complaints")

        # updated_ids = batch_df.select('complaint_id').distinct()

        delta_table.alias("target")\
            .merge(
                batch_df.select('complaint_id').distinct().alias("source"), 
                "target.complaint_id = source.complaint_id AND target.is_latest = true")\
            .whenMatchedUpdate(set={"is_latest": lit(False)})\
            .execute()
        
        batch_df.write.format("delta")\
            .mode("append")\
            .partitionBy("year_received","month_received")\
            .save("abfss://silver@rcmadls10dev.dfs.core.windows.net/br_complaints")

    else:
        batch_df\
            .write.format("delta")\
            .mode("overwrite")\
            .partitionBy("year_received","month_received")\
            .save("abfss://silver@rcmadls10dev.dfs.core.windows.net/br_complaints")

## Load processed data into silver container

In [0]:

br_streaming_df.writeStream\
  .foreachBatch(upsert_is_latest)\
  .option("checkpointLocation","abfss://silver@rcmadls10dev.dfs.core.windows.net/checkpoint_br_complaints")\
  .trigger(once=True)\
  .start()