# Phase 4: State-of-the-Art - Institutional Portfolio Management

Welcome to the final phase. Here, we implement an **Institutional Portfolio Management** system. This system scales complexity by integrating the **Black-Litterman Model** and a **Human-in-the-Loop (HITL)** branching workflow.

### System Architecture
The flow includes conditional branching and feedback loops:
```
START ‚Üí Data Agent ‚Üí Market Posterior (BL) ‚Üí Optimizer ‚Üí Human Review
                                 ‚Üë                          |
                                 ‚îî------- [Rejected] -------‚îò
                                 |
                                 ‚îî------- [Approved] -------‚Üí END
```

| Agent | Role | Logic |
|---|---|---|
| **InstitutionalDataAgent** | Fetches market equilibrium (Priors) | Sequential |
| **MarketPosteriorAgent** | Merges market data with subjective Views | Iterative |
| **PortfolioOptimizer** | Generates target weights | Sequential |
| **HumanReviewAgent** | Simulates a Portfolio Manager's approval | Branching |

## 1. Environment Setup

We use `--only-binary=:all:` to avoid C++ build errors on Windows for libraries like `matplotlib`.

In [30]:
%pip install vinagent==0.0.6.post3

from langchain_openai import ChatOpenAI
from dotenv import load_dotenv, find_dotenv
import os

load_dotenv(find_dotenv('.env'))

llm = ChatOpenAI(model="gpt-4o-mini")
print("LLM initialized.")

Note: you may need to restart the kernel to use updated packages.
LLM initialized.


## 2. Define Institutional State

The state tracks market priors, subjective views, and the final approval status.

In [31]:
import operator
from typing import Annotated, List, TypedDict

def append_messages(existing: list, update: dict) -> list:
    return existing + [update]

class InstitutionalState(TypedDict):
    """State for professional institutional rebalancing."""
    messages: Annotated[list[dict], append_messages]
    market_priors: str
    optimal_weights: str
    approval_status: str
    pm_feedback: str

In [32]:
from datetime import datetime
from vinagent.register import primary_function

@primary_function
def get_current_time() -> str:
    """
    Get the current date and time. Use this to know 'today's' date.
    Returns:
        str: Current date and time in YYYY-MM-DD HH:MM:SS format.
    """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

print("Custom time tool defined.")

Custom time tool defined.


In [33]:
import operator
import re
from typing import TypedDict, Annotated, List, Optional, Callable
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from tavily import TavilyClient
from vinagent.register import primary_function

# Monkeypatch to fix ImportError in vinagent library
import langgraph.utils.runnable as utils_runnable
try:
    from langgraph._internal._runnable import coerce_to_runnable
    utils_runnable.coerce_to_runnable = coerce_to_runnable
except (ImportError, AttributeError):
    pass

class DeepSearchState(TypedDict):
    task: str
    plan: List[str]
    draft: str
    critique: str
    adjustment: List[str]
    sections: Annotated[List[str], operator.add]
    chapters: Annotated[List[str], operator.add]
    revision_number: int
    max_revisions: int
    max_chapters: int
    max_paragraphs_per_chapter: int
    max_critical_queries: int
    number_of_chapters: int
    current_chapter_order: int

class FlexibleDeepSearch:
    PLAN_PROMPT = """Outline for analytical essay on the topic. Not more than {max_chapters} chapters. Format: 1. Chapter 1\n2. Chapter..."""
    WRITER_PROMPT = """Generate ‡¶ó‡¶¨‡ßá‡¶∑‡¶£‡¶æ (research) for {max_paragraphs_per_chapter} paragraphs. Use collected info. Respond with revised version if critique provided. Utilize info below:\n\n------\n- Previous content:\n{content}\n- Critique:\n{critique}\n- Suggested Adjustment:\n{adjustment}"""
    REFLECTION_PROMPT = """Generate critique and recommendations. Less than 200 words."""
    RESEARCH_CRITIQUE_PROMPT = """Generate search queries (max {max_critical_queries}) to gather info for critique."""

    def __init__(self, llm, search_func: Callable[[str], List[str]]):
        self.llm = llm
        self.search_func = search_func
        self.graph = self._build_graph()

    def _build_graph(self):
        builder = StateGraph(DeepSearchState)
        builder.add_node("planner", self.plan_node)
        builder.add_node("generate", self.generation_node)
        builder.add_node("reflect", self.reflection_node)
        builder.add_node("research_critique", self.research_critique_node)
        builder.add_edge(START, "planner")
        builder.add_edge("planner", "generate")
        builder.add_conditional_edges("generate", self.should_continue, {END: END, "reflect": "reflect"})
        builder.add_edge("reflect", "research_critique")
        builder.add_edge("research_critique", "generate")
        return builder.compile()

    def plan_node(self, state: DeepSearchState):
        messages = [SystemMessage(content=self.PLAN_PROMPT.format(max_chapters=state['max_chapters'])), HumanMessage(content=state["task"])]
        response = self.llm.invoke(messages)
        list_tasks = [t for t in response.content.split("\n") if re.match(r"^\d+\. ", t.strip())]
        return {"plan": list_tasks, "current_chapter_order": 0, "number_of_chapters": len(list_tasks)}

    def generation_node(self, state: DeepSearchState):
        idx = state["current_chapter_order"]
        chapter_outline = state["plan"][idx]
        chapter_title = chapter_outline.split("\n")[0].strip()
        search_results = self.search_func(chapter_outline)
        if state.get("adjustment"): search_results.extend(state["adjustment"])
        messages = [SystemMessage(content=self.WRITER_PROMPT.format(max_paragraphs_per_chapter=state["max_paragraphs_per_chapter"], content="\n".join(search_results), critique=state.get("critique", ""), adjustment="\n".join(state.get("adjustment", [])))), HumanMessage(content=f"Outline: {chapter_outline}")]
        response = self.llm.invoke(messages)
        is_last_revision = state.get("revision_number", 1) >= state["max_revisions"]
        new_data = {"draft": response.content, "sections": [chapter_title]}
        if is_last_revision:
            new_data["chapters"] = [f"## {chapter_title}\n{response.content}"]
            new_data["current_chapter_order"] = idx + 1
            new_data["revision_number"] = 1
        else:
            new_data["revision_number"] = state["revision_number"] + 1
        return new_data

    def reflection_node(self, state: DeepSearchState):
        messages = [SystemMessage(content=self.REFLECTION_PROMPT), HumanMessage(content=state["draft"])]
        response = self.llm.invoke(messages)
        return {"critique": response.content}

    def research_critique_node(self, state: DeepSearchState):
        messages = [SystemMessage(content=self.RESEARCH_CRITIQUE_PROMPT.format(max_critical_queries=state["max_critical_queries"])), HumanMessage(content=f"Critique to address: {state['critique']}")]
        response = self.llm.invoke(messages)
        queries = [q for q in response.content.split("\n") if re.match(r"^\d+\. ", q.strip())]
        new_results = []
        for q in queries: new_results.extend(self.search_func(q))
        return {"adjustment": new_results}

    def should_continue(self, state: DeepSearchState):
        if state["current_chapter_order"] >= state["number_of_chapters"]: return END
        return "reflect"

    def run(self, query: str, config: dict = {}):
        initial_state = {"task": query, "max_chapters": config.get("max_chapters", 3), "max_paragraphs_per_chapter": config.get("max_paragraphs_per_chapter", 3), "max_critical_queries": config.get("max_critical_queries", 3), "max_revisions": config.get("max_revisions", 1), "revision_number": 1, "sections": [], "chapters": [], "adjustment": []}
        final_state = self.graph.invoke(initial_state)
        return f"# I. Planning\n" + "\n".join(final_state["sections"]) + "\n\n# II. Results\n" + "\n\n".join(final_state["chapters"])

tavily = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))

def tavily_search(query: str) -> List[str]:
    response = tavily.search(query=query, max_results=3)
    return [r["content"] for r in response["results"]]

@primary_function
def custom_deepsearch_tool(query: str, max_chapters: int = 3, max_paragraphs_per_chapter: int = 3, max_critical_queries: int = 3, max_revisions: int = 1) -> str:
    """Invoke custom deepsearch to deeply analyze the query."""
    openai_llm = ChatOpenAI(model="gpt-4o-mini")
    deepsearch = FlexibleDeepSearch(llm=openai_llm, search_func=tavily_search)
    return deepsearch.run(query, config={"max_chapters": max_chapters, "max_paragraphs_per_chapter": max_paragraphs_per_chapter, "max_critical_queries": max_critical_queries, "max_revisions": max_revisions})

print("Custom DeepSearch tool defined.")

Custom DeepSearch tool defined.


## 3. Implement Institutional Agents

The `HumanReviewAgent` is the core branching node.

In [34]:
from vinagent.multi_agent import AgentNode
from vinagent.logger.logger import logging_message

class InstitutionalDataAgent(AgentNode):
    """Fetches CAPM priors and market equilibrium data."""
    @logging_message
    def exec(self, state: InstitutionalState) -> dict:
        print(f"[{self.name}] Processing institutional workflow step...")
        prompt = "Calculate implied market returns (priors) for a 60/40 Equity/Bond portfolio based on current risk-free rates."
        output = self.invoke(prompt)
        return {
            "messages": {"role": "DataAgent", "content": output.content if hasattr(output, "content") else str(output) if hasattr(output, "content") else str(output)},
            "market_priors": output.content if hasattr(output, "content") else str(output) if hasattr(output, "content") else str(output)
        }

class MarketPosteriorAgent(AgentNode):
    """Combines Priors with Analyst Views using Black-Litterman logic."""
    @logging_message
    def exec(self, state: InstitutionalState) -> dict:
        print(f"[{self.name}] Processing institutional workflow step...")
        priors = state.get("market_priors", "")
        views = "Analyst View: Bullish on Tech (NVDA), Neutral on Bonds."
        
        prompt = f"Given Priors: {priors} and Views: {views}, calculate the Black-Litterman posterior returns."
        output = self.invoke(prompt)
        return {
            "messages": {"role": "PosteriorAgent", "content": output.content if hasattr(output, "content") else str(output) if hasattr(output, "content") else str(output)},
            "investor_views": views
        }

class PortfolioOptimizer(AgentNode):
    """Performs mean-variance optimization on the posterior distribution."""
    @logging_message
    def exec(self, state: InstitutionalState) -> dict:
        print(f"[{self.name}] Processing institutional workflow step...")
        prompt = "Based on the posterior returns, generate a target portfolio allocation (weights). Output as a percentage dictionary."
        output = self.invoke(prompt)
        # Simulated weight dict
        return {
            "messages": {"role": "Optimizer", "content": output.content if hasattr(output, "content") else str(output) if hasattr(output, "content") else str(output)},
            "optimized_weights": {"NVDA": 0.45, "SPY": 0.35, "AGG": 0.20}
        }

class HumanReviewAgent(AgentNode):
    """Simulates a PM review. Decides whether to approve or reject for revision."""
    @logging_message
    def exec(self, state: InstitutionalState) -> dict:
        print(f"[{self.name}] Processing institutional workflow step...")
        weights = state.get("optimized_weights", {})
        prompt = f"Review this portfolio rebalance: {weights}. Is it too aggressive? Respond with 'approved' or 'rejected'. IF REJECTING, specify EXACTLY what needs adjustment in the analyst views."
        output = self.invoke(prompt)
        
        res_text = output.content if hasattr(output, "content") else str(output) if hasattr(output, "content") else str(output)
        status = "approved" if "approve" in res_text.lower() else "rejected"
        return {
            "messages": {"role": "PM_Review", "content": output.content if hasattr(output, "content") else str(output) if hasattr(output, "content") else str(output)},
            "approval_status": status
        }

    def branching(self, state: InstitutionalState, config=None) -> str:
        """Determines the next node in the graph."""
        return state.get("approval_status", "rejected")

## 4. Assemble the Institutional Graph

We use conditional mapping to handle the feedback loop from Human Review to Posterior adjustment.

In [35]:
from vinagent.register import primary_function
from typing import Any, Union, Callable, List
import time
import inspect
import operator
from typing import Annotated, TypedDict
from IPython.display import display, Markdown

# Tools imported directly
from vinagent.tools.yfinance_tools import fetch_stock_data, visualize_stock_data, plot_returns
from vinagent.tools.websearch_tools import search_api

@primary_function
def get_current_time() -> str:
    """Get current system time."""
    from datetime import datetime
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

print("Institutional Core Infrastructure Initialized (Direct Registration).")

Resilient tool infrastructure initialized.


In [36]:
from vinagent.multi_agent import CrewAgent
from vinagent.graph.operator import FlowStateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

instr = "CRITICAL: Format tool arguments as JSON object. NEVER use plain strings."
data_agent = InstitutionalDataAgent(name="data_agent", llm=llm, tools=[], instruction=instr)
bl_agent = MarketPosteriorAgent(name="bl_agent", llm=llm, tools=[], instruction=instr)
optimizer = PortfolioOptimizer(name="optimizer", llm=llm, tools=[], instruction=instr)
human_review = HumanReviewAgent(name="human_review", llm=llm, tools=[], instruction="Analyze PM feedback carefully.")

# Direct Tool Registration for Institutional Workflow
global_tools = [
    primary_function(fetch_stock_data),
    primary_function(visualize_stock_data),
    primary_function(plot_returns),
    primary_function(search_api),
    primary_function(custom_deepsearch_tool),
    get_current_time
]

for agent in [data_agent, bl_agent, optimizer, human_review]:
    for tool in global_tools:
        agent.tools_manager.register_function_tool(tool)

crew = CrewAgent(
    llm=llm,
    checkpoint=MemorySaver(),
    graph=FlowStateGraph(InstitutionalState),
    flow=[
        START >> data_agent,
        data_agent >> bl_agent,
        bl_agent >> optimizer,
        optimizer >> human_review,
        human_review >> {"approved": END, "rejected": bl_agent}
    ]
)
print("Institutional Portfolio Series Assembled with Direct Registration.")

INFO:vinagent.register.tool:Registered tool: fetch_stock_data (runtime)
INFO:vinagent.register.tool:Registered tool: visualize_stock_data (runtime)
INFO:vinagent.register.tool:Registered tool: plot_returns (runtime)
INFO:vinagent.register.tool:Registered tool: search_api (runtime)
INFO:vinagent.register.tool:Registered tool: custom_deepsearch_tool (runtime)
INFO:vinagent.register.tool:Registered tool: get_current_time (runtime)
INFO:vinagent.register.tool:Registered tool: fetch_stock_data (runtime)
INFO:vinagent.register.tool:Registered tool: visualize_stock_data (runtime)
INFO:vinagent.register.tool:Registered tool: plot_returns (runtime)
INFO:vinagent.register.tool:Registered tool: search_api (runtime)
INFO:vinagent.register.tool:Registered tool: custom_deepsearch_tool (runtime)
INFO:vinagent.register.tool:Registered tool: get_current_time (runtime)
INFO:vinagent.register.tool:Registered tool: fetch_stock_data (runtime)
INFO:vinagent.register.tool:Registered tool: visualize_stock_dat

Institutional Portfolio Series Assembled with Global Resilient Tools.


## 5. Execute Institutional Rebalance

The system will iterate until the Human Review node grants approval.

In [37]:
from IPython.display import display, Markdown

query = "Perform an institutional rebalance for the Tech-Heavy portfolio. Ensure it goes through PM review."

result = crew.invoke(query=query, user_id="admin", thread_id=10)

display(Markdown(f"""
## üè¶ Institutional Portfolio Advice
---
**Approval Status:** {result.get('approval_status', 'Unknown')}

**Optimized Allocation Insights:**
{result.get('optimal_weights', 'No data collected.')}
"""))

INFO:vinagent.multi_agent.crew:No authentication card provided, skipping authentication
INFO:vinagent.agent.agent:No authentication card provided, skipping authentication
INFO:vinagent.agent.agent:I'am chatting with unknown_user
INFO:vinagent.agent.agent:Tool calling iteration 1/10


{'input': {'messages': {'role': 'user', 'content': 'Perform an institutional rebalance for the Tech-Heavy portfolio. Ensure it goes through PM review.'}}, 'config': {'configurable': {'user_id': 'admin'}, 'thread_id': 10}}


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:vinagent.agent.agent:Executing tool call: {'tool_name': 'search_api', 'tool_type': 'function', 'arguments': {'query': 'current risk-free rates'}, 'module_path': '__runtime__'}
INFO:vinagent.register.tool:Completed executing function tool search_api({'query': 'current risk-free rates'})
INFO:vinagent.agent.agent:Tool calling iteration 2/10
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:vinagent.agent.agent:No more tool calls needed. Completed in 2 iterations.
INFO:vinagent.logger.logger:

{'messages': {'role': 'DataAgent', 'content': 'To calculate the implied market returns for a 60/40 Equity/Bond portfolio, based on the current risk-free rates of 4.02% for the 10-year Treasury and 2.25% for the Canadian 2-year bond yield, we can consider the following:\n\n1. **Equity Return**: Typically, the expected return on equity can be calculated by addin

AttributeError: 'str' object has no attribute 'artifact'