# **Persist Data Externally:**

Let's us expand our previous example and save the data to our postgresql database using supabase

In [27]:
%%capture --no-stderr
%pip install -qU langgraph langchain-google-genai langchain_core langchain_community psycopg langgraph-checkpoint-postgres

In [28]:
# For Colab, use this:
from google.colab import userdata
GEMINI_API_KEY = userdata.get("GEMINI_API_KEY")
DB_PASSWORD = userdata.get("DB_PASSWORD")

In [29]:
from langchain_google_genai import ChatGoogleGenerativeAI

gemini_flash: ChatGoogleGenerativeAI = ChatGoogleGenerativeAI(
    model='gemini-1.5-flash',
    temperature=0,
    api_key=GEMINI_API_KEY
)

In [74]:
import psycopg
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg_pool import ConnectionPool
import logging

# Enable logging to monitor database operations
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Set up the PostgreSQL connection URI
DB_URI = f"postgresql://postgres.tsynjerctymobynsautl:{DB_PASSWORD}@aws-0-ap-southeast-1.pooler.supabase.com:6543/postgres"

# Create a connection pool for efficient database connections
pool = ConnectionPool(conninfo=DB_URI, min_size=1, max_size=5, open=True)

# Initialize PostgresSaver with a pooled connection and create necessary tables
with pool.connection() as conn:
    memory = PostgresSaver(conn)
    memory.setup()  # Ensure required tables are set up

# Define a function to log and fetch saved data for troubleshooting
def fetch_and_log_saved_data(thread_id):
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT * FROM checkpoints WHERE thread_id = %s", (thread_id,))
            rows = cur.fetchall()
            logger.info("Retrieved rows from checkpoints table:")
            for row in rows:
                logger.info(row)

* Creates a connection pool using psycopg3 to manage connections to the PostgreSQL database. min_size and max_size define the minimum and maximum number of open connections, which helps optimize database usage in concurrent environments.

* Obtains a connection from the pool and initializes PostgresSaver with it. memory.setup() creates necessary tables for saving conversation states if they haven’t been created already. PostgresSaver allows storing and retrieving state data from the database.

In [66]:
from langgraph.graph import MessagesState
class State(MessagesState):
    summary: str

In [67]:
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage

def conversation(state: State):

    # Here, the function attempts to retrieve a summary from the state. If no summary exists, it defaults to an empty string ("").
    summary = state.get("summary", "")

    if summary:
        system_message = f"Summary of conversation earlier: {summary}"
        messages = [SystemMessage(content=system_message)] + state["messages"]
    else:
        messages = state["messages"]

    # If a summary exists, it creates a system_message that says, "Summary of conversation earlier: <summary content>".
    # This system_message is then added to the beginning of the conversation history (state["messages"]). This ensures that the model is provided with prior context.
    # If no summary exists, it simply passes the current messages without adding any additional context.

    response = gemini_flash.invoke(messages)
    return {"messages": response}
    # The function returns the response wrapped in a dictionary with the key "messages". This makes the function output compatible with other parts of the system that expect messages in the response.

In [68]:
def summarize_conversation(state: State):

    # retrieve any existing summary from the state. If a summary doesn’t exist yet, it defaults to an empty string ("").
    summary = state.get("summary", "")

    if summary:
        summary_message = (
            f"This is summary of the conversation to date: {summary}\n\n"
            "Extend the summary by taking into account the new messages above:"
        )
    else:
        summary_message = "Create a summary of the conversation above:"
    # If a summary exists: It creates a prompt for extending the current summary with new information from the latest messages. The message template, This is the summary of the conversation to date: {summary}, provides the existing summary and instructs the model to extend it.
    # If no summary exists: It generates a prompt to create an initial summary with the message "Create a summary of the conversation above:".

    messages = state["messages"] + [HumanMessage(content=summary_message)]
    # This line adds the summary_message as a new HumanMessage to the conversation history (state["messages"]). This ensures the LLM can reference the recent conversation when generating a summary.

    response = gemini_flash.invoke(messages)

    delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
    # This creates a list of RemoveMessage objects for all but the last two messages in state["messages"].

    return {"summary": response.content, "messages": delete_messages}

In [69]:
from langgraph.graph import END

def should_continue(state: State):

    """Return the next node to execute."""

    messages = state["messages"]
    #The state["messages"] list contains all messages exchanged in the conversation up to this point.

    if len(messages) > 6:
        return "summarize_conversation"
    return END
    # If there are more than 6 messages in the conversation, it returns "summarize_conversation", instructing the workflow to proceed to a node that will summarize the conversation.
    # If the length of messages is 6 or fewer, the function returns END.
    # Returning END directs the workflow to terminate the conversation instead of continuing to a summarization step.

In [70]:
from langgraph.graph import StateGraph, START
from IPython.display import Image, display

workflow = StateGraph(State)
workflow.add_node("conversation", conversation)
workflow.add_node("summarize_conversation", summarize_conversation)

workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)

# Compile the graph with the Subabase checkpointer
graph = workflow.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))

<IPython.core.display.Image object>

In [72]:
# Create a thread
config = {"configurable": {"thread_id": "1"}}

# Start conversation and invoke the model
input_message = HumanMessage(content="hi! I'm Ammar")
output = graph.invoke({"messages": [input_message]}, config)
for m in output['messages'][-1:]:
    m.pretty_print()

# Continue conversation with additional messages
input_message = HumanMessage(content="what's my name?")
output = graph.invoke({"messages": [input_message]}, config)
for m in output['messages'][-1:]:
    m.pretty_print()

input_message = HumanMessage(content="i like the Valorant!")
output = graph.invoke({"messages": [input_message]}, config)
for m in output['messages'][-1:]:
    m.pretty_print()

# Fetch and log saved data to verify database persistence
fetch_and_log_saved_data("1")


Hi Ammar! It's nice to meet you. What can I do for you today?

You told me your name is Ammar! Is there anything else I can help you with?

That's awesome! Valorant is a really popular game. What do you like most about it?  Do you have a favorite agent or weapon? 

I'm not able to play games myself, but I can tell you about different strategies, agents, or even help you find some cool Valorant content online if you'd like. 😊


In [73]:
config = {"configurable": {"thread_id": "1"}}
graph_state = graph.get_state(config)
graph_state

StateSnapshot(values={'messages': [HumanMessage(content='i like the Valorant!', additional_kwargs={}, response_metadata={}, id='7d5f1932-d9f0-40fe-ad04-fc2278892619'), AIMessage(content="That's awesome! Valorant is a really popular game. What do you like most about it?  Do you have a favorite agent or weapon? \n\nI'm not able to play games myself, but I can tell you about different strategies, agents, or even help you find some cool Valorant content online if you'd like. 😊 \n", additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': [{'category': 'HARM_CATEGORY_SEXUALLY_EXPLICIT', 'probability': 'NEGLIGIBLE', 'blocked': False}, {'category': 'HARM_CATEGORY_HATE_SPEECH', 'probability': 'NEGLIGIBLE', 'blocked': False}, {'category': 'HARM_CATEGORY_HARASSMENT', 'probability': 'NEGLIGIBLE', 'blocked': False}, {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'probability': 'NEGLIGIBLE', 'blocked': False}]