# Knowledge Graph Rule Discovery

This notebook discovers logical rules in a knowledge graph and validates them with:
- **Confidence**: Fraction of times the rule holds
- **Support**: Number of instances where the rule applies
- **Concrete examples**: Actual instances from the dataset

In [2]:
import re
from collections import defaultdict
from itertools import combinations
import pandas as pd
from typing import List, Tuple, Dict, Set

print("Libraries imported successfully!")

Libraries imported successfully!


## 1. Load and Parse Knowledge Graph

In [8]:
class KnowledgeGraph:
    def __init__(self, filename):
        self.triples = []  # List of (subject, relation, object)
        self.relations = defaultdict(set)  # relation -> set of (subject, object)
        self.entities = set()
        self.load_graph(filename)
    
    def load_graph(self, filename):
        """Load knowledge graph from text file"""
        with open(filename, 'r') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                
                # Parse: subject relation object
                parts = line.split()
                if len(parts) >= 3:
                    subject = parts[0]
                    relation = parts[1]
                    obj = parts[2]
                    
                    self.triples.append((subject, relation, obj))
                    self.relations[relation].add((subject, obj))
                    self.entities.add(subject)
                    self.entities.add(obj)
        
        print(f"Loaded {len(self.triples)} triples")
        print(f"Found {len(self.relations)} unique relations")
        print(f"Found {len(self.entities)} unique entities")
        print(f"\nRelations: {list(self.relations.keys())}")
    
    def get_relation_pairs(self, relation):
        """Get all (subject, object) pairs for a relation"""
        return self.relations[relation]
    
    def get_outgoing(self, entity, relation):
        """Get all entities connected from entity via relation"""
        return {obj for subj, obj in self.relations[relation] if subj == entity}
    
    def get_incoming(self, entity, relation):
        """Get all entities connected to entity via relation"""
        return {subj for subj, obj in self.relations[relation] if obj == entity}

# Load your knowledge graph
# Replace 'metafam.txt' with your actual file path
kg = KnowledgeGraph('train.txt')

Loaded 13821 triples
Found 28 unique relations
Found 1316 unique entities

Relations: ['sisterOf', 'secondAuntOf', 'girlCousinOf', 'daughterOf', 'granddaughterOf', 'nieceOf', 'motherOf', 'greatAuntOf', 'grandmotherOf', 'auntOf', 'uncleOf', 'greatUncleOf', 'brotherOf', 'sonOf', 'fatherOf', 'grandfatherOf', 'secondUncleOf', 'boyCousinOf', 'grandsonOf', 'nephewOf', 'girlSecondCousinOf', 'girlFirstCousinOnceRemovedOf', 'greatGranddaughterOf', 'greatGrandmotherOf', 'greatGrandfatherOf', 'boySecondCousinOf', 'boyFirstCousinOnceRemovedOf', 'greatGrandsonOf']


## 2. Rule Discovery Functions

In [9]:
class Rule:
    def __init__(self, name, description, body_relations, head_relation):
        self.name = name
        self.description = description
        self.body_relations = body_relations  # List of relations in the body
        self.head_relation = head_relation  # Conclusion relation
        self.support = 0
        self.confidence = 0.0
        self.examples = []
        self.counter_examples = []

def discover_inverse_rules(kg: KnowledgeGraph) -> List[Rule]:
    """Discover inverse rules: relation1(X,Y) → relation2(Y,X)"""
    rules = []
    
    for rel1, rel2 in combinations(kg.relations.keys(), 2):
        # Check if rel1(X,Y) implies rel2(Y,X)
        matches = 0
        total = 0
        examples = []
        counter_examples = []
        
        for x, y in kg.relations[rel1]:
            total += 1
            if (y, x) in kg.relations[rel2]:
                matches += 1
                if len(examples) < 5:
                    examples.append((x, y))
            else:
                if len(counter_examples) < 3:
                    counter_examples.append((x, y))
        
        if total > 0:
            confidence = matches / total
            if confidence >= 0.7 and matches >= 3:  # Threshold for interesting rules
                rule = Rule(
                    f"Inverse: {rel1} ↔ {rel2}",
                    f"{rel1}(X,Y) → {rel2}(Y,X)",
                    [rel1],
                    rel2
                )
                rule.support = matches
                rule.confidence = confidence
                rule.examples = examples
                rule.counter_examples = counter_examples
                rules.append(rule)
    
    return rules

def discover_horn_clauses_2(kg: KnowledgeGraph) -> List[Rule]:
    """Discover 2-relation Horn clauses: rel1(X,Y) ∧ rel2(Y,Z) → rel3(X,Z)"""
    rules = []
    
    # Try all combinations of 3 relations
    for rel1, rel2, rel3 in combinations(kg.relations.keys(), 3):
        # Try different orderings as body and head
        for body_rels, head_rel in [([rel1, rel2], rel3), ([rel1, rel3], rel2), ([rel2, rel3], rel1)]:
            matches = 0
            total = 0
            examples = []
            counter_examples = []
            
            # Check all instances of body_rels[0]
            for x, y in kg.relations[body_rels[0]]:
                # Find all z where body_rels[1](y, z) holds
                z_candidates = kg.get_outgoing(y, body_rels[1])
                
                for z in z_candidates:
                    total += 1
                    # Check if head_rel(x, z) holds
                    if (x, z) in kg.relations[head_rel]:
                        matches += 1
                        if len(examples) < 5:
                            examples.append((x, y, z))
                    else:
                        if len(counter_examples) < 3:
                            counter_examples.append((x, y, z))
            
            if total > 0:
                confidence = matches / total
                if confidence >= 0.7 and matches >= 3:
                    rule = Rule(
                        f"Horn-2: {body_rels[0]} ∧ {body_rels[1]} → {head_rel}",
                        f"{body_rels[0]}(X,Y) ∧ {body_rels[1]}(Y,Z) → {head_rel}(X,Z)",
                        body_rels,
                        head_rel
                    )
                    rule.support = matches
                    rule.confidence = confidence
                    rule.examples = examples
                    rule.counter_examples = counter_examples
                    rules.append(rule)
    
    return rules

def discover_horn_clauses_3(kg: KnowledgeGraph) -> List[Rule]:
    """Discover 3-relation Horn clauses: rel1(X,Y) ∧ rel2(Y,Z) ∧ rel3(Z,W) → rel4(X,W)"""
    rules = []
    
    # Try combinations of 4 relations (computationally expensive, so we limit)
    relation_list = list(kg.relations.keys())
    
    # Sample if too many relations
    if len(relation_list) > 10:
        import random
        relation_combos = [tuple(random.sample(relation_list, 4)) for _ in range(50)]
    else:
        relation_combos = list(combinations(relation_list, 4))
    
    for rels in relation_combos[:30]:  # Limit to avoid excessive computation
        rel1, rel2, rel3, rel4 = rels
        
        matches = 0
        total = 0
        examples = []
        counter_examples = []
        
        # Check: rel1(X,Y) ∧ rel2(Y,Z) ∧ rel3(Z,W) → rel4(X,W)
        for x, y in kg.relations[rel1]:
            z_candidates = kg.get_outgoing(y, rel2)
            for z in z_candidates:
                w_candidates = kg.get_outgoing(z, rel3)
                for w in w_candidates:
                    total += 1
                    if (x, w) in kg.relations[rel4]:
                        matches += 1
                        if len(examples) < 5:
                            examples.append((x, y, z, w))
                    else:
                        if len(counter_examples) < 3:
                            counter_examples.append((x, y, z, w))
        
        if total > 0:
            confidence = matches / total
            if confidence >= 0.7 and matches >= 2:
                rule = Rule(
                    f"Horn-3: {rel1} ∧ {rel2} ∧ {rel3} → {rel4}",
                    f"{rel1}(X,Y) ∧ {rel2}(Y,Z) ∧ {rel3}(Z,W) → {rel4}(X,W)",
                    [rel1, rel2, rel3],
                    rel4
                )
                rule.support = matches
                rule.confidence = confidence
                rule.examples = examples
                rule.counter_examples = counter_examples
                rules.append(rule)
    
    return rules

def discover_symmetric_rules(kg: KnowledgeGraph) -> List[Rule]:
    """Discover symmetric rules: relation(X,Y) → relation(Y,X)"""
    rules = []
    
    for rel in kg.relations.keys():
        matches = 0
        total = 0
        examples = []
        counter_examples = []
        
        for x, y in kg.relations[rel]:
            total += 1
            if (y, x) in kg.relations[rel]:
                matches += 1
                if len(examples) < 5:
                    examples.append((x, y))
            else:
                if len(counter_examples) < 3:
                    counter_examples.append((x, y))
        
        if total > 0:
            confidence = matches / total
            if confidence >= 0.7 and matches >= 3:
                rule = Rule(
                    f"Symmetric: {rel}",
                    f"{rel}(X,Y) → {rel}(Y,X)",
                    [rel],
                    rel
                )
                rule.support = matches
                rule.confidence = confidence
                rule.examples = examples
                rule.counter_examples = counter_examples
                rules.append(rule)
    
    return rules

print("Rule discovery functions defined!")

Rule discovery functions defined!


## 3. Discover All Rules

In [10]:
print("Discovering inverse rules...")
inverse_rules = discover_inverse_rules(kg)
print(f"Found {len(inverse_rules)} inverse rules\n")

print("Discovering symmetric rules...")
symmetric_rules = discover_symmetric_rules(kg)
print(f"Found {len(symmetric_rules)} symmetric rules\n")

print("Discovering 2-relation Horn clauses...")
horn2_rules = discover_horn_clauses_2(kg)
print(f"Found {len(horn2_rules)} 2-relation Horn clause rules\n")

print("Discovering 3-relation Horn clauses (this may take a while)...")
horn3_rules = discover_horn_clauses_3(kg)
print(f"Found {len(horn3_rules)} 3-relation Horn clause rules\n")

# Combine all rules
all_rules = inverse_rules + symmetric_rules + horn2_rules + horn3_rules
print(f"\n{'='*80}")
print(f"TOTAL: Discovered {len(all_rules)} rules")
print(f"{'='*80}")

Discovering inverse rules...
Found 0 inverse rules

Discovering symmetric rules...
Found 0 symmetric rules

Discovering 2-relation Horn clauses...
Found 95 2-relation Horn clause rules

Discovering 3-relation Horn clauses (this may take a while)...
Found 0 3-relation Horn clause rules


TOTAL: Discovered 95 rules


## 4. Sort and Display Top Rules

In [11]:
# Sort by confidence, then by support
all_rules.sort(key=lambda r: (r.confidence, r.support), reverse=True)

# Display top rules
print("\n" + "="*80)
print("TOP DISCOVERED RULES (sorted by confidence and support)")
print("="*80 + "\n")

for i, rule in enumerate(all_rules[:15], 1):  # Show top 15
    print(f"\n{'─'*80}")
    print(f"Rule #{i}: {rule.name}")
    print(f"{'─'*80}")
    print(f"Description: {rule.description}")
    print(f"Confidence:  {rule.confidence:.2%} ({rule.support} matches)")
    print(f"Support:     {rule.support} instances\n")
    
    print("Examples:")
    for j, example in enumerate(rule.examples, 1):
        if len(example) == 2:
            print(f"  {j}. {example[0]} → {example[1]}")
        elif len(example) == 3:
            print(f"  {j}. {example[0]} → {example[1]} → {example[2]}")
        elif len(example) == 4:
            print(f"  {j}. {example[0]} → {example[1]} → {example[2]} → {example[3]}")
    
    if rule.counter_examples and rule.confidence < 1.0:
        print(f"\nCounter-examples (cases where rule doesn't hold):")
        for j, ce in enumerate(rule.counter_examples[:2], 1):
            if len(ce) == 2:
                print(f"  {j}. {ce[0]} → {ce[1]}")
            elif len(ce) == 3:
                print(f"  {j}. {ce[0]} → {ce[1]} → {ce[2]}")
            elif len(ce) == 4:
                print(f"  {j}. {ce[0]} → {ce[1]} → {ce[2]} → {ce[3]}")


TOP DISCOVERED RULES (sorted by confidence and support)


────────────────────────────────────────────────────────────────────────────────
Rule #1: Horn-2: sisterOf ∧ grandsonOf → granddaughterOf
────────────────────────────────────────────────────────────────────────────────
Description: sisterOf(X,Y) ∧ grandsonOf(Y,Z) → granddaughterOf(X,Z)
Confidence:  100.00% (722 matches)
Support:     722 instances

Examples:
  1. isabella807 → oskar788 → magdalena801
  2. isabella807 → oskar788 → elias798
  3. isabella807 → oskar788 → sebastian802
  4. isabella807 → oskar788 → clara797
  5. laura399 → matthias398 → lena395

────────────────────────────────────────────────────────────────────────────────
Rule #2: Horn-2: grandmotherOf ∧ auntOf → greatGrandmotherOf
────────────────────────────────────────────────────────────────────────────────
Description: grandmotherOf(X,Y) ∧ auntOf(Y,Z) → greatGrandmotherOf(X,Z)
Confidence:  100.00% (562 matches)
Support:     562 instances

Examples:
  1. alina

## 5. Create Summary DataFrame

In [None]:
# Create summary dataframe
summary_data = []
for rule in all_rules:
    summary_data.append({
        'Rule': rule.name,
        'Description': rule.description,
        'Confidence': f"{rule.confidence:.2%}",
        'Support': rule.support,
        'Example': str(rule.examples[0]) if rule.examples else "N/A"
    })

df_summary = pd.DataFrame(summary_data)
print("\n" + "="*80)
print("SUMMARY TABLE")
print("="*80)
display(df_summary.head(20))

## 6. Export Results

In [None]:
# Save to CSV
df_summary.to_csv('discovered_rules.csv', index=False)
print("Rules saved to 'discovered_rules.csv'")

# Save detailed report to text file
with open('rules_detailed_report.txt', 'w') as f:
    f.write("KNOWLEDGE GRAPH RULE DISCOVERY - DETAILED REPORT\n")
    f.write("="*80 + "\n\n")
    
    for i, rule in enumerate(all_rules, 1):
        f.write(f"\nRule #{i}: {rule.name}\n")
        f.write("─"*80 + "\n")
        f.write(f"Description: {rule.description}\n")
        f.write(f"Confidence:  {rule.confidence:.2%}\n")
        f.write(f"Support:     {rule.support} instances\n\n")
        
        f.write("Examples:\n")
        for j, example in enumerate(rule.examples, 1):
            f.write(f"  {j}. {example}\n")
        
        if rule.counter_examples:
            f.write(f"\nCounter-examples:\n")
            for j, ce in enumerate(rule.counter_examples, 1):
                f.write(f"  {j}. {ce}\n")
        
        f.write("\n")

print("Detailed report saved to 'rules_detailed_report.txt'")

## 7. Visualize Rule Statistics

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Plot 1: Confidence distribution
confidences = [r.confidence for r in all_rules]
axes[0, 0].hist(confidences, bins=20, edgecolor='black', alpha=0.7)
axes[0, 0].set_xlabel('Confidence')
axes[0, 0].set_ylabel('Number of Rules')
axes[0, 0].set_title('Distribution of Rule Confidence')
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: Support distribution
supports = [r.support for r in all_rules]
axes[0, 1].hist(supports, bins=20, edgecolor='black', alpha=0.7, color='green')
axes[0, 1].set_xlabel('Support (# instances)')
axes[0, 1].set_ylabel('Number of Rules')
axes[0, 1].set_title('Distribution of Rule Support')
axes[0, 1].grid(True, alpha=0.3)

# Plot 3: Confidence vs Support scatter
axes[1, 0].scatter(supports, confidences, alpha=0.6, s=50)
axes[1, 0].set_xlabel('Support (# instances)')
axes[1, 0].set_ylabel('Confidence')
axes[1, 0].set_title('Confidence vs Support')
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Rule type distribution
rule_types = {'Inverse': 0, 'Symmetric': 0, 'Horn-2': 0, 'Horn-3': 0}
for rule in all_rules:
    if 'Inverse' in rule.name:
        rule_types['Inverse'] += 1
    elif 'Symmetric' in rule.name:
        rule_types['Symmetric'] += 1
    elif 'Horn-3' in rule.name:
        rule_types['Horn-3'] += 1
    elif 'Horn-2' in rule.name:
        rule_types['Horn-2'] += 1

axes[1, 1].bar(rule_types.keys(), rule_types.values(), edgecolor='black', alpha=0.7)
axes[1, 1].set_xlabel('Rule Type')
axes[1, 1].set_ylabel('Number of Rules')
axes[1, 1].set_title('Distribution of Rule Types')
axes[1, 1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('rule_statistics.png', dpi=300, bbox_inches='tight')
plt.show()

print("\nVisualization saved to 'rule_statistics.png'")

## 8. Rule Quality Metrics

In [None]:
print("\n" + "="*80)
print("RULE QUALITY METRICS")
print("="*80 + "\n")

if all_rules:
    confidences = [r.confidence for r in all_rules]
    supports = [r.support for r in all_rules]
    
    print(f"Total rules discovered: {len(all_rules)}")
    print(f"\nConfidence Statistics:")
    print(f"  Mean:   {np.mean(confidences):.2%}")
    print(f"  Median: {np.median(confidences):.2%}")
    print(f"  Min:    {np.min(confidences):.2%}")
    print(f"  Max:    {np.max(confidences):.2%}")
    
    print(f"\nSupport Statistics:")
    print(f"  Mean:   {np.mean(supports):.1f}")
    print(f"  Median: {np.median(supports):.1f}")
    print(f"  Min:    {np.min(supports)}")
    print(f"  Max:    {np.max(supports)}")
    
    print(f"\nRule Type Breakdown:")
    for rule_type, count in rule_types.items():
        print(f"  {rule_type}: {count}")
    
    # High-quality rules (confidence >= 90% and support >= 5)
    high_quality = [r for r in all_rules if r.confidence >= 0.9 and r.support >= 5]
    print(f"\nHigh-quality rules (confidence ≥ 90%, support ≥ 5): {len(high_quality)}")
else:
    print("No rules discovered. Try adjusting the confidence/support thresholds.")