In [1]:
# 1. Setup & Configuration
import os
import sys
import glob
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, count, desc, lit, unix_timestamp, coalesce, abs as abs_val
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Initialize Spark Session
if 'spark' not in locals() or spark is None:
    print("‚öôÔ∏è Initializing Spark Session...")
    spark = SparkSession.builder \
        .appName("FabricFailureAnalysis") \
        .master("local[*]") \
        .getOrCreate()
    print(f"‚úÖ Spark Session Created: {spark.version}")

# --- CONFIGURATION ---
IS_LOCAL_TESTING = True

if IS_LOCAL_TESTING:
    # Point to the new detailed exports directory
    BASE_PATH = os.path.abspath(os.path.join(os.getcwd(), "../exports/fabric_item_details/"))
    # Point to the Audit Logs directory
    AUDIT_LOG_PATH = os.path.abspath(os.path.join(os.getcwd(), "../exports/monitor_hub_analysis/raw_data/daily/"))
    print(f"üîß Running in LOCAL mode.")
    print(f"  - Item Details: {BASE_PATH}")
    print(f"  - Audit Logs:   {AUDIT_LOG_PATH}")
else:
    BASE_PATH = "Files/exports/fabric_item_details/"
    AUDIT_LOG_PATH = "Files/exports/monitor_hub_analysis/raw_data/daily/"
    print(f"‚òÅÔ∏è Running in FABRIC mode.")
    print(f"  - Item Details: {BASE_PATH}")
    print(f"  - Audit Logs:   {AUDIT_LOG_PATH}")

‚öôÔ∏è Initializing Spark Session...


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/03 01:51:33 WARN Utils: Your hostname, sanmi-System-Product-Name, resolves to a loopback address: 127.0.1.1; using 192.168.0.14 instead (on interface eno1)
25/12/03 01:51:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/03 01:51:33 WARN Utils: Your hostname, sanmi-System-Product-Name, resolves to a loopback address: 127.0.1.1; using 192.168.0.14 instead (on interface eno1)
25/12/03 01:51:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust

‚úÖ Spark Session Created: 4.0.1
üîß Running in LOCAL mode.
  - Item Details: /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/fabric_item_details
  - Audit Logs:   /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/monitor_hub_analysis/raw_data/daily


In [2]:
# 2. Define Schemas

# JSON Schema for the detailed item exports
# Note: failureReason is a nested object in the JSON source
json_schema = StructType([
    StructField("id", StringType(), True),
    StructField("itemId", StringType(), True),
    StructField("jobType", StringType(), True),
    StructField("invokeType", StringType(), True),
    StructField("status", StringType(), True),
    StructField("failureReason", StructType([
        StructField("errorCode", StringType(), True),
        StructField("message", StringType(), True),
        StructField("requestId", StringType(), True)
    ]), True),
    StructField("rootActivityId", StringType(), True),
    StructField("startTimeUtc", StringType(), True), # Read as String first to handle ISO format safely
    StructField("endTimeUtc", StringType(), True),
    StructField("_workspace_name", StringType(), True),
    StructField("_item_name", StringType(), True),
    StructField("_item_type", StringType(), True),
    StructField("duration", DoubleType(), True)
])

# Note on Principal/User Information:
# The underlying source API (jobs/instances) does not currently return the 'principal' or 'userId' 
# responsible for the execution. This information is not available in the current dataset.

In [3]:
# 3. Load and Process Data
from pyspark.sql.functions import split, initcap, regexp_replace, element_at, substring

def load_audit_logs():
    """
    Loads the Fabric Audit Logs (Activity Events) to get User information.
    """
    print("Loading Audit Logs...")
    try:
        # Load all CSVs with header, letting Spark infer columns by name (all strings by default)
        # Use PERMISSIVE mode to handle bad rows without crashing
        audit_df = spark.read.option("header", "true") \
            .option("mode", "PERMISSIVE") \
            .csv(os.path.join(AUDIT_LOG_PATH, "fabric_activities_*.csv"))
            
        # Filter for relevant activities
        # Also filter out rows where CreationTime is not a valid timestamp string (e.g. "Succeeded")
        # This handles cases where CSV parsing might have shifted columns
        audit_df = audit_df.filter(col("Activity").isin("RunArtifact", "ExecuteNotebook", "ExecutePipeline")) \
            .filter(col("CreationTime").rlike(r"^\d{4}-\d{2}-\d{2}"))
            
        # Clean CreationTime (remove " UTC" suffix if present) and cast to timestamp
        # We use substring to take the first 19 characters (yyyy-MM-dd HH:mm:ss) which is safe if format is consistent
        audit_df = audit_df.withColumn("AuditTime", to_timestamp(substring(col("CreationTime"), 1, 19)))
            
        # Select available columns
        # We know UserId exists based on file inspection
        select_cols = ["ItemId", "SubmittedBy", "AuditTime", "UserId"]
            
        audit_df = audit_df.select(*[col(c) for c in select_cols])
            
        return audit_df
    except Exception as e:
        print(f"‚ö†Ô∏è Could not load Audit Logs: {str(e)}")
        return None

def load_and_process_failures(file_pattern="jobs_*.json"):
    """
    Loads JSONs matching the pattern, filters for failures.
    """
    full_path_pattern = os.path.join(BASE_PATH, file_pattern)
    matched_files = glob.glob(full_path_pattern)
    
    if not matched_files:
        # Fallback to old pattern if new generic files aren't found yet
        print(f"‚ö†Ô∏è No files found at {full_path_pattern}. Trying legacy patterns...")
        matched_files = glob.glob(os.path.join(BASE_PATH, "*.json"))
        if not matched_files:
            print(f"‚ö†Ô∏è No JSON files found at {BASE_PATH}")
            return None
        
    try:
        print(f"üìÇ Loading {len(matched_files)} files from {BASE_PATH}...")
        df = spark.read.option("multiLine", "true").schema(json_schema).json(matched_files)
        failed_df = df.filter(col("status") == "Failed")
        
        parsed_df = failed_df \
            .withColumn("error_code", col("failureReason.errorCode")) \
            .withColumn("error_message", col("failureReason.message")) \
            .withColumn("source_type", col("_item_type")) \
            .withColumn("start_time", to_timestamp(col("startTimeUtc"))) \
            .withColumn("end_time", to_timestamp(col("endTimeUtc"))) \
            .withColumn("itemId", col("itemId")) \
            .withColumn("invokeType", col("invokeType")) # Added Invoke Type
        
        parsed_df = parsed_df.withColumn("calculated_duration", 
            (unix_timestamp(col("end_time")) - unix_timestamp(col("start_time"))).cast("double")
        )
            
        return parsed_df
    except Exception as e:
        print(f"‚ö†Ô∏è Could not load data: {str(e)}")
        return None

# Load all failure data from JSON exports
print("Loading all failure data...")
# Updated default pattern to match the new generic output
failures_df = load_and_process_failures("jobs_*.json")

# Join with Audit Logs
final_df = None
if failures_df:
    audit_df = load_audit_logs()
    if audit_df:
        print("üîó Joining with Audit Logs to enrich User info...")
        
        audit_renamed = audit_df.withColumnRenamed("ItemId", "AuditItemId") \
                                .withColumnRenamed("AuditTime", "AuditTime")
        
        joined_df = failures_df.join(audit_renamed, 
            (failures_df.itemId == audit_renamed.AuditItemId) & 
            (abs_val(unix_timestamp(failures_df.start_time) - unix_timestamp(audit_renamed.AuditTime)) < 120),
            "left"
        )
        final_df = joined_df.dropDuplicates(["id"])
    else:
        print("‚ö†Ô∏è Audit logs not available. User info will be missing.")
        final_df = failures_df.withColumn("UserId", lit(None)).withColumn("SubmittedBy", lit(None))

    # Final Selection with Enhanced Columns
    final_df = final_df.select(
        col("_workspace_name").alias("Workspace"),
        col("_item_name").alias("Item Name"),
        col("_item_type").alias("Item Type"),
        col("invokeType").alias("Invoke Type"), # Added
        col("start_time").alias("Start Time"),
        col("end_time").alias("End Time"),
        coalesce(col("duration")/1000, col("calculated_duration")).alias("Duration (s)"),
        col("UserId").alias("User ID"), # Original User ID
        
        # Enhanced User Name Extraction
        # Tries to extract "Firstname Lastname" from "firstname.lastname@domain.com"
        coalesce(
            initcap(regexp_replace(element_at(split(col("UserId"), "@"), 1), "\\.", " ")),
            col("SubmittedBy"), 
            col("UserId")
        ).alias("User Name"),
        
        col("error_code").alias("Error Code"),
        col("error_message").alias("Error Message")
    )

    print(f"‚úÖ Successfully loaded {final_df.count()} failure records.")
else:
    print("‚ùå No failure data found.")

Loading all failure data...
üìÇ Loading 2 files from /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/fabric_item_details...
Loading Audit Logs...
Loading Audit Logs...


25/12/03 01:51:37 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/monitor_hub_analysis/raw_data/daily/fabric_activities_*.csv.
java.io.FileNotFoundException: File /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/monitor_hub_analysis/raw_data/daily/fabric_activities_*.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datas

üîó Joining with Audit Logs to enrich User info...
‚úÖ Successfully loaded 1203 failure records.
‚úÖ Successfully loaded 1203 failure records.


In [4]:
# 4. Analysis & Display
from pyspark.sql.functions import avg, max, min, substring, count, desc, col

if final_df:
    print("=== Data Coverage Diagnostics ===")
    # Check date ranges to explain missing User IDs
    stats = final_df.select(
        min("Start Time").alias("min_time"),
        max("Start Time").alias("max_time"),
        count("*").alias("total_count"),
        count("User ID").alias("user_count")
    ).collect()[0]
    
    print(f"Data Range: {stats['min_time']} to {stats['max_time']}")
    print(f"Records with User Info: {stats['user_count']} / {stats['total_count']}")
    
    if stats['user_count'] < stats['total_count']:
        # Check the latest date that HAS user info
        latest_user_date = final_df.filter(col("User ID").isNotNull()).agg(max("Start Time")).collect()[0][0]
        print(f"Latest Record with User Info: {latest_user_date}")
        if latest_user_date and stats['max_time'] > latest_user_date:
             print(f"‚ö†Ô∏è WARNING: No User Info available for failures after {latest_user_date}. Audit Logs may be missing for recent dates.")

    print("\n=== Recent Failures (Detailed) ===")
    # Added Invoke Type and truncated Error Message
    final_df.orderBy(col("Start Time").desc()).limit(60) \
        .select(
            "Workspace", "Item Name", "Item Type", "Invoke Type", 
            "Start Time", "End Time", "Duration (s)", "User ID", "User Name", "Error Code", 
            substring("Error Message", 1, 50).alias("Error Msg (Trunc)")
        ) \
        .show(truncate=False)
        
    # Show verified matches if we have partial data
    if stats['user_count'] > 0 and stats['user_count'] < stats['total_count']:
        print("\n=== Recent Failures (Verified with User Info) ===")
        final_df.filter(col("User ID").isNotNull()) \
            .orderBy(col("Start Time").desc()).limit(10) \
            .select(
                "Workspace", "Item Name", "Start Time", "User Name", "Error Code"
            ) \
            .show(truncate=False)
    
    print("\n=== Top Error Codes ===")
    error_counts = final_df.groupBy("Error Code").agg(count("*").alias("Count")).orderBy(desc("Count"))
    error_counts.show(truncate=False)

    print("\n=== Failures by Workspace ===")
    workspace_counts = final_df.groupBy("Workspace").agg(count("*").alias("Count")).orderBy(desc("Count"))
    workspace_counts.show(truncate=False)
    
    print("\n=== Failures by User ===")
    user_counts = final_df.groupBy("User Name").agg(count("*").alias("Count")).orderBy(desc("Count"))
    user_counts.show(truncate=False)

    print("\n=== Duration Statistics (Failed Runs) ===")
    final_df.select(
        avg("Duration (s)").alias("Avg Duration"),
        max("Duration (s)").alias("Max Duration"),
        min("Duration (s)").alias("Min Duration")
    ).show()
else:
    print("No data available for analysis.")

=== Data Coverage Diagnostics ===


                                                                                

Data Range: 2025-11-02 01:00:02.893333 to 2025-12-02 23:00:03.223119
Records with User Info: 932 / 1203


                                                                                

Latest Record with User Info: 2025-11-30 23:50:00.643333

=== Recent Failures (Detailed) ===


                                                                                

+----------------------------+-------------------------+------------+-----------+--------------------------+--------------------------+------------+-------+---------+-------------------+--------------------------------------------------+
|Workspace                   |Item Name                |Item Type   |Invoke Type|Start Time                |End Time                  |Duration (s)|User ID|User Name|Error Code         |Error Msg (Trunc)                                 |
+----------------------------+-------------------------+------------+-----------+--------------------------+--------------------------+------------+-------+---------+-------------------+--------------------------------------------------+
|RE Service - Data Operations|MIf_Analytics_Contracts  |DataPipeline|Scheduled  |2025-12-02 23:00:03.223119|2025-12-02 23:16:11.739386|968.0       |NULL   |NULL     |Failed             |Operation on target install_base failed: Failed to|
|rescm_dev_test              |JDE_BI_OUT        

In [7]:
# 5. show random rows up to 100 rows of the recent failures detailed
if final_df:
    print("=== Recent Failures (Detailed) ===")
    # Added Invoke Type and truncated Error Message
    final_df.orderBy(col("Start Time").desc()).limit(100)


=== Recent Failures (Detailed) ===


In [8]:
# 6. describe the recent failures detailed table 
final_df.describe().show()

25/12/03 01:54:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+--------------------+--------------------+------------+-----------+------------------+--------------------+---------------+-----------------+--------------------+
|summary|           Workspace|           Item Name|   Item Type|Invoke Type|      Duration (s)|             User ID|      User Name|       Error Code|       Error Message|
+-------+--------------------+--------------------+------------+-----------+------------------+--------------------+---------------+-----------------+--------------------+
|  count|                1203|                1203|        1203|       1203|              1203|                 932|            932|             1203|                1175|
|   mean|                NULL|                NULL|        NULL|       NULL|420.68661679135494|                NULL|           NULL|             NULL|                NULL|
| stddev|                NULL|                NULL|        NULL|       NULL| 3976.839741573941|                NULL|           NULL|        