In [44]:
from langchain.tools import Tool
from langchain_community.llms import LlamaCpp
from langchain_core.messages import HumanMessage, SystemMessage
from typing import Dict, Any, List, Tuple, TypedDict
from langgraph.graph import Graph, StateGraph
import json


mistral = LlamaCpp(
    model_path="C:/Users/DaysPC/Langgraph/mistral-7b-instruct-v0.2.Q4_K_M.gguf",
    temperature=0.1,
    max_tokens=2000,
    n_ctx=4096,
    top_p=0.95,
    verbose=False
)


def search_web(query: str) -> str:
    """Simulate web search."""
    return f"Found results for: {query}"

def calculate(expression: str) -> str:
    """Evaluate a mathematical expression."""
    try:
        return str(eval(expression))
    except Exception as e:
        return f"Error: {str(e)}"

def write_file(content: str) -> str:
    """Simulate writing content to a file."""
    return f"Successfully wrote: {content}"

llama_context: n_batch is less than GGML_KQ_MASK_PAD - increasing to 64
llama_context: n_ctx_per_seq (4096) < n_ctx_train (32768) -- the full capacity of the model will not be utilized


In [45]:
class PlanAgent:
    def __init__(self, model: LlamaCpp):
        self.model = model
        
    def create_plan(self, query: str) -> List[Dict[str, Any]]:
        """Create a plan by breaking down the query into subtasks."""
        prompt = f"""<s>[INST] Break this task into exactly 2 simple steps:
        Task: {query}
        
        Create 2 separate tasks:
        1. First find/search for the required information
        2. Then perform the calculation or action on that information
        
        Format as a Python list with exactly 2 dictionaries:
        [
            {{"task_id": 1, "description": "Search for specific information", "status": "pending"}},
            {{"task_id": 2, "description": "Perform calculation with the found information", "status": "pending"}}
        ]
        
        Return only the Python list.[/INST]</s>"""
        
        try:
            response = self.model.invoke(prompt)
            
            response_clean = response.strip().split('[')[1].split(']')[0]
            response_clean = f"[{response_clean}]"
            tasks = eval(response_clean)
            
            
            if not tasks or len(tasks) == 0:
                raise ValueError("No tasks created")
                
            
            cleaned_tasks = []
            for i, task in enumerate(tasks, 1):
                cleaned_tasks.append({
                    "task_id": i,
                    "description": task.get("description", f"Step {i} of: {query}"),
                    "status": "pending"
                })
            return cleaned_tasks
            
        except Exception as e:
            
            return [
                {
                    "task_id": 1,
                    "description": f"Search for information: {query}",
                    "status": "pending"
                },
                {
                    "task_id": 2,
                    "description": f"Process the found information",
                    "status": "pending"
                }
            ]

In [46]:
class ToolAgent:
    def __init__(self, tools: List[Tool], model: LlamaCpp):
        self.model = model
        self.calculator = next(t for t in tools if t.name == "calculator")
        
    def get_population_from_model(self, location: str) -> tuple[int, str]:
        """Use Mistral's knowledge to get population data, parsing JSON response."""
        import json
        
        prompt = f"""[INST] What is the current population of {location}? 
        Answer ONLY in JSON format, with these keys:
        {{
          "population_millions": float,
          "year": int
        }}
        Example: {{"population_millions": 8.8, "year": 2023}}
        Use recent and precise data.[/INST]"""
        
        response = self.model.invoke(prompt).strip()
        
        try:
            data = json.loads(response)
            population_millions = data["population_millions"]
            year = data["year"]
            population = int(population_millions * 1_000_000)
            return population, f"Model estimate ({year})"
        
        except Exception as e:
            raise ValueError(f"Failed to parse population data from model response: {str(e)}")
        
    def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a single task using available tools."""
        try:
            if task["task_id"] == 1:  
                
                location_prompt = f"""[INST] Extract just the location name from this text:
                {task['description']}
                Return only the location name, no other text.[/INST]"""
                
                location = self.model.invoke(location_prompt).strip()
                
                
                population, source = self.get_population_from_model(location)
                
                
                if population < 100_000 or population > 350_000_000:  # US population cap
                    raise ValueError("Population number seems incorrect")
                    
                self.current_population = population
                
                return {
                    **task,
                    "status": "completed",
                    "result": f"Population of {location}: {population:,}\nSource: {source}",
                    "tool_used": "model_knowledge"
                }
                
            elif task["task_id"] == 2:  
                if not hasattr(self, 'current_population'):
                    raise ValueError("Population data not available")
                    
                calculation = f"{self.current_population} * 0.15"
                result = self.calculator.run(calculation)
                calculated_value = float(result)
                
                return {
                    **task,
                    "status": "completed",
                    "result": f"15% of {self.current_population:,} = {calculated_value:,.2f}",
                    "tool_used": "calculator"
                }
                
        except Exception as e:
            return {
                **task,
                "status": "failed",
                "error": str(e),
                "result": f"Error in task execution: {str(e)}"
            }
    
    def reflect_on_result(self, task: Dict[str, Any]) -> str:
        """Provide feedback on task execution."""
        if task.get("status") == "completed":
            if task["task_id"] == 1:
                return "Successfully retrieved population data"
            elif task["task_id"] == 2:
                return "Successfully calculated percentage"
        return f"Task {task['task_id']} failed: {task.get('error', 'unknown error')}"


In [47]:
class WorkflowState(TypedDict):
    query: str
    tasks: List[Dict[str, Any]]
    current_task_id: int
    completed: bool
    steps: int
    last_node: str

def create_workflow(tools: List[Tool], model: LlamaCpp) -> Graph:
    """Create a simplified workflow that executes tasks sequentially."""
    
    
    plan_agent = PlanAgent(model)
    tool_agent = ToolAgent(tools, model)
    
    def plan(state: WorkflowState) -> WorkflowState:
        """Create initial plan if no tasks exist."""
        if not state["tasks"]:
            state["tasks"] = plan_agent.create_plan(state["query"])
            state["steps"] = 0
            state["last_node"] = "plan"
            state["current_task_id"] = 1  
        return state
    
    def execute(state: WorkflowState) -> WorkflowState:
        """Execute current task."""
        state["steps"] = state.get("steps", 0) + 1
        state["last_node"] = "execute"
        
        
        current_task = next((t for t in state["tasks"] 
                           if t["task_id"] == state["current_task_id"]), None)
        
        if current_task and current_task["status"] == "pending":
            
            result = tool_agent.execute_task(current_task)
            
            
            state["tasks"] = [
                result if t["task_id"] == current_task["task_id"] else t
                for t in state["tasks"]
            ]
            
            
            if state["current_task_id"] < len(state["tasks"]):
                state["current_task_id"] += 1
            else:
                state["completed"] = True
                
        return state
    
    def should_continue(state: WorkflowState) -> Tuple[bool, str]:
        """Determine next step in workflow."""
        
        if state["completed"]:
            return False, "end"
            
        
        if state["steps"] >= 10:
            state["completed"] = True
            return False, "end"
            
        
        if not state["tasks"]:
            return True, "plan"
        
        
        current_task = next((t for t in state["tasks"] 
                           if t["task_id"] == state["current_task_id"]), None)
        
        if current_task and current_task["status"] == "pending":
            return True, "execute"
        
        
        if state["current_task_id"] < len(state["tasks"]):
            return True, "execute"
        
        
        state["completed"] = True
        return False, "end"
    
    
    workflow = StateGraph(WorkflowState)
    
    
    workflow.add_node("plan", plan)
    workflow.add_node("execute", execute)
    
    
    workflow.set_entry_point("plan")
    
    
    workflow.add_conditional_edges("plan", should_continue)
    workflow.add_conditional_edges("execute", should_continue)
    
    return workflow.compile()

def run_workflow(graph: Graph, query: str) -> Dict[str, Any]:
    """Run workflow with initial query."""
    initial_state: WorkflowState = {
        "query": query,
        "tasks": [],
        "current_task_id": 1,
        "completed": False,
        "steps": 0,
        "last_node": ""
    }
    return graph.invoke(initial_state)

In [48]:
tools = [
    Tool(
        name="web_search",
        func=search_web,
        description="Search the web for information."
    ),
    Tool(
        name="calculator",
        func=calculate,
        description="Calculate mathematical expressions."
    ),
    Tool(
        name="file_writer",
        func=write_file,
        description="Write content to a file."
    )
]

try:
    
    workflow = create_workflow(tools, mistral)
    
    
    query = "Find the population of New Jersey and calculate 15% of it"
    
    
    result = run_workflow(workflow, query)
    
    print("\nWorkflow completed!")
    print("\nTasks and Results:")
    for task in result["tasks"]:
        print(f"\nTask {task['task_id']}:")
        print(f"Description: {task['description']}")
        print(f"Status: {task['status']}")
        if "result" in task:
            print(f"Result: {task['result']}")
            
except Exception as e:
    print(f"Error: {str(e)}")




Workflow completed!

Tasks and Results:

Task 1:
Description: Search for the population of New Jersey
Status: completed
Result: Population of New Jersey: 9,200,000
Source: Model estimate (2023)

Task 2:
Description: Calculate 15% of the New Jersey population
Status: completed
Result: 15% of 9,200,000 = 1,380,000.00
