In [1]:
import numpy as np
import pandas as pd

# Causal Path Significance Ranking Algorithm

Our goal is to build up an algorithm to rank causal paths by their "significance" using PNS (Probability of Necessity and Sufficiency) scores. PNS (Probability of Necessity and Sufficiency) is a causal importance metric – it “measures both the sufficiency and necessity of X to produce Y” for a given cause-effect pair.

We implement a Python algorithm to analyze the causal DAG and rank the most significant causal paths leading to the target incident. The algorithm takes as input the DAG structure, a dictionary of PNS scores for each node, and the specified target variable (incident). 

We assume we already have a PNS-like score for each variable indicating its importance in causing the incident.

The algorithm performs a Depth-First Search (DFS) backwards from the target node through the DAG (following parent links) to enumerate all causal paths ending in the target. For each path found (from some root cause node to the target), it computes a "path significance" score by summing the PNS scores of all nodes on that path, with an extra weight applied to the root cause’s score (by default, weight = 2, meaning the root cause’s contribution is doubled). The idea is to prioritize paths where the initial cause is especially critical. The paths are then ranked by this significance score in descending order. 

Below is the implementation:

In [2]:
def rank_causal_paths(dag, pns_scores, target, root_weight=2):
    """
    Rank causal paths in a DAG from any root node to the given target node by their significance.
    
    Parameters:
        - dag (dict): Adjacency list of the DAG, where dag[u] = [v1, v2, ...] 
                    means u -> v1, u -> v2 are directed edges.
                    (Every node in the DAG should appear as a key, even if it 
                    has an empty list of children.)
                    
        - pns_scores (dict): Mapping from node name to its PNS score.
        
        - target (hashable): The target node for which we trace back causal paths 
                            (e.g., the incident variable).
                            
        - root_weight (float): A weighting factor for the root cause's PNS score in a path
                            (default 2.0, meaning root cause counts double).
    
    Returns:
        List of tuples [(path, path_score), ...] sorted by path_score in descending order.
        Each path is represented as a list of nodes from the root to the target.
    """
    
    # Build a parent adjacency list from the child adjacency list (dag)
    parents = {node: [] for node in dag}
    for parent, children in dag.items():
        for child in children:
            parents.setdefault(child, []).append(parent)
    # Ensure every node appears in parents (even if no parents)
    for node in dag:
        parents.setdefault(node, [])
    
    # Use DFS to find all paths from any root to the target
    all_paths = []
    def dfs_back(node, current_path):
        current_path.insert(0, node)  # prepend current node to the path
        if len(parents[node]) == 0:  # no parent means this is a root cause
            all_paths.append(current_path.copy())
        else:
            for par in parents[node]:
                dfs_back(par, current_path.copy())
    
    dfs_back(target, [])
    
    # Compute significance score for each path
    path_scores = []
    for path in all_paths:
        # exclude the target node itself when summing scores (target's PNS is not relevant for causing itself)
        if len(path) <= 1:
            continue  # no causative nodes if path is just the target
        root = path[0]
        # Sum PNS scores for all non-target nodes in the path
        score_sum = 0.0
        for node in path[:-1]:  # all nodes except the target (last one)
            score_sum += pns_scores.get(node, 0.0)
        # Apply extra weight to the root cause's score
        if root in pns_scores:
            score_sum += (root_weight - 1) * pns_scores[root]
        path_scores.append((path, score_sum))
        
    # Sort paths by descending score
    path_scores.sort(key=lambda x: x[1], reverse=True)
    
    return path_scores

**Exemple usage** with Model 1, the small-scale outage incident:

In [3]:
example_dag = {
    "Deploy_NewVersion": ["MemoryLeakBug"],
    "MemoryLeakBug": ["MemoryUsageHigh"],
    "MemoryUsageHigh": ["ServiceCrash"],
    "ServiceCrash": ["OutageIncident"],
    "OutageIncident": [], # target has no children
    # HeavyTraffic latent included in DAG for completeness:
    "HeavyTraffic": ["MemoryUsageHigh", "ServiceCrash"]
}

example_pns = {
    "Deploy_NewVersion": 0.8,
    "MemoryLeakBug": 0.9,
    "MemoryUsageHigh": 0.5,
    "ServiceCrash": 0.6,
    "HeavyTraffic": 0.3
    # OutageIncident (target) can be omitted or assigned 0, it's not used in scoring
}

In [9]:
result_paths = rank_causal_paths(example_dag, example_pns, target="OutageIncident", root_weight=1)

In [10]:
for path, score in result_paths:
    print(f"Path: {' -> '.join(path)}")
    print(f"Path Significance Score: {score:.2f}")
    print("-----")

Path: Deploy_NewVersion -> MemoryLeakBug -> MemoryUsageHigh -> ServiceCrash -> OutageIncident
Path Significance Score: 2.80
-----
Path: HeavyTraffic -> MemoryUsageHigh -> ServiceCrash -> OutageIncident
Path Significance Score: 1.40
-----
Path: HeavyTraffic -> ServiceCrash -> OutageIncident
Path Significance Score: 0.90
-----


This indicates the top-ranked path is the one starting from Deploy_NewVersion (the deployment with bug) leading to the outage, with a higher significance score. The other paths involving the latent heavy traffic are ranked lower due to the lower PNS score for HeavyTraffic in this example. In a real analysis, one would interpret this output as identifying the deployment bug as the most significant causal chain for the incident, compared to other possible causes like a pure traffic overload path.