# PipelineManager

## 1. Decompose a task into a pipeline of subtasks

In [1]:
from task_chain.agents.agent_registry import AgentRegistry
from task_chain.decompose.pipeline import SimplePipelineDecomposer
from task_chain.storage.task.context_store import TaskContextStore
import nest_asyncio

nest_asyncio.apply()

objective = "create a report about the demographics of the population of India"
task_storage = TaskContextStore()
registry = AgentRegistry()

decomposer = SimplePipelineDecomposer(
    objective=objective,
    storage_context=task_storage,
    agents=registry.get_all()
)

task, subtasks = decomposer.create_tasks(verbose=True, run_async=True)



[1m> Entering new LLMChain chain...[0m

[1m> Finished chain.[0m


[1m> Entering new LLMChain chain...[0m

[1m> Finished chain.[0m
tasks=[PredictTask(name='collect demographic data for India', description='Collect demographic data for India including population, age distribution, gender ratio, etc.', inputs=[], outputs=['demographic_data_india']), PredictTask(name='analyze demographic data', description='Analyze the demographic data for India to identify patterns, trends and insights.', inputs=['demographic_data_india'], outputs=['demographic_analysis_india']), PredictTask(name='create report on demographic data for India', description='Create a detailed report on the demographic data for India including charts, graphs and insights.', inputs=['demographic_data_india', 'demographic_analysis_india'], outputs=['demographic_report_india'])]


[1m> Entering new LLMChain chain...[0m

[1m> Finished chain.[0m


In [8]:
task_storage.persist()

Persisted task store to ./storage\taskstore.json


In [2]:
children = task_storage.task_network.get_all_children(task.id)
for child in children:
    print(task_storage.get_task(child).inputs)

[]
['demographic_data']
['demographic_trends']


In [2]:
from task_chain.storage.task.context_store import TaskContextStore
from task_chain.agents.agent_registry import AgentRegistry
from task_chain.decompose.pipeline import SimplePipelineDecomposer
from task_chain.storage.task.context_store import TaskContextStore
import nest_asyncio

nest_asyncio.apply()

objective = "create a report about the demographics of the population of India"
registry = AgentRegistry()


task_storage = TaskContextStore()
task_storage.load_from_file('C:\\Users\\Lukasz\\DataspellProjects\\TaskChain\\docs\\showcase\\storage\\taskstore.json')

task = task_storage.get_task(task_storage.root_id)

In [2]:
print(task_storage.repr_tree())

[32m┐[0m
[32m└──[0mCreate report on demographic data for India
[32m   [0m[33m├──[0mcollect demographic data for India
[32m   [0m[33m├──[0manalyze demographic data
[32m   [0m[33m└──[0mcreate report on demographic data for India


# Setup Pipeline Manager

In [3]:
from task_chain.executor.pipeline import PipelineManager
from task_chain.communication.non_interactive import NonInteractiveCommunicator

communicator = NonInteractiveCommunicator()

pipeline_manager = PipelineManager(
    task=task,
    agent_registry=registry,
    communication=communicator,
    task_storage=task_storage,
    persist=True,
    persist_path="task_chain/storage/task/task_storage.json",
    verbose=True
)

In [4]:
pipeline_manager.execute()



[1m> Entering new PipelineManager chain...[0m

CURRENT CONTEXT:
[32m┐[0m
[32m└──[0m[40mCreate report on demographic data for India[0m [31m<<<<<<<<[39m
[32m   [0m[33m├──[0mcollect demographic data for India
[32m   [0m[33m├──[0manalyze demographic data
[32m   [0m[33m└──[0mcreate report on demographic data for India

starting [Task Manager Role: execution] Task: Create report on demographic data for India



[1m> Entering new SimpleTaskManager chain...[0m

CURRENT CONTEXT:
[32m┐[0m
[32m└──[0mCreate report on demographic data for India
[32m   [0m[33m├──[0m[42mcollect demographic data for India[0m [32m<<<<<<<<[39m
[32m   [0m[33m├──[0manalyze demographic data
[32m   [0m[33m└──[0mcreate report on demographic data for India

starting [Task Manager Role: execution] Task: collect demographic data for India



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mI should first check if there are any reliable sources for demographic data on India

Task(id='2e271118dfe54fa79f7f555bb2929b29', name='Create report on demographic data for India', description='Create a detailed report on the demographic data for India including charts, graphs and insights.', status=<TaskStatus.ISSUE: 'issue'>, type=<TaskType.PIPELINE: 'pipeline'>, summary='root task:\n    - collect demographic data for India\n    - analyze demographic data\n    - create report on demographic data for India', details=None, results=None, relations={<TaskRelations.ISSUE: 'issue'>: '57cf9efbf0834daab5bd73f8aa4cea99'}, inputs=['demographic_data_india', 'demographic_analysis_india'], outputs=['demographic_report_india'])

In [5]:
print(task.get_summary())

root task:
    - collect demographic data for India
    - analyze demographic data
    - create report on demographic data for India


In [5]:
pipeline_manager.subordinate_tasks

['5d62bec3618245008862e3d99b545b1a',
 'd0c8f3cba18e45c3938ab432fbf75766',
 'a76356a2b0584565afdaac9358f93f46']

In [7]:
task_storage.get_task("5d62bec3618245008862e3d99b545b1a")

Task(id='5d62bec3618245008862e3d99b545b1a', name='gather data on population demographics in India', description='Collect data on the population demographics in India.', status=<TaskStatus.OPEN: 'open'>, type=<TaskType.TASK: 'task'>, summary=None, details=None, results=None, relations={<TaskRelations.PARENT: 'parent'>: '523dc07daf114c89a25e8cfe92579e03', <TaskRelations.NEXT: 'next'>: 'd0c8f3cba18e45c3938ab432fbf75766', <TaskRelations.PREV: 'previous'>: None, <TaskRelations.ROOT: 'root'>: '523dc07daf114c89a25e8cfe92579e03', <TaskRelations.AGENT: 'agent'>: 'DecisionAgent'}, inputs=[], outputs=['demographic_data'])

Testing printing functions

In [1]:
from task_chain.task import Task
from task_chain.schema import TaskStatus, TaskType, TaskRelations

task = Task(id='2e271118dfe54fa79f7f555bb2929b29', name='Create report on demographic data for India', description='Create a detailed report on the demographic data for India including charts, graphs and insights.', status=TaskStatus.ISSUE, type=TaskType.PIPELINE, summary='root task:\n    - collect demographic data for India\n    - analyze demographic data\n    - create report on demographic data for India', details=None, results=None, relations={TaskRelations.ISSUE: '57cf9efbf0834daab5bd73f8aa4cea99'}, inputs=['demographic_data_india', 'demographic_analysis_india'], outputs=['demographic_report_india'])

In [2]:
print(task.colored_card_str())

 [31m--------------------------------------------------------------------[39m
[31m|[39m TASK TYPE:    [31mPIPELINE[39m                           STATUS: [31mISSUE[39m    [31m |[39m
[31m|[39m                                                                   [31m |[39m
[31m|[39m ID:           [31m2e271118dfe54fa79f7f555bb2929b29[39m                    [31m |[39m
[31m|[39m                                                                   [31m |[39m
[31m|[39m NAME:         [33mCreate report on demographic data for India[39m         [31m |[39m
[31m|[39m DESCRIPTION:  [33mCreate a detailed report on the demographic data -[39m  [31m |[39m
[31m|[39m [33m              for India including charts, graphs and insights.[39m    [31m |[39m
[31m|[39m INPUTS:       [33mdemographic_data_india, demographic_analysis_india[39m  [31m |[39m
[31m|[39m OUTPUTS:      [33mdemographic_report_india[39m                            [31m |[39m
 [31m---------------

In [7]:
len("2e271118dfe54fa79f7f555bb2929b29 | STATUS: issue")

48

In [8]:
"dsfsdfggesgs".split("\n")

['dsfsdfggesgs']

In [67]:
def _prefix(key, width: int=13):
    """Return a prefix string."""
    _width = width - len(key)
    return f"{key.upper()}:{_width*' '}"


def trim_lines(content: str, indent=14, max_width=61):
    lines = []
    for line in content.split("\n"):
        while len(line)+indent > max_width:
            if not lines:
                lines.append(line[:max_width-indent-1] + "-")
                line = line[max_width-indent-1:]
            else:
                lines.append(" "*indent + line[:max_width-indent-1] + "-")
                line = line[max_width-indent-1:]
        if not lines:
            lines.append(line)
        else:
            lines.append(" "*indent + line)
    return lines


def _formatted_task_lines(
        task: Task,
        indent: int = 14,
        max_width: int = 44,
        highlight: str = Fore.BLUE,
        text_color: str = Fore.GREEN) -> list[str]:
    """Print a task as an outline."""
    first_line = f"{_prefix('TASK TYPE')}{highlight}{task.type.upper()}{Fore.RESET}"
    space = max_width - len(f"STATUS: {task.status}") - len(_remove_ansii(first_line)) - 2
    first_line += f"{' '*space}STATUS: {highlight}{task.status.upper()}{Fore.RESET}"

    lines = [
        first_line,
        " ",
        f"{_prefix('ID')}{highlight}{task.id}{Fore.RESET}",
        " "
    ]

    for key in ["name", "description", "inputs", "outputs"]:
        content = getattr(task, key)
        if isinstance(content, list):
            content = ", ".join(content)
            if len(content)+indent > max_width:
                content = "\n".join(content.split(", "))
        _lines = trim_lines(content, indent=indent, max_width=max_width)
        lines.append(f"{_prefix(key)}{text_color}{_lines[0]}{Fore.RESET}")
        for line in _lines[1:]:
            lines.append(f"{text_color}{line}{Fore.RESET}")

    return lines

def colored_task(
        task: Task,
        indent: int = 14,
        max_width: int = 64,
    ) -> str:
    text_color = Fore.GREEN
    if task.type == TaskType.ISSUE:
        highlight = Fore.RED
        text_color = Fore.YELLOW

    elif task.status == TaskStatus.ISSUE:
        highlight = Fore.RED
        text_color = Fore.YELLOW
    elif task.status == TaskStatus.IN_PROGRESS:
        highlight = Fore.YELLOW
    elif task.status == TaskStatus.CLOSED:
        highlight = Fore.GREEN
        text_color = Fore.WHITE
    else:
        highlight = Fore.BLUE

    lines = _formatted_task_lines(
        task, highlight=highlight, text_color=text_color, indent=indent, max_width=max_width)
    return _outline_task(lines, highlight=highlight, max_width=max_width)



def _outline_task(
        lines: list[str],
        max_width: int = 64,
        highlight: str = Fore.BLUE,
    ) -> str:

    new_lines = []
    width = max_width + 4
    # header
    new_lines.append(f" {highlight}{'-'*width}{Fore.RESET}")
    for line in lines:
        new_lines.append(f"{highlight}|{Fore.RESET} {line}{' '*(width-2-len(_remove_ansii(line)))}{highlight} |{Fore.RESET}")

    # footer
    new_lines.append(f" {highlight}{'-'*width}{Fore.RESET}")
    return "\n".join(new_lines)


def _remove_ansii(string: str):
    """Remove ansii characters from a string."""
    import re
    ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
    return ansi_escape.sub('', string)



In [68]:
print(colored_task(task))

 [31m--------------------------------------------------------------------[39m
[31m|[39m TASK TYPE:    [31mPIPELINE[39m                           STATUS: [31mISSUE[39m    [31m |[39m
[31m|[39m                                                                   [31m |[39m
[31m|[39m ID:           [31m2e271118dfe54fa79f7f555bb2929b29[39m                    [31m |[39m
[31m|[39m                                                                   [31m |[39m
[31m|[39m NAME:         [33mCreate report on demographic data for India[39m         [31m |[39m
[31m|[39m DESCRIPTION:  [33mCreate a detailed report on the demographic data -[39m  [31m |[39m
[31m|[39m [33m              for India including charts, graphs and insights.[39m    [31m |[39m
[31m|[39m INPUTS:       [33mdemographic_data_india, demographic_analysis_india[39m  [31m |[39m
[31m|[39m OUTPUTS:      [33mdemographic_report_india[39m                            [31m |[39m
 [31m---------------

In [39]:
string = "| dfdf  |"
string1 = "|-------|"
print(string)
print(string1)

| dfdf  |
|-------|


In [40]:
lines = formatted_task_lines(task)

In [41]:
length = [len(line) for line in lines]
print(length)

[82, 1, 67, 71, 71, 29, 46, 50, 48]


In [42]:
lines

['ID:           \x1b[34m2e271118dfe54fa79f7f555bb2929b29\x1b[39m | STATUS: \x1b[34missue\x1b[39m',
 ' ',
 'NAME:         \x1b[32mCreate report on demographic data for India\x1b[39m',
 'DESCRIPTION:  \x1b[32mCreate a detailed report on the demographic da-\x1b[39m',
 '\x1b[32m              ta for India including charts, graphs and insi-\x1b[39m',
 '\x1b[32m              ghts.\x1b[39m',
 'INPUTS:       \x1b[32mdemographic_data_india\x1b[39m',
 '\x1b[32m              demographic_analysis_india\x1b[39m',
 'OUTPUTS:      \x1b[32mdemographic_report_india\x1b[39m']