In [0]:
# Databricks notebook: Incremental CSV to Bronze Transformation
# This notebook reads CSV file from ADLS Gen2 and writes incrementally to Bronze layer

# Import required libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import uuid


In [0]:
# Configuration parameters
dbutils.widgets.text("csv_path", "abfss://code@kivastorageacc2.dfs.core.windows.net/kiva_loans2.csv", "CSV File Path (ADLS)")
dbutils.widgets.text("bronze_container", "kivabronze", "Bronze Container Name")
dbutils.widgets.text("storage_account_name", "kivastorageacc2", "Storage Account Name")
dbutils.widgets.text("incremental_key", "id", "Incremental Loading Key Column")
dbutils.widgets.dropdown("overwrite_partition", "false", ["true", "false"], "Overwrite Existing Partition")

storage_account_key = "YOUR_STORAGE_ACCOUNT_KEY"  # Replace with your storage account key



In [0]:
csv_path="abfss://code@kivastorageacc2.dfs.core.windows.net/kiva_loans2.csv"
bronze_container= "kivabronze"
storage_account_name= "kivastorageacc2"
incremental_key="id"
overwrite_partition= "false"



In [0]:
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_key)
print(f"Storage account {storage_account_name} authentication configured")

Storage account kivastorageacc2 authentication configured


In [0]:
# Get widget values from Databricks
csv_path = dbutils.widgets.get("csv_path")
bronze_container = dbutils.widgets.get("bronze_container")
storage_account_name = dbutils.widgets.get("storage_account_name")

incremental_key = dbutils.widgets.get("incremental_key")
overwrite_partition = dbutils.widgets.get("overwrite_partition").lower() == "true"



In [0]:
# Display CSV full path
print(f"CSV file to be read: {csv_path}")

# Create bronze layer path for ADLS Gen2
bronze_base_path = f"abfss://{bronze_container}@{storage_account_name}.dfs.core.windows.net"
print(f"Bronze base path: {bronze_base_path}")

CSV file to be read: abfss://code@kivastorageacc2.dfs.core.windows.net/kiva_loans.csv
Bronze base path: abfss://kivabronze@kivastorageacc2.dfs.core.windows.net


In [0]:
# Read CSV file
print(f"Reading CSV file: {csv_path}")

try:
    df_csv = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(csv_path)
    
    # Check if posted_time column exists
    if "posted_time" not in df_csv.columns:
        raise Exception("Column 'posted_time' not found in CSV file!")
    
    # Convert posted_time to date/timestamp format (if it's string)
    df_csv = df_csv.withColumn("posted_time", 
                               when(col("posted_time").isNotNull(), 
                                   to_timestamp(col("posted_time")))
                               .otherwise(None))
    
    # Extract year and month information from posted_time for partitioning
    df_csv = df_csv.withColumn("posted_year", year(col("posted_time"))) \
                   .withColumn("posted_month", month(col("posted_time")))
    
    # Add processing metadata
    df_new_data = df_csv \
        .withColumn("ingestion_date", current_timestamp()) \
        .withColumn("batch_id", lit(str(uuid.uuid4())))
    
    print(f"CSV file read. New data row count: {df_new_data.count()}")
    print("Schema information:")
    df_new_data.printSchema()
    
    print("Sample data:")
    display(df_new_data.limit(5))
    
    # Show posted_time date range
    date_range = df_new_data.select(min("posted_time").alias("min_date"), 
                                   max("posted_time").alias("max_date")).collect()[0]
    print(f"Posted time range: {date_range['min_date']} - {date_range['max_date']}")
    
except Exception as e:
    print(f"CSV reading error: {str(e)}")
    dbutils.notebook.exit(f"CSV reading error: {str(e)}")

Reading CSV file: abfss://code@kivastorageacc2.dfs.core.windows.net/kiva_loans.csv
CSV file read. New data row count: 672113
Schema information:
root
 |-- id: string (nullable = true)
 |-- funded_amount: string (nullable = true)
 |-- loan_amount: string (nullable = true)
 |-- activity: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- use: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- partner_id: string (nullable = true)
 |-- posted_time: timestamp (nullable = true)
 |-- disbursed_time: string (nullable = true)
 |-- funded_time: string (nullable = true)
 |-- term_in_months: string (nullable = true)
 |-- lender_count: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- borrower_genders: string (nullable = true)
 |-- repayment_interval: string (nullable = true)
 |-- date: string (nullable = true)
 |-- posted_year:

id,funded_amount,loan_amount,activity,sector,use,country_code,country,region,currency,partner_id,posted_time,disbursed_time,funded_time,term_in_months,lender_count,tags,borrower_genders,repayment_interval,date,posted_year,posted_month,ingestion_date,batch_id
653051,300.0,300.0,Fruits & Vegetables,Food,"To buy seasonal, fresh fruits to sell.",PK,Pakistan,Lahore,PKR,247.0,2014-01-01T06:12:39Z,2013-12-17 08:00:00+00:00,2014-01-02 10:06:32+00:00,12.0,12,,female,irregular,2014-01-01,2014,1,2025-05-22T08:10:04.626Z,adb0de9d-7c7e-4448-94f6-e01cd2c07765
653053,575.0,575.0,Rickshaw,Transportation,to repair and maintain the auto rickshaw used in their business.,PK,Pakistan,Lahore,PKR,247.0,2014-01-01T06:51:08Z,2013-12-17 08:00:00+00:00,2014-01-02 09:17:23+00:00,11.0,14,,"female, female",irregular,2014-01-01,2014,1,2025-05-22T08:10:04.626Z,adb0de9d-7c7e-4448-94f6-e01cd2c07765
653068,150.0,150.0,Transportation,Transportation,To repair their old cycle-van and buy another one to rent out as a source of income,IN,India,Maynaguri,INR,334.0,2014-01-01T09:58:07Z,2013-12-17 08:00:00+00:00,2014-01-01 16:01:36+00:00,43.0,6,"user_favorite, user_favorite",female,bullet,2014-01-01,2014,1,2025-05-22T08:10:04.626Z,adb0de9d-7c7e-4448-94f6-e01cd2c07765
653063,200.0,200.0,Embroidery,Arts,to purchase an embroidery machine and a variety of new embroidery materials.,PK,Pakistan,Lahore,PKR,247.0,2014-01-01T08:03:11Z,2013-12-24 08:00:00+00:00,2014-01-01 13:00:00+00:00,11.0,8,,female,irregular,2014-01-01,2014,1,2025-05-22T08:10:04.626Z,adb0de9d-7c7e-4448-94f6-e01cd2c07765
653084,400.0,400.0,Milk Sales,Food,to purchase one buffalo.,PK,Pakistan,Abdul Hakeem,PKR,245.0,2014-01-01T11:53:19Z,2013-12-17 08:00:00+00:00,2014-01-01 19:18:51+00:00,14.0,16,,female,monthly,2014-01-01,2014,1,2025-05-22T08:10:04.626Z,adb0de9d-7c7e-4448-94f6-e01cd2c07765


Posted time range: 2014-01-01 04:49:26 - 2017-07-26 06:31:46


In [0]:
# Check existence of key column for incremental loading
if incremental_key and incremental_key not in df_new_data.columns:
    error_msg = f"Key column '{incremental_key}' specified for incremental loading not found in CSV file!"
    print(error_msg)
    dbutils.notebook.exit(error_msg)


In [0]:
# Process year/month based - separate partition for each year/month
years_months = df_new_data.select("posted_year", "posted_month").distinct().collect()

for row in years_months:
    year = row["posted_year"]
    month = row["posted_month"]
    
    if year is None or month is None:
        continue
    
    print(f"\n=== Processing partition {year}/{month:02d} ===")
    
    # Bronze layer path for this year/month
    bronze_output_path = f"{bronze_base_path}/{year}/{month:02d}"
    
    # Filter data for this partition
    df_partition_new = df_new_data.filter((col("posted_year") == year) & 
                                         (col("posted_month") == month))
    partition_new_count = df_partition_new.count()
    print(f"New data count for this partition: {partition_new_count}")
    
    # Check and read existing Bronze data
    bronze_exists = False
    try:
        # Check if bronze folder exists
        dbutils.fs.ls(bronze_output_path)
        bronze_exists = True
        print(f"Existing Bronze data found: {bronze_output_path}")
        
        # Read existing Bronze data
        df_existing_bronze = spark.read.format("parquet").load(bronze_output_path)
        existing_count = df_existing_bronze.count()
        print(f"Existing Bronze data row count: {existing_count}")
        
    except Exception as e:
        print(f"Bronze data not found: {bronze_output_path}")
        bronze_exists = False
    
    # Incremental loading strategy
    if bronze_exists and not overwrite_partition:
        print(f"Performing incremental loading, key column: {incremental_key}")
        
        # Find the latest value in existing data
        if incremental_key in df_existing_bronze.columns:
            max_existing_value = df_existing_bronze.agg({incremental_key: "max"}).collect()[0][0]
            print(f"Latest {incremental_key} value in existing data: {max_existing_value}")
            
            # Filter new data - only take records not in existing dataset
            df_to_append = df_partition_new.filter(col(incremental_key) > max_existing_value)
            append_count = df_to_append.count()
            print(f"New records to append: {append_count}")
            
            if append_count > 0:
                # Union operation
                df_final = df_existing_bronze.union(df_to_append)
            else:
                print("No new data to append, keeping existing data.")
                continue
        else:
            print(f"{incremental_key} column not found in existing Bronze data. Adding all new data.")
            df_final = df_existing_bronze.union(df_partition_new)
    else:
        # Use only new data if no existing data or overwrite is selected
        if not bronze_exists:
            print("No existing Bronze data found. New data will be written.")
        elif overwrite_partition:
            print("Overwrite existing partition selected. New data will be written.")
        
        df_final = df_partition_new
    
    # Remove duplicates (based on incremental_key)
    print(f"Removing duplicates based on {incremental_key}...")
    
    # Use window function to select latest version for each record
    window = Window.partitionBy(incremental_key).orderBy(desc("ingestion_date"))
    df_deduplicated = df_final.withColumn("row_num", row_number().over(window)) \
                             .filter(col("row_num") == 1) \
                             .drop("row_num")
    
    final_count = df_deduplicated.count()
    print(f"Data row count after deduplication: {final_count}")
    
    # Write to Bronze layer as Parquet
    print(f"Writing data to Bronze layer: {bronze_output_path}")
    
    try:
        # Write in Parquet format
        df_deduplicated.write \
            .format("parquet") \
            .mode("overwrite") \
            .save(bronze_output_path)
        
        print(f"Data successfully written to Bronze layer: {bronze_output_path}")
        print(f"Total rows written: {final_count}")
        
    except Exception as e:
        print(f"Error writing to Bronze layer: {str(e)}")
        dbutils.notebook.exit(f"Error writing to Bronze layer: {str(e)}")



=== Processing partition 2014/04 ===
New data count for this partition: 13467
Bronze data not found: abfss://kivabronze@kivastorageacc2.dfs.core.windows.net/2014/04
No existing Bronze data found. New data will be written.
Removing duplicates based on id...
Data row count after deduplication: 13467
Writing data to Bronze layer: abfss://kivabronze@kivastorageacc2.dfs.core.windows.net/2014/04
Data successfully written to Bronze layer: abfss://kivabronze@kivastorageacc2.dfs.core.windows.net/2014/04
Total rows written: 13467

=== Processing partition 2014/10 ===
New data count for this partition: 16348
Bronze data not found: abfss://kivabronze@kivastorageacc2.dfs.core.windows.net/2014/10
No existing Bronze data found. New data will be written.
Removing duplicates based on id...
Data row count after deduplication: 16348
Writing data to Bronze layer: abfss://kivabronze@kivastorageacc2.dfs.core.windows.net/2014/10
Data successfully written to Bronze layer: abfss://kivabronze@kivastorageacc2.d

In [0]:
# Report processing results
total_processed = df_new_data.count()
print("\n=== Processing Summary ===")
print("Process completed successfully!")
print(f"Total processed row count: {total_processed}")
print(f"Bronze base path: {bronze_base_path}")
print(f"Processed year/month partitions:")


=== Processing Summary ===
Process completed successfully!
Total processed row count: 672113
Bronze base path: abfss://kivabronze@kivastorageacc2.dfs.core.windows.net
Processed year/month partitions:


In [0]:
for row in years_months:
    year = row["posted_year"] 
    month = row["posted_month"]
    if year is not None and month is not None:
        print(f"  - {year}/{month:02d}")


  - 2014/04
  - 2014/10
  - 2014/12
  - 2014/05
  - 2014/01
  - 2014/08
  - 2014/09
  - 2014/03
  - 2014/02
  - 2014/06
  - 2015/07
  - 2014/11
  - 2015/01
  - 2014/07
  - 2015/02
  - 2015/12
  - 2015/04
  - 2015/08
  - 2015/11
  - 2015/09
  - 2015/10
  - 2015/03
  - 2015/06
  - 2015/05
  - 2016/07
  - 2016/11
  - 2016/05
  - 2016/02
  - 2016/09
  - 2016/10
  - 2016/06
  - 2016/01
  - 2016/04
  - 2016/08
  - 2016/03
  - 2017/03
  - 2017/07
  - 2016/12
  - 2017/04
  - 2017/02
  - 2017/05
  - 2017/06
  - 2017/01


In [0]:
# Return output of this notebook as dict (can be used in ADF)
output = {
    "status": "success",
    "processed_rows": total_processed,
    "bronze_base_path": bronze_base_path,
    "partitions_processed": len([r for r in years_months if r["posted_year"] is not None]),
    "process_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "incremental_key": incremental_key
}

dbutils.notebook.exit(str(output))