In [2]:
# Step 1: LLM 生成因果知识
def tree_query(x1, x2, method='logit',threshold=0.5):
    """
    基于树状逻辑的因果方向查询器。
    沿着手绘图逻辑依次判断，每步返回概率（非二值化）。
    最终输出边方向和置信度。

    输出:
        {
            'relation': 'x->y' | 'y->x' | 'x<->y' | 'independent',
            'confidence': float,
            'log': [(step, prob)]
        }
    """

    log = []

    # Step 1: 是否存在 backdoor path?
    p_backdoor = check_backdoor(x1, x2, method)
    log.append(("backdoor_path", p_backdoor))

    if p_backdoor > threshold:
        # Step 2: 阻断路径后是否独立？
        p_independent = check_independence_after_block(x1, x2, method)
        log.append(("independent_after_block", p_independent))
        if p_independent > threshold:
            return {"relation": "independent", "confidence": p_independent, "log": log}

        # Step 3: 是否存在潜在混杂因子？
        p_latent = check_latent_confounder_after_block(x1, x2, method)
        log.append(("latent_confounder", p_latent))
        if p_latent > threshold:
            return {"relation": "x<->y", "confidence": p_latent, "log": log}

        # Step 4: 判断方向 (x→y?)
        p_x_to_y = check_causal_direction_after_block(x1, x2, method)
        log.append(("x->y", p_x_to_y))

        if p_x_to_y > threshold:
            return {"relation": "x->y", "confidence": p_x_to_y, "log": log}
        else:
            return {"relation": "y->x", "confidence": 1 - p_x_to_y, "log": log}

    else:
        # 没有 backdoor path
        p_independent = check_independence(x1, x2, method)
        log.append(("independent_no_backdoor", p_independent))
        if p_independent > threshold:
            return {"relation": "independent", "confidence": p_independent, "log": log}

        # 检查潜在混杂因子
        p_latent = check_latent_confounder(x1, x2, method)
        log.append(("latent_confounder_no_backdoor", p_latent))
        if p_latent > threshold:
            return {"relation": "x<->y", "confidence": p_latent, "log": log}

        # 判断方向
        p_x_to_y = check_causal_direction(x1, x2, method)
        log.append(("x->y_no_backdoor", p_x_to_y))
        if p_x_to_y > threshold:
            return {"relation": "x->y", "confidence": p_x_to_y, "log": log}
        else:
            return {"relation": "y->x", "confidence": 1 - p_x_to_y, "log": log}



In [1]:
def llm_judge(prompt, x1, x2, method='probability', n_sample=10):
    """
    让 LLM 判断“是否成立”，并返回满足 Kolmogorov 公理的概率
    """
    if method == 'frequency':
        votes = []
        for _ in range(n_sample):
            ans = fake_llm_response(prompt, 'frequency')
            votes.append(1 if ans.strip().lower() == 'yes' else 0)
        p_yes = sum(votes) / len(votes)
        p_no = 1 - p_yes
        return {'yes': p_yes, 'no': p_no}

    elif method == 'probability':
        output = fake_llm_response(prompt, 'probability')
        p_yes, p_no = parse_probabilities(output)
        return {'yes': p_yes, 'no': p_no}

    elif method == 'logit':
        output = fake_llm_response(prompt, 'logit')
        logit_yes, logit_no = parse_logits(output)
        p_yes = sigmoid(logit_yes)
        p_no = sigmoid(logit_no)
        return {'yes': p_yes, 'no': p_no}
    
def check_backdoor(x1, x2, method='logit'):
    prompt = f"请判断 {x1} 与 {x2} 之间是否存在 back-door path。" \
             f"请直接输出概率或 logit，并确保满足 Kolmogorov 公理（或 logit1 + logit2 = 0）。"
    return llm_judge(prompt, x1, x2, method)

def check_independence_after_block(x1, x2, method='logit'):
    prompt = f"阻断 back-door path 后，{x1} 与 {x2} 是否独立？" \
             f"请输出概率对或 logit，并确保满足 Kolmogorov 公理（或 logit1 + logit2 = 0）。"
    return llm_judge(prompt, x1, x2, method)

def check_latent_confounder_after_block(x1, x2, method='logit'):
    prompt = f"阻断 back-door path 后，{x1} 与 {x2} 是否存在潜在混杂因子？" \
             f"请输出概率对或 logit，并确保满足 Kolmogorov 公理（或 logit1 + logit2 = 0）。"
    return llm_judge(prompt, x1, x2, method)

def check_causal_direction_after_block(x1, x2, method='logit'):
    prompt = f"阻断 back-door path 后，请判断 {x1} 是否会导致 {x2}。" \
             f"请输出概率对或 logit，并确保满足 Kolmogorov 公理（或 logit1 + logit2 = 0）。"
    return llm_judge(prompt, x1, x2, method)

def check_independence(x1, x2, method='logit'):
    prompt = f"请判断 {x1} 与 {x2} 是否独立。" \
             f"请输出概率或 logit，并确保满足 Kolmogorov 公理（或 logit1 + logit2 = 0）。"
    return llm_judge(prompt, x1, x2, method)

def check_latent_confounder(x1, x2, method='logit'):
    prompt = f"请判断 {x1} 与 {x2} 之间是否存在潜在混杂因子。" \
             f"请输出概率或 logit，并确保满足 Kolmogorov 公理（或 logit1 + logit2 = 0）。"
    return llm_judge(prompt, x1, x2, method)

def check_causal_direction(x1, x2, method='logit'):
    prompt = f"请判断 {x1} 是否会导致 {x2}。" \
             f"请输出概率或 logit，并确保满足 Kolmogorov 公理（或 logit1 + logit2 = 0）。"
    return llm_judge(prompt, x1, x2, method)



In [None]:
# Step 2: 多校准 (multi-Calibration)
def multi_calibration(causal_knowledge):
    """
    使用校准函数调整推导的因果概率，确保与真实的因果关系匹配
    校准过程：
    - 使用嵌入的语义向量对因果知识进行聚类，或者按节点相关边划分类。
    - 然后，进行alpha校准，调整因果概率以确保模型输出更准确。
    
    输入：
    - causal_knowledge: LLM生成的因果知识字典，每对变量的因果关系概率，
      格式：
      {
          ('X1', 'X2'): {
              'independent': p_independent,
              'latent': p_latent,
              'causal': p_causal
          },
          ...
      }
    
    输出：
    - calibrated_probabilities: 校准后的因果概率字典，格式和输入一样
    """
    
    # 确定合适的划分准则，例如基于嵌入的语义向量进行聚类，或按节点相关边进行分类
    clusters = clustering(causal_knowledge)
    
    # 校准函数 - alpha校准
    # 需要对每对变量的概率（'independent', 'latent', 'causal'）进行alpha校准
    calibrated_probabilities = {}
    for pair, probabilities in causal_knowledge.items():
        p_independent = probabilities['independent']
        p_latent = probabilities['latent']
        p_causal = probabilities['causal']
        
        # 校准因果概率（模拟alpha校准的效果）
        calibrated_p_independent = alpha_calibrate(p_independent, clusters)
        calibrated_p_latent = alpha_calibrate(p_latent, clusters)
        calibrated_p_causal = alpha_calibrate(p_causal, clusters)
        
        # 存储校准后的概率
        calibrated_probabilities[pair] = {
            'independent': calibrated_p_independent,
            'latent': calibrated_p_latent,
            'causal': calibrated_p_causal
        }
    
    return calibrated_probabilities

# 校准函数，模拟alpha校准效果（可根据实际需求调整）
def alpha_calibrate(probability, clusters):
    """
    模拟alpha校准：通过校准函数对因果概率进行调整，具体的校准方法可以根据需要实现。
    
    输入：
    - probability: 需要校准的因果概率
    - clusters: 聚类后的信息，用于调整概率
    
    输出：
    - calibrated_probability: 校准后的因果概率
    """
    # 这里使用简单的比例调整作为示例，可以根据具体的alpha校准算法进行调整
    # 假设根据聚类结果调整因果概率
    adjustment_factor = 1.0  # 可以根据聚类的情况进行调整
    calibrated_probability = probability * adjustment_factor
    
    # 确保概率在[0, 1]范围内
    calibrated_probability = max(0.0, min(calibrated_probability, 1.0))
    
    return calibrated_probability

# 示例聚类函数（此处为简单的示例，实际聚类方法可能涉及更多细节）
def clustering(causal_knowledge):
    """
    聚类函数：根据因果关系知识对变量进行聚类
    这里仅是示例，实际聚类可以通过语义向量、相关性或其他方式进行。
    """
    clusters = {}  # 模拟聚类结果，实际应使用有效的聚类方法
    for pair in causal_knowledge:
        clusters[pair] = "cluster_1"  # 假设所有变量对属于同一簇
    return clusters


In [None]:
# Step 3: 形成先验因果图
def create_prior_causal_graph(calibrated_probabilities):
    """
    根据校准后的因果概率生成先验因果图
    - 使用三个概率判断每对变量之间的因果关系（独立性、潜在混杂变量、因果方向）。
    - 根据综合判断结果生成因果图，可能会有双向箭头（<->）表示不确定或相互作用。
    
    输入：
    - calibrated_probabilities: 每对变量的校准后概率字典，
        格式：{
            ('X1', 'X2'): {
                'independent': p_independent,
                'latent': p_latent,
                'causal': p_causal
            },
            ...
        }
    输出：
    - prior_graph: 先验因果图，格式为字典，键是变量对，值是因果关系（X->Y, Y->X, <->, independent）
    """
    prior_graph = {}
    
    # 遍历每对变量，判断因果关系
    for pair, probabilities in calibrated_probabilities.items():
        p_independent = probabilities['independent']  # 独立性概率
        p_latent = probabilities['latent']  # 潜在混杂变量概率
        p_causal = probabilities['causal']  # 因果方向概率
        
        # 判断因果关系
        if p_independent > 0.5:
            # 如果独立性概率大于0.5，认为X1和X2是独立的
            prior_graph[pair] = "independent"
        elif p_latent > 0.5:
            # 如果潜在混杂变量概率大于0.5，认为存在潜在混杂变量
            prior_graph[pair] = "<->"  # 双向箭头表示不确定或相互作用
        else:
            # 根据因果关系概率判断因果方向
            if p_causal > 0.5:
                # 如果因果概率大于0.5，认为X1导致X2
                prior_graph[pair] = f"{pair[0]}->{pair[1]}"
            elif p_causal < 0.5:
                # 如果因果概率小于0.5，认为X2导致X1
                prior_graph[pair] = f"{pair[1]}->{pair[0]}"
            else:
                # 如果因果概率接近0.5，认为两者之间相互作用
                prior_graph[pair] = "<->"
    
    return prior_graph


In [None]:
# Step 4: 使用BCCD结合数据生成后验因果图
def generate_post_causal_graph(prior_graph, data):
    """
    使用BCCD（贝叶斯因果链发现）结合数据生成后验因果图
    - 结合先验因果图和实际数据，推导出后验因果关系。
    - 应用约束（如无环性约束），确保生成的因果图合理。
    
    输入：先验因果图、数据
    输出：后验因果图（推导出的因果关系图，满足约束条件）
    """
    # 将先验因果图应用于数据，使用BCCD进一步推导因果关系
    post_causal_graph = bccd_inference(data, prior_graph)
    
    # 对后验图进行约束，确保没有环路等不合理的因果关系
    post_causal_graph = apply_constraints(post_causal_graph)
    
    return post_causal_graph
