In [1]:
%load_ext autoreload
%autoreload 2
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
from aiflows.backends.api_info import ApiInfo
from aiflows.utils import serving
from aiflows.utils import colink_utils
from aiflows.workers import run_dispatch_worker_thread
from aiflows.base_flows import AtomicFlow
from aiflows.messages import FlowMessage
from aiflows import flow_verse
import sys
import os
sys.path.append("..")
from utils import compile_and_writefile, dict_to_yaml
import json
import copy
FLOW_MODULES_PATH = "./"


  from .autonotebook import tqdm as notebook_tqdm


In [279]:
%%compile_and_writefile ./ShortTermMemoryFlowModule/ShortTermMemoryFlow.py

from copy import deepcopy
from typing import Dict, Any, Optional
import hydra
from aiflows.base_flows import AtomicFlow
from aiflows.messages import FlowMessage
from aiflows.prompt_template import JinjaPrompt
from aiflows.backends.llm_lite import LiteLLMBackend

class ShortTermMemoryFlow(AtomicFlow):
    """
    A flow that maintains short-term conversational memory.
    
    Operations:
        - update: Integrates new content into the running memory context.
        - fetch: Retrieves relevant content from memory based on a query.
    
    Configuration Parameters:
        - `backend` (dict): LLM backend configuration.
        - `update_prompt_template` (JinjaPrompt): Prompt template for memory updates.
        - `fetch_prompt_template` (JinjaPrompt): Prompt template for fetching memory.
    
    Flow State:
        - `running_context` (List[str]): The accumulated memory context for the conversation.

    """

    REQUIRED_KEYS_CONFIG = ["backend"]

    SUPPORTS_CACHING: bool = True

    def __init__(
        self,
        backend: LiteLLMBackend,
        update_prompt_template: JinjaPrompt,
        fetch_prompt_template: JinjaPrompt,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.backend = backend
        self.update_prompt_template = update_prompt_template
        self.fetch_prompt_template = fetch_prompt_template
        self.set_up_flow_state()

    def set_up_flow_state(self):
        super().set_up_flow_state()
        self.flow_state["running_context"] = []  

    @classmethod
    def instantiate_from_config(cls, config: Dict) -> 'ShortTermMemoryFlow':
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}
        kwargs["backend"] = hydra.utils.instantiate(flow_config["backend"], _convert_="partial")
        kwargs["update_prompt_template"] = hydra.utils.instantiate(
            flow_config["update_prompt_template"], _convert_="partial"
        )
        kwargs["fetch_prompt_template"] = hydra.utils.instantiate(
            flow_config["fetch_prompt_template"], _convert_="partial"
        )
        
        return cls(**kwargs)

    def run(self, input_message: FlowMessage):
        """Run the flow based on the specified operation in `input_message`."""
        input_data = input_message.data
        operation = input_data.get("operation")
        input_data =input_data.get("content")

        if operation == "update":
            content = input_data.get("content")
            result = self.handle_update(content)
        elif operation == "fetch":
            query = input_data.get("query")
            participant = input_data.get("participant")
            result = self.handle_fetch(query, participant)
        else:
            result = {"error": f"Unsupported operation '{operation}' in ShortTermMemoryFlow."}
        
        reply_message = self.package_output_message(
            input_message=input_message,
            response=result
        )
        self.send_message(reply_message)



    def handle_update(self, content: str) -> Dict[str, Any]:
        """Handles memory update by combining new content into the running context."""
        update_input = {
            "running_context": self.flow_state["running_context"][-1] if self.flow_state["running_context"] else "",
            "new_message": content
        }
        prompt_text = self.update_prompt_template.format(**update_input)

        response = self.backend(messages=[{"role": "system", "content": prompt_text}])
        updated_memory = response[0]["content"] if response else ""

        print(updated_memory)

        if "Memory Entries:" in updated_memory:
            extracted_memory = updated_memory.split("Memory Entries:")[-1].strip()
        else:
            extracted_memory = updated_memory.strip()

        self.flow_state["running_context"] = [extracted_memory]
        return {"result": "Memory updated successfully."}


    def handle_fetch(self, query: str, participant: str) -> Dict[str, Any]:
        """Handles memory retrieval by fetching relevant content from the running context."""
        fetch_input = {
            "running_context": self.flow_state["running_context"][-1] if self.flow_state["running_context"] else "",
            "query": query,
            "participant": participant
        }

        print('\n look here')

        print(fetch_input)
        prompt_text = self.fetch_prompt_template.format(**fetch_input)

        response = self.backend(messages=[{"role": "system", "content": prompt_text}])
        retrieved_memory = response[0]["content"] if response else ""

        if "Relevant Memories:" in retrieved_memory:
            cleaned_memory = retrieved_memory.split("Relevant Memories:")[-1].strip()
        else:
            cleaned_memory = retrieved_memory.strip()
            
        return {"result": cleaned_memory}




In [294]:
default_config_ShortTermMemoryFlow= {
    "_target_": "ShortTermMemoryFlowModule.ShortTermMemoryFlow.ShortTermMemoryFlow.instantiate_from_default_config",
    "name": "ShortTermMemoryFlow",
    "description": "Flow that maintains a running context for short-term memory.",
    "backend": {
        "_target_": "aiflows.backends.llm_lite.LiteLLMBackend",
        "api_infos": '???',
    
        "model_name": {
            "groq": "groq/llama3-70b-8192"
        },
        "n": 1,
        "max_tokens": 1500,
        "temperature": 0.7,
        "top_p": 0.9,
        "stream": False
    },
   "update_prompt_template": {
    "_target_": "aiflows.prompt_template.JinjaPrompt",
    "template": """
You are an expert memory assistant, tasked with creating accurate, participant-specific memory entries. Each entry should:
- Clearly tag key topics such as **Themes, Setup, Preparation, Activities**, etc.
- Be succinct but capture essential details for future reference.
- Ensure that entries on distinct topics are separated clearly for effective categorization.

#### Examples

**Example 1:**
Conversation:
- User: I’ve been wanting to explore new hiking trails this year.
- Friend: I have a few in mind! Let’s start planning.

Memory Entries:
- **Activity**: User wants to explore new hiking trails this year.
- **Activity**: Friend has suggested planning hikes together.

**Example 2:**
Conversation:
- User: I think having a cozy cabin stay would add to the experience.
- Friend: That’s perfect! I know a few near the trails we like.

Memory Entries:
- **Accommodation**: User prefers a cozy cabin stay to enhance the experience.
- **Accommodation**: Friend agrees with the cabin idea and has some in mind.

Now, categorize each new piece of content into concise memory entries, tagged with relevant topics.

Current Running Context:
{{running_context}}

New Message:
{{new_message}}

Memory Entries:
""",
    "input_variables": ["running_context", "new_message"]
}
,
"fetch_prompt_template": {
    "_target_": "aiflows.prompt_template.JinjaPrompt",
    "template": """
You are a memory retrieval assistant tasked with finding memories relevant to both direct and related themes in multi-participant conversations. For each query:
- Retrieve memories tagged with topics that are either directly or conceptually related to the query.
- Prioritize entries with high relevance to the query subject (e.g., “themes” should capture topics related to art, creativity, and project ideas).

#### Examples

**Example 1:**
Query: What themes have we discussed?
Running Context:
- **Activity**: User has started painting landscapes as a form of relaxation.
- **Project**: Friend suggested working on a collaborative art project.
- **Supplies**: User has acrylic paint sets and large canvases available.
- **Setup**: They discussed working outside to leverage natural light.

Relevant Memories:
- **Activity**: User has started painting landscapes as a form of relaxation.
- **Project**: Friend suggested working on a collaborative art project.

**Example 2:**
Query: What setup did we discuss for the project?
Running Context:
- **Activity**: User has started painting landscapes.
- **Project**: They are planning a weekend project for art.
- **Setup**: They discussed using User’s studio or setting up outside for natural light.

Relevant Memories:
- **Setup**: They discussed using User’s studio or setting up outside for natural light.

Now, based on the query below, retrieve memory entries that match the subject, including conceptually related themes.
Give only the memories directly, donot give reasoning, although you should reason well, but donot include it in output

Query:
{{query}}

Running Context:
{{running_context}}

Relevant Memories:
""",
    "input_variables": ["running_context", "query", "participant"]
}


}

dict_to_yaml(default_config_ShortTermMemoryFlow, "./ShortTermMemoryFlowModule/ShortTermMemoryFlow.yaml")


In [295]:
from aiflows.backends.api_info import ApiInfo

api_info = [ApiInfo(backend_used="groq", api_key=os.getenv('GROQ_API_KEY'))]
quick_load_api_keys(default_config_ShortTermMemoryFlow, api_info)

In [296]:
cl = colink_utils.start_colink_server()
run_dispatch_worker_thread(cl)

[[36m2024-11-08 23:47:38,460[0m][[34maiflows.workers.dispatch_worker:236[0m][[32mINFO[0m] - Dispatch worker started in attached thread.[0m
[[36m2024-11-08 23:47:38,461[0m][[34maiflows.workers.dispatch_worker:237[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m


In [297]:
run_dispatch_worker_thread(cl)

[[36m2024-11-08 23:47:38,541[0m][[34maiflows.workers.dispatch_worker:236[0m][[32mINFO[0m] - Dispatch worker started in attached thread.[0m
[[36m2024-11-08 23:47:38,541[0m][[34maiflows.workers.dispatch_worker:237[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m


In [298]:
serving.serve_flow(
    cl=cl,
    flow_class_name="ShortTermMemoryFlowModule.ShortTermMemoryFlow.ShortTermMemoryFlow",
    flow_endpoint="ShortTermMemoryFlow",
)

[[36m2024-11-08 23:47:38,589[0m][[34maiflows.utils.serving:116[0m][[32mINFO[0m] - Started serving ShortTermMemoryFlowModule.ShortTermMemoryFlow.ShortTermMemoryFlow at flows:ShortTermMemoryFlow.[0m
[[36m2024-11-08 23:47:38,590[0m][[34maiflows.utils.serving:117[0m][[32mINFO[0m] - dispatch_point: coflows_dispatch[0m
[[36m2024-11-08 23:47:38,590[0m][[34maiflows.utils.serving:118[0m][[32mINFO[0m] - parallel_dispatch: False[0m
[[36m2024-11-08 23:47:38,590[0m][[34maiflows.utils.serving:119[0m][[32mINFO[0m] - singleton: False
[0m


True

In [299]:
proxy_stm_flow = serving.get_flow_instance(
    cl=cl,
    flow_endpoint="ShortTermMemoryFlow",
    user_id="local",
    config_overrides=default_config_ShortTermMemoryFlow,
)

[[36m2024-11-08 23:47:38,648[0m][[34maiflows.utils.serving:336[0m][[32mINFO[0m] - Mounted 34748925-b632-49bb-b13e-ec81c2bb4f50 at flows:ShortTermMemoryFlow:mounts:local:34748925-b632-49bb-b13e-ec81c2bb4f50[0m


In [300]:
datas = [
    {
        "operation": "update",
        "content": {
            "content": "User: I recently started capturing landscapes with my camera. It's been refreshing to explore natural settings through photography."
        }
    },
    {
        "operation": "update",
        "content": {
            "content": "Friend: That sounds amazing! I’ve been experimenting with portrait photography myself. Maybe we could do a shoot together with a mix of nature and portrait themes."
        }
    },
    {
        "operation": "update",
        "content": {
            "content": "User: I’d love that! Combining both themes would create some unique shots. We could go somewhere scenic for a photo session."
        }
    },
    {
        "operation": "update",
        "content": {
            "content": "Friend: Agreed! I know a beautiful spot near a lake with a perfect blend of natural light and shade, ideal for both landscape and portrait photos."
        }
    },
    {
        "operation": "update",
        "content": {
            "content": "User: That sounds perfect. I’ll bring my wide-angle lens and a few filters to capture different moods."
        }
    },
    {
        "operation": "fetch",
        "content": {
            "query": "What ideas did we have for photo themes?",
            "participant": "User"
        }
    },
    {
        "operation": "fetch",
        "content": {
            "query": "Where did we plan to do our photo session?",
            "participant": "User"
        }
    }
]

for data in datas:
    input_message = proxy_stm_flow.package_input_message(data)
    future = proxy_stm_flow.get_reply_future(input_message)
    reply_data = future.get_data()

    print("Data sent:\n", data, "\n")
    print("REPLY:\n", reply_data, "\n")


Data sent:
 {'operation': 'update', 'content': {'content': "User: I recently started capturing landscapes with my camera. It's been refreshing to explore natural settings through photography."}} 

REPLY:
 {'result': 'Memory updated successfully.'} 

Data sent:
 {'operation': 'update', 'content': {'content': 'Friend: That sounds amazing! I’ve been experimenting with portrait photography myself. Maybe we could do a shoot together with a mix of nature and portrait themes.'}} 

REPLY:
 {'result': 'Memory updated successfully.'} 

Data sent:
 {'operation': 'update', 'content': {'content': 'User: I’d love that! Combining both themes would create some unique shots. We could go somewhere scenic for a photo session.'}} 

REPLY:
 {'result': 'Memory updated successfully.'} 

Data sent:
 {'operation': 'update', 'content': {'content': 'Friend: Agreed! I know a beautiful spot near a lake with a perfect blend of natural light and shade, ideal for both landscape and portrait photos.'}} 

REPLY:
 {'res

[[36m2024-11-08 23:47:39,291[0m][[34maiflows.workers.dispatch_worker:119[0m][[32mINFO[0m] - 
~~~ Dispatch task ~~~[0m
[[36m2024-11-08 23:47:39,295[0m][[34maiflows.workers.dispatch_worker:161[0m][[32mINFO[0m] - flow_endpoint: ShortTermMemoryFlow[0m
[[36m2024-11-08 23:47:39,295[0m][[34maiflows.workers.dispatch_worker:162[0m][[32mINFO[0m] - flow_id: 34748925-b632-49bb-b13e-ec81c2bb4f50[0m
[[36m2024-11-08 23:47:39,295[0m][[34maiflows.workers.dispatch_worker:163[0m][[32mINFO[0m] - owner_id: local[0m
[[36m2024-11-08 23:47:39,295[0m][[34maiflows.workers.dispatch_worker:164[0m][[32mINFO[0m] - message_paths: ['push_tasks:76530617-a0d7-482c-b69a-9f92ea85bb4d:msg'][0m
[[36m2024-11-08 23:47:39,296[0m][[34maiflows.workers.dispatch_worker:165[0m][[32mINFO[0m] - parallel_dispatch: False
[0m
[[36m2024-11-08 23:47:39,315[0m][[34maiflows.workers.dispatch_worker:188[0m][[32mINFO[0m] - Input message source: Proxy_ShortTermMemoryFlow[0m
Here are the categori