From 64911c92d8f490c07c50e2c911bb01accdb87df8 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin <59914433+mdmohsin7@users.noreply.github.com> Date: Thu, 27 Feb 2025 23:33:12 +0530 Subject: [PATCH 1/5] catch exception in sync_update_persona_prompt --- backend/utils/apps.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/backend/utils/apps.py b/backend/utils/apps.py index 05f7192f288..db344825fab 100644 --- a/backend/utils/apps.py +++ b/backend/utils/apps.py @@ -77,7 +77,6 @@ def get_available_apps(uid: str, include_reviews: bool = False) -> List[App]: if cachedApps := get_generic_cache('get_public_approved_apps_data'): print('get_public_approved_plugins_data from cache----------------------------') public_approved_data = cachedApps - public_approved_data = get_public_approved_apps_db() public_unapproved_data = get_public_unapproved_apps(uid) private_data = get_private_apps(uid) pass @@ -392,7 +391,7 @@ async def generate_persona_prompt(uid: str, persona: dict): tweets = None if "twitter" in persona['connected_accounts']: - print("twitter in connected accounts---------------------------") + print("twitter is in connected accounts") # Get latest tweets tweets = await get_twitter_timeline(persona['twitter']['username']) tweets = [{'tweet': tweet['text'], 'posted_at': tweet['created_at']} for tweet in tweets['timeline']] @@ -472,13 +471,15 @@ def sync_update_persona_prompt(persona: dict): asyncio.set_event_loop(loop) try: return loop.run_until_complete(update_persona_prompt(persona)) + except Exception as e: + print(f"Error in update_persona_prompt for persona {persona.get('id', 'unknown')}: {str(e)}") + return None finally: loop.close() async def update_persona_prompt(persona: dict): """Update a persona's chat prompt with latest facts and memories.""" - # Get latest facts and user info facts = get_facts(persona['uid'], limit=250) user_name = get_user_name(persona['uid']) @@ -531,7 +532,7 @@ async def update_persona_prompt(persona: dict): # Add a guideline about tweets if they exist if condensed_tweets: - persona_prompt += "7. Utilize condensed tweets to enhance authenticity, incorporating common expressions, opinions, and phrasing from {user_name}’s social media presence.\n" + persona_prompt += "7. Utilize condensed tweets to enhance authenticity, incorporating common expressions, opinions, and phrasing from {user_name}'s social media presence.\n" persona_prompt += f""" **Rules:** @@ -556,6 +557,7 @@ async def update_persona_prompt(persona: dict): persona['persona_prompt'] = persona_prompt persona['updated_at'] = datetime.now(timezone.utc) + update_persona_in_db(persona) delete_app_cache_by_id(persona['id']) From 5b7f31651cb4641dc6c3dfd06c7a4af8199143e8 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin <59914433+mdmohsin7@users.noreply.github.com> Date: Thu, 27 Feb 2025 23:33:24 +0530 Subject: [PATCH 2/5] uncomment --- backend/utils/memories/process_memory.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/backend/utils/memories/process_memory.py b/backend/utils/memories/process_memory.py index d891115a360..d97e45fd7eb 100644 --- a/backend/utils/memories/process_memory.py +++ b/backend/utils/memories/process_memory.py @@ -20,7 +20,7 @@ from models.task import Task, TaskStatus, TaskAction, TaskActionProvider from models.trend import Trend from models.notification_message import NotificationMessage -from utils.apps import get_available_apps, update_persona_prompt, sync_update_persona_prompt +from utils.apps import get_available_apps, sync_update_persona_prompt from utils.llm import obtain_emotional_message, retrieve_metadata_fields_from_transcript from utils.llm import summarize_open_glass, get_transcript_structure, generate_embedding, \ get_plugin_result, should_discard_memory, summarize_experience_text, new_facts_extractor, \ @@ -192,15 +192,15 @@ def process_memory( if not is_reprocess: threading.Thread(target=memory_created_webhook, args=(uid, memory,)).start() # Update persona prompts with new memory - # personas = get_omi_personas_by_uid_db(uid) - # if personas: - # threads = [] - # print('updating personas after memory creation') - # for persona in personas: - # threads.append(threading.Thread(target=sync_update_persona_prompt, args=(persona,))) - # - # [t.start() for t in threads] - # [t.join() for t in threads] + personas = get_omi_personas_by_uid_db(uid) + if personas: + threads = [] + print('updating personas after memory creation') + for persona in personas: + threads.append(threading.Thread(target=sync_update_persona_prompt, args=(persona,))) + + [t.start() for t in threads] + [t.join() for t in threads] # TODO: trigger external integrations here too From 256857602f4221ac64b318abc3b9652cf78c1be4 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin <59914433+mdmohsin7@users.noreply.github.com> Date: Thu, 27 Feb 2025 23:33:43 +0530 Subject: [PATCH 3/5] add timeout for twitter --- backend/routers/apps.py | 2 +- backend/utils/social.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/routers/apps.py b/backend/routers/apps.py index df12e858403..9552510dc00 100644 --- a/backend/routers/apps.py +++ b/backend/routers/apps.py @@ -27,7 +27,7 @@ from utils.other import endpoints as auth from models.app import App from utils.other.storage import upload_plugin_logo, delete_plugin_logo, upload_app_thumbnail, get_app_thumbnail_url -from utils.social import get_twitter_profile, get_twitter_timeline, verify_latest_tweet, \ +from utils.social import get_twitter_profile, verify_latest_tweet, \ upsert_persona_from_twitter_profile, add_twitter_to_persona router = APIRouter() diff --git a/backend/utils/social.py b/backend/utils/social.py index c3d06cbe1cf..52280bf4d10 100644 --- a/backend/utils/social.py +++ b/backend/utils/social.py @@ -20,7 +20,7 @@ async def get_twitter_profile(handle: str) -> Dict[str, Any]: "X-RapidAPI-Host": rapid_api_host } - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(url, headers=headers) response.raise_for_status() data = response.json() @@ -36,7 +36,7 @@ async def get_twitter_timeline(handle: str) -> Dict[str, Any]: "X-RapidAPI-Host": rapid_api_host } - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(url, headers=headers) response.raise_for_status() data = response.json() @@ -52,7 +52,7 @@ async def verify_latest_tweet(username: str, handle: str) -> Dict[str, Any]: "X-RapidAPI-Host": rapid_api_host } - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(url, headers=headers) response.raise_for_status() data = response.json() From 4038e5ffea1749ff7d521cf399cb61ed7d1ce363 Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin <59914433+mdmohsin7@users.noreply.github.com> Date: Fri, 7 Mar 2025 16:44:22 +0530 Subject: [PATCH 4/5] update omi personas in a seaprate thread asynchrounously --- backend/utils/memories/process_memory.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/backend/utils/memories/process_memory.py b/backend/utils/memories/process_memory.py index d97e45fd7eb..105b771b590 100644 --- a/backend/utils/memories/process_memory.py +++ b/backend/utils/memories/process_memory.py @@ -174,6 +174,18 @@ def save_structured_vector(uid: str, memory: Memory, update_only: bool = False): update_vector_metadata(uid, memory.id, metadata) +def _update_personas_async(uid: str): + print(f"[PERSONAS] Starting persona updates in background thread for uid={uid}") + personas = get_omi_personas_by_uid_db(uid) + if personas: + threads = [] + for persona in personas: + threads.append(threading.Thread(target=sync_update_persona_prompt, args=(persona,))) + + [t.start() for t in threads] + [t.join() for t in threads] + + def process_memory( uid: str, language_code: str, memory: Union[Memory, CreateMemory, WorkflowCreateMemory], force_process: bool = False, is_reprocess: bool = False @@ -192,16 +204,7 @@ def process_memory( if not is_reprocess: threading.Thread(target=memory_created_webhook, args=(uid, memory,)).start() # Update persona prompts with new memory - personas = get_omi_personas_by_uid_db(uid) - if personas: - threads = [] - print('updating personas after memory creation') - for persona in personas: - threads.append(threading.Thread(target=sync_update_persona_prompt, args=(persona,))) - - [t.start() for t in threads] - [t.join() for t in threads] - + threading.Thread(target=_update_personas_async, args=(uid,)).start() # TODO: trigger external integrations here too From c667da46fcd55cced8d4f962b824540d6e6b338d Mon Sep 17 00:00:00 2001 From: Mohammed Mohsin <59914433+mdmohsin7@users.noreply.github.com> Date: Mon, 17 Mar 2025 01:05:54 +0530 Subject: [PATCH 5/5] add comments for testing --- backend/utils/memories/process_memory.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/utils/memories/process_memory.py b/backend/utils/memories/process_memory.py index 7421439c5ba..9c75debde8b 100644 --- a/backend/utils/memories/process_memory.py +++ b/backend/utils/memories/process_memory.py @@ -21,10 +21,9 @@ from models.task import Task, TaskStatus, TaskAction, TaskActionProvider from models.trend import Trend from models.notification_message import NotificationMessage -from utils.apps import get_available_apps +from utils.apps import get_available_apps, sync_update_persona_prompt from utils.llm import obtain_emotional_message, retrieve_metadata_fields_from_transcript, \ summarize_open_glass, get_transcript_structure, generate_embedding, \ - get_plugin_result, should_discard_memory, summarize_experience_text, new_facts_extractor, \ trends_extractor, get_email_structure, get_post_structure, get_message_structure, \ retrieve_metadata_from_email, retrieve_metadata_from_post, retrieve_metadata_from_message, retrieve_metadata_from_text, \ @@ -219,6 +218,7 @@ def _update_personas_async(uid: str): [t.start() for t in threads] [t.join() for t in threads] + print(f"[PERSONAS] Finished persona updates in background thread for uid={uid}") def process_memory( @@ -240,7 +240,9 @@ def process_memory( threading.Thread(target=memory_created_webhook, args=(uid, memory,)).start() # TODO: Bad code, cause the websocket was drop, need to check it carefully before enabling. # Update persona prompts with new memory + print("before creating the thread for _update_personas_async") threading.Thread(target=_update_personas_async, args=(uid,)).start() + print("after calling start for _update_personas_async") # TODO: trigger external integrations here too