In [None]:
from g_config import create_gemini_config, create_openai_config

provider, model = create_openai_config()

Using OpenAI model: *** gpt-4o-mini ***


In [3]:
from agents import Agent, Runner, SQLiteSession, RunConfig

In [4]:
agent = Agent(
    name="StreamAgent",
    instructions="Reply very concisely."
)

run_config = RunConfig(
    model=model,
    model_provider=provider,
    tracing_disabled=True
)

In [5]:


# Create a session instance with a session ID
session = SQLiteSession("conversation_123")

# First turn
result = await Runner.run(
    agent,
    "What s the capital of California?",
    session=session,
    run_config=run_config
)
print(result.final_output)  # "San Francisco"

# Second turn - agent automatically remembers previous context
result = await Runner.run(
    agent,
    "What state is it in?",
    session=session,
    run_config=run_config
)
print(result.final_output)  # "California"

# Also works with synchronous runner
result = await Runner.run(
    agent,
    "What's the population?",
    session=session,
    run_config=run_config
)
print(result.final_output)  # "Approximately 39 million"



Sacramento.
California.
As of 2023, approximately 513,000.


In [6]:
# Also works with synchronous runner
result = await Runner.run(
    agent,
    "What was my last message?",
    session=session,
    run_config=run_config
)
print(result.final_output)  # "Approximately 39 million"


"What's the population?"


In [8]:
# Get all items in a session
items = await session.get_items()
items

[{'content': 'What s the capital of California?', 'role': 'user'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': 'Sacramento.',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'},
 {'content': 'What state is it in?', 'role': 'user'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': 'California.',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'},
 {'content': "What's the population?", 'role': 'user'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': 'As of 2023, approximately 513,000.',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'},
 {'content': 'What was my last message?', 'role': 'user'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': '"What\'s the population?"',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'messag

In [9]:
# Add new items to a session
new_items = [
    {"role": "user", "content": "Hello"},
    {"role": "assistant", "content": "Hi there!"}
]
await session.add_items(new_items)


In [10]:
# Remove and return the most recent item
last_item = await session.pop_item()
print(last_item)  # {"role": "assistant", "content": "Hi there!"}

{'role': 'assistant', 'content': 'Hi there!'}


In [11]:
# Clear all items from a session
await session.clear_session()

In [12]:
# Get all items in a session
items = await session.get_items()
items

[]

### # Multiple sessions

In [14]:
# Multiple sessions

from agents import Agent, Runner, SQLiteSession


# Different sessions maintain separate conversation histories
session_1 = SQLiteSession("user_123", "conversations.db")
session_2 = SQLiteSession("user_456", "conversations.db")

result1 = await Runner.run(
    agent,
    "Hello",
    session=session_1,
    run_config=run_config
)
result2 = await Runner.run(
    agent,
    "Hello",
    session=session_2,
    run_config=run_config
)

In [16]:
result1.final_output

'Hello! How can I help you today?'

In [27]:
session_1.session_id

session_1.messages_table



'agent_messages'

**Custom memory implementations**

In [28]:
from agents.memory import Session
from typing import List

class MyCustomSession:
    """Custom session implementation following the Session protocol."""
    def __init__(self,session_id: str):
        self.session_id = session_id
        self.items = []
        
    async def get_items(self) -> List[dict]:
        """Get all items in the session."""
        return self.items
    
    async def add_items(self, items: List[dict]) -> None:
        """Add new items to the session."""
        self.items.extend(items)
        
    async def pop_item(self) -> dict:
        """Remove and return the most recent item."""
        if self.items:
            return self.items.pop()
        return None
    async def clear_session(self) -> None:
        """Clear all items from the session."""
        self.items.clear()



In [29]:
# customSession results
custom_session = MyCustomSession("custom_session_123")

results_main = await Runner.run(
    agent,
    "Hi there!",
    session=custom_session,
    run_config=run_config
)

print(results_main.final_output)  # "Hello!"

Hello! How can I assist you today?


In [35]:
# Get all items in a session
items = await custom_session.get_items()
items

[{'content': 'Hi there!', 'role': 'user'},
 {'id': '__fake_id__',
  'content': [{'annotations': [],
    'text': 'Hello! How can I assist you today?',
    'type': 'output_text'}],
  'role': 'assistant',
  'status': 'completed',
  'type': 'message'}]

In [37]:
# Remove and return the most recent item
last_item = await custom_session.pop_item()
print(last_item)  # {"role": "assistant", "content": "Hi there!"}

{'id': '__fake_id__', 'content': [{'annotations': [], 'text': 'Hello! How can I assist you today?', 'type': 'output_text'}], 'role': 'assistant', 'status': 'completed', 'type': 'message'}


In [39]:
# Clear all items from a session
await session.clear_session()

In [41]:
# Get all items in a session
await custom_session.get_items()


[{'content': 'Hi there!', 'role': 'user'}]