Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import json
import logging
from datetime import datetime, timezone
import threading
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Optional

import boto3
Expand Down Expand Up @@ -46,6 +47,38 @@ class AgentCoreMemorySessionManager(RepositorySessionManager, SessionRepository)
- Consistent with existing Strands Session managers (such as: FileSessionManager, S3SessionManager)
"""

# Class-level timestamp tracking for monotonic ordering
_timestamp_lock = threading.Lock()
_last_timestamp: Optional[datetime] = None

@classmethod
def _get_monotonic_timestamp(cls, desired_timestamp: Optional[datetime] = None) -> datetime:
"""Get a monotonically increasing timestamp.

Args:
desired_timestamp (Optional[datetime]): The desired timestamp. If None, uses current time.

Returns:
datetime: A timestamp guaranteed to be greater than any previously returned timestamp.
"""
if desired_timestamp is None:
desired_timestamp = datetime.now(timezone.utc)

with cls._timestamp_lock:
if cls._last_timestamp is None:
cls._last_timestamp = desired_timestamp
return desired_timestamp

# Why the 1 second check? Because Boto3 does NOT support sub 1 second resolution.
if desired_timestamp <= cls._last_timestamp + timedelta(seconds=1):
# Increment by 1 second to ensure ordering
new_timestamp = cls._last_timestamp + timedelta(seconds=1)
else:
new_timestamp = desired_timestamp

cls._last_timestamp = new_timestamp
return new_timestamp

def __init__(
self,
agentcore_memory_config: AgentCoreMemoryConfig,
Expand Down Expand Up @@ -149,7 +182,7 @@ def create_session(self, session: Session, **kwargs: Any) -> Session:
payload=[
{"blob": json.dumps(session.to_dict())},
],
eventTimestamp=datetime.now(timezone.utc),
eventTimestamp=self._get_monotonic_timestamp(),
)
logger.info("Created session: %s with event: %s", session.session_id, event.get("event", {}).get("eventId"))
return session
Expand Down Expand Up @@ -220,7 +253,7 @@ def create_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A
payload=[
{"blob": json.dumps(session_agent.to_dict())},
],
eventTimestamp=datetime.now(timezone.utc),
eventTimestamp=self._get_monotonic_timestamp(),
)
logger.info(
"Created agent: %s in session: %s with event %s",
Expand Down Expand Up @@ -319,13 +352,18 @@ def create_message(
messages = AgentCoreMemoryConverter.message_to_payload(session_message)
if not messages:
return

# Parse the original timestamp and use it as desired timestamp
original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00"))
monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp)

if not AgentCoreMemoryConverter.exceeds_conversational_limit(messages[0]):
event = self.memory_client.create_event(
memory_id=self.config.memory_id,
actor_id=self.config.actor_id,
session_id=session_id,
messages=messages,
event_timestamp=datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")),
event_timestamp=monotonic_timestamp,
)
else:
event = self.memory_client.gmdp_client.create_event(
Expand All @@ -335,7 +373,7 @@ def create_message(
payload=[
{"blob": json.dumps(messages[0])},
],
eventTimestamp=datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")),
eventTimestamp=monotonic_timestamp,
)
logger.debug("Created event: %s for message: %s", event.get("eventId"), session_message.message_id)
return event
Expand Down
Loading