Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/bedrock_agentcore/memory/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,51 @@ def list_actor_sessions(self, actor_id: str) -> List[SessionSummary]:
logger.error(" ❌ Error listing sessions: %s", e)
raise

def delete_all_long_term_memories_in_namespace(self, namespace: str) -> Dict[str, Any]:
"""Delete all long-term memory records within a specific namespace.

This method retrieves all memory records in the specified namespace and performs
batch deletion operations using the AWS Bedrock AgentCore API, processing in chunks of 100.

Args:
namespace: The namespace prefix to delete memories from

Returns:
Dictionary containing batch deletion results with successfulRecords and failedRecords
"""
logger.info("🗑️ Deleting all long-term memories in namespace '%s'...", namespace)

# Retrieve all memory records in the specified namespace
memory_records = self.list_long_term_memory_records(namespace_prefix=namespace)
logger.info(" -> Found %d memory records to delete", len(memory_records))

if not memory_records:
logger.info(" ✅ No records found to delete")
return {"successfulRecords": [], "failedRecords": []}

# Format record IDs for batch deletion API
memory_record_ids = [{"memoryRecordId": record["memoryRecordId"]} for record in memory_records]

all_successful = []
all_failed = []

# Process in chunks of 100
for i in range(0, len(memory_record_ids), 100):
chunk = memory_record_ids[i : i + 100]
try:
result = self._data_plane_client.batch_delete_memory_records(memoryId=self._memory_id, records=chunk)
all_successful.extend(result.get("successfulRecords", []))
all_failed.extend(result.get("failedRecords", []))
except ClientError as e:
logger.error(" ❌ Error deleting chunk: %s", e)
raise

logger.info(" ✅ Successfully deleted %d records", len(all_successful))
if all_failed:
logger.warning(" ⚠️ Failed to delete %d records", len(all_failed))

return {"successfulRecords": all_successful, "failedRecords": all_failed}

def create_memory_session(self, actor_id: str, session_id: str = None) -> "MemorySession":
"""Creates a new MemorySession instance."""
session_id = session_id or str(uuid.uuid4())
Expand Down
153 changes: 153 additions & 0 deletions tests/bedrock_agentcore/memory/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,159 @@ def test_delete_memory_record_client_error(self):
with pytest.raises(ClientError):
manager.delete_memory_record(record_id="invalid-record")

def test_delete_all_long_term_memories_in_namespace_success(self):
"""Test delete_all_long_term_memories_in_namespace successful execution."""
with patch("boto3.Session") as mock_session_class:
mock_session = MagicMock()
mock_session.region_name = "us-west-2"
mock_client_instance = MagicMock()
mock_session.client.return_value = mock_client_instance
mock_session_class.return_value = mock_session

manager = MemorySessionManager(memory_id="testMemory-1234567890", region_name="us-west-2")

# Mock list_long_term_memory_records
mock_records = [
{"memoryRecordId": "rec-1", "content": {"text": "Memory 1"}},
{"memoryRecordId": "rec-2", "content": {"text": "Memory 2"}},
]
with patch.object(manager, "list_long_term_memory_records", return_value=mock_records):
# Mock batch_delete_memory_records response
mock_response = {
"successfulRecords": [
{"memoryRecordId": "rec-1", "status": "SUCCEEDED"},
{"memoryRecordId": "rec-2", "status": "SUCCEEDED"},
],
"failedRecords": [],
}
mock_client_instance.batch_delete_memory_records.return_value = mock_response

result = manager.delete_all_long_term_memories_in_namespace("test/namespace")

assert len(result["successfulRecords"]) == 2
assert len(result["failedRecords"]) == 0

# Verify API call
mock_client_instance.batch_delete_memory_records.assert_called_once_with(
memoryId="testMemory-1234567890",
records=[{"memoryRecordId": "rec-1"}, {"memoryRecordId": "rec-2"}],
)

def test_delete_all_long_term_memories_in_namespace_empty(self):
"""Test delete_all_long_term_memories_in_namespace with no records."""
with patch("boto3.Session") as mock_session_class:
mock_session = MagicMock()
mock_session.region_name = "us-west-2"
mock_client_instance = MagicMock()
mock_session.client.return_value = mock_client_instance
mock_session_class.return_value = mock_session

manager = MemorySessionManager(memory_id="testMemory-1234567890", region_name="us-west-2")

# Mock empty list_long_term_memory_records
with patch.object(manager, "list_long_term_memory_records", return_value=[]):
result = manager.delete_all_long_term_memories_in_namespace("empty/namespace")

assert result == {"successfulRecords": [], "failedRecords": []}
# Should not call batch_delete_memory_records
mock_client_instance.batch_delete_memory_records.assert_not_called()

def test_delete_all_long_term_memories_in_namespace_client_error(self):
"""Test delete_all_long_term_memories_in_namespace with ClientError."""
with patch("boto3.Session") as mock_session_class:
mock_session = MagicMock()
mock_session.region_name = "us-west-2"
mock_client_instance = MagicMock()
mock_session.client.return_value = mock_client_instance
mock_session_class.return_value = mock_session

manager = MemorySessionManager(memory_id="testMemory-1234567890", region_name="us-west-2")

# Mock list_long_term_memory_records
mock_records = [{"memoryRecordId": "rec-1", "content": {"text": "Memory 1"}}]
with patch.object(manager, "list_long_term_memory_records", return_value=mock_records):
# Mock ClientError
error_response = {"Error": {"Code": "ValidationException", "Message": "Invalid request"}}
mock_client_instance.batch_delete_memory_records.side_effect = ClientError(
error_response, "BatchDeleteMemoryRecords"
)

with pytest.raises(ClientError):
manager.delete_all_long_term_memories_in_namespace("test/namespace")

def test_delete_all_long_term_memories_in_namespace_over_100_records(self):
"""Test deleting more than 100 records in namespace."""
with patch("boto3.Session") as mock_session_class:
mock_session = MagicMock()
mock_session.region_name = "us-west-2"
mock_client_instance = MagicMock()
mock_session.client.return_value = mock_client_instance
mock_session_class.return_value = mock_session

manager = MemorySessionManager(memory_id="testMemory-1234567890", region_name="us-west-2")

# Mock 150 memory records
mock_records = [{"memoryRecordId": f"rec-{i}", "content": {"text": f"Memory {i}"}} for i in range(150)]
with patch.object(manager, "list_long_term_memory_records", return_value=mock_records):
# Mock batch_delete_memory_records responses for each chunk
mock_client_instance.batch_delete_memory_records.side_effect = [
{"successfulRecords": [{"memoryRecordId": f"rec-{i}"} for i in range(100)], "failedRecords": []},
{
"successfulRecords": [{"memoryRecordId": f"rec-{i}"} for i in range(100, 150)],
"failedRecords": [],
},
]

result = manager.delete_all_long_term_memories_in_namespace("test/namespace")

# Verify two batch calls were made
assert mock_client_instance.batch_delete_memory_records.call_count == 2
assert len(result["successfulRecords"]) == 150
assert len(result["failedRecords"]) == 0

# Verify first batch had 100 records
first_batch = mock_client_instance.batch_delete_memory_records.call_args_list[0][1]["records"]
assert len(first_batch) == 100

# Verify second batch had 50 records
second_batch = mock_client_instance.batch_delete_memory_records.call_args_list[1][1]["records"]
assert len(second_batch) == 50

def test_delete_all_long_term_memories_in_namespace_partial_failure(self):
"""Test delete_all_long_term_memories_in_namespace with some failed records."""
with patch("boto3.Session") as mock_session_class:
mock_session = MagicMock()
mock_session.region_name = "us-west-2"
mock_client_instance = MagicMock()
mock_session.client.return_value = mock_client_instance
mock_session_class.return_value = mock_session

manager = MemorySessionManager(memory_id="testMemory-1234567890", region_name="us-west-2")

# Mock list_long_term_memory_records
mock_records = [
{"memoryRecordId": "rec-1", "content": {"text": "Memory 1"}},
{"memoryRecordId": "rec-2", "content": {"text": "Memory 2"}},
{"memoryRecordId": "rec-3", "content": {"text": "Memory 3"}},
]
with patch.object(manager, "list_long_term_memory_records", return_value=mock_records):
# Mock batch_delete_memory_records response with partial failure
mock_response = {
"successfulRecords": [
{"memoryRecordId": "rec-1", "status": "SUCCEEDED"},
{"memoryRecordId": "rec-3", "status": "SUCCEEDED"},
],
"failedRecords": [{"memoryRecordId": "rec-2", "status": "FAILED", "errorMessage": "Access denied"}],
}
mock_client_instance.batch_delete_memory_records.return_value = mock_response

result = manager.delete_all_long_term_memories_in_namespace("test/namespace")

assert len(result["successfulRecords"]) == 2
assert len(result["failedRecords"]) == 1
assert result["failedRecords"][0]["memoryRecordId"] == "rec-2"
assert result["failedRecords"][0]["errorMessage"] == "Access denied"

def test_list_actor_sessions_success(self):
"""Test list_actor_sessions successful execution."""
with patch("boto3.Session") as mock_session_class:
Expand Down
Loading