In [1]:
import numpy as np
from sklearn.datasets import make_classification


class TopDownRule:
    def __init__(self, feature_idx=None, threshold=None, class_label=None):
        self.feature_idx = feature_idx
        self.threshold = threshold
        self.class_label = class_label
    
    def covers(self, x):
        if self.feature_idx is None:
            return True
        return x[self.feature_idx] >= self.threshold

def learn_one_rule_top_down(X, y, class_label, min_coverage=5):
    best_rule = TopDownRule(class_label=class_label)
    best_accuracy = 0
    
    # 初始规则（空规则）覆盖所有样本
    covered = np.ones(len(y), dtype=bool)
    current_accuracy = np.mean(y[covered] == class_label)
    
    if current_accuracy > best_accuracy and sum(covered) >= min_coverage:
        best_accuracy = current_accuracy
        best_rule = TopDownRule(class_label=class_label)
    
    # 尝试添加条件
    for feature_idx in range(X.shape[1]):
        unique_values = np.unique(X[:, feature_idx])
        for threshold in unique_values:
            new_covered = covered & (X[:, feature_idx] >= threshold)
            if sum(new_covered) < min_coverage:
                continue
            accuracy = np.mean(y[new_covered] == class_label)
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_rule = TopDownRule(feature_idx, threshold, class_label)
                covered = new_covered
    
    return best_rule, covered

def top_down_sequential_covering(X, y, min_coverage=5):
    rules = []
    remaining_indices = np.ones(len(y), dtype=bool)
    
    for class_label in np.unique(y):
        while True:
            # 在当前未覆盖的样本上学习规则
            rule, covered = learn_one_rule_top_down(
                X[remaining_indices], y[remaining_indices], 
                class_label, min_coverage=min_coverage)
            
            if rule.feature_idx is None:  # 无法学习到有用的规则
                break
                
            rules.append(rule)
            remaining_indices[remaining_indices] = ~covered
            
            if sum(remaining_indices) < min_coverage:
                break
    
    return rules

# 测试示例
X, y = make_classification(n_samples=100, n_features=5, n_classes=2, random_state=42)
rules = top_down_sequential_covering(X, y)
print(f"Learned {len(rules)} rules:")
for i, rule in enumerate(rules):
    print(f"Rule {i+1}: IF feature_{rule.feature_idx} >= {rule.threshold:.2f} THEN class {rule.class_label}")

Learned 5 rules:
Rule 1: IF feature_1 >= -2.18 THEN class 0
Rule 2: IF feature_1 >= -1.76 THEN class 0
Rule 3: IF feature_1 >= 0.31 THEN class 0
Rule 4: IF feature_1 >= 0.38 THEN class 0
Rule 5: IF feature_1 >= -0.70 THEN class 0


In [1]:
import math
from copy import deepcopy

# 定义规则类
class Rule:
    def __init__(self, head):
        self.head = head  # 规则头部，如'grandfather(X,Y)'
        self.body = []    # 规则体，如['parent(X,Z)', 'parent(Z,Y)', 'male(X)']
    
    def add_literal(self, literal):
        self.body.append(literal)
    
    def __str__(self):
        body_str = ' ∧ '.join(self.body) if self.body else 'True'
        return f"{self.head} ← {body_str}"

# FOIL算法实现
class FOIL:
    def __init__(self, target, predicates, examples):
        self.target = target          # 目标谓词，如'grandfather(X,Y)'
        self.predicates = predicates  # 可用谓词列表
        self.examples = examples      # 训练样例 [(facts, label), ...]
    
    def learn_rules(self, max_rules=5, max_literals=5):
        rules = []
        pos_examples = [ex for ex in self.examples if ex[1]]  # 正例
        
        while len(rules) < max_rules and pos_examples:
            # 学习一条新规则
            new_rule = self.learn_one_rule(pos_examples, max_literals)
            if not new_rule.body:  # 无法学习更多规则
                break
            
            rules.append(new_rule)
            # 移除已覆盖的正例
            pos_examples = [ex for ex in pos_examples 
                          if not self.covers(ex[0], new_rule)]
        
        return rules
    
    def learn_one_rule(self, examples, max_literals):
        target_vars = self.extract_vars(self.target)
        current_rule = Rule(self.target)
        current_covered = [ex for ex in examples 
                          if self.covers(ex[0], current_rule)]
        
        for _ in range(max_literals):
            candidates = self.generate_candidates(current_rule)
            if not candidates:
                break
            
            # 选择最佳候选文字
            best_gain = -float('inf')
            best_literal = None
            best_covered = None
            
            for literal in candidates:
                temp_rule = deepcopy(current_rule)
                temp_rule.add_literal(literal)
                covered = [ex for ex in examples if self.covers(ex[0], temp_rule)]
                gain = self.foil_gain(current_covered, covered)
                
                if gain > best_gain:
                    best_gain = gain
                    best_literal = literal
                    best_covered = covered
            
            if best_literal:
                current_rule.add_literal(best_literal)
                current_covered = best_covered
            else:
                break
        
        return current_rule
    
    def generate_candidates(self, rule):
        """生成候选文字"""
        used_vars = set()
        for literal in [self.target] + rule.body:
            used_vars.update(self.extract_vars(literal))
        
        candidates = []
        for pred in self.predicates:
            if pred == self.target.split('(')[0]:
                continue
            
            # 确定谓词的元数
            arity = 2  # 简化处理，假设都是二元谓词
            # 生成变量组合
            for var_combo in self.generate_var_combos(arity, used_vars):
                candidates.append(f"{pred}({','.join(var_combo)})")
        
        return candidates
    
    def generate_var_combos(self, arity, used_vars):
        """生成变量组合，简化版本"""
        vars_list = sorted(used_vars)
        new_var = f"V{len(used_vars)+1}"  # 生成新变量
        
        # 只考虑两种组合：1) 重用现有变量 2) 引入一个新变量
        combos = []
        
        # 组合1：重用现有变量
        if vars_list:
            for v1 in vars_list:
                for v2 in vars_list:
                    combos.append([v1, v2])
        
        # 组合2：引入一个新变量
        if len(vars_list) >= 1:
            combos.append([vars_list[0], new_var])
        
        return combos
    
    def foil_gain(self, current_covered, new_covered):
        """计算FOIL信息增益"""
        p0 = sum(1 for ex in current_covered if ex[1])
        n0 = sum(1 for ex in current_covered if not ex[1])
        i0 = -math.log2(p0/(p0+n0)) if p0 else 0
        
        p1 = sum(1 for ex in new_covered if ex[1])
        n1 = sum(1 for ex in new_covered if not ex[1])
        if p1 + n1 == 0:
            return -float('inf')
        i1 = -math.log2(p1/(p1+n1)) if p1 else 0
        
        t = p1 - p0  # 新覆盖的正例数
        return t * (i1 - i0)
    
    def covers(self, facts, rule):
        """简化版的覆盖检查，实际应用需要一阶逻辑推理"""
        # 这里我们简化处理，只检查规则体中的所有文字是否都在facts中
        for literal in rule.body:
            pred = literal.split('(')[0]
            args = literal.split('(')[1].rstrip(')').split(',')
            
            # 查找匹配的事实
            found = False
            for fact in facts.get(pred, []):
                if len(fact) == len(args):
                    match = True
                    # 这里应该处理变量绑定，简化版本只检查存在性
                    if isinstance(fact, tuple):
                        fact = list(fact)
                    # 忽略具体变量匹配
                    found = True
                    break
            
            if not found:
                return False
        return True
    
    @staticmethod
    def extract_vars(literal):
        """从文字中提取变量"""
        if '(' not in literal:
            return set()
        var_str = literal.split('(')[1].rstrip(')')
        return set(var_str.split(','))

# 示例使用
if __name__ == "__main__":
    # 定义家族关系数据集
    background_knowledge = {
        'parent': [('alice', 'bob'), ('bob', 'charlie'), ('bob', 'dave'),
                   ('eve', 'frank'), ('frank', 'grace')],
        'male': ['bob', 'charlie', 'dave', 'frank'],
        'female': ['alice', 'eve', 'grace']
    }
    
    # 创建训练样例 (正例和反例)
    examples = []
    # 正例: grandfather(alice, charlie), grandfather(alice, dave)
    examples.append((background_knowledge, True))  # 简化处理
    
    # 反例: 非祖父关系
    negative_example = deepcopy(background_knowledge)
    examples.append((negative_example, False))  # 简化处理
    
    # FOIL参数
    target_predicate = 'grandfather(X,Y)'
    available_predicates = ['parent', 'male', 'female']
    
    # 运行FOIL
    foil = FOIL(target_predicate, available_predicates, examples)
    learned_rules = foil.learn_rules()
    
    # 输出学习到的规则
    print("Learned rules:")
    for i, rule in enumerate(learned_rules, 1):
        print(f"Rule {i}: {rule}")

Learned rules:
Rule 1: grandfather(X,Y) ← parent(X,X) ∧ parent(X,X) ∧ parent(X,X) ∧ parent(X,X) ∧ parent(X,X)
