In [9]:
import pandas as pd
import matplotlib.pyplot as plt

data = pd.read_excel("responses.xlsx")
data = data[~data['remove_pii_5'].isna()]
data.reset_index(drop=True, inplace=True)
print(f"Number of rows: {data.shape[0]}")

data = data.rename(columns={
                        'remove_pii_1': 'pii_node_1',
                        'remove_pii_2': 'pii_node_2',
                        'remove_pii_3': 'pii_node_3',
                        'remove_pii_4': 'pii_node_4',
                        'remove_pii_5': 'pii_node_5',
                        "Symptoms, Reason to call, Rec": "get_symptoms",
                        "Summarized": "summary"
                        })

error_columns = ['pii_node_1', 'pii_node_2', 'pii_node_3', 'pii_node_4','pii_node_5','get_symptoms', 'summary']

Number of rows: 70


In [3]:
data[error_columns] = data[error_columns].astype(int).astype(bool)

data[error_columns].head()

data['no_errors'] = data[error_columns].all(axis=1)

In [4]:
class Node:
    def __init__(self, name):
        self.name = name
        self.parents = []

    def add_parent(self, parent):
        self.parents.append(parent)
    
def create_graph():
    nodes = {
        "pii_node_1": Node("pii_node_1"),
        "pii_node_2": Node("pii_node_2"),
        "pii_node_3": Node("pii_node_3"),
        "pii_node_4": Node("pii_node_4"),
        "pii_node_5": Node("pii_node_5"),
        "get_symptoms": Node("get_symptoms"),
        "summary": Node("summary")
    }

    nodes['pii_node_2'].add_parent(nodes['pii_node_1'])
    nodes['pii_node_3'].add_parent(nodes['pii_node_2'])
    nodes['pii_node_4'].add_parent(nodes['pii_node_3'])
    nodes['pii_node_5'].add_parent(nodes['pii_node_4'])
    nodes['get_symptoms'].add_parent(nodes['pii_node_5'])
    nodes['summary'].add_parent(nodes['get_symptoms'])

    return nodes

In [5]:
graph = create_graph()

## Algorithm

In [23]:
# Rewritten code

from typing import Dict, List, Tuple

def calculate_probabilities(node: str, data: pd.DataFrame, dependencies: List[Node]) -> Tuple[float, Dict[str, float]]:
    """Calculate failure probabilities for the node and its upstream dependencies."""
    node_fails = data[node] == False
    p_node_fails = node_fails.mean()
    
    # Calculate independent failure probability
    if not dependencies:
        p_independent_fail = node_fails.mean()
    else:
        deps_pass = data[[dep.name for dep in dependencies]].all(axis=1)
        p_independent_fail = (node_fails & deps_pass).sum() / deps_pass.sum()

    # Calculate conditional failure probabilities for dependencies
    p_node_fails_given_dep_fails = {}
    for dep in dependencies:
        dep_fails = data[dep.name] == False
        p_node_fails_given_dep_fails[dep.name] = (node_fails & dep_fails).sum() / dep_fails.sum()

    return p_node_fails, p_independent_fail, p_node_fails_given_dep_fails

def find_root_cause(node: str, data: pd.DataFrame, graph: Dict[str, Node]) -> Tuple[List[str], float, Dict[str, float]]:
    """Recursively find the root cause of failures, tracing from downstream to upstream."""
    dependencies = graph[node].parents  # These are upstream nodes
    
    p_node_fails, p_independent_fail, p_node_fails_given_dep_fails = calculate_probabilities(node, data, dependencies)
    
    print(f"Analyzing node: {node}")
    print(f"Overall failure probability for this node: {p_node_fails:.4f}")
    print(f"Independent failure probability: {p_independent_fail:.4f}")
    print(f"Node failure because dep fails: {max([v for _, v in p_node_fails_given_dep_fails.items()])}")
    print(f"Conditional failure probabilities given upstream dependency failures:")
    for dep, prob in p_node_fails_given_dep_fails.items():
        print(f"  P({node} fails | {dep} fails): {prob:.4f}")
    print()
    
    # Check if independent failure is more likely than any upstream dependency failure
    if p_independent_fail > max(p_node_fails_given_dep_fails.values(), default=0) :
        return [node], p_independent_fail, p_node_fails_given_dep_fails

    if not dependencies:
        return [node], p_independent_fail, p_node_fails_given_dep_fails

    max_dep = max(p_node_fails_given_dep_fails, key=p_node_fails_given_dep_fails.get)
    upstream_path, upstream_independent_prob, upstream_final_probs = find_root_cause(max_dep, data, graph)
    
    return [node] + upstream_path, upstream_independent_prob, upstream_final_probs

def improve_system(downstream_node: str, data: pd.DataFrame, graph: Dict[str, Node]) -> Tuple[List[str], float, Dict[str, float]]:
    """Entry point for the root cause analysis, starting from the most downstream node."""
    path, independent_prob, final_probs = find_root_cause(downstream_node, data, graph)
    
    print("\nRoot cause analysis complete.")
    print(f"Debug path (from downstream to upstream): {' -> '.join(path)}")
    print(f"Most likely root cause (most upstream issue): {path[-1]}")
    print(f"Independent failure probability of root cause: {independent_prob:.4f}")
    print("Conditional failure probabilities given root cause's dependency failures:")
    for dep, prob in final_probs.items():
        print(f"  P({path[-1]} fails | {dep} fails): {prob:.4f}")
    
    if independent_prob > max(final_probs.values(), default=0):
        print(f"The most likely cause is an independent failure in node {path[-1]}")
    else:
        most_likely_dep = max(final_probs, key=final_probs.get)
        print(f"The most likely cause is a failure in dependency: {most_likely_dep}")
    
    return path, independent_prob, final_probs
print()
print(f"Focus on node: {improve_system('summary', data=data, graph=graph)}")


Analyzing node: summary
Overall failure probability for this node: 0.0714
Independent failure probability: 0.0000
Node failure because dep fails: 0.625
Conditional failure probabilities given upstream dependency failures:
  P(summary fails | get_symptoms fails): 0.6250

Analyzing node: get_symptoms
Overall failure probability for this node: 0.1143
Independent failure probability: 0.1014
Node failure because dep fails: 1.0
Conditional failure probabilities given upstream dependency failures:
  P(get_symptoms fails | pii_node_5 fails): 1.0000

Analyzing node: pii_node_5
Overall failure probability for this node: 0.0143
Independent failure probability: 0.0000
Node failure because dep fails: 1.0
Conditional failure probabilities given upstream dependency failures:
  P(pii_node_5 fails | pii_node_4 fails): 1.0000

Analyzing node: pii_node_4
Overall failure probability for this node: 0.0143
Independent failure probability: 0.0147
Node failure because dep fails: 0.0
Conditional failure proba

In [24]:
# USING BAYES THEOREM: which, for a node, will give us p(dependency being the cause of failure).

from typing import Dict, List, Tuple


def calculate_probabilities(node: str, data: pd.DataFrame, dependencies: List[Node]) -> Tuple[float, float, Dict[str, float]]:
    """Calculate failure probabilities for the node and its dependencies."""
    node_fails = data[node] == False
    p_node_fails = node_fails.mean()

    # Calculate independent failure probability
    if not dependencies:
        p_independent_fail = p_node_fails
    else:
        deps_pass = data[[dep.name for dep in dependencies]].all(axis=1)
        p_independent_fail = (node_fails & deps_pass).sum() / deps_pass.sum()

    p_dep_fails_given_node_fails = {}
    for dep in dependencies:
        dep_fails = data[dep.name] == False
        p_dep_fails = dep_fails.mean()
        p_node_fails_given_dep_fails = (node_fails & dep_fails).sum() / dep_fails.sum()
        
        # Bayes' theorem application
        p_dep_fails_given_node_fails[dep.name] = (
            p_node_fails_given_dep_fails * p_dep_fails / p_node_fails
        )

    return p_node_fails, p_independent_fail, p_dep_fails_given_node_fails

def find_root_cause(node: str, data: pd.DataFrame, graph: Dict[str, Node]) -> Tuple[List[str], float, Dict[str, float]]:
    """Recursively find the root cause of failures, tracing from downstream to upstream."""
    dependencies = graph[node].parents  # These are upstream nodes
    
    p_node_fails, p_independent_fail, p_dep_fails_given_node_fails = calculate_probabilities(node, data, dependencies)
    
    print(f"Analyzing node: {node}")
    print(f"Overall failure probability for this node: {p_node_fails:.4f}")
    print(f"Independent failure probability: {p_independent_fail:.4f}")
    print(f"Probabilities of upstream dependencies being the cause of failure:")
    for dep, prob in p_dep_fails_given_node_fails.items():
        print(f"  {dep}: {prob:.4f}")
    print()
    
    # Check if independent failure is more likely than any upstream dependency failure
    if p_independent_fail > max(p_dep_fails_given_node_fails.values(), default=0):
        return [node], p_independent_fail, p_dep_fails_given_node_fails

    if not dependencies:
        return [node], p_independent_fail, p_dep_fails_given_node_fails

    max_dep = max(p_dep_fails_given_node_fails, key=p_dep_fails_given_node_fails.get)
    upstream_path, upstream_independent_prob, upstream_final_probs = find_root_cause(max_dep, data, graph)
    
    return [node] + upstream_path, upstream_independent_prob, upstream_final_probs

def improve_system(downstream_node: str, data: pd.DataFrame, graph: Dict[str, Node]) -> Tuple[List[str], float, Dict[str, float]]:
    """Entry point for the Bayesian root cause analysis, starting from the most downstream node."""
    path, independent_prob, final_probs = find_root_cause(downstream_node, data, graph)
    
    print("\nRoot cause analysis complete.")
    print(f"Debug path (from downstream to upstream): {' -> '.join(path)}")
    print(f"Most likely root cause/most upstream issue: {path[-1]}")
    print(f"Independent failure probability of root cause: {independent_prob:.4f}")
    print("Probabilities of root cause's dependencies being the cause of failure:")
    for dep, prob in final_probs.items():
        print(f"  {dep}: {prob:.4f}")
    
    if independent_prob > max(final_probs.values(), default=0):
        print(f"The most likely cause is an independent failure in node {path[-1]}")
    else:
        most_likely_dep = max(final_probs, key=final_probs.get)
        print(f"The most likely cause is a failure in dependency: {most_likely_dep}")
    
    return path, independent_prob, final_probs

print(f"Focus on node: {improve_system('summary', data=data, graph=graph)}")

Analyzing node: summary
Overall failure probability for this node: 0.0714
Independent failure probability: 0.0000
Probabilities of upstream dependencies being the cause of failure:
  get_symptoms: 1.0000

Analyzing node: get_symptoms
Overall failure probability for this node: 0.1143
Independent failure probability: 0.1014
Probabilities of upstream dependencies being the cause of failure:
  pii_node_5: 0.1250

Analyzing node: pii_node_5
Overall failure probability for this node: 0.0143
Independent failure probability: 0.0000
Probabilities of upstream dependencies being the cause of failure:
  pii_node_4: 1.0000

Analyzing node: pii_node_4
Overall failure probability for this node: 0.0143
Independent failure probability: 0.0147
Probabilities of upstream dependencies being the cause of failure:
  pii_node_3: 0.0000


Root cause analysis complete.
Debug path (from downstream to upstream): summary -> get_symptoms -> pii_node_5 -> pii_node_4
Most likely root cause/most upstream issue: pii_no