In [1]:
def read_file(file_path: str, delimiter: str = "\t", header: bool = True):
    """
    Reads a delimited file (TSV/CSV) from Microsoft Fabric Lakehouse.

    Parameters:
     file_path (str): Full path to the file in Lakehouse (abfss://...)
     delimiter (str): Delimiter used in the file (default = tab '\t')
    header (bool): Whether the file has a header row

    Returns:Spark DataFrame
    """
    return (
        spark.read
        .option("header", header)
        .option("delimiter", delimiter)
        .csv(file_path)
    )


StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 3, Finished, Available, Finished)

In [2]:
product_path='abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Files/raw_data/Product.txt'
df_product =read_file(product_path)
display(df_product)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 02a5498e-17e0-4a26-8c87-8e52887567e3)

In [3]:
sales_path='abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Files/raw_data/Sales.txt'
df_sales=read_file(sales_path)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 5, Finished, Available, Finished)

In [4]:
display(df_sales)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0ad48b8a-e33a-4c3c-ba71-5a5a14063198)

In [5]:
region_path='abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Files/raw_data/Region.txt'
df_region=read_file(region_path)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 7, Finished, Available, Finished)

In [6]:
display(df_region)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 21f17a53-e631-48b8-b38b-6a94969257b7)

In [7]:
def clean_column_names(df):
    for col_name in df.columns:
        new_name = col_name.strip().replace(" ", "_")
        df = df.withColumnRenamed(col_name, new_name)
    return df

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 9, Finished, Available, Finished)

In [8]:
df_product=clean_column_names(df_product)
df_region=clean_column_names(df_region)
df_sales=clean_column_names(df_sales) 



StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 10, Finished, Available, Finished)

In [9]:
df_product.printSchema()

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 11, Finished, Available, Finished)

root
 |-- ProductKey: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Standard_Cost: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Subcategory: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Background_Color_Format: string (nullable = true)
 |-- Font_Color_Format: string (nullable = true)



In [10]:
df_region.printSchema()

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 12, Finished, Available, Finished)

root
 |-- SalesTerritoryKey: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Group: string (nullable = true)



In [11]:
df_sales.printSchema()

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 13, Finished, Available, Finished)

root
 |-- SalesOrderNumber: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ProductKey: string (nullable = true)
 |-- SalesTerritoryKey: string (nullable = true)
 |-- OrderQuantity: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- SalesAmount: string (nullable = true)
 |-- TotalProductCost: string (nullable = true)



In [12]:
from datetime import datetime
import uuid
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def write_to_bronze_path_with_logging(df, source_path: str, destination_path: str, log_path: str, mode: str = "overwrite"):
    """
    Writes a Spark DataFrame to Bronze Lakehouse and logs metadata.
    """
    from pyspark.sql import Row

    log = {
        "log_id": str(uuid.uuid4()),
        "source_path": source_path,
        "destination_path": destination_path,
        "ingestion_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "row_count": None,
        "status": "Success",
        "error_message": None
    }

    try:
        df.write.mode(mode).format("delta").save(destination_path)
        log["row_count"] = df.count()
        print(f"Data written to path: {destination_path}")
    except Exception as e:
        log["status"] = "Failed"
        log["error_message"] = str(e)
        print(f"Failed to write data: {log['error_message']}")

    # Define schema to avoid type inference errors
    log_schema = StructType([
        StructField("log_id", StringType(), True),
        StructField("source_path", StringType(), True),
        StructField("destination_path", StringType(), True),
        StructField("ingestion_time", StringType(), True),
        StructField("row_count", StringType(), True),
        StructField("status", StringType(), True),
        StructField("error_message", StringType(), True)
    ])

    try:
        spark.createDataFrame([log], schema=log_schema).coalesce(1).write.mode("append").json(log_path)
        print(f" Log saved at: {log_path}")
    except Exception as log_err:
        print(f"Failed to save log: {str(log_err)}")


StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 14, Finished, Available, Finished)

In [13]:
d_p_path='abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.product'
l_p_path='abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs'
write_to_bronze_path_with_logging(df_product,'bronze_files',d_p_path,l_p_path)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 15, Finished, Available, Finished)

Data written to path: abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.product
 Log saved at: abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs


In [27]:

d_s_path = 'abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.sales'


l_p_path = 'abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs'


write_to_bronze_path_with_logging(df_sales, 'bronze_files', d_s_path, l_p_path)


StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 29, Finished, Available, Finished)

Data written to path: abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.sales
 Log saved at: abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs


In [22]:

d_r_path = 'abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.region'


l_p_path = 'abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs'


write_to_bronze_path_with_logging(df_region, 'bronze_files', d_r_path, l_p_path)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 24, Finished, Available, Finished)

Data written to path: abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.region
 Log saved at: abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs


In [28]:
pth='abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs'
df=spark.read.format('json').load('abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/neha_karpe_bronze_lakehouse.Lakehouse/Tables/dbo.logs')

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 30, Finished, Available, Finished)

In [29]:
display(df)

StatementMeta(, 208617c2-9b90-4ebe-a8ea-7a99f0648de2, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2970b863-c324-4f2d-a587-3423dd28f956)