In [None]:
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

### Test Local LangGraph Agent

In [None]:
from langgraph_sdk import get_client

url = "https://leetmock-ts-fa225f46565756e7b0567441810f232f.default.us.langgraph.app"
client = get_client(url=url)

# Using the graph deployed with the name "agent"
assistant_id = "code-mock-staged-v1"

# create thread
thread = await client.threads.create()

print(thread)

In [2]:
openai_assistant = await client.assistants.create(graph_id=assistant_id)

In [None]:
openai_assistant

In [3]:
from agent_graph.code_mock_staged_v1.graph import create_compiled_graph
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

# graph = create_graph().compile()
graph = create_compiled_graph()
config = { "configurable": {"thread_id": "1", "session_id1": "abc"} }

In [1]:
from langgraph.graph import StateGraph
from pydantic import BaseModel
from typing import List

class State(BaseModel):
    messages: List[str] = []


graph = StateGraph(State).compile()

In [None]:
async for chunk in graph.astream(
    input={
        "messages": ["Hi"],
        "event": "reminder",
        # "trigger": True,
    },
    config=config,
    stream_mode=["values"],
):
    print(chunk)

async for chunk in graph.astream(
    input={
        "messages": ["Hi"],
        # "event": "user_message"
    },
    config=config,
    stream_mode=["values"],
):
    print(chunk)

In [None]:
from langchain_core.load.load import load, loads
from langchain_core.load.dump import dumpd, dumps
from langgraph.types import StateSnapshot

state = graph.get_state(config=config)
state

In [None]:
dumped_state = dumpd(state)
dumped_state

In [None]:
loaded_state = load(dumped_state, valid_namespaces=["agent_graph"])
loaded_state_snapshot = StateSnapshot(*loaded_state)
loaded_state_snapshot

In [None]:
loaded_state_snapshot == state, dumpd(loaded_state_snapshot) == dumpd(state)

In [None]:
loaded_state_snapshot.values == state.values

In [10]:
history = list(graph.get_state_history(config=config))

In [None]:
dumped_history = dumps(history)
dumped_history

In [None]:
loaded_history = loads(dumped_history, valid_namespaces=["agent_graph"])
loaded_history_snapshot = [StateSnapshot(*h) for h in loaded_history]
loaded_history_snapshot

In [None]:
loaded_history_snapshot == history, dumpd(loaded_history_snapshot) == dumpd(history)

In [None]:
dumped_state_byte = dumped_state.encode()
dumped_history_byte = dumped_history.encode()
print(f"State: {len(dumped_state_byte) / 1024:.2f} KB")
print(f"State History: {len(dumped_history_byte) / 1024:.2f} KB")

In [None]:
!pip install graphviz

In [None]:
a = graph.get_graph().draw_mermaid()

In [7]:
with open("graph.md", "w") as f:
    f.write(a)

In [None]:
from pydantic import BaseModel, Field, PrivateAttr


class User(BaseModel):
    name: str
    age: int
    _a: str = PrivateAttr(init=False)

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        print("base class init")
        self._a = "a"


class User2(User):

    c: str = Field(...)


user = User2(name="Charlie", age=20, c="c")
print(user)  # Output: 50

In [None]:
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

CACHE_CONFIG = {"cache_control": {"type": "ephemeral"}}

prompt = ChatPromptTemplate.from_messages(
    [
        SystemMessagePromptTemplate.from_template(
            template="You are a {role} assistant." * 1000,
            additional_kwargs=CACHE_CONFIG,
        ),
        HumanMessagePromptTemplate.from_template(
            template="{input}",
            additional_kwargs=CACHE_CONFIG,
        ),
    ]
)

chat = ChatAnthropic(
    model_name="claude-3-5-haiku-20241022",
    extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"},  # type: ignore
)

openai = ChatOpenAI(name="gpt-4o-mini")

chain = prompt | chat
# chain.invoke(
#     {
#         "role": "helpful",
#         "input": "What is second page of the shakespeare?",
#     }
# ).dict()


In [None]:
openai.invoke(
    [
        SystemMessage(
            content=[
                {
                    "text": "You are a helpful assistant." * 1000,
                    "type": "text",
                    **CACHE_CONFIG,
                }
            ],
        ),
        HumanMessage(
            content=[
                {
                    "text": "What is second page of the shakespeare?",
                    "type": "text",
                    **CACHE_CONFIG,
                }
            ],
        ),
        HumanMessage(
            content=[
                {
                    "text": "What is first page of the shakespeare?",
                    "type": "text",
                    **CACHE_CONFIG,
                }
            ],
        ),
    ]
).dict()

In [None]:
import json
import time


json.dumps(
    {
        "context": "test",
        "name": "test",
        "time": time.time(),
    }
)


In [2]:
import asyncio
from langgraph.func import entrypoint, task
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=1)
val: list[int] = []
count: int = 5000

def _inc():
    val.append(len(val))
    return val

@task(name="async_inc")
async def ainc(topic: str):
    return _inc()


@task(name="async_thread_inc")
async def thread_ainc(topic: str):
    return await asyncio.to_thread(_inc)


@task(name="sync_inc")
def inc():
    return _inc()

# Build workflow
@entrypoint()
async def async_workflow(topic: str, previous: str):
    rs = [ainc(topic) for _ in range(count)]

    return asyncio.gather(*rs)


@entrypoint()
async def async_thread_workflow(topic: str, previous: str):
    rs = [thread_ainc(topic) for _ in range(count)]

    return await asyncio.gather(*rs)


@entrypoint()
def sync_workflow(topic: str, previous: str):
    rs = [inc() for _ in range(count)]

    return [r.result() for r in rs]


def check_final_val(val: list[int]):
    assert len(val) == count
    for i, v in enumerate(val):
        assert v == i

In [None]:
val = []
# Invoke
async for step in sync_workflow.astream("cat", stream_mode="updates"):
    ...

check_final_val(val)

In [46]:
val = []
# Invoke
async for step in async_thread_workflow.astream("cat", stream_mode="updates"):
    ...

check_final_val(val)

In [93]:
val = []
# Invoke
async for step in async_workflow.astream("cat", stream_mode="updates"):
    ...

check_final_val(val)

In [None]:
val = []

await asyncio.gather(async_workflow.ainvoke("cat"), async_workflow.ainvoke("cat"), async_workflow.ainvoke("cat"))

In [None]:
len(val)

In [105]:
import random


context = {
    "silent": False,
}

def chunk_tokenize(text: str):
    return text.split(" ")

@task
async def should_silent(text: str):
    await asyncio.sleep(3)
    return context["silent"]

@task
async def invoke_agent(text: str):
    silent_fut = should_silent(text)

    no_silent = False
    accumulated_text = ""
    for token in chunk_tokenize(text):
        if silent_fut.done() and not no_silent:
            if silent_fut.result():
                print("[silent]")
                return
            else:
                print("[done]")
                print(accumulated_text, end="")
                no_silent = True

        await asyncio.sleep(0.1)
        if not no_silent:
            # print(f"[accumulating] {token}")
            accumulated_text += token + " "
        else:
            print(token + " ", end="")

    await silent_fut
    if silent_fut.result():
        print("[silent]")
    else:
        print("[done]")
        print(accumulated_text, end="")

@task
async def set_silent(silent: bool):
    context["silent"] = silent

@entrypoint()
async def agent_workflow(obj: dict):
    await set_silent(obj["silent"])
    await invoke_agent(obj["text"])

In [None]:
# A very long story
story = """\
Short-term memory¶
State management using the previous parameter and optionally using the entrypoint.final primitive can be used to implement short term memory.

Please see the following how-to guides for more details:

How to add thread-level persistence (functional API): Shows how to add thread-level persistence to a functional API workflow and implements a simple chatbot.
Long-term memory¶
long-term memory allows storing information across different thread ids. This could be useful for learning information about a given user in one conversation and using it in another.

Please see the following how-to guides for more details:

How to add cross-thread persistence (functional API): Shows how to add cross-thread persistence to a functional API workflow and implements a simple chatbot.
Workflows¶
Workflows and agent guide for more examples of how to build workflows using the Functional API.
Agents¶
How to create a React agent from scratch (Functional API): Shows how to create a simple React agent from scratch using the functional API.
How to build a multi-agent network: Shows how to build a multi-agent network using the functional API.
How to add multi-turn conversation in a multi-agent application (functional API): allow an end-user to engage in a multi-turn conversation with one or more agents.
"""

await agent_workflow.ainvoke({"text": story, "silent": False})

In [None]:
llm = ChatOpenAI(model="gpt-4o", temperature=1, max_completion_tokens=2)
llm.invoke(story)

In [None]:
ctx: dict = {}

@task
async def long_task(t: int):
    print(f"Start long task {t}")
    await asyncio.sleep(t)
    ctx["done"] = True
    print(f"End long task {t}")


@entrypoint()
async def workflow(t: int):
    print(f"Start workflow {t}")
    long_task(t)
    print(f"End workflow {t}")
    return

async def loop():
    while not ctx.get("done", False):
        print(ctx)
        await asyncio.sleep(1)

await asyncio.gather(loop(), workflow.ainvoke(5), workflow.ainvoke(10))

{}
Start workflow
End workflow
Start long task
{}
{}
{}
{}
{}
{}
{}
{}
{}
End long task


[None, None]