In [1]:
import os 
from qdls.data import load_json, save_json 

# analyse the dependent relations 

FinalAll() 后续会是哪个函数名

In [2]:
train = load_json("/home/qing/raid/paperwork/kgtool/data/kqa/full/train.json")
val = load_json("/home/qing/raid/paperwork/kgtool/data/kqa/full/val.json")

/home/qing/raid/paperwork/kgtool/data/kqa/full/train.json loaded with 94376 samples!
/home/qing/raid/paperwork/kgtool/data/kqa/full/val.json loaded with 11797 samples!


In [3]:
from collections import Counter
from typing import List, Dict, Tuple, Set

# ... existing imports ...

def analyze_function_patterns(train_data: List[Dict]) -> Dict:
    """
    Analyze function call patterns in training data.
    
    Args:
        train_data: List of training examples containing programs
        
    Returns:
        Dictionary with analysis results containing:
        - function_freq: Frequency distribution of individual functions
        - seq_freq: Frequency of unique function sequences
        - common_bigrams: Most common function pairs
        - seq_length_dist: Sequence length distribution
        
    Example:
        >>> train_data = load_json("train.json")
        >>> analysis = analyze_function_patterns(train_data)
    """
    function_freq = Counter()
    seq_counter = Counter()
    seq_lengths = Counter()
    bigram_counter = Counter()

    for s in train_data:
        program = s["program"]
        functions = [f['function'] for f in program]
        
        # Update frequency counts
        function_freq.update(functions)
        seq_counter[tuple(functions)] += 1  # Store sequence as tuple for hashability
        seq_lengths[len(functions)] += 1
        
        # Track bigrams for dependency analysis
        bigram_counter.update(zip(functions, functions[1:]))

    return {
        'function_freq': function_freq.most_common(),
        'seq_freq': seq_counter.most_common(10),
        'common_bigrams': bigram_counter.most_common(10),
        'seq_length_dist': sorted(seq_lengths.items())
    }, function_freq, seq_counter, bigram_counter

# Usage example

analysis,function_freq, seq_counter, bigram_counter = analyze_function_patterns(train)

# Generate report
print("Top 10 Functions:")
for func, count in analysis['function_freq'][:10]:
    print(f"{func}: {count}")

print(f"\ntotal distinct function sequences: {len(seq_counter)}")

print("\nMost Common Sequences:")
for seq, count in analysis['seq_freq']:
    print(f"{' → '.join(seq)}: {count}")

print("\nCommon Bigrams (Potential Dependencies):")
for (f1, f2), count in analysis['common_bigrams']:
    print(f"{f1} → {f2}: {count}")

print("\nSequence Length Distribution:")
for length, count in analysis['seq_length_dist']:
    print(f"{length} steps: {count} examples")



Top 10 Functions:
Find: 128457
FilterConcept: 59963
Relate: 50257
FindAll: 30266
FilterStr: 25305
And: 25302
QueryAttr: 22265
QueryRelation: 13876
SelectBetween: 13247
What: 11174

total distinct function sequences: 830

Most Common Sequences:
Find → Find → QueryRelation: 7152
Find → Find → SelectBetween: 5438
Find → Find → QueryRelationQualifier: 4013
FindAll → FilterStr → FilterConcept → QueryAttr: 3257
Find → Relate → Find → And → Find → QueryRelation: 3002
Find → QueryAttr: 2873
FindAll → FilterStr → FilterConcept → QueryAttrQualifier: 2727
FindAll → FilterStr → FilterConcept → What: 2322
Find → QueryAttrQualifier: 2280
FindAll → FilterNum → FilterConcept → SelectAmong: 2197

Common Bigrams (Potential Dependencies):
Find → Relate: 39214
Relate → FilterConcept: 26564
Find → Find: 21537
FindAll → FilterStr: 20797
Relate → Find: 20560
Find → And: 20560
FilterStr → FilterConcept: 19524
FilterConcept → QueryAttr: 13844
Find → QueryRelation: 11720
And → Find: 8822

Sequence Length Distri

In [4]:
len(bigram_counter)

105

In [5]:
bigram_counter.keys()

dict_keys([('FindAll', 'FilterStr'), ('FilterStr', 'FilterConcept'), ('FilterConcept', 'FindAll'), ('FilterConcept', 'And'), ('And', 'What'), ('FilterConcept', 'QueryAttr'), ('QueryAttr', 'VerifyStr'), ('FindAll', 'FilterNum'), ('FilterNum', 'FilterConcept'), ('FilterConcept', 'SelectAmong'), ('Find', 'Relate'), ('Relate', 'Find'), ('Find', 'And'), ('And', 'Find'), ('And', 'QueryRelation'), ('Relate', 'FilterConcept'), ('FilterConcept', 'QueryAttrQualifier'), ('Find', 'SelectBetween'), ('FilterConcept', 'Relate'), ('FilterConcept', 'Count'), ('Find', 'Find'), ('Find', 'QueryRelationQualifier'), ('And', 'Relate'), ('Find', 'FilterNum'), ('FilterNum', 'Find'), ('QueryAttr', 'VerifyDate'), ('Find', 'QueryRelation'), ('Find', 'QueryAttr'), ('QueryAttr', 'VerifyYear'), ('FilterConcept', 'Or'), ('Or', 'Count'), ('And', 'QueryRelationQualifier'), ('FilterStr', 'QFilterStr'), ('QFilterStr', 'FilterConcept'), ('Find', 'QueryAttrQualifier'), ('FilterConcept', 'What'), ('And', 'SelectBetween'), (

# 将 function name 的转移关系建模成规则


In [10]:
from typing import Dict, List, Tuple
from collections import defaultdict
import json 

# ... existing analysis code ...

class FunctionStateMachine:
    """
    Finite State Machine for function call transitions based on bigram analysis
    
    Attributes:
        transitions: Dict[current_function, Dict[next_function, count]]
        start_states: Counter for initial functions
        end_states: Counter for terminal functions
    
    Example:
        >>> fsm = FunctionStateMachine.from_bigrams(analysis['common_bigrams'])
        >>> fsm.get_likely_transitions('query_data')
        [('preprocess', 0.6), ('validate', 0.4)]
    """
    
    def __init__(self, transitions: Dict[str, Dict[str, int]]):
        self.transitions = transitions
        self.start_states = Counter()
        self.end_states = Counter()
        
    @classmethod
    def from_bigrams(cls, bigrams: List[Tuple[Tuple[str, str], int]]):
        """Build state machine from bigram analysis results"""
        transitions = defaultdict(lambda: defaultdict(int))
        start_functions = Counter()
        end_functions = Counter()
        
        for (f1, f2), count in bigrams:
            transitions[f1][f2] += count
            start_functions[f1] += count
            end_functions[f2] += count
            
        # Normalize start/end probabilities
        total = sum(start_functions.values())
        cls.start_states = {k: v/total for k, v in start_functions.items()}
        cls.end_states = {k: v/total for k, v in end_functions.items()}
        
        return cls(transitions)
    
    def get_transition_prob(self, current: str, next_: str) -> float:
        """Get probability of transitioning to next function"""
        total = sum(self.transitions[current].values())
        return self.transitions[current].get(next_, 0) / total if total else 0
    
    def get_likely_transitions(self, current: str, top_n: int = 3) -> List[Tuple[str, float]]:
        """Get most probable next states with probabilities"""
        transitions = self.transitions.get(current, {})
        total = sum(transitions.values())
        if total == 0:
            return []
            
        sorted_trans = sorted(
            transitions.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_n]
        
        return [(func, count/total) for func, count in sorted_trans]
    
    def generate_validation_rules(self, min_support: int = 5) -> Dict[str, List[str]]:
        """Generate validation rules from frequent transitions"""
        return {
            func: list(transitions.keys())
            for func, transitions in self.transitions.items()
            if sum(transitions.values()) >= min_support
        }
    
    def save_machine(self, path: str):
        """Save state machine to JSON file"""
        with open(path, 'w') as f:
            json.dump({
                'transitions': self.transitions,
                'start_states': self.start_states,
                'end_states': self.end_states
            }, f)
    
    @classmethod
    def load_machine(cls, path: str):
        """Load state machine from JSON file"""
        with open(path) as f:
            data = json.load(f)
        return cls(
            transitions=data['transitions'],
            start_states=data['start_states'],
            end_states=data['end_states']
        )

# Usage example
fsm = FunctionStateMachine.from_bigrams(bigram_counter.items())

# Generate validation rules
validation_rules = fsm.generate_validation_rules(min_support=1)
print("Validation Rules:")
for func, allowed in validation_rules.items():
    print(f"After {func}, allowed: {allowed}")

# Query example
# current_function = "FilterStr"
for current_function in function_freq.keys():   
    print(f"\nLikely transitions from {current_function}:")
    for func, prob in fsm.get_likely_transitions(current_function):
        print(f"- {func}: {prob:.2%} probability")

# Save for later use
fsm.save_machine("function_state_machine.json")

Validation Rules:
After FindAll, allowed: ['FilterStr', 'FilterNum', 'FilterYear', 'FilterDate', 'FilterConcept']
After FilterStr, allowed: ['FilterConcept', 'QFilterStr', 'QueryRelationQualifier', 'Relate', 'SelectBetween', 'QFilterNum', 'Find', 'QueryRelation', 'QFilterYear', 'QueryAttrQualifier', 'QueryAttr', 'QFilterDate', 'QueryAttrUnderCondition']
After FilterConcept, allowed: ['FindAll', 'And', 'QueryAttr', 'SelectAmong', 'QueryAttrQualifier', 'Relate', 'Count', 'Or', 'What', 'Find', 'QueryAttrUnderCondition']
After And, allowed: ['What', 'Find', 'QueryRelation', 'Relate', 'QueryRelationQualifier', 'SelectBetween', 'Count', 'QueryAttr', 'QueryAttrQualifier', 'QueryAttrUnderCondition']
After QueryAttr, allowed: ['VerifyStr', 'VerifyDate', 'VerifyYear', 'VerifyNum']
After FilterNum, allowed: ['FilterConcept', 'Find', 'Relate', 'QueryAttr', 'QueryRelation', 'SelectBetween', 'QueryAttrUnderCondition', 'QFilterYear', 'QueryAttrQualifier', 'QFilterStr', 'QueryRelationQualifier', 'QFil

# 转移规则的 prompt 接口

用于 api 方案

In [17]:
from typing import Dict, List, Tuple, Optional

class FunctionPromptGenerator:
    """
    Generate natural language prompts based on function transition rules
    
    Attributes:
        fsm: FunctionStateMachine instance
        templates: Dictionary of prompt templates
        
    Example:
        >>> prompt_gen = FunctionPromptGenerator(fsm)
        >>> print(prompt_gen.generate_prompt("query_data"))
    """
    
    def __init__(self, fsm: FunctionStateMachine):
        self.fsm = fsm
        self.templates = {
            'strict': (
                "After calling `{current_func}`, you MUST choose from these functions: "
                "{allowed_funcs}. Provide your next step."
            ),
            'flexible': (
                "After `{current_func}`, consider these common next steps (probability shown):\n"
                "{func_list}\n"
                "Choose the most appropriate function or propose a valid alternative."
            ),
            'creative': (
                "Your last step was `{current_func}`. While these are common next steps:\n"
                "{func_list}\n"
                "You may also propose innovative sequences that maintain logical coherence."
            )
        }
    
    def generate_prompt(
        self,
        current_func: str,
        mode: str = 'flexible',
        max_suggestions: int = 5,
        include_prob: bool = True
    ) -> Optional[str]:
        """
        Generate prompt based on current function state
        
        Args:
            current_func: Current function name
            mode: Prompt strictness level (strict/flexible/creative)
            max_suggestions: Maximum number of transitions to show
            include_prob: Whether to include probability values
            
        Returns:
            Formatted prompt string or None if no transitions
        """
        transitions = self.fsm.get_likely_transitions(current_func, top_n=max_suggestions)
        if not transitions:
            return None
            
        # Format function list
        func_list = []
        for func, prob in transitions:
            prob_desc = f"({prob:.0%} likely)" if include_prob else ""
            func_list.append(f"- `{func}` {prob_desc}")
        
        return self.templates[mode].format(
            current_func=current_func,
            allowed_funcs=', '.join(f"`{f[0]}`" for f in transitions),
            func_list='\n'.join(func_list)
        )
    def format_as_system_prompt(self, history: List[str] = None) -> str:
        """Generate full system-level instruction prompt"""
        base = """You are a workflow automation assistant. Follow these rules:
    1. Maintain valid function call sequences
    2. Prefer common patterns unless instructed otherwise
    3. Explain your reasoning for non-standard choices

    Current function call constraints:"""
        
        if history:
            current = history[-1]
            transitions = self.fsm.get_likely_transitions(current)
            rules = '\n'.join([f"- After {current}: {', '.join([f[0] for f in transitions])}"])
        else:
            start_funcs = [f for f, _ in self.fsm.start_states.most_common(3)]
            rules = f"Start with: {', '.join(start_funcs)}"
        
        return f"{base}\n{rules}\n\nResponse format: Markdown code block with function calls."



# Usage example
prompter = FunctionPromptGenerator(fsm)

# Single step prompt
current_function = "FindAll"
print("=== Flexible Prompt ===")
print(prompter.generate_prompt(current_function))

print("\n=== Strict Prompt ===")
print(prompter.generate_prompt(current_function, mode='strict', include_prob=False))

# 使用示例
print("\n=== System Prompt ===")
print(prompter.format_as_system_prompt(["FindAll"]))


=== Flexible Prompt ===
After `FindAll`, consider these common next steps (probability shown):
- `FilterStr` (69% likely)
- `FilterNum` (16% likely)
- `FilterYear` (12% likely)
- `FilterDate` (3% likely)
- `FilterConcept` (0% likely)
Choose the most appropriate function or propose a valid alternative.

=== Strict Prompt ===
After calling `FindAll`, you MUST choose from these functions: `FilterStr`, `FilterNum`, `FilterYear`, `FilterDate`, `FilterConcept`. Provide your next step.

=== System Prompt ===
You are a workflow automation assistant. Follow these rules:
    1. Maintain valid function call sequences
    2. Prefer common patterns unless instructed otherwise
    3. Explain your reasoning for non-standard choices

    Current function call constraints:
- After FindAll: FilterStr, FilterNum, FilterYear

Response format: Markdown code block with function calls.
