In [0]:
from pyspark.sql.functions import lit,col
from datetime import datetime

# Configuration
current_time = datetime.now().isoformat()
ingest_type = "stream"
path = f"/Volumes/capstone/schema_bronze/00_data/"
ingest_path = path+ingest_type
archive_path = path + "archive/"
file_list = [file for file in dbutils.fs.ls(ingest_path)]
catalog_schema = "capstone.schema_bronze"

file_list

if file_list is None:
    log_run_history={
                "file_name": "",
                "timestamp": current_time,
                "status":"SKIP",
                "type":"No file to process",
                "error_message": ""
                }
file_list

In [0]:
#this is autoloader for streaming ingest and transform. But as I use free version is was unabled. SO, I have to comment this out and use mock script

# # Read source table as stream
# source_df = (
#     spark.readStream
#     .format("delta")
#     .table("source_table")
# )

# # Apply transformation logic
# transformed_df = source_df.filter(col("status") == "active")

# # Write into target table
# (
#     transformed_df.writeStream
#     .format("delta")
#     .option("checkpointLocation", "/mnt/checkpoints/myjob")
#     .outputMode("append")  # append only new rows
#     .table("target_table")
# )

In [0]:
for file in file_list:
    
    file_path = file.path
    table_full = catalog_schema + "." + file.name.split(".")[0]
    file_type = file.name.split(".")[-1]

    # Check if file exists before processing
    try:
        # Check if file exists
        file_exists = dbutils.fs.ls(file_path)
        print(f"{file.name} exists")

    except Exception as file_check_error:
        # Handle file check errors (e.g., path doesn't exist)
        if "java.io.FileNotFoundException" in str(file_check_error) or "Path does not exist" in str(file_check_error):
            log_run_history={
                "file_name": file.name,
                "timestamp": current_time,
                "status":"FILE_NOT_FOUND",
                "type":"stream file or directory not found",
                "error_message": str(file_check_error),
                }

        else:
            # Other file system errors
            log_run_history={
                "file_name": file.name,
                "timestamp": current_time,
                "status":"ERROR",
                "type":"Error occurred during file system check",
                "error_message": str(file_check_error),
                }
            
        file_exists = None
        print(log_run_history["type"])

    # File exists, proceed with processing
    if file_exists is not None:
        print(f"processing {file.name}........")
        try:
            
            # Process the file from volume
            if file_type == "csv":
                claims_stream_df = (
                    spark.read
                        .option("header", "true")
                        .csv(file_path)   # path to stream file
                        .withColumn("Amount", col("Amount").cast("string"))
                        .withColumn("_source", lit(ingest_type))
                        .withColumn("_ingestion_timestamp", lit(current_time).cast("timestamp"))
                    )
            elif file_type == "json":
                claims_stream_df = (
                    spark.read
                        .option("header", "true")
                        .json(file_path)   # path to stream file
                        .withColumn("Amount", col("Amount").cast("string"))
                        .withColumn("_source", lit(ingest_type))
                        .withColumn("_ingestion_timestamp", lit(current_time).cast("timestamp"))
                    )
            
            # Get record count for logging
            record_count = claims_stream_df.count()
            
            # Write to table
            claims_stream_df.write.mode("overwrite").saveAsTable(table_full)
            
            # Move and rename the file after processed
            new_file_name = str(current_time) + "_" + file.name
            new_file_path = archive_path + new_file_name
            dbutils.fs.mv(file_path, new_file_path)
            
            # Log success
            log_run_history={
                "file_name": file.name,
                "timestamp": current_time,
                "status":"SUCCESS",
                "type":f"{ingest_type} file processed successfully",
                "error_message": "",
                }

            
        except Exception as processing_error:
            # Log processing error
            log_run_history={
                "file_name": file.name,
                "timestamp": current_time,
                "status":"ERROR",
                "type":"Error occurred during file processing",
                "error_message": str(processing_error),
                }
            # print(processing_error)
        print(log_run_history["type"])
    


In [0]:
df_log = spark.createDataFrame([log_run_history])
df_log.write.mode("append").saveAsTable("capstone.schema_bronze.log_ingest")