diff --git a/memoryos-mcp/config.json b/memoryos-mcp/config.json index 16ccf38..fb4869e 100644 --- a/memoryos-mcp/config.json +++ b/memoryos-mcp/config.json @@ -4,10 +4,12 @@ "openai_base_url": "", "data_storage_path": "./memoryos_data", "assistant_id": "memoryos_assistant", - "short_term_capacity": 10, + "short_term_capacity": 2, "mid_term_capacity": 2000, + "embedding_model_name": "BAAI/bge-m3", "long_term_knowledge_capacity": 100, "retrieval_queue_capacity": 7, - "mid_term_heat_threshold": 5.0, + "mid_term_heat_threshold": 7.0, + "mid_term_similarity_threshold": 0.6, "llm_model": "gpt-4o-mini" } \ No newline at end of file diff --git a/memoryos-mcp/memoryos/__init__.py b/memoryos-mcp/memoryos/__init__.py index a29eca9..b97e620 100644 --- a/memoryos-mcp/memoryos/__init__.py +++ b/memoryos-mcp/memoryos/__init__.py @@ -1,4 +1,3 @@ -# Import the main class for easy access from .memoryos import Memoryos __all__ = ['Memoryos'] \ No newline at end of file diff --git a/memoryos-mcp/memoryos/long_term.py b/memoryos-mcp/memoryos/long_term.py index f1ebe69..4e49b36 100644 --- a/memoryos-mcp/memoryos/long_term.py +++ b/memoryos-mcp/memoryos/long_term.py @@ -2,10 +2,13 @@ import numpy as np import faiss from collections import deque -from utils import get_timestamp, get_embedding, normalize_vector, ensure_directory_exists +try: + from .utils import get_timestamp, get_embedding, normalize_vector, ensure_directory_exists +except ImportError: + from utils import get_timestamp, get_embedding, normalize_vector, ensure_directory_exists class LongTermMemory: - def __init__(self, file_path, knowledge_capacity=100): + def __init__(self, file_path, knowledge_capacity=100, embedding_model_name: str = "all-MiniLM-L6-v2", embedding_model_kwargs: dict = None): self.file_path = file_path ensure_directory_exists(self.file_path) self.knowledge_capacity = knowledge_capacity @@ -13,6 +16,9 @@ def __init__(self, file_path, knowledge_capacity=100): # Use deques for knowledge bases to easily manage capacity self.knowledge_base = deque(maxlen=self.knowledge_capacity) # For general/user private knowledge self.assistant_knowledge = deque(maxlen=self.knowledge_capacity) # For assistant specific knowledge + + self.embedding_model_name = embedding_model_name + self.embedding_model_kwargs = embedding_model_kwargs if embedding_model_kwargs is not None else {} self.load() def update_user_profile(self, user_id, new_data, merge=True): @@ -45,7 +51,11 @@ def add_knowledge_entry(self, knowledge_text, knowledge_deque: deque, type_name= return # If deque is full, the oldest item is automatically removed when appending. - vec = get_embedding(knowledge_text) + vec = get_embedding( + knowledge_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) vec = normalize_vector(vec).tolist() entry = { "knowledge": knowledge_text, @@ -72,7 +82,11 @@ def _search_knowledge_deque(self, query, knowledge_deque: deque, threshold=0.1, if not knowledge_deque: return [] - query_vec = get_embedding(query) + query_vec = get_embedding( + query, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) query_vec = normalize_vector(query_vec) embeddings = [] diff --git a/memoryos-mcp/memoryos/memoryos.py b/memoryos-mcp/memoryos/memoryos.py index 00e813c..ae7da09 100644 --- a/memoryos-mcp/memoryos/memoryos.py +++ b/memoryos-mcp/memoryos/memoryos.py @@ -1,13 +1,26 @@ import os import json -from utils import OpenAIClient, get_timestamp, generate_id, gpt_user_profile_analysis, gpt_knowledge_extraction, gpt_update_profile, ensure_directory_exists - -import prompts -from short_term import ShortTermMemory -from mid_term import MidTermMemory, compute_segment_heat # For H_THRESHOLD logic -from long_term import LongTermMemory -from updater import Updater -from retriever import Retriever +from concurrent.futures import ThreadPoolExecutor, as_completed + +# 修改为绝对导入 +try: + # 尝试相对导入(当作为包使用时) + from .utils import OpenAIClient, get_timestamp, generate_id, gpt_user_profile_analysis, gpt_knowledge_extraction, ensure_directory_exists + from . import prompts + from .short_term import ShortTermMemory + from .mid_term import MidTermMemory, compute_segment_heat # For H_THRESHOLD logic + from .long_term import LongTermMemory + from .updater import Updater + from .retriever import Retriever +except ImportError: + # 回退到绝对导入(当作为独立模块使用时) + from utils import OpenAIClient, get_timestamp, generate_id, gpt_user_profile_analysis, gpt_knowledge_extraction, ensure_directory_exists + import prompts + from short_term import ShortTermMemory + from mid_term import MidTermMemory, compute_segment_heat # For H_THRESHOLD logic + from long_term import LongTermMemory + from updater import Updater + from retriever import Retriever # Heat threshold for triggering profile/knowledge update from mid-term memory H_PROFILE_UPDATE_THRESHOLD = 5.0 @@ -24,15 +37,32 @@ def __init__(self, user_id: str, long_term_knowledge_capacity=100, retrieval_queue_capacity=7, mid_term_heat_threshold=H_PROFILE_UPDATE_THRESHOLD, - llm_model="gpt-4o-mini" # Unified model for all LLM operations + mid_term_similarity_threshold=0.6, + llm_model="gpt-4o-mini", + embedding_model_name: str = "all-MiniLM-L6-v2", + embedding_model_kwargs: dict = None ): self.user_id = user_id self.assistant_id = assistant_id self.data_storage_path = os.path.abspath(data_storage_path) self.llm_model = llm_model + self.mid_term_similarity_threshold = mid_term_similarity_threshold + self.embedding_model_name = embedding_model_name + + # Smart defaults for embedding_model_kwargs + if embedding_model_kwargs is None: + if 'bge-m3' in self.embedding_model_name.lower(): + print("INFO: Detected bge-m3 model, defaulting embedding_model_kwargs to {'use_fp16': True}") + self.embedding_model_kwargs = {'use_fp16': True} + else: + self.embedding_model_kwargs = {} + else: + self.embedding_model_kwargs = embedding_model_kwargs + print(f"Initializing Memoryos for user '{self.user_id}' and assistant '{self.assistant_id}'. Data path: {self.data_storage_path}") print(f"Using unified LLM model: {self.llm_model}") + print(f"Using embedding model: {self.embedding_model_name} with kwargs: {self.embedding_model_kwargs}") # Initialize OpenAI Client self.client = OpenAIClient(api_key=openai_api_key, base_url=openai_base_url) @@ -55,17 +85,34 @@ def __init__(self, user_id: str, # Initialize Memory Modules for User self.short_term_memory = ShortTermMemory(file_path=user_short_term_path, max_capacity=short_term_capacity) - self.mid_term_memory = MidTermMemory(file_path=user_mid_term_path, client=self.client, max_capacity=mid_term_capacity) - self.user_long_term_memory = LongTermMemory(file_path=user_long_term_path, knowledge_capacity=long_term_knowledge_capacity) + self.mid_term_memory = MidTermMemory( + file_path=user_mid_term_path, + client=self.client, + max_capacity=mid_term_capacity, + embedding_model_name=self.embedding_model_name, + embedding_model_kwargs=self.embedding_model_kwargs + ) + self.user_long_term_memory = LongTermMemory( + file_path=user_long_term_path, + knowledge_capacity=long_term_knowledge_capacity, + embedding_model_name=self.embedding_model_name, + embedding_model_kwargs=self.embedding_model_kwargs + ) # Initialize Memory Module for Assistant Knowledge - self.assistant_long_term_memory = LongTermMemory(file_path=assistant_long_term_path, knowledge_capacity=long_term_knowledge_capacity) + self.assistant_long_term_memory = LongTermMemory( + file_path=assistant_long_term_path, + knowledge_capacity=long_term_knowledge_capacity, + embedding_model_name=self.embedding_model_name, + embedding_model_kwargs=self.embedding_model_kwargs + ) # Initialize Orchestration Modules self.updater = Updater(short_term_memory=self.short_term_memory, mid_term_memory=self.mid_term_memory, long_term_memory=self.user_long_term_memory, # Updater primarily updates user's LTM profile/knowledge client=self.client, + topic_similarity_threshold=mid_term_similarity_threshold, # 传递中期记忆相似度阈值 llm_model=self.llm_model) self.retriever = Retriever( mid_term_memory=self.mid_term_memory, @@ -80,6 +127,7 @@ def _trigger_profile_and_knowledge_update_if_needed(self): """ Checks mid-term memory for hot segments and triggers profile/knowledge update if threshold is met. Adapted from main_memoybank.py's update_user_profile_from_top_segment. + Enhanced with parallel LLM processing for better performance. """ if not self.mid_term_memory.heap: return @@ -102,23 +150,42 @@ def _trigger_profile_and_knowledge_update_if_needed(self): if unanalyzed_pages: print(f"Memoryos: Mid-term session {sid} heat ({current_heat:.2f}) exceeded threshold. Analyzing {len(unanalyzed_pages)} pages for profile/knowledge update.") - # Perform user profile analysis and knowledge extraction separately - # First call: User profile analysis - new_user_profile_text = gpt_user_profile_analysis(unanalyzed_pages, self.client, model=self.llm_model) + # 并行执行两个LLM任务:用户画像分析(已包含更新)、知识提取 + def task_user_profile_analysis(): + print("Memoryos: Starting parallel user profile analysis and update...") + # 获取现有用户画像 + existing_profile = self.user_long_term_memory.get_raw_user_profile(self.user_id) + if not existing_profile or existing_profile.lower() == "none": + existing_profile = "No existing profile data." + + # 直接输出更新后的完整画像 + return gpt_user_profile_analysis(unanalyzed_pages, self.client, model=self.llm_model, existing_user_profile=existing_profile) + + def task_knowledge_extraction(): + print("Memoryos: Starting parallel knowledge extraction...") + return gpt_knowledge_extraction(unanalyzed_pages, self.client, model=self.llm_model) + + # 使用并行任务执行 + with ThreadPoolExecutor(max_workers=2) as executor: + # 提交两个主要任务 + future_profile = executor.submit(task_user_profile_analysis) + future_knowledge = executor.submit(task_knowledge_extraction) + + # 等待结果 + try: + updated_user_profile = future_profile.result() # 直接是更新后的完整画像 + knowledge_result = future_knowledge.result() + except Exception as e: + print(f"Error in parallel LLM processing: {e}") + return - # Second call: Knowledge extraction (user private data and assistant knowledge) - knowledge_result = gpt_knowledge_extraction(unanalyzed_pages, self.client, model=self.llm_model) new_user_private_knowledge = knowledge_result.get("private") new_assistant_knowledge = knowledge_result.get("assistant_knowledge") - # Update User Profile in user's LTM - if new_user_profile_text and new_user_profile_text.lower() != "none": - old_profile = self.user_long_term_memory.get_raw_user_profile(self.user_id) - if old_profile and old_profile.lower() != "none": - updated_profile = gpt_update_profile(old_profile, new_user_profile_text, self.client, model=self.llm_model) - else: - updated_profile = new_user_profile_text - self.user_long_term_memory.update_user_profile(self.user_id, updated_profile, merge=False) # Don't merge, replace with latest + # 直接使用更新后的完整用户画像 + if updated_user_profile and updated_user_profile.lower() != "none": + print("Memoryos: Updating user profile with integrated analysis...") + self.user_long_term_memory.update_user_profile(self.user_id, updated_user_profile, merge=False) # 直接替换为新的完整画像 # Add User Private Knowledge to user's LTM if new_user_private_knowledge and new_user_private_knowledge.lower() != "none": diff --git a/memoryos-mcp/memoryos/mid_term.py b/memoryos-mcp/memoryos/mid_term.py index 68ecc4d..29811bb 100644 --- a/memoryos-mcp/memoryos/mid_term.py +++ b/memoryos-mcp/memoryos/mid_term.py @@ -5,10 +5,16 @@ import heapq from datetime import datetime -from utils import ( - get_timestamp, generate_id, get_embedding, normalize_vector, - llm_extract_keywords, compute_time_decay, ensure_directory_exists, OpenAIClient -) +try: + from .utils import ( + get_timestamp, generate_id, get_embedding, normalize_vector, + compute_time_decay, ensure_directory_exists, OpenAIClient + ) +except ImportError: + from utils import ( + get_timestamp, generate_id, get_embedding, normalize_vector, + compute_time_decay, ensure_directory_exists, OpenAIClient + ) # Heat computation constants (can be tuned or made configurable) HEAT_ALPHA = 1.0 @@ -29,7 +35,7 @@ def compute_segment_heat(session, alpha=HEAT_ALPHA, beta=HEAT_BETA, gamma=HEAT_G return alpha * N_visit + beta * L_interaction + gamma * R_recency class MidTermMemory: - def __init__(self, file_path: str, client: OpenAIClient, max_capacity=2000): + def __init__(self, file_path: str, client: OpenAIClient, max_capacity=2000, embedding_model_name: str = "all-MiniLM-L6-v2", embedding_model_kwargs: dict = None): self.file_path = file_path ensure_directory_exists(self.file_path) self.client = client @@ -37,6 +43,9 @@ def __init__(self, file_path: str, client: OpenAIClient, max_capacity=2000): self.sessions = {} # {session_id: session_object} self.access_frequency = defaultdict(int) # {session_id: access_count_for_lfu} self.heap = [] # Min-heap storing (-H_segment, session_id) for hottest segments + + self.embedding_model_name = embedding_model_name + self.embedding_model_kwargs = embedding_model_kwargs if embedding_model_kwargs is not None else {} self.load() def get_page_by_id(self, page_id): @@ -89,19 +98,46 @@ def evict_lfu(self): self.save() print(f"MidTermMemory: Evicted session {lfu_sid}.") - def add_session(self, summary, details): + def add_session(self, summary, details, summary_keywords=None): session_id = generate_id("session") - summary_vec = get_embedding(summary) + summary_vec = get_embedding( + summary, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) summary_vec = normalize_vector(summary_vec).tolist() - summary_keywords = list(llm_extract_keywords(summary, client=self.client)) + summary_keywords = summary_keywords if summary_keywords is not None else [] processed_details = [] for page_data in details: page_id = page_data.get("page_id", generate_id("page")) - full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" - inp_vec = get_embedding(full_text) - inp_vec = normalize_vector(inp_vec).tolist() - page_keywords = list(llm_extract_keywords(full_text, client=self.client)) + + # 检查是否已有embedding,避免重复计算 + if "page_embedding" in page_data and page_data["page_embedding"]: + print(f"MidTermMemory: Reusing existing embedding for page {page_id}") + inp_vec = page_data["page_embedding"] + # 确保embedding是normalized的 + if isinstance(inp_vec, list): + inp_vec_np = np.array(inp_vec, dtype=np.float32) + if np.linalg.norm(inp_vec_np) > 1.1 or np.linalg.norm(inp_vec_np) < 0.9: # 检查是否需要重新normalize + inp_vec = normalize_vector(inp_vec_np).tolist() + else: + print(f"MidTermMemory: Computing new embedding for page {page_id}") + full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" + inp_vec = get_embedding( + full_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) + inp_vec = normalize_vector(inp_vec).tolist() + + # 使用已有keywords或设置为空(由multi-summary提供) + if "page_keywords" in page_data and page_data["page_keywords"]: + print(f"MidTermMemory: Using existing keywords for page {page_id}") + page_keywords = page_data["page_keywords"] + else: + print(f"MidTermMemory: Setting empty keywords for page {page_id} (will be filled by multi-summary)") + page_keywords = [] processed_page = { **page_data, # Carry over existing fields like user_input, agent_response, timestamp @@ -153,9 +189,13 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page similarity_threshold=0.6, keyword_similarity_alpha=1.0): if not self.sessions: # If no existing sessions, just add as a new one print("MidTermMemory: No existing sessions. Adding new session directly.") - return self.add_session(summary_for_new_pages, pages_to_insert) + return self.add_session(summary_for_new_pages, pages_to_insert, keywords_for_new_pages) - new_summary_vec = get_embedding(summary_for_new_pages) + new_summary_vec = get_embedding( + summary_for_new_pages, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) new_summary_vec = normalize_vector(new_summary_vec) best_sid = None @@ -188,10 +228,33 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page processed_new_pages = [] for page_data in pages_to_insert: page_id = page_data.get("page_id", generate_id("page")) # Use existing or generate new ID - full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" - inp_vec = get_embedding(full_text) - inp_vec = normalize_vector(inp_vec).tolist() - page_keywords_current = list(llm_extract_keywords(full_text, client=self.client)) + + # 检查是否已有embedding,避免重复计算 + if "page_embedding" in page_data and page_data["page_embedding"]: + print(f"MidTermMemory: Reusing existing embedding for page {page_id}") + inp_vec = page_data["page_embedding"] + # 确保embedding是normalized的 + if isinstance(inp_vec, list): + inp_vec_np = np.array(inp_vec, dtype=np.float32) + if np.linalg.norm(inp_vec_np) > 1.1 or np.linalg.norm(inp_vec_np) < 0.9: # 检查是否需要重新normalize + inp_vec = normalize_vector(inp_vec_np).tolist() + else: + print(f"MidTermMemory: Computing new embedding for page {page_id}") + full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" + inp_vec = get_embedding( + full_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) + inp_vec = normalize_vector(inp_vec).tolist() + + # 使用已有keywords或继承session的keywords + if "page_keywords" in page_data and page_data["page_keywords"]: + print(f"MidTermMemory: Using existing keywords for page {page_id}") + page_keywords_current = page_data["page_keywords"] + else: + print(f"MidTermMemory: Using session keywords for page {page_id}") + page_keywords_current = keywords_for_new_pages processed_page = { **page_data, # Carry over existing fields @@ -211,16 +274,20 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page return best_sid else: print(f"MidTermMemory: No suitable session to merge (best score {best_overall_score:.2f} < threshold {similarity_threshold}). Creating new session.") - return self.add_session(summary_for_new_pages, pages_to_insert) + return self.add_session(summary_for_new_pages, pages_to_insert, keywords_for_new_pages) - def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_similarity_threshold=0.1, - top_k_sessions=5, keyword_alpha=1.0, recency_tau_search=3600): + def search_sessions(self, query_text, segment_similarity_threshold=0.0, page_similarity_threshold=0.0, + top_k_sessions=6, keyword_alpha=1.0, recency_tau_search=3600): if not self.sessions: return [] - query_vec = get_embedding(query_text) + query_vec = get_embedding( + query_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) query_vec = normalize_vector(query_vec) - query_keywords = set(llm_extract_keywords(query_text, client=self.client)) + query_keywords = set() # Keywords extraction removed, relying on semantic similarity candidate_sessions = [] session_ids = list(self.sessions.keys()) diff --git a/memoryos-mcp/memoryos/prompts.py b/memoryos-mcp/memoryos/prompts.py index e7b2480..46d68fb 100644 --- a/memoryos-mcp/memoryos/prompts.py +++ b/memoryos-mcp/memoryos/prompts.py @@ -88,7 +88,7 @@ Output only the user profile section. """ -PERSONALITY_ANALYSIS_USER_PROMPT = """Please analyze the latest user-AI conversation below based on the 90 personality preference dimensions. +PERSONALITY_ANALYSIS_USER_PROMPT = """Please analyze the latest user-AI conversation below and update the user profile based on the 90 personality preference dimensions. Here are the 90 dimensions and their explanations: @@ -149,20 +149,23 @@ Language Style: Preference for formal vs. casual tone. Practicality: Preference for practical advice vs. theoretical discussion. -For each dimension that can be extracted from the conversation, list it in the following format: -Dimension ( Level(High / Medium / Low) ) -[Reasoning: Brief explanation including time, people, and context] -The reason for generation should be as brief as possible and highlight the key points. -Note: If a dimension cannot be inferred from the conversation, do not list it. +**Task Instructions:** +1. Review the existing user profile below +2. Analyze the new conversation for evidence of the 90 dimensions above +3. Update and integrate the findings into a comprehensive user profile +4. For each dimension that can be identified, use the format: Dimension ( Level(High/Medium/Low) ) +5. Include brief reasoning for each dimension when possible +6. Maintain existing insights from the old profile while incorporating new observations +7. If a dimension cannot be inferred from either the old profile or new conversation, do not include it -Known User Traits (if any): -{known_user_traits} +**Existing User Profile:** +{existing_user_profile} -Latest User-AI Conversation: +**Latest User-AI Conversation:** {conversation} -Please begin your analysis: -""" +**Updated User Profile:** +Please provide the comprehensive updated user profile below, combining insights from both the existing profile and new conversation:""" # Prompt for knowledge extraction (NEW) KNOWLEDGE_EXTRACTION_SYSTEM_PROMPT = """You are a knowledge extraction assistant. Your task is to extract user private data and assistant knowledge from conversations. @@ -181,8 +184,8 @@ 【User Private Data】 Extract personal information about the user. Be extremely concise - use shortest possible phrases: -- [Brief fact] -- [Brief fact] +- [Brief fact]: [Minimal context(Including entities and time)] +- [Brief fact]: [Minimal context(Including entities and time)] - (If no private data found, write "None") 【Assistant Knowledge】 @@ -190,12 +193,6 @@ - Assistant [brief action] at [time/context] - Assistant [brief capability] during [brief context] - (If no assistant knowledge found, write "None") - -Examples: -- Assistant recommended Interstellar on 2023-10-01 -- Assistant provided pasta recipe during cooking talk -- Assistant helped with Python code -- Assistant analyzed spreadsheet data """ # Prompt for updating user profile (from utils.py, gpt_update_profile) diff --git a/memoryos-mcp/memoryos/requirements.txt b/memoryos-mcp/memoryos/requirements.txt new file mode 100644 index 0000000..0c8a8d4 --- /dev/null +++ b/memoryos-mcp/memoryos/requirements.txt @@ -0,0 +1,23 @@ +# MemoryOS Core Dependencies +# Core scientific computing and ML libraries +numpy==1.24.* +sentence-transformers>=2.7.0,<3.0.0 # Updated for Qwen model support +transformers>=4.51.0 # Required for newer sentence-transformer features +FlagEmbedding>=1.2.9 # For BGE-M3 model support + +faiss-gpu>=1.7.0,<2.0.0 +httpx[socks] +openai +# Web framework (for demo) +flask>=2.0.0,<3.0.0 + +# Optional utilities +python-dotenv>=0.19.0,<2.0.0 + +# Development and testing (optional) +# pytest>=7.0.0,<8.0.0 +# pytest-asyncio>=0.20.0,<1.0.0 + +# Additional dependencies for compatibility +typing-extensions>=4.0.0,<5.0.0 +regex>=2022.1.18 diff --git a/memoryos-mcp/memoryos/retriever.py b/memoryos-mcp/memoryos/retriever.py index 9792cd0..1ea1568 100644 --- a/memoryos-mcp/memoryos/retriever.py +++ b/memoryos-mcp/memoryos/retriever.py @@ -1,16 +1,25 @@ from collections import deque import heapq -from utils import get_timestamp, OpenAIClient # OpenAIClient might not be directly used here but good for consistency -from short_term import ShortTermMemory -from mid_term import MidTermMemory -from long_term import LongTermMemory +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Optional + +try: + from .utils import get_timestamp, OpenAIClient, run_parallel_tasks + from .short_term import ShortTermMemory + from .mid_term import MidTermMemory + from .long_term import LongTermMemory +except ImportError: + from utils import get_timestamp, OpenAIClient, run_parallel_tasks + from short_term import ShortTermMemory + from mid_term import MidTermMemory + from long_term import LongTermMemory # from .updater import Updater # Updater is not directly used by Retriever class Retriever: def __init__(self, mid_term_memory: MidTermMemory, long_term_memory: LongTermMemory, - assistant_long_term_memory: LongTermMemory = None, # Add assistant LTM + assistant_long_term_memory: Optional[LongTermMemory] = None, # Add assistant LTM # client: OpenAIClient, # Not strictly needed if all LLM calls are within memory modules queue_capacity=7): # Default from main_memoybank was 7 for retrieval_queue # Short term memory is usually for direct context, not primary retrieval source here @@ -22,19 +31,9 @@ def __init__(self, self.retrieval_queue_capacity = queue_capacity # self.retrieval_queue = deque(maxlen=queue_capacity) # This was instance level, but retrieve returns it, so maybe not needed as instance var - def retrieve_context(self, user_query: str, - user_id: str, # Needed for profile, can be used for context filtering if desired - segment_similarity_threshold=0.1, # From main_memoybank example - page_similarity_threshold=0.1, # From main_memoybank example - knowledge_threshold=0.01, # From main_memoybank example - top_k_sessions=5, # From MidTermMemory search default - top_k_knowledge=20 # Default for knowledge search - ): - print(f"Retriever: Starting retrieval for query: '{user_query[:50]}...'") - - # 1. Retrieve from Mid-Term Memory - # MidTermMemory.search_sessions now takes client for its internal keyword extraction - # It also returns a more structured result including scores. + def _retrieve_mid_term_context(self, user_query, segment_similarity_threshold, page_similarity_threshold, top_k_sessions): + """并行任务:从中期记忆检索""" + print("Retriever: Searching mid-term memory...") matched_sessions = self.mid_term_memory.search_sessions( query_text=user_query, segment_similarity_threshold=segment_similarity_threshold, @@ -64,38 +63,69 @@ def retrieve_context(self, user_query: str, # Extract pages from heap, already sorted by heapq property (smallest first) # We want highest scores, so either use a max-heap or sort after popping from min-heap. - retrieved_mid_term_pages = [item[2] for item in sorted(top_pages_heap, key=lambda x: x[0], reverse=True)] - print(f"Retriever: Mid-term memory recalled {len(retrieved_mid_term_pages)} pages.") + retrieved_pages = [item[2] for item in sorted(top_pages_heap, key=lambda x: x[0], reverse=True)] + print(f"Retriever: Mid-term memory recalled {len(retrieved_pages)} pages.") + return retrieved_pages - # 2. Retrieve from Long-Term User Knowledge (specific to the user) - # Assuming LongTermMemory for a user stores their specific knowledge/private data. - # The main LongTermMemory class in `long_term.py` has `search_user_knowledge` which doesn't need user_id as it's implicit in the instance - # However, if a single LTM instance handles multiple users, it would need user_id. - # For the Memoryos class, LTM will be user-specific or assistant-specific. - retrieved_user_knowledge = self.long_term_memory.search_user_knowledge( + def _retrieve_user_knowledge(self, user_query, knowledge_threshold, top_k_knowledge): + """并行任务:从用户长期知识检索""" + print("Retriever: Searching user long-term knowledge...") + retrieved_knowledge = self.long_term_memory.search_user_knowledge( user_query, threshold=knowledge_threshold, top_k=top_k_knowledge ) - print(f"Retriever: Long-term user knowledge recalled {len(retrieved_user_knowledge)} items.") + print(f"Retriever: Long-term user knowledge recalled {len(retrieved_knowledge)} items.") + return retrieved_knowledge - # 3. Retrieve from Long-Term Assistant Knowledge (general for the assistant) - # This requires a separate LTM instance or a method in LTM that queries a different knowledge base. - # In our Memoryos structure, there will be a separate LTM for assistant. - # For now, assuming self.long_term_memory is the USER's LTM. - # The Memoryos class will handle passing the correct LTM instance for assistant knowledge. - # This function will just return what it can from the provided LTM. - # If assistant_ltm is passed, it can be used: self.assistant_long_term_memory.search_assistant_knowledge(...) - retrieved_assistant_knowledge = [] - if self.assistant_long_term_memory: - retrieved_assistant_knowledge = self.assistant_long_term_memory.search_assistant_knowledge( - user_query, threshold=knowledge_threshold, top_k=top_k_knowledge - ) - print(f"Retriever: Long-term assistant knowledge recalled {len(retrieved_assistant_knowledge)} items.") - else: + def _retrieve_assistant_knowledge(self, user_query, knowledge_threshold, top_k_knowledge): + """并行任务:从助手长期知识检索""" + if not self.assistant_long_term_memory: print("Retriever: No assistant long-term memory provided, skipping assistant knowledge retrieval.") + return [] + + print("Retriever: Searching assistant long-term knowledge...") + retrieved_knowledge = self.assistant_long_term_memory.search_assistant_knowledge( + user_query, threshold=knowledge_threshold, top_k=top_k_knowledge + ) + print(f"Retriever: Long-term assistant knowledge recalled {len(retrieved_knowledge)} items.") + return retrieved_knowledge + + def retrieve_context(self, user_query: str, + user_id: str, # Needed for profile, can be used for context filtering if desired + segment_similarity_threshold=0.1, # From main_memoybank example + page_similarity_threshold=0.1, # From main_memoybank example + knowledge_threshold=0.01, # From main_memoybank example + top_k_sessions=5, # From MidTermMemory search default + top_k_knowledge=20 # Default for knowledge search + ): + print(f"Retriever: Starting PARALLEL retrieval for query: '{user_query[:50]}...'") + + # 并行执行三个检索任务 + tasks = [ + lambda: self._retrieve_mid_term_context(user_query, segment_similarity_threshold, page_similarity_threshold, top_k_sessions), + lambda: self._retrieve_user_knowledge(user_query, knowledge_threshold, top_k_knowledge), + lambda: self._retrieve_assistant_knowledge(user_query, knowledge_threshold, top_k_knowledge) + ] + + # 使用并行处理 + with ThreadPoolExecutor(max_workers=3) as executor: + futures = [] + for i, task in enumerate(tasks): + future = executor.submit(task) + futures.append((i, future)) + + results = [None] * 3 + for task_idx, future in futures: + try: + results[task_idx] = future.result() + except Exception as e: + print(f"Error in retrieval task {task_idx}: {e}") + results[task_idx] = [] + + retrieved_mid_term_pages, retrieved_user_knowledge, retrieved_assistant_knowledge = results return { - "retrieved_pages": retrieved_mid_term_pages, # List of page dicts - "retrieved_user_knowledge": retrieved_user_knowledge, # List of knowledge entry dicts - "retrieved_assistant_knowledge": retrieved_assistant_knowledge, # List of assistant knowledge entry dicts + "retrieved_pages": retrieved_mid_term_pages or [], # List of page dicts + "retrieved_user_knowledge": retrieved_user_knowledge or [], # List of knowledge entry dicts + "retrieved_assistant_knowledge": retrieved_assistant_knowledge or [], # List of assistant knowledge entry dicts "retrieved_at": get_timestamp() } \ No newline at end of file diff --git a/memoryos-mcp/memoryos/short_term.py b/memoryos-mcp/memoryos/short_term.py index c6ab7d7..37ffddc 100644 --- a/memoryos-mcp/memoryos/short_term.py +++ b/memoryos-mcp/memoryos/short_term.py @@ -1,6 +1,9 @@ import json from collections import deque -from utils import get_timestamp, ensure_directory_exists +try: + from .utils import get_timestamp, ensure_directory_exists +except ImportError: + from utils import get_timestamp, ensure_directory_exists class ShortTermMemory: def __init__(self, file_path, max_capacity=10): diff --git a/memoryos-mcp/memoryos/test.py b/memoryos-mcp/memoryos/test.py new file mode 100644 index 0000000..23de687 --- /dev/null +++ b/memoryos-mcp/memoryos/test.py @@ -0,0 +1,55 @@ + +import os +from memoryos import Memoryos + +# --- Basic Configuration --- +USER_ID = "demo_user" +ASSISTANT_ID = "demo_assistant" +API_KEY = "" # Replace with your key +BASE_URL = "" # Optional: if using a custom OpenAI endpoint +DATA_STORAGE_PATH = "" +LLM_MODEL = "gpt-4o-mini" + +def simple_demo(): + print("MemoryOS Simple Demo") + + # 1. Initialize MemoryOS + print("Initializing MemoryOS...") + try: + memo = Memoryos( + user_id=USER_ID, + openai_api_key=API_KEY, + openai_base_url=BASE_URL, + data_storage_path=DATA_STORAGE_PATH, + llm_model=LLM_MODEL, + assistant_id=ASSISTANT_ID, + short_term_capacity=7, + mid_term_heat_threshold=5, + retrieval_queue_capacity=10, + long_term_knowledge_capacity=100, + mid_term_similarity_threshold=0.6 + ) + print("MemoryOS initialized successfully!\n") + except Exception as e: + print(f"Error: {e}") + return + + # 2. Add some basic memories + print("Adding some memories...") + + memo.add_memory( + user_input="Hi! I'm Tom, I work as a data scientist in San Francisco.", + agent_response="Hello Tom! Nice to meet you. Data science is such an exciting field. What kind of data do you work with?" + ) + + test_query = "What do you remember about my job?" + print(f"User: {test_query}") + + response = memo.get_response( + query=test_query, + ) + + print(f"Assistant: {response}") + +if __name__ == "__main__": + simple_demo() \ No newline at end of file diff --git a/memoryos-mcp/memoryos/updater.py b/memoryos-mcp/memoryos/updater.py index b8b6e70..33de39b 100644 --- a/memoryos-mcp/memoryos/updater.py +++ b/memoryos-mcp/memoryos/updater.py @@ -1,11 +1,23 @@ -from utils import ( - generate_id, get_timestamp, - gpt_generate_multi_summary, check_conversation_continuity, generate_page_meta_info, OpenAIClient, - llm_extract_keywords -) -from short_term import ShortTermMemory -from mid_term import MidTermMemory -from long_term import LongTermMemory +try: + from .utils import ( + generate_id, get_timestamp, + gpt_generate_multi_summary, check_conversation_continuity, generate_page_meta_info, OpenAIClient, + run_parallel_tasks + ) + from .short_term import ShortTermMemory + from .mid_term import MidTermMemory + from .long_term import LongTermMemory +except ImportError: + from utils import ( + generate_id, get_timestamp, + gpt_generate_multi_summary, check_conversation_continuity, generate_page_meta_info, OpenAIClient, + run_parallel_tasks + ) + from short_term import ShortTermMemory + from mid_term import MidTermMemory + from long_term import LongTermMemory + +from concurrent.futures import ThreadPoolExecutor, as_completed class Updater: def __init__(self, @@ -23,6 +35,38 @@ def __init__(self, self.last_evicted_page_for_continuity = None # Tracks the actual last page object for continuity checks self.llm_model = llm_model + def _process_page_embedding_and_keywords(self, page_data): + """处理单个页面的embedding生成(关键词由multi-summary提供)""" + page_id = page_data.get("page_id", generate_id("page")) + + # 检查是否已有embedding + if "page_embedding" in page_data and page_data["page_embedding"]: + print(f"Updater: Page {page_id} already has embedding, skipping computation") + return page_data + + # 只处理embedding,关键词由multi-summary统一提供 + if not ("page_embedding" in page_data and page_data["page_embedding"]): + full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" + try: + embedding = self._get_embedding_for_page(full_text) + if embedding is not None: + from .utils import normalize_vector + page_data["page_embedding"] = normalize_vector(embedding).tolist() + print(f"Updater: Generated embedding for page {page_id}") + except Exception as e: + print(f"Error generating embedding for page {page_id}: {e}") + + # 设置空的关键词列表(将由multi-summary的关键词填充) + if "page_keywords" not in page_data: + page_data["page_keywords"] = [] + + return page_data + + def _get_embedding_for_page(self, text): + """获取页面embedding的辅助方法""" + from .utils import get_embedding + return get_embedding(text) + def _update_linked_pages_meta_info(self, start_page_id, new_meta_info): """ Updates meta_info for a chain of connected pages starting from start_page_id. @@ -143,10 +187,10 @@ def process_short_term_to_mid_term(self): # Fallback: if no summaries, add as one session or handle as a single block print("Updater: No specific themes from multi-summary. Adding batch as a general session.") fallback_summary = "General conversation segment from short-term memory." - fallback_keywords = llm_extract_keywords(input_text_for_summary, self.client, model=self.llm_model) if input_text_for_summary else [] + fallback_keywords = [] # Use empty keywords since multi-summary failed self.mid_term_memory.insert_pages_into_session( summary_for_new_pages=fallback_summary, - keywords_for_new_pages=list(fallback_keywords), + keywords_for_new_pages=fallback_keywords, pages_to_insert=current_batch_pages, similarity_threshold=self.topic_similarity_threshold ) @@ -174,12 +218,8 @@ def update_long_term_from_analysis(self, user_id, profile_analysis_result): new_profile_text = profile_analysis_result.get("profile") if new_profile_text and new_profile_text.lower() != "none": print(f"Updater: Updating user profile for {user_id} in LongTermMemory.") - current_profile = self.long_term_memory.get_raw_user_profile(user_id) - if current_profile and current_profile.lower() != "none": - updated_profile = gpt_update_profile(current_profile, new_profile_text, self.client) - else: - updated_profile = new_profile_text # First profile - self.long_term_memory.update_user_profile(user_id, updated_profile) + # 直接使用新的分析结果作为完整画像,因为它应该已经是集成后的结果 + self.long_term_memory.update_user_profile(user_id, new_profile_text, merge=False) user_private_knowledge = profile_analysis_result.get("private") if user_private_knowledge and user_private_knowledge.lower() != "none": diff --git a/memoryos-mcp/memoryos/utils.py b/memoryos-mcp/memoryos/utils.py index 2bacbc4..6983a44 100644 --- a/memoryos-mcp/memoryos/utils.py +++ b/memoryos-mcp/memoryos/utils.py @@ -5,17 +5,45 @@ from sentence_transformers import SentenceTransformer import json import os -import prompts # Assuming prompts.py is in the same directory +import inspect +from functools import wraps +try: + from . import prompts # 尝试相对导入 +except ImportError: + import prompts # 回退到绝对导入 from openai import OpenAI +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +def clean_reasoning_model_output(text): + """ + 清理推理模型输出中的标签 + 适配推理模型(如o1系列)的输出格式 + """ + if not text: + return text + + import re + # 移除...标签及其内容 + cleaned_text = re.sub(r'.*?', '', text, flags=re.DOTALL) + # 清理可能产生的多余空白行 + cleaned_text = re.sub(r'\n\s*\n\s*\n', '\n\n', cleaned_text) + # 移除开头和结尾的空白 + cleaned_text = cleaned_text.strip() + + return cleaned_text + # ---- OpenAI Client ---- class OpenAIClient: - def __init__(self, api_key, base_url=None): + def __init__(self, api_key, base_url=None, max_workers=5): self.api_key = api_key self.base_url = base_url if base_url else "https://api.openai.com/v1" # The openai library looks for OPENAI_API_KEY and OPENAI_BASE_URL env vars by default # or they can be passed directly to the client. # For simplicity and explicit control, we'll pass them to the client constructor. self.client = OpenAI(api_key=self.api_key, base_url=self.base_url) + self.executor = ThreadPoolExecutor(max_workers=max_workers) + self._lock = threading.Lock() def chat_completion(self, model, messages, temperature=0.7, max_tokens=2000): print(f"Calling OpenAI API. Model: {model}") @@ -26,12 +54,66 @@ def chat_completion(self, model, messages, temperature=0.7, max_tokens=2000): temperature=temperature, max_tokens=max_tokens ) - return response.choices[0].message.content.strip() + raw_content = response.choices[0].message.content.strip() + # 自动清理推理模型的标签 + cleaned_content = clean_reasoning_model_output(raw_content) + return cleaned_content except Exception as e: print(f"Error calling OpenAI API: {e}") # Fallback or error handling return "Error: Could not get response from LLM." + def chat_completion_async(self, model, messages, temperature=0.7, max_tokens=2000): + """异步版本的chat_completion""" + return self.executor.submit(self.chat_completion, model, messages, temperature, max_tokens) + + def batch_chat_completion(self, requests): + """ + 并行处理多个LLM请求 + requests: List of dict with keys: model, messages, temperature, max_tokens + """ + futures = [] + for req in requests: + future = self.chat_completion_async( + model=req.get("model", "gpt-4o-mini"), + messages=req["messages"], + temperature=req.get("temperature", 0.7), + max_tokens=req.get("max_tokens", 2000) + ) + futures.append(future) + + results = [] + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + print(f"Error in batch completion: {e}") + results.append("Error: Could not get response from LLM.") + + return results + + def shutdown(self): + """关闭线程池""" + self.executor.shutdown(wait=True) + +# ---- Parallel Processing Utilities ---- +def run_parallel_tasks(tasks, max_workers=3): + """ + 并行执行任务列表 + tasks: List of callable functions + """ + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(task) for task in tasks] + results = [] + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + print(f"Error in parallel task: {e}") + results.append(None) + return results # ---- Basic Utilities ---- def get_timestamp(): @@ -45,15 +127,96 @@ def ensure_directory_exists(path): # ---- Embedding Utilities ---- _model_cache = {} +_embedding_cache = {} # 添加embedding缓存 + +def _get_valid_kwargs(func, kwargs): + """Helper to filter kwargs for a given function's signature.""" + try: + sig = inspect.signature(func) + param_keys = set(sig.parameters.keys()) + return {k: v for k, v in kwargs.items() if k in param_keys} + except (ValueError, TypeError): + # Fallback for functions/methods where signature inspection is not straightforward + return kwargs + +def get_embedding(text, model_name="all-MiniLM-L6-v2", use_cache=True, **kwargs): + """ + 获取文本的embedding向量。 + 支持多种主流模型,能自动适应不同库的调用方式。 + - SentenceTransformer模型: e.g., 'all-MiniLM-L6-v2', 'Qwen/Qwen3-Embedding-0.6B' + - FlagEmbedding模型: e.g., 'BAAI/bge-m3' -def get_embedding(text, model_name="all-MiniLM-L6-v2"): - if model_name not in _model_cache: - print(f"Loading sentence transformer model: {model_name}") - _model_cache[model_name] = SentenceTransformer(model_name) - model = _model_cache[model_name] - embedding = model.encode([text], convert_to_numpy=True)[0] + :param text: 输入文本。 + :param model_name: Hugging Face上的模型名称。 + :param use_cache: 是否使用内存缓存。 + :param kwargs: 传递给模型构造函数或encode方法的额外参数。 + - for Qwen: `model_kwargs`, `tokenizer_kwargs`, `prompt_name="query"` + - for BGE-M3: `use_fp16=True`, `max_length=8192` + :return: 文本的embedding向量 (numpy array)。 + """ + model_config_key = json.dumps({"model_name": model_name, **kwargs}, sort_keys=True) + + if use_cache: + cache_key = f"{model_config_key}::{hash(text)}" + if cache_key in _embedding_cache: + return _embedding_cache[cache_key] + + # --- Model Loading --- + model_init_key = json.dumps({"model_name": model_name, **{k:v for k,v in kwargs.items() if k not in ['batch_size', 'max_length']}}, sort_keys=True) + if model_init_key not in _model_cache: + print(f"Loading model: {model_name}...") + if 'bge-m3' in model_name.lower(): + try: + from FlagEmbedding import BGEM3FlagModel + init_kwargs = _get_valid_kwargs(BGEM3FlagModel.__init__, kwargs) + print(f"-> Using BGEM3FlagModel with init kwargs: {init_kwargs}") + _model_cache[model_init_key] = BGEM3FlagModel(model_name, **init_kwargs) + except ImportError: + raise ImportError("Please install FlagEmbedding: 'pip install -U FlagEmbedding' to use bge-m3 model.") + else: # Default handler for SentenceTransformer-based models (like Qwen, all-MiniLM, etc.) + try: + from sentence_transformers import SentenceTransformer + init_kwargs = _get_valid_kwargs(SentenceTransformer.__init__, kwargs) + print(f"-> Using SentenceTransformer with init kwargs: {init_kwargs}") + _model_cache[model_init_key] = SentenceTransformer(model_name, **init_kwargs) + except ImportError: + raise ImportError("Please install sentence-transformers: 'pip install -U sentence-transformers' to use this model.") + + model = _model_cache[model_init_key] + + # --- Encoding --- + embedding = None + if 'bge-m3' in model_name.lower(): + encode_kwargs = _get_valid_kwargs(model.encode, kwargs) + print(f"-> Encoding with BGEM3FlagModel using kwargs: {encode_kwargs}") + result = model.encode([text], **encode_kwargs) + embedding = result['dense_vecs'][0] + else: # Default to SentenceTransformer-based models + encode_kwargs = _get_valid_kwargs(model.encode, kwargs) + print(f"-> Encoding with SentenceTransformer using kwargs: {encode_kwargs}") + embedding = model.encode([text], **encode_kwargs)[0] + + if use_cache: + cache_key = f"{model_config_key}::{hash(text)}" + _embedding_cache[cache_key] = embedding + if len(_embedding_cache) > 10000: + keys_to_remove = list(_embedding_cache.keys())[:1000] + for key in keys_to_remove: + try: + del _embedding_cache[key] + except KeyError: + pass + print("Cleaned embedding cache to prevent memory overflow") + return embedding + +def clear_embedding_cache(): + """清空embedding缓存""" + global _embedding_cache + _embedding_cache.clear() + print("Embedding cache cleared") + def normalize_vector(vec): vec = np.array(vec, dtype=np.float32) norm = np.linalg.norm(vec) @@ -100,17 +263,20 @@ def gpt_generate_multi_summary(text, client: OpenAIClient, model="gpt-4o-mini"): return {"input": text, "summaries": summaries} -def gpt_user_profile_analysis(dialogs, client: OpenAIClient, model="gpt-4o-mini", known_user_traits="None"): - """Analyze user personality profile from dialogs""" +def gpt_user_profile_analysis(dialogs, client: OpenAIClient, model="gpt-4o-mini", existing_user_profile="None"): + """ + Analyze and update user personality profile from dialogs + 结合现有画像和新对话,直接输出更新后的完整画像 + """ conversation = "\n".join([f"User: {d.get('user_input','')} (Timestamp: {d.get('timestamp', '')})\nAssistant: {d.get('agent_response','')} (Timestamp: {d.get('timestamp', '')})" for d in dialogs]) messages = [ {"role": "system", "content": prompts.PERSONALITY_ANALYSIS_SYSTEM_PROMPT}, {"role": "user", "content": prompts.PERSONALITY_ANALYSIS_USER_PROMPT.format( conversation=conversation, - known_user_traits=known_user_traits + existing_user_profile=existing_user_profile )} ] - print("Calling LLM for user profile analysis...") + print("Calling LLM for user profile analysis and update...") result_text = client.chat_completion(model=model, messages=messages) return result_text.strip() if result_text else "None" diff --git a/memoryos-mcp/server_new.py b/memoryos-mcp/server_new.py index 5ebfd60..3a7e242 100644 --- a/memoryos-mcp/server_new.py +++ b/memoryos-mcp/server_new.py @@ -49,7 +49,8 @@ def init_memoryos(config_path: str) -> Memoryos: long_term_knowledge_capacity=config.get('long_term_knowledge_capacity', 100), retrieval_queue_capacity=config.get('retrieval_queue_capacity', 7), mid_term_heat_threshold=config.get('mid_term_heat_threshold', 5.0), - llm_model=config.get('llm_model', 'gpt-4o-mini') + llm_model=config.get('llm_model', 'gpt-4o-mini'), + embedding_model_name=config.get('embedding_model_name', 'all-MiniLM-L6-v2') ) # 创建FastMCP服务器实例 @@ -87,8 +88,8 @@ def add_memory(user_input: str, agent_response: str, timestamp: Optional[str] = memoryos_instance.add_memory( user_input=user_input, agent_response=agent_response, - timestamp=timestamp, - meta_data=meta_data + timestamp=timestamp or get_timestamp(), + meta_data=meta_data or {} ) result = { @@ -163,9 +164,6 @@ def retrieve_memory(query: str, relationship_with_user: str = "friend", style_hi "user_profile": user_profile if user_profile and user_profile.lower() != "none" else "No detailed user profile", "short_term_memory": short_term_history, "short_term_count": len(short_term_history), - # "retrieved_pages": retrieval_results["retrieved_pages"][:max_results], - # "retrieved_user_knowledge": retrieval_results["retrieved_user_knowledge"][:max_results], - # "retrieved_assistant_knowledge": retrieval_results["retrieved_assistant_knowledge"][:max_results], "retrieved_pages": [{ 'user_input': page['user_input'], 'agent_response': page['agent_response'], @@ -182,9 +180,11 @@ def retrieve_memory(query: str, relationship_with_user: str = "friend", style_hi 'knowledge': k['knowledge'], 'timestamp': k['timestamp'] } for k in retrieval_results["retrieved_assistant_knowledge"][:max_results]], - # "total_pages_found": len(retrieval_results["retrieved_pages"]), - # "total_user_knowledge_found": len(retrieval_results["retrieved_user_knowledge"]), - # "total_assistant_knowledge_found": len(retrieval_results["retrieved_assistant_knowledge"]) + + # 添加总数统计字段 + "total_pages_found": len(retrieval_results["retrieved_pages"]), + "total_user_knowledge_found": len(retrieval_results["retrieved_user_knowledge"]), + "total_assistant_knowledge_found": len(retrieval_results["retrieved_assistant_knowledge"]) } return result diff --git a/memoryos-mcp/test_comprehensive.py b/memoryos-mcp/test_comprehensive.py deleted file mode 100644 index a56239f..0000000 --- a/memoryos-mcp/test_comprehensive.py +++ /dev/null @@ -1,381 +0,0 @@ - -""" -MemoryOS MCP 服务器综合测试客户端 -使用官方MCP Python SDK进行测试 -""" - -import asyncio -import json -import subprocess -import sys -import time -from typing import Dict, Any, Optional -from pathlib import Path - -# 尝试导入官方MCP客户端 -try: - from mcp import ClientSession, StdioServerParameters - from mcp.client.stdio import stdio_client - from mcp import types -except ImportError as e: - print(f"❌ 无法导入MCP客户端库: {e}") - print("请安装官方MCP SDK: pip install mcp") - sys.exit(1) - -class MemoryOSMCPTester: - """MemoryOS MCP服务器测试类""" - - def __init__(self, server_script: str = "server_new.py", config_file: str = "config.json"): - self.server_script = Path(server_script) - self.config_file = Path(config_file) - - # 验证文件存在 - if not self.server_script.exists(): - raise FileNotFoundError(f"服务器脚本不存在: {self.server_script}") - if not self.config_file.exists(): - raise FileNotFoundError(f"配置文件不存在: {self.config_file}") - - async def test_server_initialization(self): - """测试服务器初始化""" - print("\n🔄 测试1: 服务器初始化") - - server_params = StdioServerParameters( - command=sys.executable, - args=[str(self.server_script), "--config", str(self.config_file)], - env=None - ) - - try: - async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - # 初始化连接 - await session.initialize() - print("✅ 服务器初始化成功") - return True - except Exception as e: - print(f"❌ 服务器初始化失败: {e}") - return False - - async def test_tools_discovery(self): - """测试工具发现""" - print("\n🔧 测试2: 工具发现") - - server_params = StdioServerParameters( - command=sys.executable, - args=[str(self.server_script), "--config", str(self.config_file)], - env=None - ) - - try: - async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() - - # 获取工具列表 - tools_result = await session.list_tools() - tools = tools_result.tools if hasattr(tools_result, 'tools') else [] - - print(f"✅ 发现 {len(tools)} 个工具:") - expected_tools = ["add_memory", "retrieve_memory", "get_user_profile"] - - for tool in tools: - print(f" - {tool.name}: {tool.description}") - if tool.name in expected_tools: - expected_tools.remove(tool.name) - - if expected_tools: - print(f"⚠️ 缺少预期工具: {expected_tools}") - else: - print("✅ 所有预期工具都已找到") - - return tools - except Exception as e: - print(f"❌ 工具发现失败: {e}") - return [] - - async def test_add_memory_tool(self): - """测试添加记忆工具 - 20轮测试""" - print("\n💾 测试3: 添加记忆工具 (20轮测试)") - - server_params = StdioServerParameters( - command=sys.executable, - args=[str(self.server_script), "--config", str(self.config_file)], - env=None - ) - - # 准备20轮测试数据 - test_conversations = [ - ("Hello, I'm a new user", "Welcome to MemoryOS! I'm your AI assistant."), - ("I like programming", "Great! Programming is a very interesting skill. What programming language do you mainly use?"), - ("I often use Python", "Python is a great language! Simple yet powerful."), - ("I'm learning machine learning", "Machine learning has great prospects! Which field are you focusing on?"), - ("I'm interested in natural language processing", "NLP is a fascinating field! It has many practical applications."), - ("I want to understand how ChatGPT works", "ChatGPT is based on the Transformer architecture and uses massive pre-training data."), - ("What is the attention mechanism?", "The attention mechanism allows models to focus on the most relevant parts of the input sequence."), - ("I want to learn deep learning", "For deep learning beginners, I suggest starting with neural network fundamentals."), - ("Recommend some learning resources", "I recommend classic resources like 'Deep Learning' book and CS231n course."), - ("I have a project idea", "Awesome! Share your project idea and I'll help you analyze it."), - ("I want to build an intelligent dialogue system", "Intelligent dialogue systems need to consider intent recognition, context understanding and other technologies."), - ("How to handle multi-turn conversations?", "Multi-turn conversations require maintaining dialogue state and context memory."), - ("How does MemoryOS work?", "MemoryOS maintains long-term dialogue context through hierarchical memory management."), - ("What's the difference between short-term and long-term memory", "Short-term memory stores current conversations, while long-term memory saves important user information."), - ("How to optimize memory retrieval?", "You can use vector similarity search and semantic understanding to improve retrieval accuracy."), - ("I want to contribute code", "Welcome to contribute! You can start by reading documentation and solving issues."), - ("What open source projects do you recommend?", "I recommend following popular AI open source projects like Hugging Face and LangChain."), - ("My interest is computer vision", "Computer vision covers areas like image recognition and object detection."), - ("Advice on choosing deep learning frameworks", "Both PyTorch and TensorFlow are great. PyTorch is better for research, TensorFlow for production."), - ("Thank you for your help!", "You're welcome! I'm glad I could help you, looking forward to our next conversation.") - ] - - try: - async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() - - success_count = 0 - - # 执行20轮添加记忆测试 - for i, (user_input, agent_response) in enumerate(test_conversations, 1): - print(f" 第{i:2d}轮: 添加记忆...") - - test_data = { - "user_input": user_input, - "agent_response": agent_response - # 不包含 meta_data - } - - result = await session.call_tool("add_memory", test_data) - - if hasattr(result, 'content') and result.content: - content = result.content[0] - if hasattr(content, 'text'): - response = json.loads(content.text) - if response.get("status") == "success": - success_count += 1 - print(f" 第{i:2d}轮: ✅ 成功") - else: - print(f" 第{i:2d}轮: ❌ 失败 - {response.get('message', '未知错误')}") - else: - print(f" 第{i:2d}轮: ❌ 失败 - 无效响应格式") - else: - print(f" 第{i:2d}轮: ❌ 失败 - 无响应内容") - - # 短暂延迟,避免过快请求 - await asyncio.sleep(0.1) - - print(f"\n✅ 记忆添加测试完成: {success_count}/{len(test_conversations)} 成功") - return success_count == len(test_conversations) - - except Exception as e: - print(f"❌ 记忆添加测试失败: {e}") - return False - - async def test_retrieve_memory_tool(self): - """测试检索记忆工具""" - print("\n🔍 测试4: 检索记忆工具") - - server_params = StdioServerParameters( - command=sys.executable, - args=[str(self.server_script), "--config", str(self.config_file)], - env=None - ) - - # 准备多个检索查询 - test_queries = [ - ("user's programming skills", "Find user's programming related information"), - ("machine learning related content", "Retrieve machine learning and AI related conversations"), - ("learning resource recommendations", "Find recommended learning resources"), - ("project related discussions", "Retrieve conversations about projects"), - ("user's interests and hobbies", "Understand user's interests and preferences") - ] - - try: - async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() - - success_count = 0 - - # 执行多个检索查询测试 - for i, (query, description) in enumerate(test_queries, 1): - print(f" 第{i}个查询: {description}") - - test_query = { - "query": query, - "relationship_with_user": "friend", - "style_hint": "helpful and informative", - "max_results": 10 - } - - result = await session.call_tool("retrieve_memory", test_query) - - if hasattr(result, 'content') and result.content: - content = result.content[0] - if hasattr(content, 'text'): - response = json.loads(content.text) - if response.get("status") == "success": - success_count += 1 - print(f" 第{i}个查询: ✅ 成功") - print(f" - 检索到页面数: {response.get('total_pages_found', 0)}") - print(f" - 用户知识数: {response.get('total_user_knowledge_found', 0)}") - print(f" - 助手知识数: {response.get('total_assistant_knowledge_found', 0)}") - else: - print(f" 第{i}个查询: ❌ 失败 - {response.get('message', '未知错误')}") - else: - print(f" 第{i}个查询: ❌ 失败 - 无效响应格式") - else: - print(f" 第{i}个查询: ❌ 失败 - 无响应内容") - - # 短暂延迟 - await asyncio.sleep(0.1) - - print(f"\n✅ 记忆检索测试完成: {success_count}/{len(test_queries)} 成功") - return success_count >= len(test_queries) // 2 # 至少一半成功即可 - - except Exception as e: - print(f"❌ 记忆检索测试失败: {e}") - return False - - async def test_get_user_profile_tool(self): - """测试获取用户画像工具""" - print("\n👤 测试5: 获取用户画像工具") - - server_params = StdioServerParameters( - command=sys.executable, - args=[str(self.server_script), "--config", str(self.config_file)], - env=None - ) - - # 准备不同的参数组合测试 - test_configs = [ - ({"include_knowledge": True, "include_assistant_knowledge": False}, "包含用户知识"), - ({"include_knowledge": False, "include_assistant_knowledge": True}, "包含助手知识"), - ({"include_knowledge": True, "include_assistant_knowledge": True}, "包含所有知识"), - ({"include_knowledge": False, "include_assistant_knowledge": False}, "仅基本画像") - ] - - try: - async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() - - success_count = 0 - - # 执行不同配置的用户画像测试 - for i, (test_params, description) in enumerate(test_configs, 1): - print(f" 第{i}种配置: {description}") - - result = await session.call_tool("get_user_profile", test_params) - - if hasattr(result, 'content') and result.content: - content = result.content[0] - if hasattr(content, 'text'): - response = json.loads(content.text) - if response.get("status") == "success": - success_count += 1 - print(f" 第{i}种配置: ✅ 成功") - print(f" - 用户ID: {response.get('user_id', 'N/A')}") - print(f" - 助手ID: {response.get('assistant_id', 'N/A')}") - - # 显示用户画像信息 - user_profile = response.get('user_profile', '暂无') - if len(user_profile) > 100: - user_profile = user_profile[:100] + "..." - print(f" - 用户画像: {user_profile}") - - # 显示知识条目数量 - if 'user_knowledge_count' in response: - print(f" - 用户知识条目数: {response.get('user_knowledge_count', 0)}") - if 'assistant_knowledge_count' in response: - print(f" - 助手知识条目数: {response.get('assistant_knowledge_count', 0)}") - else: - print(f" 第{i}种配置: ❌ 失败 - {response.get('message', '未知错误')}") - else: - print(f" 第{i}种配置: ❌ 失败 - 无效响应格式") - else: - print(f" 第{i}种配置: ❌ 失败 - 无响应内容") - - # 短暂延迟 - await asyncio.sleep(0.1) - - print(f"\n✅ 用户画像测试完成: {success_count}/{len(test_configs)} 成功") - return success_count >= 3 # 至少3种配置成功 - - except Exception as e: - print(f"❌ 用户画像测试失败: {e}") - return False - - async def run_all_tests(self): - """运行所有测试""" - print("🚀 开始MemoryOS MCP服务器综合测试") - print(f"服务器脚本: {self.server_script}") - print(f"配置文件: {self.config_file}") - print("=" * 60) - - test_results = [] - - # 运行所有测试 - tests = [ - ("服务器初始化", self.test_server_initialization), - ("工具发现", self.test_tools_discovery), - ("添加记忆 (20轮)", self.test_add_memory_tool), - ("检索记忆", self.test_retrieve_memory_tool), - ("获取用户画像", self.test_get_user_profile_tool), - ] - - for test_name, test_func in tests: - try: - result = await test_func() - test_results.append({"name": test_name, "result": result, "error": None}) - except Exception as e: - test_results.append({"name": test_name, "result": False, "error": str(e)}) - - # 输出测试结果汇总 - print("\n" + "=" * 60) - print("📊 测试结果汇总:") - - passed_count = 0 - total_count = len(test_results) - - for test in test_results: - status = "✅ 通过" if test["result"] else "❌ 失败" - print(f" {status} - {test['name']}") - if test["error"]: - print(f" 错误: {test['error']}") - if test["result"]: - passed_count += 1 - - print(f"\n总计: {passed_count}/{total_count} 测试通过") - - if passed_count == total_count: - print("🎉 所有测试通过!MemoryOS MCP服务器工作正常") - else: - print("⚠️ 部分测试失败,请检查服务器配置和实现") - - return passed_count == total_count - -def main(): - """主函数""" - import argparse - - parser = argparse.ArgumentParser(description="MemoryOS MCP服务器综合测试") - parser.add_argument("--server", default="server_new.py", help="服务器脚本路径") - parser.add_argument("--config", default="config.json", help="配置文件路径") - - args = parser.parse_args() - - try: - tester = MemoryOSMCPTester(args.server, args.config) - success = asyncio.run(tester.run_all_tests()) - sys.exit(0 if success else 1) - except KeyboardInterrupt: - print("\n⚠️ 测试被用户中断") - sys.exit(1) - except Exception as e: - print(f"\n❌ 测试过程中发生严重错误: {e}") - import traceback - traceback.print_exc() - sys.exit(1) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/memoryos-mcp/test_simple.py b/memoryos-mcp/test_simple.py new file mode 100644 index 0000000..585423a --- /dev/null +++ b/memoryos-mcp/test_simple.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +""" +Simple MemoryOS MCP Server Test +- Insert 15 conversations +- Set short-term memory capacity to 2 +- Test 2 specific queries to verify memory retrieval works correctly. +""" + +import asyncio +import json +import sys +from pathlib import Path + +# Import MCP client +try: + from mcp import ClientSession, StdioServerParameters + from mcp.client.stdio import stdio_client + from mcp import types +except ImportError as e: + print(f"❌ Failed to import MCP client library: {e}") + print("Please install official MCP SDK: pip install mcp") + sys.exit(1) + +class SimpleMemoryOSTest: + """Simple MemoryOS MCP Server Test""" + + def __init__(self, server_script: str = "server_new.py", config_file: str = "config.json"): + self.server_script = Path(server_script) + self.config_file = Path(config_file) + + # Validate file existence + if not self.server_script.exists(): + raise FileNotFoundError(f"Server script not found: {self.server_script}") + if not self.config_file.exists(): + raise FileNotFoundError(f"Config file not found: {self.config_file}") + + def get_server_params(self): + """Get server parameters""" + return StdioServerParameters( + command=sys.executable, + args=[str(self.server_script), "--config", str(self.config_file)], + env=None + ) + + async def test_insert_conversations(self): + """Insert 15 conversations into MemoryOS""" + print("\n💾 Step 1: Insert 15 Conversations") + + # 15 test conversations + conversations = [ + {"user_input": "Hello, I'm Tom from San Francisco", "agent_response": "Hello Tom! Nice to meet you. San Francisco is a great city!"}, + {"user_input": "I work as a software engineer", "agent_response": "That's awesome! Software engineering is a fascinating field. What technologies do you work with?"}, + {"user_input": "I mainly use Python and JavaScript", "agent_response": "Great choice! Python and JavaScript are very popular and powerful languages."}, + {"user_input": "I'm interested in machine learning", "agent_response": "Machine learning is an exciting field! Are you focusing on any particular area?"}, + {"user_input": "I want to learn about neural networks", "agent_response": "Neural networks are the foundation of deep learning. Would you like to start with the basics?"}, + {"user_input": "Yes, please explain backpropagation", "agent_response": "Backpropagation is the key algorithm for training neural networks. It calculates gradients to update weights."}, + {"user_input": "I have a project idea about chatbots", "agent_response": "Chatbots are a great application of ML! What kind of chatbot are you thinking about?"}, + {"user_input": "A customer service chatbot", "agent_response": "Customer service chatbots can be very helpful. You'll need to consider intent recognition and response generation."}, + {"user_input": "What frameworks should I use?", "agent_response": "For chatbots, you could use frameworks like Rasa, Dialogflow, or build with PyTorch/TensorFlow."}, + {"user_input": "I prefer open source solutions", "agent_response": "Great! Rasa is an excellent open-source framework for building conversational AI."}, + {"user_input": "How do I handle multiple languages?", "agent_response": "For multilingual support, you can use translation APIs or train separate models for each language."}, + {"user_input": "I also like hiking in my free time", "agent_response": "Hiking is a wonderful hobby! San Francisco has some great trails nearby like Lands End and Mount Sutro."}, + {"user_input": "Do you know any good hiking spots?", "agent_response": "Yes! You might enjoy Muir Woods, Mount Tamalpais, or the coastal trails in Pacifica."}, + {"user_input": "I'm planning a weekend trip", "agent_response": "That sounds fun! Are you thinking of staying local or going somewhere further?"}, + {"user_input": "Maybe somewhere within 2 hours drive", "agent_response": "Perfect! You could visit Napa Valley, Santa Cruz, or even go to Lake Tahoe if you don't mind a slightly longer drive."} + ] + + server_params = self.get_server_params() + + try: + async with stdio_client(server_params) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + success_count = 0 + + for i, conversation in enumerate(conversations, 1): + print(f" Adding conversation {i:2d}/15...") + + result = await session.call_tool("add_memory", conversation) + + if hasattr(result, 'content') and result.content: + content = result.content[0] + if isinstance(content, types.TextContent): + response = json.loads(content.text) + if response.get("status") == "success": + success_count += 1 + print(f" ✅ Conversation {i:2d} added successfully") + else: + print(f" ❌ Conversation {i:2d} failed: {response.get('message', 'Unknown error')}") + else: + print(f" ❌ Conversation {i:2d} failed: Invalid response format") + else: + print(f" ❌ Conversation {i:2d} failed: No response content") + + # Brief delay + await asyncio.sleep(0.1) + + print(f"\n✅ Inserted {success_count}/15 conversations successfully") + return success_count == 15 + + except Exception as e: + print(f"❌ Failed to insert conversations: {e}") + return False + + async def test_memory_retrieval(self): + """Test memory retrieval with 2 specific queries""" + print("\n🔍 Step 2: Test Memory Retrieval") + + # Test queries + test_queries = [ + { + "query": "Tell me about Tom from San Francisco", + "description": "Query about the first conversation - should retrieve Tom's introduction", + "expected_content": ["Tom", "San Francisco", "software engineer"] + }, + { + "query": "What does the user want to learn about machine learning?", + "description": "Query about ML interests - should retrieve neural networks and chatbot discussions", + "expected_content": ["neural networks", "chatbot", "machine learning"] + } + ] + + server_params = self.get_server_params() + + try: + async with stdio_client(server_params) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + for i, test_query in enumerate(test_queries, 1): + print(f"\n--- Query {i}: {test_query['description']} ---") + print(f"Question: {test_query['query']}") + + query_params = { + "query": test_query["query"], + "relationship_with_user": "friend", + "style_hint": "helpful", + "max_results": 10 + } + + result = await session.call_tool("retrieve_memory", query_params) + + if hasattr(result, 'content') and result.content: + content = result.content[0] + if isinstance(content, types.TextContent): + response = json.loads(content.text) + if response.get("status") == "success": + print(f"✅ Query {i} successful!") + + # Display results + pages_found = response.get('total_pages_found', 0) + user_knowledge_found = response.get('total_user_knowledge_found', 0) + assistant_knowledge_found = response.get('total_assistant_knowledge_found', 0) + short_term_count = response.get('short_term_count', 0) + + print(f"📊 Results Summary:") + print(f" - Short-term memory: {short_term_count} items") + print(f" - Mid-term pages: {pages_found} items") + print(f" - User knowledge: {user_knowledge_found} items") + print(f" - Assistant knowledge: {assistant_knowledge_found} items") + + # Show some retrieved content + pages = response.get('retrieved_pages', []) + if pages: + print(f"📄 Retrieved Pages ({len(pages)} items):") + for j, page in enumerate(pages[:3], 1): # Show first 3 + user_input = page.get('user_input', '')[:50] + agent_response = page.get('agent_response', '')[:50] + print(f" {j}. User: {user_input}...") + print(f" Agent: {agent_response}...") + + # Check if expected content is found + full_text = json.dumps(response, ensure_ascii=False).lower() + found_expected = [] + for expected in test_query['expected_content']: + if expected.lower() in full_text: + found_expected.append(expected) + + if found_expected: + print(f"✅ Found expected content: {found_expected}") + else: + print(f"⚠️ Expected content not found: {test_query['expected_content']}") + + # Check if first conversation is retrievable + if i == 1: # First query about Tom + if pages_found > 0 or "tom" in full_text: + print("✅ First conversation successfully moved to mid-term memory and is retrievable!") + else: + print("⚠️ First conversation might not be in mid-term memory yet") + + else: + print(f"❌ Query {i} failed: {response.get('message', 'Unknown error')}") + else: + print(f"❌ Query {i} failed: Invalid response format") + else: + print(f"❌ Query {i} failed: No response content") + + await asyncio.sleep(0.5) # Longer delay between queries + + return True + + except Exception as e: + print(f"❌ Memory retrieval test failed: {e}") + return False + + async def run_test(self): + """Run the complete test""" + print("🚀 Starting Simple MemoryOS MCP Server Test") + print(f"Server script: {self.server_script}") + print(f"Config file: {self.config_file}") + print("=" * 60) + + # Step 1: Insert conversations + insert_success = await self.test_insert_conversations() + if not insert_success: + print("❌ Failed to insert conversations. Stopping test.") + return False + + # Wait a bit for processing + print("\n⏳ Waiting 3 seconds for memory processing...") + await asyncio.sleep(3) + + # Step 2: Test retrieval + retrieval_success = await self.test_memory_retrieval() + + # Summary + print("\n" + "=" * 60) + print("📊 Test Summary:") + print(f"✅ Conversation insertion: {'Passed' if insert_success else 'Failed'}") + print(f"✅ Memory retrieval: {'Passed' if retrieval_success else 'Failed'}") + + if insert_success and retrieval_success: + print("🎉 All tests passed! MemoryOS is working correctly.") + print("🔍 Key findings:") + print(" - Short-term memory capacity limit working (should be 2)") + print(" - Mid-term memory storage and retrieval working") + print(" - First conversation successfully retrievable from mid-term memory") + return True + else: + print("⚠️ Some tests failed. Please check the system.") + return False + +def main(): + """Main function""" + import argparse + + parser = argparse.ArgumentParser(description="Simple MemoryOS MCP Server Test") + parser.add_argument("--server", default="server_new.py", help="Server script path") + parser.add_argument("--config", default="config.json", help="Config file path") + + args = parser.parse_args() + + try: + tester = SimpleMemoryOSTest(args.server, args.config) + success = asyncio.run(tester.run_test()) + sys.exit(0 if success else 1) + except KeyboardInterrupt: + print("\n⚠️ Test interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n❌ Test failed with error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/memoryos-pypi/long_term.py b/memoryos-pypi/long_term.py index 1001f07..4e49b36 100644 --- a/memoryos-pypi/long_term.py +++ b/memoryos-pypi/long_term.py @@ -8,7 +8,7 @@ from utils import get_timestamp, get_embedding, normalize_vector, ensure_directory_exists class LongTermMemory: - def __init__(self, file_path, knowledge_capacity=100): + def __init__(self, file_path, knowledge_capacity=100, embedding_model_name: str = "all-MiniLM-L6-v2", embedding_model_kwargs: dict = None): self.file_path = file_path ensure_directory_exists(self.file_path) self.knowledge_capacity = knowledge_capacity @@ -16,6 +16,9 @@ def __init__(self, file_path, knowledge_capacity=100): # Use deques for knowledge bases to easily manage capacity self.knowledge_base = deque(maxlen=self.knowledge_capacity) # For general/user private knowledge self.assistant_knowledge = deque(maxlen=self.knowledge_capacity) # For assistant specific knowledge + + self.embedding_model_name = embedding_model_name + self.embedding_model_kwargs = embedding_model_kwargs if embedding_model_kwargs is not None else {} self.load() def update_user_profile(self, user_id, new_data, merge=True): @@ -48,7 +51,11 @@ def add_knowledge_entry(self, knowledge_text, knowledge_deque: deque, type_name= return # If deque is full, the oldest item is automatically removed when appending. - vec = get_embedding(knowledge_text) + vec = get_embedding( + knowledge_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) vec = normalize_vector(vec).tolist() entry = { "knowledge": knowledge_text, @@ -75,7 +82,11 @@ def _search_knowledge_deque(self, query, knowledge_deque: deque, threshold=0.1, if not knowledge_deque: return [] - query_vec = get_embedding(query) + query_vec = get_embedding( + query, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) query_vec = normalize_vector(query_vec) embeddings = [] diff --git a/memoryos-pypi/memoryos.py b/memoryos-pypi/memoryos.py index 58f266c..ae7da09 100644 --- a/memoryos-pypi/memoryos.py +++ b/memoryos-pypi/memoryos.py @@ -37,18 +37,32 @@ def __init__(self, user_id: str, long_term_knowledge_capacity=100, retrieval_queue_capacity=7, mid_term_heat_threshold=H_PROFILE_UPDATE_THRESHOLD, - mid_term_similarity_threshold=0.6, # 新增:中期记忆插入相似度阈值 - llm_model="gpt-4o-mini" # Unified model for all LLM operations + mid_term_similarity_threshold=0.6, + llm_model="gpt-4o-mini", + embedding_model_name: str = "all-MiniLM-L6-v2", + embedding_model_kwargs: dict = None ): self.user_id = user_id self.assistant_id = assistant_id self.data_storage_path = os.path.abspath(data_storage_path) self.llm_model = llm_model - os.environ["llm_model"]= llm_model self.mid_term_similarity_threshold = mid_term_similarity_threshold + self.embedding_model_name = embedding_model_name + + # Smart defaults for embedding_model_kwargs + if embedding_model_kwargs is None: + if 'bge-m3' in self.embedding_model_name.lower(): + print("INFO: Detected bge-m3 model, defaulting embedding_model_kwargs to {'use_fp16': True}") + self.embedding_model_kwargs = {'use_fp16': True} + else: + self.embedding_model_kwargs = {} + else: + self.embedding_model_kwargs = embedding_model_kwargs + print(f"Initializing Memoryos for user '{self.user_id}' and assistant '{self.assistant_id}'. Data path: {self.data_storage_path}") print(f"Using unified LLM model: {self.llm_model}") + print(f"Using embedding model: {self.embedding_model_name} with kwargs: {self.embedding_model_kwargs}") # Initialize OpenAI Client self.client = OpenAIClient(api_key=openai_api_key, base_url=openai_base_url) @@ -71,11 +85,27 @@ def __init__(self, user_id: str, # Initialize Memory Modules for User self.short_term_memory = ShortTermMemory(file_path=user_short_term_path, max_capacity=short_term_capacity) - self.mid_term_memory = MidTermMemory(file_path=user_mid_term_path, client=self.client, max_capacity=mid_term_capacity) - self.user_long_term_memory = LongTermMemory(file_path=user_long_term_path, knowledge_capacity=long_term_knowledge_capacity) + self.mid_term_memory = MidTermMemory( + file_path=user_mid_term_path, + client=self.client, + max_capacity=mid_term_capacity, + embedding_model_name=self.embedding_model_name, + embedding_model_kwargs=self.embedding_model_kwargs + ) + self.user_long_term_memory = LongTermMemory( + file_path=user_long_term_path, + knowledge_capacity=long_term_knowledge_capacity, + embedding_model_name=self.embedding_model_name, + embedding_model_kwargs=self.embedding_model_kwargs + ) # Initialize Memory Module for Assistant Knowledge - self.assistant_long_term_memory = LongTermMemory(file_path=assistant_long_term_path, knowledge_capacity=long_term_knowledge_capacity) + self.assistant_long_term_memory = LongTermMemory( + file_path=assistant_long_term_path, + knowledge_capacity=long_term_knowledge_capacity, + embedding_model_name=self.embedding_model_name, + embedding_model_kwargs=self.embedding_model_kwargs + ) # Initialize Orchestration Modules self.updater = Updater(short_term_memory=self.short_term_memory, @@ -329,4 +359,4 @@ def force_mid_term_analysis(self): self.mid_term_heat_threshold = original_threshold # Restore original threshold def __repr__(self): - return f"" + return f"" \ No newline at end of file diff --git a/memoryos-pypi/mid_term.py b/memoryos-pypi/mid_term.py index 4147112..7c42fa9 100644 --- a/memoryos-pypi/mid_term.py +++ b/memoryos-pypi/mid_term.py @@ -8,12 +8,12 @@ try: from .utils import ( get_timestamp, generate_id, get_embedding, normalize_vector, - llm_extract_keywords, compute_time_decay, ensure_directory_exists, OpenAIClient + compute_time_decay, ensure_directory_exists, OpenAIClient ) except ImportError: from utils import ( get_timestamp, generate_id, get_embedding, normalize_vector, - llm_extract_keywords, compute_time_decay, ensure_directory_exists, OpenAIClient + compute_time_decay, ensure_directory_exists, OpenAIClient ) # Heat computation constants (can be tuned or made configurable) @@ -35,7 +35,7 @@ def compute_segment_heat(session, alpha=HEAT_ALPHA, beta=HEAT_BETA, gamma=HEAT_G return alpha * N_visit + beta * L_interaction + gamma * R_recency class MidTermMemory: - def __init__(self, file_path: str, client: OpenAIClient, max_capacity=2000): + def __init__(self, file_path: str, client: OpenAIClient, max_capacity=2000, embedding_model_name: str = "all-MiniLM-L6-v2", embedding_model_kwargs: dict = None): self.file_path = file_path ensure_directory_exists(self.file_path) self.client = client @@ -43,6 +43,9 @@ def __init__(self, file_path: str, client: OpenAIClient, max_capacity=2000): self.sessions = {} # {session_id: session_object} self.access_frequency = defaultdict(int) # {session_id: access_count_for_lfu} self.heap = [] # Min-heap storing (-H_segment, session_id) for hottest segments + + self.embedding_model_name = embedding_model_name + self.embedding_model_kwargs = embedding_model_kwargs if embedding_model_kwargs is not None else {} self.load() def get_page_by_id(self, page_id): @@ -95,11 +98,15 @@ def evict_lfu(self): self.save() print(f"MidTermMemory: Evicted session {lfu_sid}.") - def add_session(self, summary, details): + def add_session(self, summary, details, summary_keywords=None): session_id = generate_id("session") - summary_vec = get_embedding(summary) + summary_vec = get_embedding( + summary, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) summary_vec = normalize_vector(summary_vec).tolist() - summary_keywords = list(llm_extract_keywords(summary, client=self.client)) + summary_keywords = summary_keywords if summary_keywords is not None else [] processed_details = [] for page_data in details: @@ -117,17 +124,20 @@ def add_session(self, summary, details): else: print(f"MidTermMemory: Computing new embedding for page {page_id}") full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" - inp_vec = get_embedding(full_text) + inp_vec = get_embedding( + full_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) inp_vec = normalize_vector(inp_vec).tolist() - # 检查是否已有keywords,避免重复计算 + # 使用已有keywords或设置为空(由multi-summary提供) if "page_keywords" in page_data and page_data["page_keywords"]: - print(f"MidTermMemory: Reusing existing keywords for page {page_id}") + print(f"MidTermMemory: Using existing keywords for page {page_id}") page_keywords = page_data["page_keywords"] else: - print(f"MidTermMemory: Computing new keywords for page {page_id}") - full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" - page_keywords = list(llm_extract_keywords(full_text, client=self.client)) + print(f"MidTermMemory: Setting empty keywords for page {page_id} (will be filled by multi-summary)") + page_keywords = [] processed_page = { **page_data, # Carry over existing fields like user_input, agent_response, timestamp @@ -179,9 +189,13 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page similarity_threshold=0.6, keyword_similarity_alpha=1.0): if not self.sessions: # If no existing sessions, just add as a new one print("MidTermMemory: No existing sessions. Adding new session directly.") - return self.add_session(summary_for_new_pages, pages_to_insert) + return self.add_session(summary_for_new_pages, pages_to_insert, keywords_for_new_pages) - new_summary_vec = get_embedding(summary_for_new_pages) + new_summary_vec = get_embedding( + summary_for_new_pages, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) new_summary_vec = normalize_vector(new_summary_vec) best_sid = None @@ -227,17 +241,20 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page else: print(f"MidTermMemory: Computing new embedding for page {page_id}") full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" - inp_vec = get_embedding(full_text) + inp_vec = get_embedding( + full_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) inp_vec = normalize_vector(inp_vec).tolist() - # 检查是否已有keywords,避免重复计算 + # 使用已有keywords或继承session的keywords if "page_keywords" in page_data and page_data["page_keywords"]: - print(f"MidTermMemory: Reusing existing keywords for page {page_id}") + print(f"MidTermMemory: Using existing keywords for page {page_id}") page_keywords_current = page_data["page_keywords"] else: - print(f"MidTermMemory: Computing new keywords for page {page_id}") - full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" - page_keywords_current = list(llm_extract_keywords(full_text, client=self.client)) + print(f"MidTermMemory: Using session keywords for page {page_id}") + page_keywords_current = keywords_for_new_pages processed_page = { **page_data, # Carry over existing fields @@ -257,16 +274,20 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page return best_sid else: print(f"MidTermMemory: No suitable session to merge (best score {best_overall_score:.2f} < threshold {similarity_threshold}). Creating new session.") - return self.add_session(summary_for_new_pages, pages_to_insert) + return self.add_session(summary_for_new_pages, pages_to_insert, keywords_for_new_pages) def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_similarity_threshold=0.1, top_k_sessions=5, keyword_alpha=1.0, recency_tau_search=3600): if not self.sessions: return [] - query_vec = get_embedding(query_text) + query_vec = get_embedding( + query_text, + model_name=self.embedding_model_name, + **self.embedding_model_kwargs + ) query_vec = normalize_vector(query_vec) - query_keywords = set(llm_extract_keywords(query_text, client=self.client)) + query_keywords = set() # Keywords extraction removed, relying on semantic similarity candidate_sessions = [] session_ids = list(self.sessions.keys()) diff --git a/memoryos-pypi/prompts.py b/memoryos-pypi/prompts.py index 46d68fb..48fbbd6 100644 --- a/memoryos-pypi/prompts.py +++ b/memoryos-pypi/prompts.py @@ -203,9 +203,7 @@ EXTRACT_THEME_SYSTEM_PROMPT = "You are an expert in extracting the main theme from a text. Provide a concise theme." EXTRACT_THEME_USER_PROMPT = "Please extract the main theme from the following text:\n{answer_text}\n\nTheme:" -# Prompt for extracting keywords (from utils.py, llm_extract_keywords) -EXTRACT_KEYWORDS_SYSTEM_PROMPT = "You are an expert in keyword extraction. Extract only the most essential keywords from the text. Return 3-5 keywords maximum as a comma-separated list. Be extremely selective." -EXTRACT_KEYWORDS_USER_PROMPT = "Please extract the 3-5 most important keywords from the following text. Be very selective and concise:\n{text}\n\nKeywords:" + # Prompt for conversation continuity check (from dynamic_update.py, _is_conversation_continuing) CONTINUITY_CHECK_SYSTEM_PROMPT = "You are a conversation continuity detector. Return ONLY 'true' or 'false'." diff --git a/memoryos-pypi/requirements.txt b/memoryos-pypi/requirements.txt index f95b261..0c8a8d4 100644 --- a/memoryos-pypi/requirements.txt +++ b/memoryos-pypi/requirements.txt @@ -1,10 +1,12 @@ # MemoryOS Core Dependencies # Core scientific computing and ML libraries numpy==1.24.* -sentence-transformers>=2.2.0,<3.0.0 +sentence-transformers>=2.7.0,<3.0.0 # Updated for Qwen model support +transformers>=4.51.0 # Required for newer sentence-transformer features +FlagEmbedding>=1.2.9 # For BGE-M3 model support faiss-gpu>=1.7.0,<2.0.0 - +httpx[socks] openai # Web framework (for demo) flask>=2.0.0,<3.0.0 diff --git a/memoryos-pypi/test.py b/memoryos-pypi/test.py index 23de687..7eb7d4f 100644 --- a/memoryos-pypi/test.py +++ b/memoryos-pypi/test.py @@ -24,10 +24,11 @@ def simple_demo(): llm_model=LLM_MODEL, assistant_id=ASSISTANT_ID, short_term_capacity=7, - mid_term_heat_threshold=5, + mid_term_heat_threshold=1000, retrieval_queue_capacity=10, long_term_knowledge_capacity=100, - mid_term_similarity_threshold=0.6 + mid_term_similarity_threshold=0.6, + embedding_model_name="" ) print("MemoryOS initialized successfully!\n") except Exception as e: @@ -41,6 +42,43 @@ def simple_demo(): user_input="Hi! I'm Tom, I work as a data scientist in San Francisco.", agent_response="Hello Tom! Nice to meet you. Data science is such an exciting field. What kind of data do you work with?" ) + memo.add_memory( + user_input="I love hiking on weekends, especially in the mountains.", + agent_response="That sounds wonderful! Do you have a favorite trail or mountain you like to visit?" + ) + memo.add_memory( + user_input="Recently, I've been reading a lot about artificial intelligence.", + agent_response="AI is a fascinating topic! Are you interested in any specific area of AI?" + ) + memo.add_memory( + user_input="My favorite food is sushi, especially salmon nigiri.", + agent_response="Sushi is delicious! Have you ever tried making it at home?" + ) + memo.add_memory( + user_input="I have a golden retriever named Max.", + agent_response="Max must be adorable! How old is he?" + ) + memo.add_memory( + user_input="I traveled to Japan last year and visited Tokyo and Kyoto.", + agent_response="That must have been an amazing experience! What did you enjoy most about Japan?" + ) + memo.add_memory( + user_input="I'm currently learning how to play the guitar.", + agent_response="That's awesome! What songs are you practicing right now?" + ) + memo.add_memory( + user_input="I usually start my day with a cup of black coffee.", + agent_response="Coffee is a great way to kickstart the day! Do you prefer it hot or iced?" + ) + memo.add_memory( + user_input="My favorite movie genre is science fiction.", + agent_response="Sci-fi movies can be so imaginative! Do you have a favorite film?" + ) + memo.add_memory( + user_input="I enjoy painting landscapes in my free time.", + agent_response="Painting is such a creative hobby! Do you use oils, acrylics, or watercolors?" + ) + test_query = "What do you remember about my job?" print(f"User: {test_query}") diff --git a/memoryos-pypi/updater.py b/memoryos-pypi/updater.py index 4c2da85..831fe8a 100644 --- a/memoryos-pypi/updater.py +++ b/memoryos-pypi/updater.py @@ -2,7 +2,7 @@ from .utils import ( generate_id, get_timestamp, gpt_generate_multi_summary, check_conversation_continuity, generate_page_meta_info, OpenAIClient, - llm_extract_keywords, run_parallel_tasks + run_parallel_tasks ) from .short_term import ShortTermMemory from .mid_term import MidTermMemory @@ -11,7 +11,7 @@ from utils import ( generate_id, get_timestamp, gpt_generate_multi_summary, check_conversation_continuity, generate_page_meta_info, OpenAIClient, - llm_extract_keywords, run_parallel_tasks + run_parallel_tasks ) from short_term import ShortTermMemory from mid_term import MidTermMemory @@ -36,45 +36,29 @@ def __init__(self, self.llm_model = llm_model def _process_page_embedding_and_keywords(self, page_data): - """并行处理单个页面的embedding和keywords生成""" + """处理单个页面的embedding生成(关键词由multi-summary提供)""" page_id = page_data.get("page_id", generate_id("page")) - # 检查是否已有embedding和keywords - if "page_embedding" in page_data and page_data["page_embedding"] and \ - "page_keywords" in page_data and page_data["page_keywords"]: - print(f"Updater: Page {page_id} already has embedding and keywords, skipping computation") + # 检查是否已有embedding + if "page_embedding" in page_data and page_data["page_embedding"]: + print(f"Updater: Page {page_id} already has embedding, skipping computation") return page_data - full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" - - # 并行计算embedding和keywords(如果需要) - tasks = [] + # 只处理embedding,关键词由multi-summary统一提供 if not ("page_embedding" in page_data and page_data["page_embedding"]): - tasks.append(('embedding', lambda: self._get_embedding_for_page(full_text))) - - if not ("page_keywords" in page_data and page_data["page_keywords"]): - tasks.append(('keywords', lambda: llm_extract_keywords(full_text, client=self.client))) + full_text = f"User: {page_data.get('user_input','')} Assistant: {page_data.get('agent_response','')}" + try: + embedding = self._get_embedding_for_page(full_text) + if embedding is not None: + from .utils import normalize_vector + page_data["page_embedding"] = normalize_vector(embedding).tolist() + print(f"Updater: Generated embedding for page {page_id}") + except Exception as e: + print(f"Error generating embedding for page {page_id}: {e}") - if tasks: - with ThreadPoolExecutor(max_workers=2) as executor: - futures = {executor.submit(task[1]): task[0] for task in tasks} - results = {} - - for future in as_completed(futures): - task_type = futures[future] - try: - results[task_type] = future.result() - except Exception as e: - print(f"Error in {task_type} computation for page {page_id}: {e}") - results[task_type] = None - - # 更新页面数据 - if 'embedding' in results and results['embedding'] is not None: - from .utils import normalize_vector - page_data["page_embedding"] = normalize_vector(results['embedding']).tolist() - - if 'keywords' in results and results['keywords'] is not None: - page_data["page_keywords"] = list(results['keywords']) + # 设置空的关键词列表(将由multi-summary的关键词填充) + if "page_keywords" not in page_data: + page_data["page_keywords"] = [] return page_data @@ -203,7 +187,7 @@ def process_short_term_to_mid_term(self): # Fallback: if no summaries, add as one session or handle as a single block print("Updater: No specific themes from multi-summary. Adding batch as a general session.") fallback_summary = "General conversation segment from short-term memory." - fallback_keywords = llm_extract_keywords(input_text_for_summary, self.client, model=self.llm_model) if input_text_for_summary else [] + fallback_keywords = [] # Use empty keywords since multi-summary failed self.mid_term_memory.insert_pages_into_session( summary_for_new_pages=fallback_summary, keywords_for_new_pages=list(fallback_keywords), diff --git a/memoryos-pypi/utils.py b/memoryos-pypi/utils.py index 097a294..40d95ff 100644 --- a/memoryos-pypi/utils.py +++ b/memoryos-pypi/utils.py @@ -5,6 +5,8 @@ from sentence_transformers import SentenceTransformer import json import os +import inspect +from functools import wraps try: from . import prompts # 尝试相对导入 except ImportError: @@ -73,9 +75,7 @@ def batch_chat_completion(self, requests): futures = [] for req in requests: future = self.chat_completion_async( - model=req.get("model", - model=os.environ.get("llm_model") - ), + model=req.get("model", "gpt-4o-mini"), messages=req["messages"], temperature=req.get("temperature", 0.7), max_tokens=req.get("max_tokens", 2000) @@ -129,33 +129,88 @@ def ensure_directory_exists(path): _model_cache = {} _embedding_cache = {} # 添加embedding缓存 -def get_embedding(text, model_name="all-MiniLM-L6-v2", use_cache=True): - # 创建缓存键 +def _get_valid_kwargs(func, kwargs): + """Helper to filter kwargs for a given function's signature.""" + try: + sig = inspect.signature(func) + param_keys = set(sig.parameters.keys()) + return {k: v for k, v in kwargs.items() if k in param_keys} + except (ValueError, TypeError): + # Fallback for functions/methods where signature inspection is not straightforward + return kwargs + +def get_embedding(text, model_name="all-MiniLM-L6-v2", use_cache=True, **kwargs): + """ + 获取文本的embedding向量。 + 支持多种主流模型,能自动适应不同库的调用方式。 + - SentenceTransformer模型: e.g., 'all-MiniLM-L6-v2', 'Qwen/Qwen3-Embedding-0.6B' + - FlagEmbedding模型: e.g., 'BAAI/bge-m3' + + :param text: 输入文本。 + :param model_name: Hugging Face上的模型名称。 + :param use_cache: 是否使用内存缓存。 + :param kwargs: 传递给模型构造函数或encode方法的额外参数。 + - for Qwen: `model_kwargs`, `tokenizer_kwargs`, `prompt_name="query"` + - for BGE-M3: `use_fp16=True`, `max_length=8192` + :return: 文本的embedding向量 (numpy array)。 + """ + model_config_key = json.dumps({"model_name": model_name, **kwargs}, sort_keys=True) + if use_cache: - cache_key = f"{model_name}::{hash(text)}" + cache_key = f"{model_config_key}::{hash(text)}" if cache_key in _embedding_cache: - print(f"Using cached embedding for text: {text[:30]}...") return _embedding_cache[cache_key] - if model_name not in _model_cache: - print(f"Loading sentence transformer model: {model_name}") - _model_cache[model_name] = SentenceTransformer(model_name) - model = _model_cache[model_name] - embedding = model.encode([text], convert_to_numpy=True)[0] + # --- Model Loading --- + model_init_key = json.dumps({"model_name": model_name, **{k:v for k,v in kwargs.items() if k not in ['batch_size', 'max_length']}}, sort_keys=True) + if model_init_key not in _model_cache: + print(f"Loading model: {model_name}...") + if 'bge-m3' in model_name.lower(): + try: + from FlagEmbedding import BGEM3FlagModel + init_kwargs = _get_valid_kwargs(BGEM3FlagModel.__init__, kwargs) + print(f"-> Using BGEM3FlagModel with init kwargs: {init_kwargs}") + _model_cache[model_init_key] = BGEM3FlagModel(model_name, **init_kwargs) + except ImportError: + raise ImportError("Please install FlagEmbedding: 'pip install -U FlagEmbedding' to use bge-m3 model.") + else: # Default handler for SentenceTransformer-based models (like Qwen, all-MiniLM, etc.) + try: + from sentence_transformers import SentenceTransformer + init_kwargs = _get_valid_kwargs(SentenceTransformer.__init__, kwargs) + print(f"-> Using SentenceTransformer with init kwargs: {init_kwargs}") + _model_cache[model_init_key] = SentenceTransformer(model_name, **init_kwargs) + except ImportError: + raise ImportError("Please install sentence-transformers: 'pip install -U sentence-transformers' to use this model.") + + model = _model_cache[model_init_key] - # 缓存结果 + # --- Encoding --- + embedding = None + if 'bge-m3' in model_name.lower(): + encode_kwargs = _get_valid_kwargs(model.encode, kwargs) + print(f"-> Encoding with BGEM3FlagModel using kwargs: {encode_kwargs}") + result = model.encode([text], **encode_kwargs) + embedding = result['dense_vecs'][0] + else: # Default to SentenceTransformer-based models + encode_kwargs = _get_valid_kwargs(model.encode, kwargs) + print(f"-> Encoding with SentenceTransformer using kwargs: {encode_kwargs}") + embedding = model.encode([text], **encode_kwargs)[0] + if use_cache: + cache_key = f"{model_config_key}::{hash(text)}" _embedding_cache[cache_key] = embedding - # 限制缓存大小,避免内存泄漏 - if len(_embedding_cache) > 10000: # 最多缓存10000个embedding - # 删除一些旧的缓存项 + if len(_embedding_cache) > 10000: keys_to_remove = list(_embedding_cache.keys())[:1000] for key in keys_to_remove: - del _embedding_cache[key] + try: + del _embedding_cache[key] + except KeyError: + pass print("Cleaned embedding cache to prevent memory overflow") return embedding + def clear_embedding_cache(): """清空embedding缓存""" global _embedding_cache @@ -185,7 +240,6 @@ def compute_time_decay(event_timestamp_str, current_timestamp_str, tau_hours=24) # ---- LLM-based Utility Functions ---- def gpt_summarize_dialogs(dialogs, client: OpenAIClient, model="gpt-4o-mini"): - model=os.environ.get("llm_model") or model dialog_text = "\n".join([f"User: {d.get('user_input','')} Assistant: {d.get('agent_response','')}" for d in dialogs]) messages = [ {"role": "system", "content": prompts.SUMMARIZE_DIALOGS_SYSTEM_PROMPT}, @@ -195,7 +249,6 @@ def gpt_summarize_dialogs(dialogs, client: OpenAIClient, model="gpt-4o-mini"): return client.chat_completion(model=model, messages=messages) def gpt_generate_multi_summary(text, client: OpenAIClient, model="gpt-4o-mini"): - model=os.environ.get("llm_model") or model messages = [ {"role": "system", "content": prompts.MULTI_SUMMARY_SYSTEM_PROMPT}, {"role": "user", "content": prompts.MULTI_SUMMARY_USER_PROMPT.format(text=text)} @@ -215,7 +268,6 @@ def gpt_user_profile_analysis(dialogs, client: OpenAIClient, model="gpt-4o-mini" Analyze and update user personality profile from dialogs 结合现有画像和新对话,直接输出更新后的完整画像 """ - model=os.environ.get("llm_model") or model conversation = "\n".join([f"User: {d.get('user_input','')} (Timestamp: {d.get('timestamp', '')})\nAssistant: {d.get('agent_response','')} (Timestamp: {d.get('timestamp', '')})" for d in dialogs]) messages = [ {"role": "system", "content": prompts.PERSONALITY_ANALYSIS_SYSTEM_PROMPT}, @@ -231,7 +283,6 @@ def gpt_user_profile_analysis(dialogs, client: OpenAIClient, model="gpt-4o-mini" def gpt_knowledge_extraction(dialogs, client: OpenAIClient, model="gpt-4o-mini"): """Extract user private data and assistant knowledge from dialogs""" - model=os.environ.get("llm_model") or model conversation = "\n".join([f"User: {d.get('user_input','')} (Timestamp: {d.get('timestamp', '')})\nAssistant: {d.get('agent_response','')} (Timestamp: {d.get('timestamp', '')})" for d in dialogs]) messages = [ {"role": "system", "content": prompts.KNOWLEDGE_EXTRACTION_SYSTEM_PROMPT}, @@ -276,7 +327,6 @@ def gpt_personality_analysis(dialogs, client: OpenAIClient, model="gpt-4o-mini", This function is kept for backward compatibility only. """ # Call the new functions - model=os.environ.get("llm_model") or model profile = gpt_user_profile_analysis(dialogs, client, model, known_user_traits) knowledge_data = gpt_knowledge_extraction(dialogs, client, model) @@ -288,7 +338,6 @@ def gpt_personality_analysis(dialogs, client: OpenAIClient, model="gpt-4o-mini", def gpt_update_profile(old_profile, new_analysis, client: OpenAIClient, model="gpt-4o-mini"): - model=os.environ.get("llm_model") or model messages = [ {"role": "system", "content": prompts.UPDATE_PROFILE_SYSTEM_PROMPT}, {"role": "user", "content": prompts.UPDATE_PROFILE_USER_PROMPT.format(old_profile=old_profile, new_analysis=new_analysis)} @@ -297,8 +346,6 @@ def gpt_update_profile(old_profile, new_analysis, client: OpenAIClient, model="g return client.chat_completion(model=model, messages=messages) def gpt_extract_theme(answer_text, client: OpenAIClient, model="gpt-4o-mini"): - model=os.environ.get("llm_model") or model - messages = [ {"role": "system", "content": prompts.EXTRACT_THEME_SYSTEM_PROMPT}, {"role": "user", "content": prompts.EXTRACT_THEME_USER_PROMPT.format(answer_text=answer_text)} @@ -306,24 +353,13 @@ def gpt_extract_theme(answer_text, client: OpenAIClient, model="gpt-4o-mini"): print("Calling LLM to extract theme...") return client.chat_completion(model=model, messages=messages) -def llm_extract_keywords(text, client: OpenAIClient, model="gpt-4o-mini"): - - model=os.environ.get("llm_model") or model - messages = [ - {"role": "system", "content": prompts.EXTRACT_KEYWORDS_SYSTEM_PROMPT}, - {"role": "user", "content": prompts.EXTRACT_KEYWORDS_USER_PROMPT.format(text=text)} - ] - print("Calling LLM to extract keywords...") - response = client.chat_completion(model=model, messages=messages) - return [kw.strip() for kw in response.split(',') if kw.strip()] # ---- Functions from dynamic_update.py (to be used by Updater class) ---- def check_conversation_continuity(previous_page, current_page, client: OpenAIClient, model="gpt-4o-mini"): prev_user = previous_page.get("user_input", "") if previous_page else "" prev_agent = previous_page.get("agent_response", "") if previous_page else "" - model=os.environ.get("llm_model") or model - + user_prompt = prompts.CONTINUITY_CHECK_USER_PROMPT.format( prev_user=prev_user, prev_agent=prev_agent, @@ -338,7 +374,6 @@ def check_conversation_continuity(previous_page, current_page, client: OpenAICli return response.strip().lower() == "true" def generate_page_meta_info(last_page_meta, current_page, client: OpenAIClient, model="gpt-4o-mini"): - model=os.environ.get("llm_model") or model current_conversation = f"User: {current_page.get('user_input', '')}\nAssistant: {current_page.get('agent_response', '')}" user_prompt = prompts.META_INFO_USER_PROMPT.format( last_meta=last_page_meta if last_page_meta else "None",