In [392]:
%pip install requests jsonschema tenacity langgraph langgraph.checkpoint.sqlite termcolor

Note: you may need to restart the kernel to use updated packages.


In [393]:
from services.model_service import ModelService

# Initialize the service with the model configuration
ollama_service = ModelService(model="llama3.1:latest")

In [394]:
from langgraph.graph.message import add_messages
from typing import Annotated, TypedDict, Any

class MultiAgentGraphState(TypedDict):
    job: str
    playbook: Annotated[list, add_messages]
    feedback: Annotated[list, add_messages]

In [395]:
controlm_job = """
"JobName": {
    "Type" : "Job:Command",
    "Command" : "echo hello",
    "PreCommand": "echo before running main command",
    "PostCommand": "echo after running main command",
    "Host" : "myhost.mycomp.com",
    "RunAs" : "user1"  
}
"""

In [396]:
from agent.role_agent import RoleAgent
from prompt.convert_prompt import DEFAULT_SYS_DEV_PROMPT
import yaml
import json

def convert_agent(user_req: str, feedback: str = ""):
    convert_agent = RoleAgent(
        role="CONVERT_AGENT",
        ollama_service=ollama_service,
        sys_prompt=DEFAULT_SYS_DEV_PROMPT.format(
            feedback=feedback,
            datetime=get_current_utc_datetime(),
        ),
    )
    
    response = convert_agent.work(user_request=user_req)

    playbook_json = json.loads(response["CONVERT_AGENT_response"].content)

    playbook_yaml = yaml.dump(playbook_json, default_flow_style=False)
    return {"playbook": playbook_yaml}

In [397]:
from prompt.review_prompt import DEFAULT_SYS_REVIEW_PROMPT
from utils.general.helpers import get_current_utc_datetime

def reviewer_agent(controlm_job: str, playbook_yaml: str):
    reviewer_agent = RoleAgent(
        role="REVIEWER_AGENT",
        ollama_service=ollama_service,
        sys_prompt=DEFAULT_SYS_REVIEW_PROMPT.format(
            controlm_job=controlm_job,
            datetime=get_current_utc_datetime(),
        ),
    )

    reviewer_req = f"""Validate this playbook: {playbook_yaml}"""

    response = reviewer_agent.work(user_request=reviewer_req)

    revision_json = json.loads(response["REVIEWER_AGENT_response"].content)

    return {"feedback": json.dumps(revision_json, indent=4)}

In [398]:
from langgraph.graph import END

def should_continue(state: MultiAgentGraphState):
    
    feedback_dict = json.loads(state["feedback"][-1].content)
    status = feedback_dict["status"]

    if status == "valid":
        return END
    else:
        return "convert_job"

In [399]:
from langgraph.graph import StateGraph, START, END


def create_graph() -> StateGraph:
    graph = StateGraph(MultiAgentGraphState)

    graph.add_node(
        "convert_job",
        lambda state: convert_agent(
            user_req=f"""Convert this Control-M Job to an Ansible playbook. {state['job']}""",
            feedback=state["feedback"][-1].content if state["feedback"] else None,
        ),
    )

    graph.add_node(
        "review_playbook",
        lambda state: reviewer_agent(
            controlm_job={state["job"]},
            playbook_yaml={state["playbook"][-1].content},
        ),
    )

    # Define the flow of the graph
    graph.add_edge(START, "convert_job")
    graph.add_edge("convert_job", "review_playbook")
    graph.add_conditional_edges(
        "review_playbook", should_continue, ["convert_job", END]
    )

    return graph

In [400]:
from memory.sqlite_saver import initialize_memory

memory = initialize_memory(":memory:")

config = {"configurable": {"thread_id": "1"}}
dict_inputs = {"job": controlm_job}

# Create the graph and compile the workflow
graph = create_graph()
workflow = graph.compile(checkpointer=memory)
print("Graph and workflow created.")

Graph and workflow created.


In [401]:
from termcolor import colored

# Define workflow parameters
iterations = 10
verbose = True

limit = {"recursion_limit": iterations}

print(colored(controlm_job,"blue"))

# Execute the workflow and print state changes
for event in workflow.stream(dict_inputs, config):
    if verbose:
        if "convert_job" in event:
            print(colored(event["convert_job"]["playbook"], "yellow"))
        elif "review_playbook" in event:
            dict = json.loads(event["review_playbook"]["feedback"])
            if(dict["status"] == "valid"):
                print(colored(event["review_playbook"]["feedback"], "green"))
            else:
                print(colored(event["review_playbook"]["feedback"], "red"))
    else:
        print("\n")

[34m
"JobName": {
    "Type" : "Job:Command",
    "Command" : "echo hello",
    "PreCommand": "echo before running main command",
    "PostCommand": "echo after running main command",
    "Host" : "myhost.mycomp.com",
    "RunAs" : "user1"  
}
[0m


[33mcommand: echo hello
environment_variables: {}
job_name: JobName
retry: {}
schedules: {}
[0m
[31m{
    "status": "not valid",
    "comments": "The Control-M Job JSON was successfully converted to Ansible Playbook, but with some missing fields.",
    "improvements": [
        "The Ansible playbook seems mostly complete. However, the `schedules` field is empty. The Control-M job likely has a schedule (e.g., time, frequency), which should be represented in the Ansible playbook using the `cron` module or another scheduling mechanism.",
        "Ensure that environment variables from the Control-M job are correctly included in the playbook. The `environment_variables` dictionary in the Control-M JSON is currently empty."
    ]
}[0m
[33mjob_name:
  command: echo hello
  host: myhost.mycomp.com
  post_command: echo after running main command
  pre_command: echo before running main command
  run_as: user1
  type: Job:Command
[0m
[32m{
    "status": "valid",
    "comments": "The Contr