In [3]:

from langchain_community.vectorstores import Weaviate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_community.chat_models import ChatOllama

import weaviate
from langchain.globals import set_llm_cache
from langchain.cache import RedisCache
import redis

REDIS_URL = "redis://localhost:6379/0"

redis_client = redis.Redis.from_url(REDIS_URL)
set_llm_cache(RedisCache(redis_client))


client = weaviate.Client(
url="http://localhost:8080",
)

vectorstore = Weaviate(client, 
                    "GRP", 
                    "content")

retriever = vectorstore.as_retriever()



# RAG prompt
template = """You're an Friendly AI assistant, your name is Claro, you can make normal conversations in a friendly manner, and also provide Answer the question based on the following context make sure it sounds like human and official assistant:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)




# RAG
model = ChatOllama(model="openhermes:7b-mistral-v2-q8_0")
# model = ChatOllama(model="falcon:40b-instruct-q4_1")
chain = (
    RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
    | prompt
    | model
    # | StrOutputParser()
)



# Add typing for input
class Question(BaseModel):
    __root__: str


chain = chain.with_types(input_type=Question)



            Please consider upgrading to the latest version. See https://weaviate.io/developers/weaviate/client-libraries/python for details.


In [4]:
result = chain.invoke('what is mediwave')

OllamaEndpointNotFoundError: Ollama call failed with status code 404. Maybe your model is not found and you should pull the model with `ollama pull openhermes:7b-mistral-v2-q8_0`.

In [None]:
result

In [None]:
def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

In [None]:
from grp_food_crew_ai.grp_food_crewai import food_crew


result = food_crew(input='how to make vanilla sponge cake give me the receipe only')


In [None]:
print(result)

In [None]:
from grp_food_crew_ai.grp_food_crewai import food_crew


result = food_crew(input='how to make vanilla sponge cake, give me the receipe')

print(food_crew.usage_metrics())

In [None]:
print(result)

In [None]:
a = 5 
a

# TRavel

In [None]:
from langchain.globals import set_debug, set_verbose
from dotenv import load_dotenv
load_dotenv()
# set_verbose(True)
# set_debug(True)

import os

from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langchain_experimental.llms.ollama_functions import OllamaFunctions

# import warnings
# warnings.filterwarnings("ignore", category=DeprecationWarning) 

import warnings
warnings.filterwarnings("ignore")

# members = ["Food_crew", "General_conversation", "General_other", "Mediwave_rag", "Travel_crew"]


members = ["Food_crew", "General_conv", "General_other", "Mediwave_rag", "Travel_crew"]

system_prompt = (
    """You are a supervisor tasked with managing a conversation between the
     following workers:  {members}. Given the following user request,"
     respond with the worker to act next. 
     
     if the user asks anything related to food, receipies, and it's related stuffs use 'Food_crew',
    if the user asks anything related to mediwave and it's related stuffs use 'Mediwave_rag',    
    if the user makes conversation, jokes and funny conversations then use 'General_conv',
    if the user asks anything related to weather, time, wikipedia and it's related stuffs use 'General_other',
    if the user asks anything related to travel, exploration, city tour and it's related stuffs use 'Travel_crew'
        
    Each worker will perform a
     task and respond with their results and status. When finished,
    respond with FINISH."""
)

# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = ["FINISH"] + members
# Using openai function calling can make output parsing easier for us
function_def = {
    "name": "route",
    "description": "Select the next role to act",
    
    "parameters": {
        "type": "object",        
        "properties": {
            "next": {
                "type": "string",
                "enum": f"{options}",
            }
        },
        "required": ["next"],
    },
}


DEFAULT_SYSTEM_TEMPLATE = """You have access to the following tools:

{tools}

You must always select one of the above tools and respond with only a JSON object matching the following schema:

{{
  "tool": "route",
  "tool_input": <parameters for the selected tool, matching the tool's JSON schema>
}}
""" 


prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(members))

llm = OllamaFunctions(
    model=os.environ['LLM'],
    tool_system_prompt_template=DEFAULT_SYSTEM_TEMPLATE
    )


def supervisor_node(state):

    print(state)
    
    supervisor_chain = (
        prompt
        | llm.bind(functions=[function_def], function_call={"name": "route"})
        | JsonOutputFunctionsParser()
    )
        
    result = supervisor_chain.invoke(state)
    
    return result





import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage


# The agent state is the input to each node in the graph
class AgentState(TypedDict):
    # The annotation tells the graph that new messages will always
    # be added to the current states
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # The 'next' field indicates where to route to next
    next: str



from grp_travel_crew_ai.grp_travel_crewai import travel_crew

from grp_RAG1.grp_rag1_rag import mediwave_rag

from grp_others.grp_others_graph import grp_other_def as gen_others

from grp_food_crew_ai.grp_food_crewai import food_crew

from grp_Gen_Conv.grp_gen_conv_chain import general_conversation



workflow = StateGraph(AgentState)

workflow.add_node("Food_crew", food_crew)
workflow.add_node("General_conv", general_conversation)
workflow.add_node("General_other", gen_others)
workflow.add_node("Mediwave_rag", mediwave_rag)
workflow.add_node("Travel_crew", travel_crew)

workflow.add_node("supervisor", supervisor_node)




for member in members:
    
    if member == 'Mediwave_rag':
        continue
    if member == 'Travel_crew':
        continue
    
    
    # We want our workers to ALWAYS "report back" to the supervisor when done
    workflow.add_edge(member, "supervisor")
    

# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes


conditional_map = {k: k for k in members}



conditional_map["FINISH"] = END
# conditional_map['supervisor'] ='supervisor'

workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)

# Finally, add entrypoint
workflow.set_entry_point("supervisor")
workflow.set_finish_point('Mediwave_rag')
workflow.set_finish_point('General_conv')
workflow.set_finish_point('Travel_crew')


graph = workflow.compile()


In [None]:
result = graph.invoke({
        "messages": [
            HumanMessage(
                content="what is the current weather in pondicherry"
            )
        ],
    })
result

In [None]:
from langchain import hub 

hub.pull("hwchase17/react")

In [None]:
result

In [None]:
result = graph.invoke({
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in kodaikanal and ooty give me a complete 7 day itenary with travel route and food, budget accomodation and other nearby scenic spots and tourist atractions, start from pondicherry and return pondicherry after last day."
            )
        ],
    })

In [None]:
result = graph.invoke({
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in ooty and give me a 3 day itenary, start from pondicherry and return pondicherry after third day"
            )
        ],
    })

In [None]:
"The user's travel needs in Ooty during summer can be met by visiting several top attractions over a 3-day itinerary, starting and ending in Pondicherry. Here is the detailed plan:\n\nDay 1:\n- Visit Dodabetta Peak, the highest point in Ooty, offering breathtaking views of the surrounding mountains and valleys.\n- Explore Mudumalai Wildlife Sanctuary, a large protected area known for its diverse wildlife population, including elephants, tigers, and various bird species.\n\nDay 2:\n- Spend the morning at the Botanical Gardens, home to a vast collection of exotic plants, flowers, and trees.\n- In the afternoon, visit Emerald Lake, a serene and picturesque lake nestled in the heart of Ooty.\n- End the day by relaxing at Ooty Lake, enjoying a peaceful boat ride or taking a leisurely walk around the lake.\n\nDay 3:\n- Begin the day with a visit to Pykara Falls, located approximately 20 km from Ooty, known for its stunning beauty and the surrounding lush greenery.\n- In the afternoon, head towards Coonoor, a nearby hill station famous for its tea gardens. Take a tour of the gardens and learn about the tea-making process.\n\nBy following this itinerary, the user will get to experience the best of Ooty's natural beauty, cultural attractions, and local experiences during their summer vacation."

In [None]:
result = graph.invoke({
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in mysore and give me a 3 day itenary, start from pondicherry and return pondicherry after third day"
            )
        ],
    })

In [None]:
"Here's a detailed response summarizing key findings about the given context and information that could be relevant to it: The user is looking for travel suggestions in Mysore during summer and requires a three-day itinerary starting from Pondicherry. Based on your request, I have delegated this task to our Travel agency manager. Here's the suggested itinerary:\n\nDay 1:\n- Depart from Pondicherry early morning by road, which is approximately a 4-hour drive.\n- Visit the Chamundeshwari Temple located on Chamundi Hill, known for its religious significance and beautiful views of Mysore city.\n- Head to Brindavan Gardens, located in the Krishnarajasagar Dam area, famous for its terrace gardens, fountains, and musical fountain show.\n- Overnight stay at a hotel in Mysore.\n\nDay 2:\n- Visit the Mysore Palace, a grand architectural marvel open to public viewing during summer months. Don't forget to check out the famous Dussehra durbar hall.\n- Explore the Sri Chamarajendra Zoo and Museum located in the heart of Mysore city. The zoo is home to a wide variety of animals, and the museum exhibits artifacts related to the history and culture of Mysore.\n- Visit the St. Philomena's Church, an impressive Roman Catholic basilica known for its neo-Gothic architecture.\n- Overnight stay at a hotel in Mysore.\n\nDay 3:\n- Depart from Mysore early morning and head back to Pondicherry by road.\n- En route, stop at the Ranganathittu Bird Sanctuary located near Srirangapatna, famous for its diverse bird population and scenic beauty.\n- Arrive in Pondicherry late afternoon/evening and complete your journey.\n\nPlease note that travel times may vary depending on traffic conditions, so it's always a good idea to leave early. Additionally, make sure to check the opening hours of each attraction before planning your visit. Let me know if you need any further assistance with this itinerary or if there are any modifications you would like me to make."

In [None]:
for s in graph.stream(input=
    {
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in mysore and give me a 3 day itenary, start from pondicherry and return pondicherry after third day"
            )
        ],
    },
    config= {"recursion_limit": 100},
    
):
    if "__end__" not in s:
        print(s)
        print("---")


In [None]:
for s in graph.stream(input=
    {
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in pondicherry and give me a 2 day itenary"
            )
        ],
    },
    config= {"recursion_limit": 100},
    
):
    if "__end__" not in s:
        print(s)
        print("---")


In [None]:
for s in graph.stream(input=
    {
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in pondicherry and give me a 2 day itenary"
            )
        ],
    },
    config= {"recursion_limit": 100},
    
):
    if "__end__" not in s:
        print(s)
        print("---")


In [None]:
for s in graph.stream(input=
    {
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in pondicherry and give me a 2 day itenary"
            )
        ],
    },
    config= {"recursion_limit": 100},
    
):
    if "__end__" not in s:
        print(s)
        print("---")


In [None]:
result = graph.invoke({
        "messages": [
            HumanMessage(
                content="suggest some good spots to visit during summer in pondicherry and give me a 2 day itenary"
            )
        ],
    })

In [None]:
result

In [None]:
graph.invoke({
        "messages": [
            HumanMessage(
                content="what is the time now"
            )
        ],
    })

# Supervisor - update

In [None]:
from langchain.globals import set_debug, set_verbose
from dotenv import load_dotenv
load_dotenv()
# set_verbose(True)
# set_debug(True)

import os

from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langchain_experimental.llms.ollama_functions import OllamaFunctions

# import warnings
# warnings.filterwarnings("ignore", category=DeprecationWarning) 

import warnings
warnings.filterwarnings("ignore")

# members = ["Food_crew", "General_conversation", "General_other", "Mediwave_rag", "Travel_crew"]


members = ["Food_crew", "General_conv", "General_other", "Mediwave_rag", "Travel_crew"]

system_prompt = (
    """You are a supervisor tasked with managing a conversation between the
     following workers:  {members}. Given the following user request,"
     respond with the worker to act next. 
     
     if the user asks anything related to food, receipies, and it's related stuffs use 'Food_crew',
    if the user asks anything related to mediwave and it's related stuffs use 'Mediwave_rag',    
    if the user makes conversation, jokes and funny conversations then use 'General_conv',
    if the user asks anything related to weather, time, wikipedia and it's related stuffs use 'General_other',
    if the user asks anything related to travel, exploration, city tour and it's related stuffs use 'Travel_crew'
        
    Each worker will perform a
     task and respond with their results and status. When finished,
    respond with FINISH."""
)

# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = ["FINISH"] + members
# Using openai function calling can make output parsing easier for us
function_def = {
    "name": "route",
    "description": "Select the next role to act",
    
    "parameters": {
        "type": "object",        
        "properties": {
            "next": {
                "type": "string",
                "enum": f"{options}",
            }
        },
        "required": ["next"],
    },
}


DEFAULT_SYSTEM_TEMPLATE = """You have access to the following tools:

{tools}

You must always select one of the above tools and respond with only a JSON object matching the following schema:

{{
  "tool": "route",
  "tool_input": <parameters for the selected tool, matching the tool's JSON schema>
}}
""" 


prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(members))

llm = OllamaFunctions(
    model=os.environ['LLM'],
    tool_system_prompt_template=DEFAULT_SYSTEM_TEMPLATE
    )


def supervisor_node(state):

    print(state)
    
    supervisor_chain = (
        prompt
        | llm.bind(functions=[function_def], function_call={"name": "route"})
        | JsonOutputFunctionsParser()
    )
        
    result = supervisor_chain.invoke(state)
    
    return result





import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage


# The agent state is the input to each node in the graph
class AgentState(TypedDict):
    # The annotation tells the graph that new messages will always
    # be added to the current states
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # The 'next' field indicates where to route to next
    next: str



from grp_travel_crew_ai.grp_travel_crewai import travel_crew

from grp_RAG1.grp_rag1_rag import mediwave_rag

from grp_others.grp_others_graph import grp_other_def as gen_others

from grp_food_crew_ai.grp_food_crewai import food_crew

from grp_Gen_Conv.grp_gen_conv_chain import general_conversation



workflow = StateGraph(AgentState)

workflow.add_node("Food_crew", food_crew)
workflow.add_node("General_conv", general_conversation)
workflow.add_node("General_other", gen_others)
workflow.add_node("Mediwave_rag", mediwave_rag)
workflow.add_node("Travel_crew", travel_crew)

workflow.add_node("supervisor", supervisor_node)




for member in members:
    
    if member == 'Mediwave_rag':
        continue
    if member == 'Travel_crew':
        continue
    
    
    # We want our workers to ALWAYS "report back" to the supervisor when done
    workflow.add_edge(member, "supervisor")
    

# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes


conditional_map = {k: k for k in members}



conditional_map["FINISH"] = END
# conditional_map['supervisor'] ='supervisor'

workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)

# Finally, add entrypoint
workflow.set_entry_point("supervisor")
workflow.set_finish_point('Mediwave_rag')
workflow.set_finish_point('General_conv')
workflow.set_finish_point('Travel_crew')


graph = workflow.compile()


In [None]:
from plan_and_execute.graph import graph 

graph.get_input_schema



In [None]:
from plan_and_execute.graph import graph 



graph.get_input_schema

In [6]:
from langchain_core.runnables import chain
from langchain_core.messages import AIMessage

from langchain.globals import set_debug, set_verbose
from plan_and_execute.graph import graph 


set_verbose=True 
set_debug=True



@chain
async def custom_chain(input):
    
    result = await graph.ainvoke({"input": input})
    
    # print(result)
    
    return AIMessage(content=result['response'])

res = await custom_chain.ainvoke(input='suggest some good spots in pondicherry')



{'plan': [{'key': 'General_other', 'value': 'Get the current time and date in Pondicherry.'}, {'key': 'Travel_crew', 'value': 'Use the time and date to determine the weather in Pondicherry.'}]}
Get the current time and date in Pondicherry.
{'messages': [HumanMessage(content='Get the current time and date in Pondicherry.')], 'agent_outcome': None, 'next': 'General_other'}
Get the current time and date in Pondicherry.
Called `execute_tools`
Calling tool: get_now(format: str = '%Y-%m-%d %H:%M:%S')
Called `execute_tools`
Calling tool: get\_now
Called `execute_tools`
Calling tool: Weather(location)
Called `execute_tools`
Calling tool: Weather(location)
Called `execute_tools`
Calling tool: wikipedia_search
{'next': 'General_other', 'messages': [HumanMessage(content='Get the current time and date in Pondicherry.'), HumanMessage(content='Get the current time and date in Pondicherry.')], 'agent_outcome': AgentFinish(return_values={'output': 'The current time and date in Pondicherry, India.'}, l

Overriding of current TracerProvider is not allowed


get the current weather in Pondicherry
{'messages': [HumanMessage(content='get the current weather in Pondicherry')], 'agent_outcome': None, 'next': 'Travel_crew'}


[1m> Entering new CrewAgentExecutor chain...[0m
[32;1m[1;3mAction: Delegate work to co-worker
Action Input: {coworker: 'Expert Travel Agent', task: 'get current weather in Pondicherry', context: 'The user needs information about the current weather in Pondicherry.'}[0m[32;1m[1;3mThought: I need to check my input and make sure it's in the correct format.
Action: Ask question to co-worker
Action Input: {coworker: 'Expert Travel Agent', question: 'What is the correct format for input to the delegate work tool?', context: 'The user needs information about the current weather in Pondicherry.'}[0m[32;1m[1;3mThought: I need to check my input and make sure it's in the correct format.
Action: Ask question to co-worker
Action Input: {coworker: 'Expert Travel Agent', question: 'What is the correct format for input to the de

Overriding of current TracerProvider is not allowed


get the current weather in Pondicherry
{'messages': [HumanMessage(content='get the current weather in Pondicherry')], 'agent_outcome': None, 'next': 'Travel_crew'}


[1m> Entering new CrewAgentExecutor chain...[0m
[32;1m[1;3mAction: Ask question to co-worker
Action Input: {'coworker': 'Travel agency manager', 'question': 'What is the current weather in Pondicherry?', 'context': ''}[0m

[1m> Entering new CrewAgentExecutor chain...[0m
[32;1m[1;3m
Final Answer: The current weather in Pondicherry is partly cloudy with a temperature of 27°C (80.6°F) and humidity of 81%.[0m

[1m> Finished chain.[0m
[93m 

The current weather in Pondicherry is partly cloudy with a temperature of 27°C (80.6°F) and humidity of 81%.
[00m
[32;1m[1;3mThought: I should find out the appropriate travel solution to meet the user's need for the current weather in Pondicherry.
Action: Delegate work to co-worker
Action Input: {'coworker': 'Expert Travel Agent', 'task': 'Find a suitable travel solution for

In [2]:
{
  "tool": "output_formatter",
  "tool_input": {
    "plan": [
      {
        "key": "Mediwave_rag",
        "value": "Describe Mediwave"
      },
      {
        "key": "General_other",
        "value": "Get current time"
      }
    ]
  }
}

AIMessage(content='Mediwave Digital is a healthcare technology and clinical research company that combines user-centered design with agile development. They have worked on several projects, one of which is the Sefton & Liverpool CAMHS platform built using Node.js, MongoDB, Apostrophe CMS + Node.js, PostgreSQL, and Sequelize. The current time is...')

In [1]:
from langchain_core.messages import HumanMessage
from plan_and_execute.graph import graph 
import warnings
warnings.filterwarnings('ignore')


config = {"recursion_limit": 50}
inputs = {"input": "whats the time now"}
async for event in graph.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v, end="\n\n")

Attempting to load an OpenAPI 3.0.0 spec.  This may result in degraded performance. Convert your OpenAPI spec to 3.1.* spec for better support.
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Content-Type. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Accept. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Content-Type. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Accept. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Content-Type. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Accept. Valid values are ['path', 'query'] Ig

ValueError: Failed to parse a function call from mistral:7b-instruct-v0.2-q8_0 output: {
  "tool": "General_other",
  "tool_input": {
    "output": {
      "steps": [
        {
          "key": "General_other",
          "value": "Get current time"
        },
        {
          "key": "_OutputFormatter",
          "value": {
            "steps": [
              {
                "key": "General_other",
                "value": "Get current time"
              },
              {
                "key": "Food_crew",
                "value": ""
              },
              {
                "key": "_OutputFormatter",
                "value": {
                  "steps": [
                    {
                      "key": "General_other",
                      "value": "Get current time"
                    },
                    {
                      "key": "Food_crew",
                      "value": ""
                    },
                    {
                      "key": "_OutputFormatter.output.steps[0].key",
                      "value": "The current time is: {current_time}"
                    }
                  ],
                  "current_time": ""
                }
              }
            ]
          }
        }
      ]
    },
    "current_time": ""
  }
}

In [2]:
from langchain_core.utils.function_calling import (
    convert_to_openai_function,
    convert_to_openai_tool,
)
from langchain_core.pydantic_v1 import BaseModel

class Response(BaseModel):
    """Response to user."""

    response: str

convert_to_openai_function(Response)

{'name': 'Response',
 'description': 'Response to user.',
 'parameters': {'type': 'object',
  'properties': {'response': {'type': 'string'}},
  'required': ['response']}}

# open ai function runnable

In [1]:
import json
from typing import Any, Callable, Dict, Literal, Optional, Sequence, Type, Union

from langchain_core.output_parsers import (
    BaseGenerationOutputParser,
    BaseOutputParser,
    JsonOutputParser,
)
from langchain_core.output_parsers.openai_functions import (
    JsonOutputFunctionsParser,
    PydanticAttrOutputFunctionsParser,
    PydanticOutputFunctionsParser,
)
from langchain_core.prompts import BasePromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import Runnable
from langchain_core.utils.function_calling import (
    convert_to_openai_function,
    convert_to_openai_tool,
)

from langchain.output_parsers import (
    JsonOutputKeyToolsParser,
    PydanticOutputParser,
    PydanticToolsParser,
)


def create_openai_fn_runnable(
    functions: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable]],
    llm: Runnable,
    prompt: Optional[BasePromptTemplate] = None,
    *,
    enforce_single_function_usage: bool = True,
    output_parser: Optional[Union[BaseOutputParser, BaseGenerationOutputParser]] = None,
    **llm_kwargs: Any,
) -> Runnable:
   
    # noqa: E501
    if not functions:
        raise ValueError("Need to pass in at least one function. Received zero.")
    openai_functions = [convert_to_openai_function(f) for f in functions]
    llm_kwargs_: Dict[str, Any] = {"functions": openai_functions}
    if len(openai_functions) == 1 and enforce_single_function_usage:
        llm_kwargs_["function_call"] = {"name": openai_functions[0]["name"]}
    output_parser = output_parser or get_openai_output_parser(functions)
    if prompt:
        return prompt | llm.bind(functions=openai_functions) | output_parser
    else:
        return llm.bind(**llm_kwargs_) | output_parser


def get_openai_output_parser(
    functions: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable]],
) -> Union[BaseOutputParser, BaseGenerationOutputParser]:
    """Get the appropriate function output parser given the user functions.

    Args:
        functions: Sequence where element is a dictionary, a pydantic.BaseModel class,
            or a Python function. If a dictionary is passed in, it is assumed to
            already be a valid OpenAI function.

    Returns:
        A PydanticOutputFunctionsParser if functions are Pydantic classes, otherwise
            a JsonOutputFunctionsParser. If there's only one function and it is
            not a Pydantic class, then the output parser will automatically extract
            only the function arguments and not the function name.
    """
    if isinstance(functions[0], type) and issubclass(functions[0], BaseModel):
        if len(functions) > 1:
            pydantic_schema: Union[Dict, Type[BaseModel]] = {
                convert_to_openai_function(fn)["name"]: fn for fn in functions
            }
        else:
            pydantic_schema = functions[0]
        output_parser: Union[
            BaseOutputParser, BaseGenerationOutputParser
        ] = PydanticOutputFunctionsParser(pydantic_schema=pydantic_schema)
    else:
        output_parser = JsonOutputFunctionsParser(args_only=len(functions) <= 1)
    return output_parser



In [2]:
state = {'input': 'tell me about mediwave and also give the current time', 'plan': [{'key': 'Mediwave_rag', 'value': 'Explain about Mediwave'}, {'key': 'General_other', 'value': 'Get current time'}], 'past_steps': ('Explain about Mediwave', " Claro: I'd be happy to help explain a bit more about Mediwave Digital! They are a company dedicated to addressing the world's needs by empowering over 50 brands to thrive and make a meaningful impact. As a design-thinking company, they focus on creating not only digital products and services but also transformative experiences.\n\nTheir team consists of talented UI/UX designers, full-stack developers, QA engineers, and DevOps programmers who collaborate from conceptualization through development, testing, and deployment. They emphasize a pursuit of excellence and innovation in all their projects.\n\nMediwave Digital has a strong focus on healthcare technology and clinical research, combining user-centered design with agile development. One of their notable achievements is their collaboration with Sefton & Liverpool CAMHS services to promote mental health and well-being for children, young people, families, and carers through a dedicated platform. Their tech stack includes Node.js, MangoDB, Apostrophe CMS + Node.js, PostgreSQL, and Sequelize.\n\nThey pride themselves on their collaborative efforts and the impact they make with their work. Matt Rigby, Head of Digital at Brook, expressed his gratitude for Mediwave's role in their team and project DFD. Overall, Mediwave is a culture-driven design company that thrives on tackling big challenges and making a tangible difference with their work. If you are interested in joining them on their mission to assist companies in delivering delightful customer experiences, they offer opportunities for Full Stack Developers."), 'response': None}

In [3]:

state_ = state.copy()

plan_steps = []

# m = [val for key, val in item.items() for item in g]

for item in state['plan']:
    # print(item['value'])
    plan_steps.append(item['value'])
    # for val in item.values():
    #     # k.append(val)
    #     print(val)
        
plan_steps

state_ |= {'plan': plan_steps}
state_

{'input': 'tell me about mediwave and also give the current time',
 'plan': ['Explain about Mediwave', 'Get current time'],
 'past_steps': ('Explain about Mediwave',
  " Claro: I'd be happy to help explain a bit more about Mediwave Digital! They are a company dedicated to addressing the world's needs by empowering over 50 brands to thrive and make a meaningful impact. As a design-thinking company, they focus on creating not only digital products and services but also transformative experiences.\n\nTheir team consists of talented UI/UX designers, full-stack developers, QA engineers, and DevOps programmers who collaborate from conceptualization through development, testing, and deployment. They emphasize a pursuit of excellence and innovation in all their projects.\n\nMediwave Digital has a strong focus on healthcare technology and clinical research, combining user-centered design with agile development. One of their notable achievements is their collaboration with Sefton & Liverpool CAM

In [1]:
# Begin

In [1]:
import json
from typing import Any, Dict, List, Optional

from langchain_community.chat_models.ollama import ChatOllama
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.outputs import ChatGeneration, ChatResult
from langchain_core.prompts import SystemMessagePromptTemplate

from langchain_experimental.pydantic_v1 import root_validator

DEFAULT_SYSTEM_TEMPLATE = """You have access to the following tools:

{tools}

You must always select one of the above tools and respond with only a JSON object matching the following schema:

{{
  "tool": <name of the selected tool>,
  "tool_input": <parameters for the selected tool, matching the tool's JSON schema>
}}
"""  # noqa: E501


DEFAULT_RESPONSE_FUNCTION = {
    "name": "__conversational_response",
    "description": (
        "Respond conversationally if no other tools should be called for a given query."
    ),
    "parameters": {
        "type": "object",
        "properties": {
            "response": {
                "type": "string",
                "description": "Conversational response to the user.",
            },
        },
        "required": ["response"],
    },
}


class OllamaFunctions(BaseChatModel):
    """Function chat model that uses Ollama API."""

    llm: ChatOllama

    tool_system_prompt_template: str

    @root_validator(pre=True)
    def validate_environment(cls, values: Dict) -> Dict:
        values["llm"] = values.get("llm") or ChatOllama(**values, format="json")
        values["tool_system_prompt_template"] = (
            values.get("tool_system_prompt_template") or DEFAULT_SYSTEM_TEMPLATE
        )
        return values

    @property
    def model(self) -> BaseChatModel:
        """For backwards compatibility."""
        return self.llm

    def _generate(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> ChatResult:
        functions = kwargs.get("functions", [])
        if "function_call" in kwargs:
            functions = [
                fn for fn in functions if fn["name"] == kwargs["function_call"]["name"]
            ]
            if not functions:
                raise ValueError(
                    'If "function_call" is specified, you must also pass a matching \
function in "functions".'
                )
            del kwargs["function_call"]
        elif not functions:
            functions.append(DEFAULT_RESPONSE_FUNCTION)
        system_message_prompt_template = SystemMessagePromptTemplate.from_template(
            self.tool_system_prompt_template
        )
        system_message = system_message_prompt_template.format(
            tools=json.dumps(functions, indent=2)
        )
        if "functions" in kwargs:
            del kwargs["functions"]
        response_message = self.llm.predict_messages(
            [system_message] + messages, stop=stop, callbacks=run_manager, **kwargs
        )
        chat_generation_content = response_message.content
        if not isinstance(chat_generation_content, str):
            raise ValueError("OllamaFunctions does not support non-string output.")
        try:
            parsed_chat_result = json.loads(chat_generation_content)
        except json.JSONDecodeError:
            raise ValueError(
                f'"{self.llm.model}" did not respond with valid JSON. Please try again.'
            )
        called_tool_name = parsed_chat_result["tool"]
        called_tool_arguments = parsed_chat_result["tool_input"]
        called_tool = next(
            (fn for fn in functions if fn["name"] == called_tool_name), None
        )
        if called_tool is None:
            raise ValueError(
                f"Failed to parse a function call from {self.llm.model} \
output: {chat_generation_content}"
            )
        if called_tool["name"] == DEFAULT_RESPONSE_FUNCTION["name"]:
            return ChatResult(
                generations=[
                    ChatGeneration(
                        message=AIMessage(
                            content=called_tool_arguments["response"],
                        )
                    )
                ]
            )

        response_message_with_functions = AIMessage(
            content="",
            additional_kwargs={
                "function_call": {
                    "name": called_tool_name,
                    "arguments": json.dumps(called_tool_arguments)
                    if called_tool_arguments
                    else "",
                },
            },
        )

        return ChatResult(
            generations=[ChatGeneration(message=response_message_with_functions)]
        )

    @property
    def _llm_type(self) -> str:
        return "ollama_functions"


In [2]:
import json
from typing import Any, Callable, Dict, Literal, Optional, Sequence, Type, Union

from langchain_core.output_parsers import (BaseGenerationOutputParser, BaseOutputParser,)
from langchain_core.output_parsers.openai_functions import (JsonOutputFunctionsParser, PydanticOutputFunctionsParser,)
from langchain_core.prompts import BasePromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import Runnable
from langchain_core.utils.function_calling import (convert_to_openai_function)

from langchain.output_parsers import (JsonOutputKeyToolsParser, PydanticOutputParser, PydanticToolsParser)


def create_openai_fn_runnable(
    functions: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable]],
    llm: Runnable,
    prompt: Optional[BasePromptTemplate] = None,
    *,
    enforce_single_function_usage: bool = True,
    output_parser: Optional[Union[BaseOutputParser, BaseGenerationOutputParser]] = None,
    **llm_kwargs: Any,
) -> Runnable:
   
    openai_functions = [convert_to_openai_function(f) for f in functions]
    llm_kwargs_: Dict[str, Any] = {"functions": openai_functions, **llm_kwargs}
    
    if len(openai_functions) == 1 and enforce_single_function_usage:
        llm_kwargs_["function_call"] = {"name": openai_functions[0]["name"]}
    output_parser = output_parser or get_openai_output_parser(functions)
    if prompt:
        return prompt | llm.bind(**llm_kwargs_) | output_parser
    else:
        return llm.bind(**llm_kwargs_) | output_parser


def get_openai_output_parser(
    functions: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable]],
) -> Union[BaseOutputParser, BaseGenerationOutputParser]:
    """Get the appropriate function output parser given the user functions.

    Args:
        functions: Sequence where element is a dictionary, a pydantic.BaseModel class,
            or a Python function. If a dictionary is passed in, it is assumed to
            already be a valid OpenAI function.

    Returns:
        A PydanticOutputFunctionsParser if functions are Pydantic classes, otherwise
            a JsonOutputFunctionsParser. If there's only one function and it is
            not a Pydantic class, then the output parser will automatically extract
            only the function arguments and not the function name.
    """
    if isinstance(functions[0], type) and issubclass(functions[0], BaseModel):
        if len(functions) > 1:
            pydantic_schema: Union[Dict, Type[BaseModel]] = {
                convert_to_openai_function(fn)["name"]: fn for fn in functions
            }
        else:
            pydantic_schema = functions[0]
        output_parser: Union[
            BaseOutputParser, BaseGenerationOutputParser
        ] = PydanticOutputFunctionsParser(pydantic_schema=pydantic_schema)
    else:
        output_parser = JsonOutputFunctionsParser(args_only=len(functions) <= 1)
    return output_parser



In [3]:
state_ = {'input': 'tell me about mediwave',
 'plan': ['Explain about Mediwave'],
 'past_steps': ('Explain about Mediwave',
  " Claro: I'd be happy to help explain a bit more about Mediwave Digital! They are a company dedicated to addressing the world's needs by empowering over 50 brands to thrive and make a meaningful impact. As a design-thinking company, they focus on creating not only digital products and services but also transformative experiences.\n\nTheir team consists of talented UI/UX designers, full-stack developers, QA engineers, and DevOps programmers who collaborate from conceptualization through development, testing, and deployment. They emphasize a pursuit of excellence and innovation in all their projects.\n\nMediwave Digital has a strong focus on healthcare technology and clinical research, combining user-centered design with agile development. One of their notable achievements is their collaboration with Sefton & Liverpool CAMHS services to promote mental health and well-being for children, young people, families, and carers through a dedicated platform. Their tech stack includes Node.js, MangoDB, Apostrophe CMS + Node.js, PostgreSQL, and Sequelize.\n\nThey pride themselves on their collaborative efforts and the impact they make with their work. Matt Rigby, Head of Digital at Brook, expressed his gratitude for Mediwave's role in their team and project DFD. Overall, Mediwave is a culture-driven design company that thrives on tackling big challenges and making a tangible difference with their work. If you are interested in joining them on their mission to assist companies in delivering delightful customer experiences, they offer opportunities for Full Stack Developers."),
 'response': None}

from plan_and_execute.planner import crews

from langchain_core.pydantic_v1 import BaseModel, Field
from typing_extensions import List
from typing import Literal

class Step(BaseModel):
    key: Literal["Food_crew", "General_conv", "General_other", "Mediwave_rag", "Travel_crew"] = Field(description='the worker gonna handle this task/step')
    value: str = Field(description='task/ step the worker need to do')


class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[Step] 
    





# from langchain.chains.openai_functions import create_openai_fn_runnable
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.prompts import ChatPromptTemplate
from plan_and_execute.planner import crews
# from langchain_experimental.llms.ollama_functions import OllamaFunctions
import os


from langchain.globals import set_debug, set_verbose


set_verbose=True 
set_debug=True

class Response(BaseModel):
    """Response to user."""

    response: str

response = {'name': 'Response',
 'description': 'Response to user.',
 'parameters': {'type': 'object',
  'properties': {'response': {'type': 'string'}},
  'required': ['response']}}


function1 = {'name': 'plan',
 'description': 'replanner',
 
 'parameters': {
   'type': 'array',
   'properties': {
     'key': {
       "enum": f"{crews}",
       'description': 'the worker gonna handle this task/step'
       
,
        
       'type': 'string'},
     'value': {
       'description': 'task/ step the worker need to do',
      'type': 'string'}
     },
                    
    'required': ['plan'],
  }
}



replanner_prompt = ChatPromptTemplate.from_template(
    """For the given user input, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
    
      if the user asks anything related to food, receipies, and it's related stuffs use 'Food_crew' key,
        if the user asks anything related to mediwave and it's related stuffs use 'Mediwave_rag' key,    
        if the user makes conversation, jokes and funny conversations then use 'General_conv' key,
        if the user asks anything related to weather, time, wikipedia and it's related stuffs use 'General_other' key,
        if the user asks anything related to travel, exploration, city tour and it's related stuffs use 'Travel_crew' key.
    
    
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

user input was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly(remove the completed step). If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)


replanner = create_openai_fn_runnable(
    
    [Plan, Response],
    OllamaFunctions(model=os.environ['LLM']),
    replanner_prompt,
)

# [function1, response],


output = replanner.invoke(state_)


if isinstance(output, Response):
    print({"response": output})
else:
    print({"plan": output})


Attempting to load an OpenAPI 3.0.0 spec.  This may result in degraded performance. Convert your OpenAPI spec to 3.1.* spec for better support.
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Content-Type. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Accept. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Content-Type. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Accept. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Content-Type. Valid values are ['path', 'query'] Ignoring optional parameter
Unsupported APIPropertyLocation "ParameterLocation.HEADER" for parameter Accept. Valid values are ['path', 'query'] Ig

{'plan': Plan(steps=[Step(key='Mediwave_rag', value='Provide detailed information about Mediwave Digital, their focus areas, team composition, and notable achievements.')])}


In [16]:
g = Plan(steps=[Step(key='Mediwave_rag', value='Give detailed information about Mediwave Digital and their focus on healthcare technology and clinical research'), Step(key='General_other', value='Determine and provide the current time')])

g.steps[0]

Step(key='Mediwave_rag', value='Give detailed information about Mediwave Digital and their focus on healthcare technology and clinical research')

In [19]:
g.dict()['steps']

[{'key': 'Mediwave_rag',
  'value': 'Give detailed information about Mediwave Digital and their focus on healthcare technology and clinical research'},
 {'key': 'General_other', 'value': 'Determine and provide the current time'}]

In [None]:
PromptTemplate(input_variables=['input', 'past_steps', 'plan'], template="For the given user input, come up with a simple step by step plan. This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps.     \n      if the user asks anything related to food, receipies, and it's related stuffs use 'Food_crew' key,\n        if the user asks anything related to mediwave and it's related stuffs use 'Mediwave_rag' key,    \n        if the user makes conversation, jokes and funny conversations then use 'General_conv' key,\n        if the user asks anything related to weather, time, wikipedia and it's related stuffs use 'General_other' key,\n        if the user asks anything related to travel, exploration, city tour and it's related stuffs use 'Travel_crew' key.\n    \n    \nThe result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.\n\nuser input was this:\n{input}\n\nYour original plan was this:\n{plan}\n\nYou have currently done the follow steps:\n{past_steps}\n\nUpdate your plan accordingly(remove the completed step). If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.")

In [10]:
state_

{'input': 'tell me about mediwave and also give the current time',
 'plan': ['Explain about Mediwave', 'Get current time'],
 'past_steps': ('Explain about Mediwave',
  " Claro: I'd be happy to help explain a bit more about Mediwave Digital! They are a company dedicated to addressing the world's needs by empowering over 50 brands to thrive and make a meaningful impact. As a design-thinking company, they focus on creating not only digital products and services but also transformative experiences.\n\nTheir team consists of talented UI/UX designers, full-stack developers, QA engineers, and DevOps programmers who collaborate from conceptualization through development, testing, and deployment. They emphasize a pursuit of excellence and innovation in all their projects.\n\nMediwave Digital has a strong focus on healthcare technology and clinical research, combining user-centered design with agile development. One of their notable achievements is their collaboration with Sefton & Liverpool CAM

In [None]:
{'plan': [{'key': 'Mediwave_rag', 'value': 'Get information about Mediwave'}]}

In [None]:
{'name': 'Response', 'arguments': {'response': 'Mediwave Digital is a healthcare technology and clinical research company that combines user-centered design with agile development. They have been instrumental in helping organizations achieve customer satisfaction throughout the customer journey. Some of their featured projects include Medichec, which helps identify medications that could potentially impact cognitive function or cause other adverse effects in older individuals, and Oxcare, a digital portal designed for supported self-management of physical health and mental wellbeing. Their tech stack includes various technologies such as Angular, Node.js, Mango DB, Cordova, Express.js, Apostrophe CMS, and PostgreSQL, among others. They take pride in collaborating with exceptional individuals and making a tangible impact with their work. For more information about their clients and testimonials, you can check out their website.'}}

In [None]:
from langchain_core.pydantic_v1 import BaseModel, Field

from pydantic import BaseModel 

class Steps(BaseModel):
    key: str = Field(description='the worker gonna handle this task/step')
    value: str = Field(description='task/ step the worker need to do')


class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[Steps] 
    

In [None]:
P

In [None]:
# {'Plan': [{'key': 'Mediwave_rag',
#    'value': 'Retrieve information about Mediwave'},
#   {'key': 'General_other',
#    'value': 'Format the retrieved information for user consumption'}]}

In [None]:
from langchain_core.messages import HumanMessage

config = {"recursion_limit": 50}
inputs = {"input": "what is the hometown of the 2024 Australia open winner?"}
async for event in graph.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)

# Scratch - supervisor and agent planner

In [None]:
from typing import List, Dict, Any
from pydantic import Field, BaseModel

class PlanItem(BaseModel):
    key: str = Field(description='the worker gonna handle this task/step')
    value: str = Field(description='task/ step the worker need to do')

class Plan(BaseModel):
    plan: List[PlanItem] = Field(description="different steps to follow, should be in sorted order, always make minimal steps")
    
    

In [None]:
from langchain_core.utils.function_calling import convert_pydantic_to_openai_function


openai_function = convert_pydantic_to_openai_function(Plan)

In [None]:
openai_function

In [None]:
crews = ["Food_crew", "General_conv", "General_other", "Mediwave_rag", "Travel_crew"]

In [None]:
function = {'name': 'Plan',
 'description': '',
 
 'parameters': {
   'type': 'array',
   'properties': {
     'key': {
       "enum": f"{crews}",
       'description': 'the worker gonna handle this task/step',
       'type': 'string'},
     'value': {
       'description': 'task/ step the worker need to do',
      'type': 'string'}
     },
                    
    'required': ['plan'],
  }
}


In [None]:
# from typing import List, Dict
# from pydantic import BaseModel

# class Parameter(BaseModel):
#     key: str
#     value: str

# class FunctionParameter(BaseModel):
#     type: str
#     properties: Dict[str, Parameter]
#     required: List[str]

# class FunctionSchema(BaseModel):
#     name: str
#     description: str
#     parameters: FunctionParameter


In [None]:
# from typing import List, Dict, Optional
# from pydantic import BaseModel, Field

# class Plan(BaseModel):
#     name: str
#     description: Optional[str] = Field(default='', description='Description of the plan')
#     parameters: List[Dict[str, str]]

#     class Config:
#         json_schema_extra = {
#             "example": {
#                 "name": "Plan",
#                 "description": "Description of the plan",
#                 "parameters": [
#                     {
#                         "key": "worker1",
#                         "value": "Do task A"
#                     },
#                     {
#                         "key": "worker2",
#                         "value": "Do task B"
#                     }
#                 ]
#             }
#         }


In [None]:
from langchain_core.pydantic_v1 import BaseModel
import os
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
from langchain_experimental.llms.ollama_functions import OllamaFunctions

class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )
    


Plan.schema_json()

In [None]:
from langchain_core.pydantic_v1 import BaseModel
import os
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Dict
from langchain_experimental.llms.ollama_functions import OllamaFunctions

class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[ Dict[
                key: str = Field(description='the worker gonna handle this task/step')
                value: str = Field(description='task/ step the worker need to do')
    ]
                ] = Field(
        description="different steps to follow, should be in sorted order"
    )
    


Plan.schema_json()

In [None]:
Plan.model_json_schema()

In [None]:
from langchain_core.pydantic_v1 import BaseModel
import os
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
from langchain_experimental.llms.ollama_functions import OllamaFunctions
    
from langchain.chains.openai_functions import create_structured_output_runnable
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_template(
    """For the given user input, come up with a simple step by step plan but don't provide answer coz you have tools to figure out things. \
        
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
    
    if the user asks anything related to food, receipies, and it's related stuffs use 'Food_crew' worker,
        if the user asks anything related to mediwave and it's related stuffs use 'Mediwave_rag' worker,    
        if the user makes conversation, jokes and funny conversations then use 'General_conv' worker,
        if the user asks anything related to weather, time, wikipedia and it's related stuffs use 'General_other' worker,
        if the user asks anything related to travel, exploration, city tour and it's related stuffs use 'Travel_crew' worker.
    
    
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

if the given objective related to mediwave then give the objective as plan

user input : {objective}"""
)

planner = create_structured_output_runnable(
    function, 
    OllamaFunctions(model=os.environ['LLM']),
    planner_prompt
)


In [None]:
def plan_step(input):
    plan = planner.invoke({"objective": input})
    # return {"plan": plan.steps}
    h = {"plan": plan}
    print(h)

    return {"plan": plan['plan']}


In [None]:
plan_step(input="Tell me about mediwave")

In [None]:
planner.invoke("tell me about mediwave")