In [1]:
from fcgb.chatbots.chatbot import BaseChatBot
from fcgb.types.tools import ToolOutput
from langgraph.graph import MessagesState
from langchain_core.runnables.config import RunnableConfig
from langgraph.graph import StateGraph, START, END
from typing import List, Dict, Annotated
from pydantic import BaseModel

from operator import add
import uuid
from langchain_core.messages import ToolMessage
from langgraph.constants import Send
from fcgb.types.utils import append_or_clear, MessagesType
from fcgb.cfg.utils import model2string, dict2string

In [30]:

class PlannedIterativeTaskSolver(BaseChatBot):
    def __init__(self,
                 llm,
                 job_handler,
                 initial_messages_spec,
                 internal_messages_spec,
                 memory=None,
                 init_values={},
                 prompt_manager_spec={},
                 global_inputs={}):
        super().__init__(
            llm=llm,
            initial_messages_spec=initial_messages_spec,
            internal_messages_spec=internal_messages_spec,
            memory=memory,
            init_values=init_values,
            prompt_manager_spec=prompt_manager_spec,
            global_inputs=global_inputs,
            compile=False
        )

        self.job_handler = job_handler
        self.compile_graph()

    def _set_state_class(self):
        
        class State(BaseModel):
            template_inputs: Dict[str, str]
            all_messages: MessagesType
            current_messages: MessagesType
            all_jobs: Annotated[List, append_or_clear]
            current_jobs: List = []
            current_jobs_outputs: Annotated[List[str], append_or_clear]
            knowledge_base: str
            turns: int
            thread_id: str = None
            output: str = None # type: ignore

        self.state_class = State

    def _set_planning_func(self):

        def planning(state: self.state_class): # type: ignore
            turns = state.turns + 1

            template_inputs = state.template_inputs | {'turns': turns, 'knowledge_base': state.knowledge_base}

            system_msg, _ = self._get_internal_message('system', template_inputs)
            query_msg, _ = self._get_internal_message('query', template_inputs)
            planning_msg, _ = self._get_internal_message('planning', template_inputs)

            plan_msg = self.llm.invoke([system_msg, query_msg, planning_msg])

            return {
                'current_messages': [system_msg, query_msg, planning_msg, plan_msg], 
                'turns': turns
                }
        
        return planning
    
    def _set_process_end_routing_func(self):

        def process_end_routing(state: self.state_class): # type: ignore
            last_message = state.current_messages[-1].content
            if ('__end__' in last_message) | (state.turns > self.global_inputs['max_turns']):
                return 'report'
            else:
                return 'distribution'
            
        return process_end_routing
    
    def _set_distribution_func(self):

        def distribution(state: self.state_class): # type: ignore

            template_inputs = state.template_inputs | {'turns': state.turns, 'knowledge_base': state.knowledge_base}

            distribution_msg, answer_model = self._get_internal_message('distribution', template_inputs)

            jobs = self.llm.with_structured_output(answer_model).invoke(state.current_messages + [distribution_msg])

            return {
                'current_jobs': jobs.jobs
            }
        
        return distribution
    
    def _set_job_routing_func(self):

        def job_routing(state: self.state_class): # type: ignore

            return [Send('executor', {'job': job}) for job in state.current_jobs]
        
        return job_routing
    
    def _set_executor_func(self):

        def executor(job: BaseModel):

            job_description = dict2string(job)
            job_output = self.job_handler.run(job_description=job_description)

            return {'current_jobs_outputs': job_output}
        
        return executor
    
    @staticmethod
    def concatenate_outputs(outputs: List[Dict]) -> str:
        return '\n-----\n'.join(output['output'] for output in outputs if 'output' in output)
    
    def _set_base_extension_func(self):

        def base_extension(state: self.state_class): # type: ignore

            retrieved_data = self.concatenate_outputs(state.current_jobs_outputs)

            template_inputs = state.template_inputs | {'turns': state.turns, 'knowledge_base': state.knowledge_base, 'retrieved_data': retrieved_data}
            extension_msg, _ = self._get_internal_message('base_extension', template_inputs)

            extended_base = self.llm.invoke(state.current_messages + [extension_msg])
            return {
                'knowledge_base': extended_base.content,
                'current_messages': [extension_msg, extended_base],
            }
        
        return base_extension
    
    def _set_iteration_end_func(self):

        def iteration_end(state: self.state_class): # type: ignore
            messages_to_pass = state.current_messages[2:] if state.turns>1 else state.current_messages
            return {
                'all_messages': messages_to_pass,
                'current_messages': '__clear__',
                'all_jobs': state.current_jobs,
                'current_jobs': [],
                'current_jobs_outputs': '__clear__'
            }

        return iteration_end
    
    def _set_report_func(self):

        def report(state: self.state_class, config: RunnableConfig): # type: ignore
            
            template_inputs = state.template_inputs | {'turns': state.turns, 'knowledge_base': state.knowledge_base}

            report_msg, _ = self._get_internal_message('report', template_inputs)

            output = self.llm.invoke([report_msg])
            thread_id = config['configurable']['thread_id']

            return {
                'output': output.content,
                'thread_id': thread_id,
                'all_messages': state.current_messages,
                'current_messages': '__clear__',
            }
        
        return report
    
    def _compile_graph(self):
        
        planning_func = self._set_planning_func()
        process_end_routing_func = self._set_process_end_routing_func()
        distribution_func = self._set_distribution_func()
        job_routing_func = self._set_job_routing_func()
        executor_func = self._set_executor_func()
        base_extension_func = self._set_base_extension_func()
        iteration_end_func = self._set_iteration_end_func()
        report_func = self._set_report_func()

        workflow = StateGraph(self.state_class)
        workflow.add_node('planning', planning_func)
        workflow.add_node('distribution', distribution_func)
        workflow.add_node('executor', executor_func)
        workflow.add_node('base_extension', base_extension_func)
        workflow.add_node('iteration_end', iteration_end_func)
        workflow.add_node('report', report_func)

        workflow.add_edge(START, 'planning')
        workflow.add_conditional_edges('planning', process_end_routing_func, ['report', 'distribution'])
        workflow.add_conditional_edges('distribution', job_routing_func, ['executor'])
        workflow.add_edge('executor', 'base_extension')
        workflow.add_edge('base_extension', 'iteration_end')
        workflow.add_edge('iteration_end', 'planning')
        workflow.add_edge('report', END)

        self.graph = workflow.compile(
            checkpointer=self.memory
        )

    def run(self, job: str, motivation: str, restrictions: str, output_format: str) -> ToolOutput:
        
        thread_id = uuid.uuid4().hex
        return self.graph.invoke(
            input={
                'template_inputs': {
                    'job': job,
                    'motivation': motivation,
                    'restrictions': restrictions,
                    'output_format': output_format
                    },
                'turns': 0,
                'knowledge_base': 'Empty'
                },
            config = {'configurable': {'thread_id': thread_id}},
            output_keys=['thread_id', 'output']
        )

In [None]:
inputs = {
    'job': 'List all Chain-of-Thought prompting variants',
    'motivation': 'For chapter planning in a book about prompt engineering I need a full list of all CoT prompting techniques.',
    'restrictions': """
    1. List only real prompting techniques
    2. Do not include techniques that are just applications of other techniques
    3. Do not include techniques that are not related to CoT prompting""",
    'output_format': 'List of techniques with very short description of each technique'
}

In [4]:
from fcgb.cfg.precompiled import get_llm, get_checkpointer
from fcgb.tools.spectools import PhantomResearcherSpecTool, JobHandlerSpecTool, JobHandler
from fcgb.cfg.chat_inputs_spec import PlannedIterativeTaskSolverConfig

## Fake LLM

In [5]:
fake_llm = get_llm(llm_model='fake')
memory = get_checkpointer(checkpointer_mode='local', mode='test')

tool_containers = [
    PhantomResearcherSpecTool(llm=fake_llm, memory=memory)
]

job_handler = JobHandlerSpecTool(
    llm=fake_llm,
    tool_containers=tool_containers,
    memory=memory,
)

In [12]:
task_solver = PlannedIterativeTaskSolver(
    llm=fake_llm,
    job_handler=job_handler,
    initial_messages_spec=PlannedIterativeTaskSolverConfig.initial_messages_spec,
    internal_messages_spec=PlannedIterativeTaskSolverConfig.internal_messages_spec,
    memory=memory,
    init_values=PlannedIterativeTaskSolverConfig.init_values,
    prompt_manager_spec=PlannedIterativeTaskSolverConfig.prompt_manager_spec,
    global_inputs=PlannedIterativeTaskSolverConfig.global_inputs
)

In [13]:
output = task_solver.run(**inputs)

# Final Report
for line in output['output'].split('\n'):
    print(line)

Fake LLM response 124


In [14]:
# Load whole task solver state
research_state = task_solver.get_state(output['thread_id'])
research_state

{'template_inputs': {'job': 'List all Chain-of-Thought prompting variants',
  'motivation': 'For chapter planning in a book about prompt engineering I need a full list of all CoT prompting techniques.',
  'restrictions': '1. List only real prompting techniques\n    2. Do not include techniques that are just applications of other techniques\n    3. Do not include techniques that are not related to CoT prompting',
  'output_format': 'List of techniques with very short description of each technique'},
 'turns': 5,
 'knowledge_base': 'Fake LLM response 122',
 'current_messages': [SystemMessage(content="You are designated to perform a complex task that requires you to solve a problem by breaking it down into smaller, manageable steps. \nYour goal is to manage whole realization of the task including detailed planning, specific jobs delegation and writing the final report.\n\nPay special attention to the first received message, as it contains the task description and all necessary details.\nE

In [16]:
# All messages from the process
for msg in research_state['all_messages']:
    msg.pretty_print()

Name: system

You are designated to perform a complex task that requires you to solve a problem by breaking it down into smaller, manageable steps. 
Your goal is to manage whole realization of the task including detailed planning, specific jobs delegation and writing the final report.

Pay special attention to the first received message, as it contains the task description and all necessary details.
Especially when writing the final report, ensure that it fits into the provided output format.

You will work in an iterative way with following schema:
1. **Task Specification**: The user will provide a task description, including all necessary details.
2. **Planning**: With input from previous steps you will describe your reasoning and plan next steps or call the end of the process.
3. **Job Distribution**: Using the plan, you will delegate specific jobs that will be performed by specialized agents. Be carefull to don't duplicate already done jobs. 
4. **Report extension**: With collected

In [20]:
# All delegated jobs
for i,job in enumerate(research_state['all_jobs']):
    print(f'Job {i+1}:')
    print(job.job)
    print(job.motivation)
    print(job.restrictions)
    print(job.output_format)
    print('-'*50)


Job 1:
Fake string qzxjf
Fake string ccmzy
Fake string avafq
Fake string azgqe
--------------------------------------------------
Job 2:
Fake string shpwg
Fake string tcohs
Fake string kylev
Fake string uhczw
--------------------------------------------------
Job 3:
Fake string qbvvt
Fake string easkt
Fake string wdzdm
Fake string klgqh
--------------------------------------------------
Job 4:
Fake string rxxek
Fake string lpsyy
Fake string hrhiv
Fake string riloa
--------------------------------------------------
Job 5:
Fake string jowwg
Fake string ybkil
Fake string vzdpy
Fake string iqvba
--------------------------------------------------
Job 6:
Fake string gfzqh
Fake string vxole
Fake string eawss
Fake string ofnyr
--------------------------------------------------
Job 7:
Fake string fhzkb
Fake string lxtfo
Fake string zdnup
Fake string zyqvh
--------------------------------------------------
Job 8:
Fake string kidtz
Fake string gtnuz
Fake string xxyop
Fake string nzahc
-----------

## Real LLM

In [21]:
llm = get_llm(llm_model='google')
memory = get_checkpointer(checkpointer_mode='local', mode='test')

tool_containers = [
    PhantomResearcherSpecTool(llm=llm, memory=memory)
]

job_handler = JobHandlerSpecTool(
    llm=llm,
    tool_containers=tool_containers,
    memory=memory,
)

In [31]:
task_solver = PlannedIterativeTaskSolver(
    llm=llm,
    job_handler=job_handler,
    initial_messages_spec=PlannedIterativeTaskSolverConfig.initial_messages_spec,
    internal_messages_spec=PlannedIterativeTaskSolverConfig.internal_messages_spec,
    memory=memory,
    init_values=PlannedIterativeTaskSolverConfig.init_values,
    prompt_manager_spec=PlannedIterativeTaskSolverConfig.prompt_manager_spec,
    global_inputs=PlannedIterativeTaskSolverConfig.global_inputs
)

In [32]:
output = task_solver.run(**inputs)

# Final Report
for line in output['output'].split('\n'):
    print(line)

List of techniques with very short description of each technique:

*   **Standard Chain-of-Thought Prompting:** Prompting the model to generate intermediate reasoning steps before the final answer, demonstrated through example question-answer pairs.
*   **Zero-Shot Chain-of-Thought Prompting:** Prompting the model to think step-by-step by adding "Let's think step by step" to the question, without providing examples.
*   **Least-to-Most Prompting:** Breaking down a complex problem into simpler subproblems solved sequentially, ordered by difficulty/dependency.
*   **Tree of Thoughts (ToT):** Exploring multiple reasoning paths at each step, maintaining a tree structure and using search algorithms to navigate.
*   **Graph of Thoughts (GoT):** Structuring the reasoning process as a graph, allowing for complex, non-linear reasoning paths.
*   **Reflexion:** Enabling models to reflect on their reasoning and improve over time through self-reflection.
*   **Program-Aided Language Models (PAL):*

In [33]:
# Load whole task solver state
research_state = task_solver.get_state(output['thread_id'])
#research_state

In [34]:
# All messages from the process
for msg in research_state['all_messages']:
    msg.pretty_print()

Name: system

You are designated to perform a complex task that requires you to solve a problem by breaking it down into smaller, manageable steps. 
Your goal is to manage whole realization of the task including detailed planning, specific jobs delegation and writing the final report.

Pay special attention to the first received message, as it contains the task description and all necessary details.
Especially when writing the final report, ensure that it fits into the provided output format.

You will work in an iterative way with following schema:
1. **Task Specification**: The user will provide a task description, including all necessary details.
2. **Planning**: With input from previous steps you will describe your reasoning and plan next steps or call the end of the process.
3. **Job Distribution**: Using the plan, you will delegate specific jobs that will be performed by specialized agents. Be carefull to don't duplicate already done jobs. 
4. **Report extension**: With collected

In [37]:
# Knowledge Base
for line in research_state['knowledge_base'].split('\n'):
    print(line)

```
## Chain-of-Thought Prompting Techniques

### Basic CoT Techniques
1.  **Standard Chain-of-Thought Prompting:**
    *   **Description:** Prompting the model to generate intermediate reasoning steps before the final answer, demonstrated through example question-answer pairs.
2.  **Zero-Shot Chain-of-Thought Prompting:**
    *   **Description:** Prompting the model to think step-by-step by adding "Let's think step by step" to the question, without providing examples.

### Advanced CoT Techniques
1.  **Least-to-Most Prompting:**
    *   **Description:** Breaking down a complex problem into simpler subproblems solved sequentially, ordered by difficulty/dependency.
2.  **Tree of Thoughts (ToT):**
    *   **Description:** Exploring multiple reasoning paths at each step, maintaining a tree structure. Uses search algorithms to navigate the tree.
3.  **Graph of Thoughts (GoT):**
    *   **Description:** Structuring the reasoning process as a graph, where nodes represent thoughts and edges r

In [38]:
# All delegated jobs
for i,job in enumerate(research_state['all_jobs']):
    print(f'Job {i+1}:')
    print('Job: ', job.job)
    print('Motivation: ', job.motivation)
    print('Restrictions: ', job.restrictions)
    print('Output Format: ', job.output_format)
    print('-'*50)


Job 1:
Job:  Research and list the fundamental Chain-of-Thought prompting technique, including its basic implementation and any immediate, obvious variations (e.g., different question formats, variations in the "Let's think step by step" phrase). Provide a short description of each technique.
Motivation:  To establish a baseline understanding of CoT prompting and its core components.
Restrictions:  List only real prompting techniques. Do not include techniques that are just applications of other techniques. Do not include techniques that are not related to CoT prompting
Output Format:  List of CoT prompting techniques with a short description of each.
--------------------------------------------------
Job 2:
Job:  Research and list more advanced CoT techniques, such as those that involve reasoning about the reasoning process itself, or those that use external knowledge. Provide a short description of each technique.
Motivation:  To explore more sophisticated CoT approaches that go beyo

In [36]:
for msg in research_state['current_messages']:
    msg.pretty_print()