In [1]:
from bedrock_agentcore_starter_toolkit.operations.memory.manager import Memory, MemoryManager
from bedrock_agentcore_starter_toolkit.operations.memory.models.strategies import SemanticStrategy, SummaryStrategy

In [None]:
print("\n" + "=" * 25 + " CONTROL PLANE DEMO " + "=" * 25)

manager = MemoryManager()

print("🔍 DEBUG: Starting control plane operations...")

# Memory creation using strategies with data classes
memory1: Memory = manager.get_or_create_memory(
    name="DemoLongTermMemory1",
    description="A temporary memory for short-lived conversations.",
    strategies=[
        SemanticStrategy(
            name="SemanticStrategy",
            description="A strategy for semantic understanding.",
        )
    ],
)
print("🔍 DEBUG: long-term memory created successfully")
memory1




✅ MemoryManager initialized for region: None


🔍 DEBUG: Starting control plane operations...


Memory already exists. Using existing memory ID: DemoLongTermMemory1-AJRz1XDfS6
🔎 Retrieving memory resource with ID: DemoLongTermMemory1-AJRz1XDfS6...
  ✅ Found memory: DemoLongTermMemory1-AJRz1XDfS6


🔍 DEBUG: long-term memory created successfully


{'arn': 'arn:aws:bedrock-agentcore:us-east-1:328307993871:memory/DemoLongTermMemory1-AJRz1XDfS6', 'id': 'DemoLongTermMemory1-AJRz1XDfS6', 'name': 'DemoLongTermMemory1', 'description': 'A temporary memory for short-lived conversations.', 'eventExpiryDuration': 90, 'status': 'ACTIVE', 'createdAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 523000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 44, 51, 65000, tzinfo=tzlocal()), 'strategies': [{'strategyId': 'SemanticStrategy-AYF3Xl8OoD', 'name': 'SemanticStrategy', 'description': 'A strategy for semantic understanding.', 'type': 'SEMANTIC', 'namespaces': ['support/user/{actorId}/{sessionId}'], 'createdAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 523000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 754000, tzinfo=tzlocal()), 'status': 'ACTIVE'}, {'strategyId': 'SummaryStrategy-o93FeU6Zdd', 'name': 'SummaryStrategy', 'description': 'A strategy for summarizing the conversation.', 'type': '

In [None]:
# Memory creation using strategies without data classes
memory2: Memory = manager.get_or_create_memory(
    name="DemoLongTermMemory2",
    description="A temporary memory for long-lived conversations.",
    strategies=[
        {
            "semanticMemoryStrategy": {
                "name": "SemanticStrategy",
                "description": "A strategy for semantic understanding.",
                "namespaces": ["support/user/{actorId}/{sessionId}"],
            }
        }
    ],
)
print("🔍 DEBUG: Long-term memory created successfully")
memory2

Memory already exists. Using existing memory ID: DemoLongTermMemory2-O63lir7uK6
🔎 Retrieving memory resource with ID: DemoLongTermMemory2-O63lir7uK6...
  ✅ Found memory: DemoLongTermMemory2-O63lir7uK6


🔍 DEBUG: Long-term memory created successfully


{'arn': 'arn:aws:bedrock-agentcore:us-east-1:328307993871:memory/DemoLongTermMemory2-O63lir7uK6', 'id': 'DemoLongTermMemory2-O63lir7uK6', 'name': 'DemoLongTermMemory2', 'description': 'A temporary memory for short-lived conversations.', 'eventExpiryDuration': 90, 'status': 'ACTIVE', 'createdAt': datetime.datetime(2025, 9, 27, 15, 38, 14, 362000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 41, 9, 247000, tzinfo=tzlocal()), 'strategies': [{'strategyId': 'SummaryStrategy-j4HeHK8XCR', 'name': 'SummaryStrategy', 'description': 'A strategy for summarizing the conversation.', 'type': 'SUMMARIZATION', 'namespaces': ['support/user/{actorId}/{sessionId}'], 'createdAt': datetime.datetime(2025, 9, 27, 15, 41, 8, 278000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 41, 8, 278000, tzinfo=tzlocal()), 'status': 'ACTIVE'}]}

In [None]:
# Memory creation using strategies without data classes
from bedrock_agentcore_starter_toolkit.operations.memory.models.strategies.user_preference import UserPreferenceStrategy

memory3: Memory = manager.get_or_create_memory(
    name="DemoLongTermMemory3",
    description="A temporary memory for long-lived conversations.",
    strategies=[
        UserPreferenceStrategy(
            name="UserPreferenceStrategy",
            description="A strategy for user preference.",
        )
    ],
)
print("🔍 DEBUG: Long-term memory created successfully")
memory3

Memory already exists. Using existing memory ID: DemoLongTermMemory3-mP3Knz4YTX
🔎 Retrieving memory resource with ID: DemoLongTermMemory3-mP3Knz4YTX...
  ✅ Found memory: DemoLongTermMemory3-mP3Knz4YTX


🔍 DEBUG: Long-term memory created successfully


{'arn': 'arn:aws:bedrock-agentcore:us-east-1:328307993871:memory/DemoLongTermMemory3-mP3Knz4YTX', 'id': 'DemoLongTermMemory3-mP3Knz4YTX', 'name': 'DemoLongTermMemory3', 'description': 'A temporary memory for short-lived conversations.', 'eventExpiryDuration': 90, 'status': 'ACTIVE', 'createdAt': datetime.datetime(2025, 9, 29, 12, 8, 26, 264000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 29, 12, 8, 26, 484000, tzinfo=tzlocal()), 'strategies': [{'strategyId': 'UserPreferenceStrategy-PMYICVDWE5', 'name': 'UserPreferenceStrategy', 'description': 'A strategy for user preference.', 'type': 'USER_PREFERENCE', 'namespaces': ['/strategies/{memoryStrategyId}/actors/{actorId}'], 'createdAt': datetime.datetime(2025, 9, 29, 12, 8, 26, 264000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 29, 12, 8, 26, 484000, tzinfo=tzlocal()), 'status': 'ACTIVE'}]}

In [None]:
# Confirm server default namespaces are applied
manager.get_memory_strategies(memory_id=memory3.id)

[{'strategyId': 'UserPreferenceStrategy-PMYICVDWE5', 'name': 'UserPreferenceStrategy', 'description': 'A strategy for user preference.', 'type': 'USER_PREFERENCE', 'namespaces': ['/strategies/{memoryStrategyId}/actors/{actorId}'], 'createdAt': datetime.datetime(2025, 9, 29, 12, 8, 26, 264000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 29, 12, 8, 26, 484000, tzinfo=tzlocal()), 'status': 'ACTIVE'}]

In [None]:
# List all memories
for memory_summary in manager.list_memories():
    print(f"🔍 DEBUG: Memory found : {memory_summary}")

🔍 DEBUG: Memory found : {'arn': 'arn:aws:bedrock-agentcore:us-east-1:328307993871:memory/DemoLongTermMemory1-AJRz1XDfS6', 'id': 'DemoLongTermMemory1-AJRz1XDfS6', 'status': 'ACTIVE', 'createdAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 523000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 44, 51, 65000, tzinfo=tzlocal()), 'memoryId': 'DemoLongTermMemory1-AJRz1XDfS6'}
🔍 DEBUG: Memory found : {'arn': 'arn:aws:bedrock-agentcore:us-east-1:328307993871:memory/DemoLongTermMemory2-O63lir7uK6', 'id': 'DemoLongTermMemory2-O63lir7uK6', 'status': 'ACTIVE', 'createdAt': datetime.datetime(2025, 9, 27, 15, 38, 14, 362000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 41, 9, 247000, tzinfo=tzlocal()), 'memoryId': 'DemoLongTermMemory2-O63lir7uK6'}
🔍 DEBUG: Memory found : {'arn': 'arn:aws:bedrock-agentcore:us-east-1:328307993871:memory/DemoLongTermMemory3-mP3Knz4YTX', 'id': 'DemoLongTermMemory3-mP3Knz4YTX', 'status': 'ACTIVE', 'createdAt': datetime.datetime(2

In [None]:
# Get all the memory strategies available
strategies = manager.get_memory_strategies(memory_id=memory1.id)
strategies

[{'strategyId': 'SemanticStrategy-AYF3Xl8OoD', 'name': 'SemanticStrategy', 'description': 'A strategy for semantic understanding.', 'type': 'SEMANTIC', 'namespaces': ['support/user/{actorId}/{sessionId}'], 'createdAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 523000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 754000, tzinfo=tzlocal()), 'status': 'ACTIVE'},
 {'strategyId': 'SummaryStrategy-o93FeU6Zdd', 'name': 'SummaryStrategy', 'description': 'A strategy for summarizing the conversation.', 'type': 'SUMMARIZATION', 'namespaces': ['support/user/{actorId}/{sessionId}'], 'createdAt': datetime.datetime(2025, 9, 27, 15, 44, 51, 175000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 44, 51, 175000, tzinfo=tzlocal()), 'status': 'ACTIVE'}]

In [None]:
if "SummaryStrategy" not in [strategy.name for strategy in strategies]:
    manager.add_strategy_and_wait(
        memory_id=memory1.id,
        strategy=SummaryStrategy(
            name="SummaryStrategy",
            description="A strategy for summarizing the conversation.",
            namespaces=["support/user/{actorId}/{sessionId}"],
        ),
    )
    print("🔍 DEBUG: Summary strategy added successfully")
else:
    print("🔍 DEBUG: Summary strategy already exists - skipping memory update")

🔍 DEBUG: Summary strategy already exists - skipping memory update


In [None]:
# Using direct access capability to show memory fields
memory1.description

'A temporary memory for short-lived conversations.'

In [None]:
# Fetch the memory again to see the updated strategies
get_response = manager.get_memory(memory_id=memory1.id)

🔎 Retrieving memory resource with ID: DemoLongTermMemory1-AJRz1XDfS6...
  ✅ Found memory: DemoLongTermMemory1-AJRz1XDfS6


In [None]:
get_response.status

'ACTIVE'

In [None]:
manager.get_memory_strategies(memory_id=memory1.id)

[{'strategyId': 'SemanticStrategy-AYF3Xl8OoD', 'name': 'SemanticStrategy', 'description': 'A strategy for semantic understanding.', 'type': 'SEMANTIC', 'namespaces': ['support/user/{actorId}/{sessionId}'], 'createdAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 523000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 35, 20, 754000, tzinfo=tzlocal()), 'status': 'ACTIVE'},
 {'strategyId': 'SummaryStrategy-o93FeU6Zdd', 'name': 'SummaryStrategy', 'description': 'A strategy for summarizing the conversation.', 'type': 'SUMMARIZATION', 'namespaces': ['support/user/{actorId}/{sessionId}'], 'createdAt': datetime.datetime(2025, 9, 27, 15, 44, 51, 175000, tzinfo=tzlocal()), 'updatedAt': datetime.datetime(2025, 9, 27, 15, 44, 51, 175000, tzinfo=tzlocal()), 'status': 'ACTIVE'}]

In [None]:
for memory in manager.list_memories():
    print(memory.status)

ACTIVE
ACTIVE
ACTIVE
ACTIVE


In [None]:
[strategy.strategyId for strategy in manager.get_memory_strategies(memory_id=memory1.id)]

['SemanticStrategy-AYF3Xl8OoD', 'SummaryStrategy-o93FeU6Zdd']

In [None]:
# try:
#     for memory in manager.list_memories():
#         manager.delete_memory(memory_id=memory.id)
# except Exception as e:
#     print(f"🔍 DEBUG: Error deleting memory: {e}")
#     pass

In [None]:
try:
    for memory in manager.list_memories():
        memory = manager.get_memory(memory_id=memory.id)
        print(f"🔍 DEBUG: Memory found with status: {memory.status}")
except Exception as e:
    print(f"🔍 DEBUG: Memory deletion confirmed. Error: {e}")

🔎 Retrieving memory resource with ID: DemoLongTermMemory1-AJRz1XDfS6...
  ✅ Found memory: DemoLongTermMemory1-AJRz1XDfS6
🔎 Retrieving memory resource with ID: DemoLongTermMemory2-O63lir7uK6...
  ✅ Found memory: DemoLongTermMemory2-O63lir7uK6
🔎 Retrieving memory resource with ID: DemoLongTermMemory3-mP3Knz4YTX...


🔍 DEBUG: Memory found with status: ACTIVE
🔍 DEBUG: Memory found with status: ACTIVE


  ✅ Found memory: DemoLongTermMemory3-mP3Knz4YTX
🔎 Retrieving memory resource with ID: TypedMemory-BMWCYo4kT8...
  ✅ Found memory: TypedMemory-BMWCYo4kT8


🔍 DEBUG: Memory found with status: ACTIVE
🔍 DEBUG: Memory found with status: ACTIVE
