In [4]:
import os
from langchain_ollama import ChatOllama

# Initialize an Ollama Client for our generative Llm model

text_model = "llama3.2"

def llm_client_loader():
    """This function serves an Ollama-hosted text generator model, to be used by our graphs."""
    try:
        llm = ChatOllama(
            model=text_model,
            temperature=0.2
        )
        return llm
    except Exception as e:
        print(f"Error {e} instantiating the Ollama client, is the Ollama server running?.")

llm = llm_client_loader()

In [6]:
from langchain_community.tools.yahoo_finance_news import YahooFinanceNewsTool

tools = [YahooFinanceNewsTool()]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)

In [None]:
from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage
import os
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.graph import END
from pydantic import Field, BaseModel
from langgraph.graph import MessagesState
from typing import Annotated, Literal
import operator
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.types import Send
from typing import List, TypedDict


# Nodes
def llm_call(state: MessagesState):
    """LLM decides whether to call a tool or not"""

    return {
        "messages": [
            llm_with_tools.invoke(
                [
                    SystemMessage(
                        content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
                    )
                ]
                + state["messages"]
            )
        ]
    }


def tool_node(state: dict):
    """Performs the tool call"""

    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}

# Nodes
def llm_call(state: MessagesState):
    """LLM decides whether to call a tool or not"""

    return {
        "messages": [
            llm_with_tools.invoke(
                [
                    SystemMessage(
                        content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
                    )
                ]
                + state["messages"]
            )
        ]
    }


# Conditional edge function to route to the tool node or end based upon whether the LLM made a tool call
def should_continue(state: MessagesState) -> Literal["tool_node", END]:
    """Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""

    messages = state["messages"]
    last_message = messages[-1]
    # If the LLM makes a tool call, then perform an action
    if last_message.tool_calls:
        return "tool_node"
    # Otherwise, we stop (reply to the user)
    return END


# Build workflow
agent_builder = StateGraph(MessagesState)

# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

# Add edges to connect nodes
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
    "llm_call",
    should_continue,
    ["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")

# Compile the agent
agent = agent_builder.compile()

# Show the agent
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))

# Invoke
messages = [HumanMessage(content="Add 3 and 4.")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
    m.pretty_print()

In [None]:
import os
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.graph import END
from pydantic import Field, BaseModel
from langgraph.graph import MessagesState
from typing import Annotated, Literal
import operator
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.types import Send
from typing import List, TypedDict

# AAA
try:
    # Schema for structured output
    #State definitions...
    class News_Analyzer(BaseModel):
        news_article_content: str = Field(description="The news article content to be analyzed.")
        metadata: str = Field(description="The metadata for the specific news article.")

    # Internal states (dynamic)
    # Augment the LLM with schema for structured output
    planner = llm.with_structured_output(FinancialReport)

    # Internal State definition

    class State(MessagesState):
        news_source: str  # news articles all together
        news_articles: list[News_Analyzer]  # list of news articles divided into metadata and content
        completed_analyses: Annotated[
            list, operator.add
        ]  # Shared key for the analysts to write to
        macro_financial_report: str

    class WorkerState(TypedDict):
        news_article: News_Analyzer
        completed_analyses: Annotated[list, operator.add]  # keys must match with other State!

    # Nodes / Tools
    def orchestrator(state: State):
        try:
            """Orchestrator that creates a research plan in order to obtain a comprehensive  ona set of topics it has to conduct research on,
             metadata and content.
            """

            # Generate queries...
            news_articles = planner.invoke(
                [
                    SystemMessage(
                        content="Split the following text into into distinct singular news articles, each with corresponding metadata and content."
                                "Do not mix news articles together, keep each topic in ints own container/bucket."),
                    HumanMessage(content=f"Assign to each split both the content and corresponding metadata: {state["news_source"]}."),
                ]
            )

            return {"news_articles": news_articles.news_analysts}

        except Exception as e:
            print(f"Error {e} during the orchestrator process.")

    class News_Summary(BaseModel):
        summary: str = Field(description="The summary of the news article.")
        label: Literal["inflation", "rates", "fed", "macro"] = Field(description="The label for the news article.")

    summarizer = llm.with_structured_output(News_Summary)

    def llm_call(state: WorkerState):
        """Worker writes a summary about the given news article. It also classifies the news article with a
        macroeconomic label.
        """

        try:
            print(f"LLM call instantiated {state['news_article'].metadata}.")

            analysis = summarizer.invoke(
                [
                    SystemMessage(
                        content="Summarize the following news article in two clear and concise paragraph, capturing the key ideas "
                                "without missing critical points. Ensure the summary is easy to understand and avoids "
                                "excessive detail. Be sure to also label the news article accordingly."
                    ),
                    HumanMessage(
                        content=f"Here is the news article: {state['news_article'].news_article_content} with corresponding metadata: "
                                f"{state['news_article'].metadata}."
                    ),
                ]
            )

            serialized_summary = "".join(f"{analysis.summary} {analysis.label}")

            print(f"Summary is {serialized_summary}.")

            # Write the final amount that was calculated.
            return {"completed_analyses": [serialized_summary]}

        except Exception as e:
            print(f"Error {e} during the llm-call process.")

    def synthesizer(state: State):
        """Synthesize a summary report from the collection of news articles. Don't forget to include the
        macroeconomic label for each news article."""
        try:
            # List of completed sections
            completed_report = state["completed_analyses"]

            # Format completed section to str to use as context for final sections
            serialized_completed_report = "\n\n---\n\n".join(completed_report)

            return {"macro_financial_report": serialized_completed_report}
        except Exception as e:
            print(f"Error {e} during the synthesizer process.")

    # Conditional edge function to create llm_call workers that each write a section of the report
    def assign_workers(state: State):
        try:
            """Assign a worker to each news article in the collection of news articles."""

            # Kick off section writing in parallel via Send() API
            return [Send("llm_call", {"news_article": s}) for s in state["news_articles"]]
        except Exception as e:
            print(f"Error {e} during the worker assignment process.")

    # Build workflow
    orchestrator_worker_builder = StateGraph(State)

    # Add the nodes
    orchestrator_worker_builder.add_node("orchestrator", orchestrator)
    orchestrator_worker_builder.add_node("llm_call", llm_call)
    orchestrator_worker_builder.add_node("synthesizer", synthesizer)

    # Add edges to connect nodes
    orchestrator_worker_builder.add_edge(START, "orchestrator")
    orchestrator_worker_builder.add_conditional_edges(
        "orchestrator", assign_workers, ["llm_call"]
    )
    orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
    orchestrator_worker_builder.add_edge("synthesizer", END)

    # Compile the workflow
    orchestrator_worker = orchestrator_worker_builder.compile()
    return orchestrator_worker
except Exception as e:
    print(f"Error {e} during the graph building process.")

# Invoke
llm_client = llm_client_loader()
graph = graph_builder(llm_client)
render_graph(graph)
mn = MarketNewsProvider(category="forex", api_key=os.getenv("FINNHUB_API_KEY"))
market_news = mn.fetch()
state = graph.invoke({"news_source": f"{market_news}"})
print(state["macro_financial_report"])