# State Reducers

https://github.com/langchain-ai/langchain-academy/blob/main/module-2/state-reducers.ipynb

## Custom Reducers

def reduce_list(left: list | None, right: list | None) -> list:

class CustomReducerState(TypedDict):
    foo: Annotated[list[int], reduce_list]

## Re-writing

If we pass a message with the same ID as an existing one in our messages list, it will get overwritten!

# New message to add
new_message = HumanMessage(content="I'm looking for information on whales, specifically", name="Lance", id="2")

## Removal
We can remove messages by using RemoveMessage.

delete_messages = [RemoveMessage(id=m.id) for m in messages[:-2]]
add_messages(messages , delete_messages)

# Multiple Schemas

https://github.com/langchain-ai/langchain-academy/blob/main/module-2/multiple-schemas.ipynb

## Private State

class OverallState(TypedDict):
    foo: int

class PrivateState(TypedDict):
    baz: int

def node_1(state: OverallState) -> PrivateState:
    print("---Node 1---")
    return {"baz": state['foo'] + 1}

def node_2(state: PrivateState) -> OverallState:
    print("---Node 2---")
    return {"foo": state['baz'] + 1}

## Input / Output Schema

class InputState(TypedDict):
    question: str

class OutputState(TypedDict):
    answer: str

class OverallState(TypedDict):
    question: str
    answer: str
    notes: str

def thinking_node(state: InputState):
    return {"answer": "bye", "notes": "... his is name is Lance"}

def answer_node(state: OverallState) -> OutputState:
    return {"answer": "bye Lance"}

graph = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
graph.add_node("answer_node", answer_node)

# Trim / Filter Messages

https://github.com/langchain-ai/langchain-academy/blob/main/module-2/trim-filter-messages.ipynb

## Reducer

from langchain_core.messages import RemoveMessage

def filter_messages(state: MessagesState):
    # Delete all but the 2 most recent messages
    delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
    return {"messages": delete_messages}

def chat_model_node(state: MessagesState):    
    return {"messages": [llm.invoke(state["messages"])]}

builder = StateGraph(MessagesState)
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)

## Filtering messages

def chat_model_node(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"][-1:])]}

## Trim messages

from langchain_core.messages import trim_messages

def chat_model_node(state: MessagesState):
    messages = trim_messages(
            state["messages"],
            max_tokens=100,
            strategy="last",
            token_counter=ChatOpenAI(model="gpt-4o"),
            allow_partial=False,
        )
    return {"messages": [llm.invoke(messages)]}

# Chatbot Summarization

https://github.com/langchain-ai/langchain-academy/blob/main/module-2/chatbot-summarization.ipynb

Building blocks:

- RemoveMessage => remove all but the last 2 messages when there is a summary.
- State with new field summary

    class State(MessagesState):
        summary: str

- model node 
    - if summary exists, will append systemmessage asking to use it

- summarize_conversation node
    - will ask to summarize conversation, using existing summary if exists (by appending humanmenssage with summary embedded, asking to use the summary so far) or conversation up to now otherwise 
    - will output as new conversation (i.e, new state) the summary plus last 2 messages.
        - this is done by using summary field and using RemoveMessage to remove all but last 2 messages in messages field.


- should_continue for add_conditional_edges
     - will move to summarize_conversation if more than X messages in conversation so far, or to END if converstaion still short.

- memory: 
    - with config being configurable with thread_id, think about slack threads.


 

# Chatbot with External Memory

https://github.com/langchain-ai/langchain-academy/blob/main/module-2/chatbot-external-memory.ipynb

## SQLite

import sqlite3
db_path = "state_db/example.db"
conn = sqlite3.connect(db_path, check_same_thread=False)

from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver(conn)

# UX and Human-in-the-Loop

- Use cases:
    - approval
    - correction / editing / human taking action
- State can be updated by human
- Can be used for debugging
- AI resume later

# Streaming and Interruption

https://github.com/langchain-ai/langchain-academy/blob/main/module-3/streaming-interruption.ipynb

## Streaming modes

- values, updates => full state, only updated part
- tokens


state-wise:

```python
config = {"configurable": {"thread_id": "1"}}
for chunk in graph.stream({"messages": [HumanMessage(content="hi! I'm Lance")]}, config, stream_mode="updates"):
    print(chunk)
```

token-wise:
- no stream_mode
- async mode:

```python
async for event in graph.astream_events
```

- we print: event["metadata"] (includes node in "langraph_node" field), event["event"] (type), event["name"]
- we could also print event["data"]
- we can select to print only the llm calls from the node of the graph that we're interested in, in the example "conversation".
   - we also indicate the event type, in our case "on_chat_model_stream" if we want to see llm responses
- This provides chunk outputs, which are the tokens. We can print them nicely as 

```python
if event["event"]=="on.." and event["metadata"]["langgraph_node"]=="conversation"
data = event["data"]
print (data["chunk"].content, end="|")
```

### messages mode

- assume `messages` field, which is list of messages.
- 

## Langraph studio

- We can take the client handle and retrieve:

    - graphs running: assistants

- Also create threads

- Print data:

this is the payload, and "messages" is the state

```json
{
    'messages': [
        {
            'content': 'Multiply 2 and 3', 
            'additional_kwargs': {}, 
            'response_metadata': {}, 
            'type': 'human', 
            'name': None, 
            'id': '9aaa247f-1e6e-4451-af25-ac678fe46d82'
        }
    ]
}
```

# Breakpoints

https://github.com/langchain-ai/langchain-academy/blob/main/module-3/breakpoints.ipynb

## Breakpoints with LangGraph API

- We add `interrupt_before=[tools]` to the call `graph.compile(...)`

```python
graph = builder.compile(interrupt_before=["tools"], checkpointer=memory)
```

- `x_ray` to `get_graph` => visualizes the breakpoints

```python
Image(graph.get_graph(xray=True).draw_mermaid_png())
```

- pass None when we invoke the graph => resume from previous checkpoint:

```python
for event in graph.stream(None, thread, stream_mode="values"):
```

### Langgraph studio

Equivalent calls:

- First call for indicating interruption:

```python
thread = client.threads.create() # Difference 1: instead of creating thread = {"configurable": {"thread_id": "1"}}
async for chunk in client.runs.stream ( # Difference 2: `async for` instead of `for`
    thread["thread_id"],
    assistant_id="agent", # Difference 3, we have several graphs and need to indicate which one
    input=initial_input, # if placed as first argument, no need to use keyword input
    stream_mode="values", # same
    interrupt_before=["tools"], # Difference 4: breakpoint indicating in stream command 
                                # instead of compile (since the graph has already been compiled)
)
```

- Second call for resuming:

```python
async for chunk in client.runs.stream ( 
    thread["thread_id"],
    assistant_id="agent", 
    input=None, # None goes here
    stream_mode="values", 
    interrupt_before=["tools"],
```

# Editing state and human feedback

https://github.com/langchain-ai/langchain-academy/blob/main/module-3/edit-state-human-feedback.ipynb

## Update state / message


- First option: append message to previous list (add_messages reducer)

```python
graph.update_state (
    thread,
    {"messages": [HumanMessage(content="my new message")]}
)
```

- Second option: replace the message. We need to supply the message id

```python
state = graph.get_state(thread)
```

or 

```python
state = await client.threads.get_state(thread["thread_id"])
```

and then:

```python
new_message = state["values"]["messages"][-1]
new_message["content"] = "my new message"
new_state = {"messages": new_message}
```

and:

```python
graph.update_state(thread, new_state)
```

or:

```python
await client.threads.update_state(thread["thread_id"], new_state)
```

## Langgraph studio

- We can add interruptions through the UI, besides the node icon in the graph
- We can edit the messages in the chat. Afterwards, we need to **fork** the thread by pressing the button "fork" below the message.
- After editing


## Awaiting user input

- add placeholder (no-op) function for collecting human feedback:

```python
def human_feedback (state: MessageState):
    pass

builder.add_node ("human_feedback", human_feedback)
```

- add interruption before human feedback node.

```python
graph = builder.compile (..., interrupt_before=["human_feedback"])
```

- when updating state, add `as_node="human_feedback"` to imitate the case that the message was updated in that node.

```python
new_message = input ("new message: ")
graph.update_state (thread, new_message, as_node="human_feedback")
```

# Dynamic breakpoints

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-3/dynamic-breakpoints.ipynb)

## Conditional

- Benefits: 
    - triggered only on certain conditions.
    - allows to communicate reason.


- In node, if condition occurs, a `NodeInterrupt` error is raised:

```python
from langgraph.errors import NodeInterrupt

def node (state: MyStateClass):
    if condition:
        raise NodeInterrupt (f"State {state} meets condition")
```

We can see breakpoint information as:

```python
state = graph.get_state (thread_config)
print (state.tasks)
```

# Time Travel

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-3/time-travel.ipynb)

## Replay

- Get history with:

```python
state_history = [s for s in graph.get_state_history(thread)]
```

- Replay using:

```python
for event in graph.stream (input=None, state_history[idx].config, stream_mode="values"):
    ... 
```

![checkpoint_history.jpg](time_travel.png)

## Fork

- Used to replay with different state values.


```python
new_state = ...
fork_config = graph.update_state (state_history[idx].config, new_state)
for event in graph.stream (None, fork_config, stream_mode="values"):
    ...
```

In case we use `MessageState`, we'll want to overwrite the previous message with a new one, by passing the message ID:

```python
new_state = {"messages": [HumanMessage(content="new message", id=state_history[idx].values["messages"][0].id)]}
```

### Using Langgraph studio

```python
state_history = client.threads.get_history (thread["thread_id"])
forked_input = {"messages": [HumanMessage(content="my new message", id=state_histor[idx]["values"]["messages"][0]["id"]]}
await client.threads.update_state (
    thread["thread_id"],
    forked_input,
    checkpoint_id=state_history[idx]["checkpoint_id"]
)
async for chunk in client.runs.stream (
    thread["thread_id"],
    assistant_id="agent",
    input=None,
    stream_updates="updates",
    checkpoint_id=state_history[idx]["checkpoint_id"]
)
```

### Notes

When we udpate the state we are actually adding a new checkpoint to the history, so it becomes larger and larger.

# Building your assistant

# Parallelization

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-4/parallelization.ipynb)

## Control execution order of parallel nodes

Several ways:

1. Reducer 
`
```python
def sorted_reducer (left, right):
    if not isinstance(left, list):
        left = [left]
    if not isinstance(right, list):
        right = [right]

    return sorted (left+right)
```

2. Sink node:

- Write updates to different fields of the state
- Sink node joins the updates in whichever way considers appropriate
- Then it deletes the temporary fields from state.

## Realistic example

- Query web and wikipedia to get context, and have LLM use it to respond query.
- Web search: tavily, with API key. Import from `langchain_tavily`
- Wikipedia search: WikipediaLoader from `langchain_community.document_loaders`



# Sub-graphs

- Sub-graphs can be used for different agents in a multi-agent graph.
- A sub-graph is a node where we pass a compiled graph as "function".
- Each sub-graph can have a different input and output state schema. 


```python
fa_builder = StateGraph (state_schema=FailureAnalysisState, output_schema=FailureAnalysisOutputState)
qs_builder = StateGraph(QuestionSummarizationState,output_schema=QuestionSummarizationOutputState)

entry_builder.add_node ("failure_analysis", fa_builder.compile())
entry_builder.add_node ("qs_analysis", qs_builder.compile())
```

The state introduce to failure_analysis and qs_analysis sub-graphs can have additional fields that are not present in the schema of those sub-graphs:

```python
class EntryGraphState(TypedDict):
    raw_logs: List[Log]             # not present in sub-graphs schemas: internal field
    cleaned_logs: List[Log]         # overlap: input used by sub-graphs
    fa_summary: str                 # overlap with fa schema: output provided by fa subgraph
    report: str                     # overlap with qs schema: output provided by qs subgraph
    processed_logs:  Annotated[List[int], add] # overlap with both schemas: output provided by both sub-graphs (has reducer)

class FailureAnalysisState(TypedDict):
    cleaned_logs: List[Log]         # overlap
    failures: List[Log]             # new: internal field
    fa_summary: str                 # overlap
    processed_logs: List[str]       # overlap

class QuestionSummarizationState(TypedDict):
    cleaned_logs: List[Log]         # overlap 
    qs_summary: str                 # new: internal field
    report: str                     # overlap
    processed_logs: List[str]       # overlap
```

A node can return a state with only some of the fields of its output state schema.

```python
def send_to_slack(state):
    qs_summary = state["qs_summary"]
    # Add fxn: report = report_generation(qs_summary)
    report = "foo bar baz"
    return {"report": report}
```

### Notes

If the sub-graphs don't use a different output schema, then we need to indicate reducers for fields such as `cleaned_logs` that, although unmodified, are used by both sub-graphs. 

# Map-Reduce

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-4/map-reduce.ipynb)

## Using send and conditional edges

- To go from a single node `before_map` to copies of a node that run in parallel (`map` step), define a function that uses `Send` to the node:

```python
def my_map (state):
    return [Send("my_map_node", {"my_node_input": s}) for s in state["my_list_of_inputs"]] 
```

Then add a conditional edge from the single node `before_map` to the parallel list of clones of `my_map` node:

```python
builder.add_conditional_edges ("before_map", "my_map", ["my_map_node"])
```

And add a normal edge that goes from `my_map_node` to `my_reducer_node`:

```python
builder.add_edge ("my_map_node", "my_reducer_node")
```

- `my_map_node` needs to output to a field which has a reducer function in it:

```python
class MyOverallState ():
    my_output_field: Annotated[my_output_type, my_reducer_function]

def my_map_node (state: MyOverallState):
    ...
    return {"my_output_field": my_return_value} 
``` 



## Notes about example used in tutorial

- We use structured output for the LLM:

```python
model = ChatOpenAI (...)

def my_node ():
    response = model.with_structured_output(MyStateClass).invoke(my_prompt)
    return {"my_output_field": response.my_field}
```

### Implementation details

We can get access to the model response content as `response.content`

# Long term memory

# Introduction

[slides](https://files.cdn.thinkific.com/file_uploads/967498/attachments/dc4/f52/87a/LangChain_Academy_-_Introduction_to_LangGraph_-_Long-Term_Memory.pdf)
[tutorial](https://docs.langchain.com/oss/python/langgraph/memory)

## Short vs Long term memory

- Short term memory: 
    - in session, single thread
    - with checkpointer
    - past messages can be summarized or filtered (e.g., truncate them)
- Long term memory: 
    - across sessions, across threads
    - with store

## Long term memory:

### Type of memory:

#### 1 Semantic 

- facts, user data
- structure: profile (dict with fields) or list of items (e.g., locations), updated after each session
- Pros:
    - Single document (profile): easily retrieved
    - List: narrow scope, easy to add
- Cons:
    - Single document: difficult to maintain when it grows
    - List: costly to retrieve as it grows.

#### 2 Episodic

- memories: agent actions

#### 3 Procedural

- prompts
- Using AI to generate prompts based on human feedback, tests and evaluation scores (LangSmith)

[video](https://www.youtube.com/watch?v=Vn8A3BxfplE)
[notebook](https://github.com/langchain-ai/langsmith-cookbook/blob/main/optimization/assisted-prompt-bootstrapping/elvis-bot.ipynb)

### Updates

#### 1 Hot path

Pro: Real-time and transparent
Con: delays / bad UX
[github code](https://github.com/langchain-ai/memory-agent)

#### 2 Background 

Pro: no delays / good UX
Con: Frequency of writing to be tuned
[github code](https://github.com/langchain-ai/memory-template)

# LangGraph Store

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-5/memory_store.ipynb)

## Implementation details

```python
from langgraph.store.memory import InMemoryStore
```

- **Memory saved** using:
    - namespace: tuple, like directory
    ```python
    namespace = (user_id, "memories")
    ```
    - key: string, like filename
    ```python
    key = "user_memory"
    ```
    - value: like content of file: 
    ```python
    value = {"food_preference" : "I like pizza"}
    ```
- Write: **put**
```python
    store.put (namespace, key, value)
```
- Read: **get**
```python
    memory = store.get (namespace, key)
```
- **Retrieve all**:
```python
    memories = store.search (namespace)
```
- **config passed**: 

```python
{"configurable": {"thread_id": thread_id, "my_key_info": my_key_info}}
```

- **Important:** The store needs to be passed to the node so that it can explicitly read values from the history and act
on them, e.g., summarizing the memories or considering them in some specific manner:

```python
def my_node_function (state: MyStateClass, config: RunnableConfig, store: BaseStore):
    user_id = config["configurable"]["user_id"]
```

- compiling with both short-term and long-term memory:

```python
builder.compile (
    checkpointer=my_checkpointer, # short_term_memory
    store=my_store, # long_term_memory
)
```

```python
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
for event in graph.stream (my_messages, config, stream_mode="values"):
    ...

# we have across-session memory, so we can pass another thread_id:
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
for event in graph.stream (my_messages, config, stream_mode="values"):
    ...
```




# Memory schema + profile

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-5/memoryschema_profile.ipynb)

## Model with structured output 

- Use `with_structured_output` method to adhere to specific profile fields

```python
new_memory = model.with_structured_output (MyProfileSchema)
new_memory = my_format.format(new_memory)
```

## TrustCall

- Adhere to more complex schemas (e.g., based on pydantic and multiple classes)
- Update complex schemas without having to regenerate the whole schema and overwrite (i.e., more efficiently)

### Call

We pass:

- A model
- A schema as a tool: `tools = [MySchema]`
- A tool choice name for enforcing output to respect this schema: `tool_choise = "MySchema"`

We retrieve:

- AI messages: 
```python
result["messages"]
```
- Structured output: 
```python
result["responses"] # list of MySchema objects 
result["responses"][0].model_dump()
```
- Metadata: result["response_metadata"]

Example:

```python
from trustcall import create_extractor
from pydantic import BaseModel, Field, ValidationError

class MySchema (BaseModel):
    name: str = Field (description="user name")
    interests: List[str] = Field (description="list of user interests")

conversation = [
    HumanMessage (content="Hi I'm Jaume"),
    AIMessage (content="Hi Jaume, how can I assist you?")
    HumanMessage (content="I like biking for cardio and sightseeing")
]

extractor = create_extractor (
    model=model,
    tools=[MySchema],
    tool_choice="MySchema",
)

system_message=SystemMessage(content="Extract user details from this conversation")
extractor.invoke ({"messages": [system_message] + conversation})
```

## Update 

- Produce a json patch
- We pass the serialized object using 
```python
{"existing": {"MySchema": result["response"][0].model_dump()}}
```

Note that we indicate the name of the class to respect, "MySchema"

Full call:
```python
creator.invoke (
    {"messages": [system_message] + updated_conversation}, 
    {"existing": {"MySchema": result["response"][0].model_dump()}}
)
```

We can also put "messages" and "existing" into a single dictionary:
```python
{"messages": [...], "existing": existing_memory_as_dict}
```

When writing into our store, we need to deserialize the object:

```python
my_store.put (namespace, key, result["responses"][0].model_dump())
```

# Memory Schema + Collection

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-5/memoryschema_collection.ipynb)

## model with structured output

- Define using BaseModel

```python
class MyMemory (BaseModel):
    memory: str = Field (description="One of the memories")

class MyCollection (BaseModel):
    my_collection: List[MyMemory] = Field (description="")

model_with_structure = mode.with_structured_output (MyCollection)
result = model_with_structure.invoke (...)
```

- Save to store using put with one key per item in the collection:

```python
for item in result.my_collection:
    key = str(uuid.uuid4 ())
    my_store.put (namespace, key, item.model_dump())
```

## TrustCall

- use single element class (`MyMemory` in previous example)
- pass `enable_inserts=True` when building the extractor object

```python
extractor = create_extractor (
    model,
    tools=[MyMemory],
    tool_choice=["MyMemory"],
    enable_inserts=True,
)

system_msg = """Update existing memories and create new ones based on the following conversation:"""

extractor.invoke({"messages": [SystemMessage(content=system_msg)]+updated_conversation, "existing": existing_memories})
```

## Graph + TrustCall

- TrustCall instruction prompt: "... use parallel tool calling to handle updates and insertions simulatenously:"

- when using memories to adapt the assistant response: since it is a list, we format the {memory} section of the prompt using the list of memories so far retrieved with search:

```python
memories = my_store.search(namespace)
formatted_memories = "\n".join([mem.value["content"] for mem in memories])
```

- existing_memories construction:
```python
namespace=("memory", user_id)
existing_items = my_store.search(namespace)
tool_name="Memory"
existing_memories = ([(existing_item.key, tool_name, existing_item.value) for existing_item in existing_items] if existing_items else None)

```
- using `merge_message_runs`
```python
system_msg = SystemMessage(content="my prompt message")
updated_messages = list(merge_message_runs(messages=[system_msg]+state["messages"]))
result=creator.invoke ({"messages": updated_messages, "existing": existing_memories})
```
- store update
```python
for resp, rmeta in zip(result["responses"], result["response_metadata"]):
    my_store.put (
        namespace, 
        rmeta.get("json_doc_id", uuid.uuid4()), # if memory is new, it is appended (add_messages reducer) since it has a new ID. If it is an existing one, it is overwritten, since we provide its previous json_doc_id as ID
        resp.model_dump(mode="json")
    )
```

# Memory agent

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-5/memory_agent.ipynb)

## Listener in TrustCall

Log tool calls done by TrustCall

Surface things like:
- actions to solve schema validation errors, and 
- updates done to previous memories

```python
class Spy:
    ...
spy = Spy()
extractor = create_extractor(...)
extractor_with_listener = extractor.with_listeners(on_end=spy)


class Spy:
    def __init__ (self):
        self.called_tools = []
    
    def __call__ (self, run):
        q = [run]
        while q:
            r = q.pop()
            if r.child_runs:
                q.extend(r.child_runs)
            if r.run_type=="chat_model":
                self.called_tools.append(r.outputs["generations"][0][0]["messages"]["kwargs"]["tool_calls"])
```



## Implementation details

When calling a tool node it is very important that the node responds back notifying that the call was made:

```python
def tool_node (...):
    # ...
    tool_calls = state["messages"][-1].tool_calls
    return {"messages": [{"role": "tool", "content": "my tool response", "tool_call_id": tool_calls[0]["id"]}]}

# Deployment

# Deployment concepts

[notebook](https://files.cdn.thinkific.com/file_uploads/967498/attachments/5d8/68e/5fd/LangChain_Academy_-_Introduction_to_LangGraph_-_Deployment.pdf)

# creating

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-6/creating.ipynb)

- langgraph cli, docker and docker compose

# connecting

## client / remote graph

```python

from langgraph_sdk import get_client


# langgraph-api:
#         image: "lg_image"
#         ports:
#             - "8123:8000"

url = "http://localhost:8123" # 
client = get_client (url=url)

# or ...
remote_graph = RemoteGraph (graph_name, url=url)
# (how to use remote_graph?)

```

## managing runs

- list: `client.runs.list(thread["thread_id"])`
- create thread: `client.threads.create()`
- create run: 
    ```python
    client.runs.create(thread["thread_id"], graph_name, input=input_message, config=config)
    ```
    - Note: `config` here only contains the `user_id`, not the `thread_id`, since that piece is passed in the first argument.
- get run status: `client.runs.get(...)`
- block until complete (join): `client.runs.join(...)`
- stream (e.g., tokens): `client.runs.stream(...)`: same as create run, but adding parameter `stream_mode="messages-tuple"`

## threads

- For working with multi-turn interactions, with multiple graphs executions for a given thread.
- The server stores the checkpoints of the thread in Postgres.
- We can:
    - get state checkpoints saved: 

    ```python
    thread_state = await client.threads.get_state(thread["thread_id"])
    for m in convert_to_messages (thread_state["values"]["messages"]):
        m.pretty_print()
    ```

    - copy (fork) thread: `client.threads.copy(thread["thread_id"])`
    - do human-in-the-loop:

    ```python
    states = await client.threads.get_history(thread["thread_id"])
    to_fork = states[-2]
    # to_fork["values"], to_fork["next"]
    message_id = to_fork["values"]["messages"][0]["id"]
    checkpoint_id = to_fork["checkpoint_id"]
    forked_input = {
        "messages": [HumanMessage(content="my new message", id=message_id)] # overwrite previuos message by supplying ID
    }
    forked_config = await.client.threads.update_state(
        thread["thread_id"],
        forked_input,
        checkpoint_id=checkpoint_id
    )
    ```

    - stream using updated input from new checkpoint
        - same call as last stream but with:
            - input=None, so that it resumes
            - adding `checkpoint_id=checkpoint_id`

## across-thread memory

The memory store is stored in Postgres

We can:

- search items by namespace: `client.store.search(namespace)` where namspace is a tuple
- put new items: `client.store.put_item (namespace, key=key, value=value)`
- delete items: `client.store.delete_item(namespace, key=key)`


# Double-texting

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-6/double-texting.ipynb)

## Reject

- runs.create with input_1
- try:
    - runs.create with input_2
- except httpx.HTTPStatusError as e:
...

## Enqueue

- To new runs we add the argument `multitask_strategy="enqueue"`
```python
second_run = await client.runs.create(..., multitask_strategy="enqueue")
```
- We await for the last run: `await client.runs.join(thread["thread_id"], second_run["run_id"])`

## Interrupt

- The previous message will be interrupted and the new one take over.
- How: as before, but we pass the argument `multitask_strategy="interrupt"`. Again, we await for the last run to complete.
- The final status is "interrupted".

## Rollback

- Instead of keeping the interrupted messages, a new run is created, the old one is deleted, and the new run takes only the new message.
- How, as before, but we pass the argument `multitask_strategy="rollback"`.

# Assistants

[notebook](https://github.com/langchain-ai/langchain-academy/blob/main/module-6/assistant.ipynb)

## Creating assistants

```python
personal_assistant = await client.assistants.create(graph_name, config={"configurable": {"todo_category": "personal"}})
```

## Updating assistants

- When updading an assistant, we are creating a new version of it.

```python
configurations = {
    "todo_category": "personal",
    "user_id": "lance",
    "task_maistro_role": new_prompt,
}
personal_assistant = await client.assistants.update(
    personal_assistant["assistant_id"],
    config={"configurable": configurations},
)
```

- Notes:
    - The fields in the configurations dict coincide with the attributes of the `Configurations` class defined in deployment/configuration.py
    - Inside the llm node of the graph, in the `task_maistro.py` file, we have an argument of type `RunnableConfig`, which gives us a dictionary. This dictionary is transformed into a Configurations object using the classmethod `from_runnable_config`:

```python
def task_mAIstro (state: MessagesState, config: RunnableConfig, store: BaseStore):
    configurable = configuration.Configuration.from_runnable_config(config)
    # configurable.todo_category, configurable.user_id, configurable.task_maistro_role
```
        

## Managing assistants

- List assistants:

```python
assistants = await client.assistants.search()

for assistant in assistants:
    print ({
        "assistant_id": assistant["assistant_id"],
        "version": assistant["version"],
        "config": assistant["config"], 
    })

# config includes category, user_id, and role, as supplied in configurations dict above
```

- Delete assistant:

```python
await client.assistants.delete(assistant["assistant_id"])
```

## Using assistants

```python
thread = await client.threads.create()
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id,
    input={"messages": [HumanMessage(content="my message")]},
    stream_mode="values",
):
    if chunk.event == "values":
        state = chunk.data
        convert_to_messages(state["messages"])[-1].pretty_print()
```

- Notes:
    - when using `graph.stream`, we get an `event` dict where `event["messages]` is a list of Message objects (e.g., HumanMessage, AIMessage, etc.) on which we can call `pretty_print`, i.e., `event["messages"][-1].pretty_print()`
    - when using `client.runs.stream`, we get a `chunk` object, and the data is in `chunk.data`. Furthermore, `chunk.data["messages"]` is a list of dicts on which cannot call `pretty_print()` directly, so we need to convert them first using `convert_to_messages`.

```python
for event in graph.stream(None, to_replay.config, stream_mode="values"):
    event['messages'][-1].pretty_print()
```