From 6dc24facdacc4b8993d2dfffa39d0683969619f8 Mon Sep 17 00:00:00 2001 From: OdyAsh Date: Sun, 28 Sep 2025 11:36:51 +0300 Subject: [PATCH 1/5] Implem. Phase 1 of WA migration Check https://github.com/OdyAsh/ansari-whatsapp/blob/main/docs/whatsapp_migration_plan/migration_plan.md for details --- .gitignore | 1 + src/ansari/app/main_api.py | 6 +- src/ansari/app/whatsapp_api_router.py | 306 ++++++++++++++++++++++++++ 3 files changed, 310 insertions(+), 3 deletions(-) create mode 100644 src/ansari/app/whatsapp_api_router.py diff --git a/.gitignore b/.gitignore index 9e05498..18a5afd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Folders .conda/ .claude/ +.specstory/ .venv/ .vscode/ abandoned/ diff --git a/src/ansari/app/main_api.py b/src/ansari/app/main_api.py index b039de2..8eeb627 100644 --- a/src/ansari/app/main_api.py +++ b/src/ansari/app/main_api.py @@ -38,7 +38,7 @@ from ansari.agents.ansari_workflow import AnsariWorkflow from ansari.ansari_db import AnsariDB, MessageLogger, SourceType from ansari.ansari_logger import get_logger -from ansari.app.main_whatsapp import router as whatsapp_router +from ansari.app.whatsapp_api_router import router as whatsapp_api_router from ansari.config import Settings, get_settings from ansari.presenters.api_presenter import ApiPresenter from ansari.util.general_helpers import CORSMiddlewareWithLogging, get_extended_origins, register_to_mailing_list @@ -88,8 +88,8 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) -# Include the WhatsApp router -app.include_router(whatsapp_router) +# Include the WhatsApp API router +app.include_router(whatsapp_api_router) # Custom exception handler, which aims to log FastAPI-related exceptions before raising them diff --git a/src/ansari/app/whatsapp_api_router.py b/src/ansari/app/whatsapp_api_router.py new file mode 100644 index 0000000..2fc3d91 --- /dev/null +++ b/src/ansari/app/whatsapp_api_router.py @@ -0,0 +1,306 @@ +# WhatsApp API Router for ansari-backend +"""FastAPI router containing WhatsApp-specific API endpoints for the ansari-whatsapp microservice.""" + +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse +from pydantic import BaseModel + +from ansari.ansari_db import SourceType, MessageLogger +from ansari.ansari_logger import get_logger + +logger = get_logger(__name__) + +# Initialize the router +router = APIRouter() + +# Get database connection +from ansari.app.main_api import db, presenter + + +# Pydantic models for WhatsApp API requests +class WhatsAppUserRegisterRequest(BaseModel): + phone_num: str + preferred_language: str + + +class WhatsAppLocationRequest(BaseModel): + phone_num: str + latitude: float + longitude: float + + +class WhatsAppThreadRequest(BaseModel): + phone_num: str + title: str + + +class WhatsAppMessageRequest(BaseModel): + phone_num: str + thread_id: str + message: str + + +@router.post("/api/v2/whatsapp/users/register") +async def register_whatsapp_user(req: WhatsAppUserRegisterRequest): + """Register a new WhatsApp user with the Ansari backend. + + Args: + req: WhatsApp user registration request containing phone_num and preferred_language + + Returns: + dict: Registration result with user details + """ + try: + logger.info(f"Registering WhatsApp user with phone: {req.phone_num}") + + result = db.register_user( + email=None, + password=None, + first_name=None, + last_name=None, + phone_num=req.phone_num, + preferred_language=req.preferred_language, + source=SourceType.WHATSAPP + ) + + logger.info(f"Successfully registered WhatsApp user: {req.phone_num}") + return {"status": "success", "user_id": result} + + except Exception as e: + logger.error(f"Error registering WhatsApp user {req.phone_num}: {str(e)}") + raise HTTPException(status_code=400, detail=f"Registration failed: {str(e)}") + + +@router.get("/api/v2/whatsapp/users/exists") +async def check_whatsapp_user_exists(phone_num: str): + """Check if a WhatsApp user exists in the Ansari backend. + + Args: + phone_num: User's WhatsApp phone number + + Returns: + dict: Contains 'exists' boolean indicating if user exists + """ + try: + logger.info(f"Checking existence for WhatsApp user: {phone_num}") + + exists = db.account_exists(phone_num=phone_num) + + logger.info(f"WhatsApp user {phone_num} exists: {exists}") + return {"exists": exists} + + except Exception as e: + logger.error(f"Error checking WhatsApp user existence {phone_num}: {str(e)}") + raise HTTPException(status_code=400, detail=f"User existence check failed: {str(e)}") + + +@router.put("/api/v2/whatsapp/users/location") +async def update_whatsapp_user_location(req: WhatsAppLocationRequest): + """Update a WhatsApp user's location in the Ansari backend. + + Args: + req: Location update request containing phone_num, latitude, and longitude + + Returns: + dict: Update result status + """ + try: + logger.info(f"Updating location for WhatsApp user: {req.phone_num}") + + result = db.update_user_by_phone_num( + phone_num=req.phone_num, + db_cols_to_vals={ + "latitude": req.latitude, + "longitude": req.longitude + } + ) + + logger.info(f"Successfully updated location for WhatsApp user: {req.phone_num}") + return result + + except Exception as e: + logger.error(f"Error updating location for WhatsApp user {req.phone_num}: {str(e)}") + raise HTTPException(status_code=400, detail=f"Location update failed: {str(e)}") + + +@router.post("/api/v2/whatsapp/threads") +async def create_whatsapp_thread(req: WhatsAppThreadRequest): + """Create a new thread for a WhatsApp user in the Ansari backend. + + Args: + req: Thread creation request containing phone_num and title + + Returns: + dict: Creation result with thread_id + """ + try: + logger.info(f"Creating thread for WhatsApp user: {req.phone_num}") + + # Get the user ID for the WhatsApp user + user_id = db.retrieve_user_info( + source=SourceType.WHATSAPP, + phone_num=req.phone_num, + db_cols=["id"] if hasattr(db, '_execute_query') else None # SQL vs MongoDB compatibility + ) + + if not user_id: + raise HTTPException(status_code=404, detail="WhatsApp user not found") + + # Extract user_id from result (handle both SQL tuple and MongoDB string returns) + if isinstance(user_id, tuple): + user_id = user_id[0] + + thread_id = db.create_thread(user_id=user_id, title=req.title) + + logger.info(f"Successfully created thread {thread_id} for WhatsApp user: {req.phone_num}") + return {"thread_id": str(thread_id)} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating thread for WhatsApp user {req.phone_num}: {str(e)}") + raise HTTPException(status_code=400, detail=f"Thread creation failed: {str(e)}") + + +@router.get("/api/v2/whatsapp/threads/last") +async def get_last_whatsapp_thread(phone_num: str): + """Get information about the last active thread for a WhatsApp user. + + Args: + phone_num: User's WhatsApp phone number + + Returns: + dict: Thread info with thread_id and last_message_time + """ + try: + logger.info(f"Getting last thread info for WhatsApp user: {phone_num}") + + # Get the user ID for the WhatsApp user + user_id = db.retrieve_user_info( + source=SourceType.WHATSAPP, + phone_num=phone_num, + db_cols=["id"] if hasattr(db, '_execute_query') else None + ) + + if not user_id: + raise HTTPException(status_code=404, detail="WhatsApp user not found") + + # Extract user_id from result + if isinstance(user_id, tuple): + user_id = user_id[0] + + thread_id, last_message_time = db.get_last_message_time_whatsapp(user_id) + + result = { + "thread_id": str(thread_id) if thread_id else None, + "last_message_time": last_message_time.isoformat() if last_message_time else None + } + + logger.info(f"Last thread info for WhatsApp user {phone_num}: {result}") + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting last thread info for WhatsApp user {phone_num}: {str(e)}") + raise HTTPException(status_code=400, detail=f"Failed to get last thread info: {str(e)}") + + +@router.get("/api/v2/whatsapp/threads/{thread_id}/history") +async def get_whatsapp_thread_history(thread_id: str, phone_num: str): + """Get the message history for a WhatsApp user's thread from the Ansari backend. + + Args: + thread_id: ID of the thread + phone_num: User's WhatsApp phone number + + Returns: + dict: Thread history with messages + """ + try: + logger.info(f"Getting thread history for WhatsApp user {phone_num}, thread {thread_id}") + + # Verify the user exists and has access to this thread + user_id = db.retrieve_user_info( + source=SourceType.WHATSAPP, + phone_num=phone_num, + db_cols=["id"] if hasattr(db, '_execute_query') else None + ) + + if not user_id: + raise HTTPException(status_code=404, detail="WhatsApp user not found") + + # Extract user_id from result + if isinstance(user_id, tuple): + user_id = user_id[0] + + # Get the thread and verify ownership + thread_data = db.get_thread(thread_id=thread_id, user_id=user_id) + + if not thread_data: + raise HTTPException(status_code=404, detail="Thread not found or access denied") + + logger.info(f"Successfully retrieved thread history for WhatsApp user {phone_num}") + return thread_data + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting thread history for WhatsApp user {phone_num}: {str(e)}") + raise HTTPException(status_code=400, detail=f"Failed to get thread history: {str(e)}") + + +@router.post("/api/v2/whatsapp/messages/process") +def process_whatsapp_message(req: WhatsAppMessageRequest) -> StreamingResponse: + """Process a message from a WhatsApp user with streaming response. + + Args: + req: Message processing request containing phone_num, thread_id, and message + + Returns: + StreamingResponse: Streamed AI response + """ + try: + logger.info(f"Processing message for WhatsApp user {req.phone_num}, thread {req.thread_id}") + + # Verify the user exists and get user_id + user_id = db.retrieve_user_info( + source=SourceType.WHATSAPP, + phone_num=req.phone_num, + db_cols=["id"] if hasattr(db, '_execute_query') else None + ) + + if not user_id: + raise HTTPException(status_code=404, detail="WhatsApp user not found") + + # Extract user_id from result + if isinstance(user_id, tuple): + user_id = user_id[0] + + # Get the thread history + history = db.get_thread(req.thread_id, user_id) + + if not history: + raise HTTPException(status_code=404, detail="Thread not found") + + # Append the user's message to the history retrieved from the DB + user_msg = {"role": "user", "content": [{"type": "text", "text": req.message}]} + history["messages"].append(user_msg) + + # Use the presenter to process the message with streaming response + logger.info(f"Starting streaming response for WhatsApp user {req.phone_num}") + return presenter.complete( + history, + message_logger=MessageLogger( + db, + SourceType.WHATSAPP, + user_id, + req.thread_id, + ), + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error processing message for WhatsApp user {req.phone_num}: {str(e)}") + raise HTTPException(status_code=500, detail=f"Message processing failed: {str(e)}") \ No newline at end of file From aff16aa2570bc5c4a696a1c2d767a315a16ee510 Mon Sep 17 00:00:00 2001 From: OdyAsh Date: Sun, 28 Sep 2025 14:01:40 +0300 Subject: [PATCH 2/5] Fix db calls in WA router, add root endpoint --- CLAUDE.md | 22 ++++++++++++++++++++-- src/ansari/app/main_api.py | 5 +++++ src/ansari/app/whatsapp_api_router.py | 11 ++++++----- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 76b19c0..ab9dc54 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -31,8 +31,18 @@ - Delete branches after they're merged to keep the repository clean ## Build/Test/Lint Commands -- Install dependencies: `pip install -r requirements.txt` -- Run backend service: `uvicorn main_api:app --reload` +- Install dependencies: `uv sync` - Installs all dependencies from pyproject.toml and uv.lock +- Run backend service: + 1. Use venv python directly: `.venv/Scripts/python.exe src/ansari/app/main_api.py` + - Alternative (if uvicorn is available): `uvicorn main_api:app --reload` + + **Note**: Direct venv python path is used because `source .venv/Scripts/activate` may not properly activate the virtual environment in bash. + + **Testing changes**: Auto-reload can be unreliable. For reliable testing after code changes: + 1. Kill the running server (`KillShell` tool) + 2. Start new server: `.venv/Scripts/python.exe src/ansari/app/main_api.py` + 3. Wait 10 seconds for startup to complete + 4. Then test with curl - Run CLI version (interactive): - Claude: `python src/ansari/app/main_stdio.py -a AnsariClaude` - OpenAI: `python src/ansari/app/main_stdio.py -a Ansari` @@ -48,6 +58,14 @@ - Build package: `python -m build` - Upload to PyPI: `twine upload dist/*` (requires PyPI credentials) +## Package Management +- **Install dependencies**: `uv sync` - Installs all dependencies from pyproject.toml and uv.lock +- **Add new package**: `uv add ` - Adds package to dependencies and updates lock file +- **Add development dependency**: `uv add --dev ` - Adds package to dev dependencies +- **Remove package**: `uv remove ` - Removes package from dependencies +- **Create virtual environment**: `uv venv` - Creates .venv directory (if not exists) +- **Update dependencies**: `uv lock` - Updates uv.lock file with latest compatible versions + ## Code Style Guidelines - **Imports**: Use absolute imports within the `ansari` package - **Formatting**: Double quotes for strings, 4-space indentation diff --git a/src/ansari/app/main_api.py b/src/ansari/app/main_api.py index 8eeb627..a50b216 100644 --- a/src/ansari/app/main_api.py +++ b/src/ansari/app/main_api.py @@ -151,6 +151,7 @@ def add_app_middleware(): cache = FanoutCache(get_settings().diskcache_dir, shards=4, timeout=1) + if __name__ == "__main__" and get_settings().DEV_MODE: # Programatically start a Uvicorn server while debugging (development) for easier control/accessibility # I.e., just run: @@ -178,6 +179,10 @@ def add_app_middleware(): log_level="debug", ) +@app.get("/") +async def root(): + """Root endpoint for health checks.""" + return {"status": "ok", "message": "Ansari Backend service is running"} class RegisterRequest(BaseModel): email: EmailStr diff --git a/src/ansari/app/whatsapp_api_router.py b/src/ansari/app/whatsapp_api_router.py index 2fc3d91..e30927d 100644 --- a/src/ansari/app/whatsapp_api_router.py +++ b/src/ansari/app/whatsapp_api_router.py @@ -53,14 +53,14 @@ async def register_whatsapp_user(req: WhatsAppUserRegisterRequest): try: logger.info(f"Registering WhatsApp user with phone: {req.phone_num}") - result = db.register_user( + result = db.register( + source=SourceType.WHATSAPP, email=None, - password=None, + password_hash=None, first_name=None, last_name=None, phone_num=req.phone_num, - preferred_language=req.preferred_language, - source=SourceType.WHATSAPP + preferred_language=req.preferred_language ) logger.info(f"Successfully registered WhatsApp user: {req.phone_num}") @@ -150,7 +150,8 @@ async def create_whatsapp_thread(req: WhatsAppThreadRequest): if isinstance(user_id, tuple): user_id = user_id[0] - thread_id = db.create_thread(user_id=user_id, title=req.title) + result = db.create_thread(source=SourceType.WHATSAPP, user_id=user_id, thread_name=req.title) + thread_id = result.get("thread_id") or result.get("_id") logger.info(f"Successfully created thread {thread_id} for WhatsApp user: {req.phone_num}") return {"thread_id": str(thread_id)} From 6bffe3ca324e3a7bae4182a255e6748b4ca1e754 Mon Sep 17 00:00:00 2001 From: OdyAsh Date: Fri, 17 Oct 2025 08:22:30 +0300 Subject: [PATCH 3/5] Refactor WhatsApp integration: Remove main_whatsapp.py and whatsapp_presenter.py - Deleted main_whatsapp.py which contained FastAPI endpoints for WhatsApp webhook handling. - Removed whatsapp_presenter.py that provided functions for WhatsApp interactions. - Updated whatsapp_api_router.py to remove location update endpoint and related model. - Cleaned up config.py by removing WhatsApp-related configuration fields. --- src/ansari/app/main_whatsapp.py | 205 ----- src/ansari/app/whatsapp_api_router.py | 35 - src/ansari/config.py | 6 - src/ansari/presenters/whatsapp_presenter.py | 862 -------------------- 4 files changed, 1108 deletions(-) delete mode 100644 src/ansari/app/main_whatsapp.py delete mode 100644 src/ansari/presenters/whatsapp_presenter.py diff --git a/src/ansari/app/main_whatsapp.py b/src/ansari/app/main_whatsapp.py deleted file mode 100644 index 49c9b3e..0000000 --- a/src/ansari/app/main_whatsapp.py +++ /dev/null @@ -1,205 +0,0 @@ -# This file aims to extend `main_api.py` with FastAPI endpoints which handle incoming WhatsApp webhook messages. -# NOTE: the `BackgroundTasks` logic is inspired by this issue and chat (respectively): -# https://stackoverflow.com/questions/72894209/whatsapp-cloud-api-sending-old-message-inbound-notification-multiple-time-on-my -# https://www.perplexity.ai/search/explain-fastapi-s-backgroundta-rnpU7D19QpSxp2ZOBzNUyg -# Steps: -# 1. Import necessary modules and configure logging. -# 2. Create a FastAPI router to extend the main FastAPI app found in `main_api.py`. -# (Therefore, this file can only be tested by running `main_api.py`.) -# 3. Initialize the Ansari agent with settings. -# 4. Initialize the WhatsAppPresenter with the agent and credentials. -# Tricky NOTE: Unlike other files, the presenter's role here is just to provide functions for handling WhatsApp messages, -# so the actual "presenting" here is technically the values returned by FastAPI's endpoints. -# 5. Define a GET endpoint to handle WhatsApp webhook verification. -# 6. Define a POST endpoint to handle incoming WhatsApp messages. - -from fastapi import APIRouter, HTTPException, Request, BackgroundTasks -from fastapi.responses import HTMLResponse, Response - -from ansari.agents import Ansari, AnsariClaude -from ansari.ansari_logger import get_logger -from ansari.config import get_settings -from ansari.presenters.whatsapp_presenter import WhatsAppPresenter - -logger = get_logger(__name__) - -# Create a router in order to make the FastAPI functions here an extension of the main FastAPI app -router = APIRouter() - -# Initialize the Ansari agent -agent_type = get_settings().AGENT -whatsapp_enabled = get_settings().WHATSAPP_ENABLED - -if agent_type == "Ansari": - ansari = Ansari(get_settings()) -elif agent_type == "AnsariClaude": - ansari = AnsariClaude(get_settings()) -else: - raise ValueError(f"Unknown agent type: {agent_type}. Must be one of: Ansari, AnsariClaude") - -chosen_whatsapp_biz_num = get_settings().WHATSAPP_BUSINESS_PHONE_NUMBER_ID.get_secret_value() - -# Initialize the presenter with the agent and credentials -presenter = WhatsAppPresenter( - agent=ansari, - access_token=get_settings().WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER.get_secret_value(), - business_phone_number_id=chosen_whatsapp_biz_num, - api_version=get_settings().WHATSAPP_API_VERSION, -) -presenter.present() - - -@router.get("/whatsapp/v1") -async def verification_webhook(request: Request) -> str | None: - """Handles the WhatsApp webhook verification request. - - Args: - request (Request): The incoming HTTP request. - - Returns: - Optional[str]: The challenge string if verification is successful, otherwise raises an HTTPException. - - """ - mode = request.query_params.get("hub.mode") - verify_token = request.query_params.get("hub.verify_token") - challenge = request.query_params.get("hub.challenge") - - if mode and verify_token: - if mode == "subscribe" and verify_token == get_settings().WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK.get_secret_value(): - logger.info("WHATSAPP WEBHOOK VERIFIED SUCCESFULLY!") - # Tricky note: apparently, you have to wrap the challenge in an HTMLResponse - # in order for meta to accept and verify the callback - # source: https://stackoverflow.com/a/74394602/13626137 - return HTMLResponse(challenge) - raise HTTPException(status_code=403, detail="Forbidden") - raise HTTPException(status_code=400, detail="Bad Request") - - -@router.post("/whatsapp/v1") -async def main_webhook(request: Request, background_tasks: BackgroundTasks) -> Response: - """Handles the incoming WhatsApp webhook message. - - Args: - request (Request): The incoming HTTP request. - background_tasks (BackgroundTasks): The background tasks to be executed. - - Returns: - Response: HTTP response with status code 200. - - """ - - # Logging the origin (host) of the incoming webhook message - # logger.debug(f"ORIGIN of the incoming webhook message: {json.dumps(request, indent=4)}") - - # Wait for the incoming webhook message to be received as JSON - data = await request.json() - - # Extract all relevant data in one go using the general presenter - try: - ( - is_status, - from_whatsapp_number, - incoming_msg_type, - incoming_msg_body, - message_id, - message_unix_time, - ) = await presenter.extract_relevant_whatsapp_message_details(data) - except Exception as e: - logger.error(f"Error extracting message details: {e}") - return Response(status_code=200) - - # Terminate if the incoming message is a status message (e.g., "delivered") - # or if the incoming message is in the form of a list, not dict - # (shouldn't happen unless user sends a non-text message, which is not supported yet) - if is_status or isinstance(incoming_msg_body, list): - return Response(status_code=200) - else: - logger.debug(f"Incoming whatsapp webhook message from {from_whatsapp_number}") - - # Terminate if whatsapp is not enabled (i.e., via .env configurations, etc) - if not whatsapp_enabled: - # Create a temporary user-specific presenter just to send the message - temp_presenter = WhatsAppPresenter.create_user_specific_presenter( - presenter, from_whatsapp_number, None, None, None, None - ) - background_tasks.add_task( - temp_presenter.send_whatsapp_message, - "Ansari for WhatsApp is down for maintenance, please try again later or visit our website at https://ansari.chat.", - ) - return Response(status_code=200) - - # Temporary corner case while locally developing: - # Since the staging server is always running, - # and since we currently have the same testing number for both staging and local testing, - # therefore we need an indicator that a message is meant for a dev who's testing locally now - # and not for the staging server. - # This is done by prefixing the message with "!d " (e.g., "!d what is ansari?") - # NOTE: Obviously, this temp. solution will be removed when we get a dedicated testing number for staging testing. - if get_settings().DEPLOYMENT_TYPE == "staging" and incoming_msg_body.get("body", "").startswith("!d "): - logger.debug("Incoming message is meant for a dev who's testing locally now, so will not process it in staging...") - return Response(status_code=200) - - # Create a user-specific presenter for this message - user_presenter = WhatsAppPresenter.create_user_specific_presenter( - presenter, - from_whatsapp_number, - incoming_msg_type, - incoming_msg_body, - message_id, - message_unix_time, - ) - - # Start the typing indicator loop that will continue until message is processed - background_tasks.add_task( - user_presenter.send_typing_indicator_then_start_loop, - ) - - # Check if the user's phone number is stored in users_whatsapp table and register if not - # Returns false if user's not found and their registration fails - user_found: bool = await user_presenter.check_and_register_user() - if not user_found: - background_tasks.add_task( - user_presenter.send_whatsapp_message, - "Sorry, we couldn't register you to our Database. Please try again later.", - ) - return Response(status_code=200) - - # Check if there are more than 24 hours have passed from the user's message to the current time - # If so, send a message to the user and return - if user_presenter.is_message_too_old(): - response_msg = "Sorry, your message " - user_msg_start = " ".join(incoming_msg_body.get("body", "").split(" ")[:5]) - if user_msg_start: - response_msg_cont = ' "' + user_msg_start + '" ' - else: - response_msg_cont = " " - response_msg = f"Sorry, your message{response_msg_cont}is too old. Please send a new message." - background_tasks.add_task( - user_presenter.send_whatsapp_message, - response_msg, - ) - return Response(status_code=200) - - # Check if the incoming message is a location - if incoming_msg_type == "location": - # NOTE: Currently, will not handle location messages - background_tasks.add_task( - user_presenter.handle_unsupported_message, - ) - return Response(status_code=200) - - # Check if the incoming message is a media type other than text - if incoming_msg_type != "text": - background_tasks.add_task( - user_presenter.handle_unsupported_message, - ) - return Response(status_code=200) - - # Rest of the code below is for processing text messages sent by the whatsapp user - - # Actual code to process the incoming message using Ansari agent then reply to the sender - background_tasks.add_task( - user_presenter.handle_text_message, - ) - - return Response(status_code=200) diff --git a/src/ansari/app/whatsapp_api_router.py b/src/ansari/app/whatsapp_api_router.py index e30927d..4884921 100644 --- a/src/ansari/app/whatsapp_api_router.py +++ b/src/ansari/app/whatsapp_api_router.py @@ -23,12 +23,6 @@ class WhatsAppUserRegisterRequest(BaseModel): preferred_language: str -class WhatsAppLocationRequest(BaseModel): - phone_num: str - latitude: float - longitude: float - - class WhatsAppThreadRequest(BaseModel): phone_num: str title: str @@ -94,35 +88,6 @@ async def check_whatsapp_user_exists(phone_num: str): raise HTTPException(status_code=400, detail=f"User existence check failed: {str(e)}") -@router.put("/api/v2/whatsapp/users/location") -async def update_whatsapp_user_location(req: WhatsAppLocationRequest): - """Update a WhatsApp user's location in the Ansari backend. - - Args: - req: Location update request containing phone_num, latitude, and longitude - - Returns: - dict: Update result status - """ - try: - logger.info(f"Updating location for WhatsApp user: {req.phone_num}") - - result = db.update_user_by_phone_num( - phone_num=req.phone_num, - db_cols_to_vals={ - "latitude": req.latitude, - "longitude": req.longitude - } - ) - - logger.info(f"Successfully updated location for WhatsApp user: {req.phone_num}") - return result - - except Exception as e: - logger.error(f"Error updating location for WhatsApp user {req.phone_num}: {str(e)}") - raise HTTPException(status_code=400, detail=f"Location update failed: {str(e)}") - - @router.post("/api/v2/whatsapp/threads") async def create_whatsapp_thread(req: WhatsAppThreadRequest): """Create a new thread for a WhatsApp user in the Ansari backend. diff --git a/src/ansari/config.py b/src/ansari/config.py index 496cce4..034aadd 100644 --- a/src/ansari/config.py +++ b/src/ansari/config.py @@ -179,12 +179,6 @@ def get_resource_path(filename): MAILCHIMP_SERVER_PREFIX: str | None = Field(default=None) MAILCHIMP_LIST_ID: str | None = Field(default=None) QURAN_DOT_COM_API_KEY: SecretStr = Field(alias="QURAN_DOT_COM_API_KEY") - WHATSAPP_ENABLED: bool = Field(default=True) - WHATSAPP_API_VERSION: str | None = Field(default="v22.0") - WHATSAPP_BUSINESS_PHONE_NUMBER_ID: SecretStr | None = Field(default=None) - WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER: SecretStr | None = Field(default=None) - WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK: SecretStr | None = Field(default=None) - WHATSAPP_CHAT_RETENTION_HOURS: float = Field(default=3) ZROK_SHARE_TOKEN: SecretStr = Field(default="") template_dir: DirectoryPath = Field(default=get_resource_path("templates")) diskcache_dir: str = Field(default="diskcache_dir") diff --git a/src/ansari/presenters/whatsapp_presenter.py b/src/ansari/presenters/whatsapp_presenter.py deleted file mode 100644 index 932f22e..0000000 --- a/src/ansari/presenters/whatsapp_presenter.py +++ /dev/null @@ -1,862 +0,0 @@ -# Unlike other files, the presenter's role here is just to provide functions for handling WhatsApp interactions - -import re -import asyncio -import time -from datetime import datetime, timezone -from typing import Any, Literal, Optional - -import httpx - -from ansari.agents.ansari import Ansari -from ansari.agents.ansari_claude import AnsariClaude -from ansari.ansari_db import AnsariDB, MessageLogger, SourceType -from ansari.ansari_logger import get_logger -from ansari.config import get_settings -from ansari.util.general_helpers import get_language_direction_from_text, get_language_from_text - -logger = get_logger(__name__) - -# Initialize the DB and agent -db = AnsariDB(get_settings()) - - -class WhatsAppPresenter: - def __init__( - self, - agent: Ansari | None = None, - access_token: str | None = None, - business_phone_number_id: str | None = None, - api_version: str = "v22.0", - user_whatsapp_number: str | None = None, - incoming_msg_type: str | None = None, - incoming_msg_body: dict | None = None, - message_id: str | None = None, - message_unix_time: int | None = None, - ): - if agent: - self.settings = agent.settings - else: - self.settings = get_settings() - - self.access_token = access_token - self.business_phone_number_id = business_phone_number_id - self.api_version = api_version - self.meta_api_url = f"https://graph.facebook.com/{api_version}/{business_phone_number_id}/messages" - - # User-specific fields - self.user_whatsapp_number = user_whatsapp_number - self.incoming_msg_type = incoming_msg_type - self.incoming_msg_body = incoming_msg_body - self.message_id = message_id - self.message_unix_time = message_unix_time - self.typing_indicator_task = None - self.first_indicator_time = None - - @classmethod - def create_user_specific_presenter( - cls, - general_presenter, - user_whatsapp_number: str, - incoming_msg_type: str, - incoming_msg_body: dict, - message_id: str, - message_unix_time: int | None = None, - ): - """Creates a user-specific presenter instance from a general presenter.""" - return cls( - access_token=general_presenter.access_token, - business_phone_number_id=general_presenter.business_phone_number_id, - api_version=general_presenter.api_version, - user_whatsapp_number=user_whatsapp_number, - incoming_msg_type=incoming_msg_type, - incoming_msg_body=incoming_msg_body, - message_id=message_id, - message_unix_time=message_unix_time, - ) - - async def extract_relevant_whatsapp_message_details( - self, - body: dict[str, Any], - ) -> tuple[bool, str | None, str | None, dict | list | None, str | None, int | None]: - """Extracts relevant whatsapp message details from the incoming webhook payload. - - Args: - body (Dict[str, Any]): The JSON body of the incoming request. - - Returns: - tuple[bool, Optional[str], Optional[str], Optional[dict], Optional[str], Optional[int]]: - A tuple of: - (is_status, user_whatsapp_number, incoming_msg_type, incoming_msg_body, message_id, message_unix_time) - - Raises: - Exception: If the payload structure is invalid or unsupported. - - Notes: - Apparently, sometimes the recieved `incoming_msg_body` is a list not a dict, example: - [ - { - 'name': {'first_name': 'Ansari', 'last_name': 'Chat', 'formatted_name': 'Ansari. Chat'}, - 'phones': [{'phone': '+1 (234) 567-8999', 'wa_id': '12345678999', 'type': 'HOME'}] - } - ] - We don't support this yet, so we a later check should be performed to verify that it's a dict. - """ - # logger.debug(f"Received payload from WhatsApp user:\n{body}") - - if not ( - body.get("object") - and (entry := body.get("entry", [])) - and (changes := entry[0].get("changes", [])) - and (value := changes[0].get("value", {})) - ): - error_msg = f"Invalid received payload from WhatsApp user and/or problem with Meta's API :\n{body}" - logger.error( - error_msg, - ) - raise Exception(error_msg) - - if "statuses" in value: - # status = value["statuses"]["status"] - # timestamp = value["statuses"]["timestamp"] - # # This log isn't important if we don't want to track when an Ansari's replied message is - # # delivered to or read by the recipient - # logger.debug( - # f"WhatsApp status update received:\n({status} at {timestamp}.)", - # ) - return True, None, None, None, None, None - else: - is_status = False - - # should never be entered - if "messages" not in value: - error_msg = f"Unsupported message type received from WhatsApp user:\n{body}" - logger.error( - error_msg, - ) - raise Exception(error_msg) - - incoming_msg = value["messages"][0] - - # Extract and store the message ID for use in send_whatsapp_typing_indicator - message_id = incoming_msg.get("id") - # Extract the phone number of the WhatsApp sender - user_whatsapp_number = incoming_msg["from"] - # Extract timestamp from message (in Unix time format) and convert to int if present - message_unix_time_str = incoming_msg.get("timestamp") - message_unix_time = int(message_unix_time_str) if message_unix_time_str is not None else None - # Meta API note: Meta sends "errors" key when receiving unsupported message types - # (e.g., video notes, gifs sent from giphy, or polls) - # NOTE: This `incoming_msg["type"] in incoming_msg.keys()` is logical, as proven by examining examples of - # messages received from Meta cloud API here: `docs/.../meta_whatsapp_api_structure_of_a_user_incoming_msg.json` - incoming_msg_type = incoming_msg["type"] if incoming_msg["type"] in incoming_msg.keys() else "errors" - # Extract the message of the WhatsApp sender (could be text, image, etc.) - incoming_msg_body = incoming_msg[incoming_msg_type] - - logger.info(f"Received whatsapp message from {user_whatsapp_number}: {incoming_msg_body}") - - return (is_status, user_whatsapp_number, incoming_msg_type, incoming_msg_body, message_id, message_unix_time) - - async def check_and_register_user(self) -> bool: - """ - Checks if the user's phone number is stored in the users table. - If not, registers the user with the preferred language. - - Returns: - bool: True if user exists or was successfully registered, False otherwise. - """ - if not self.user_whatsapp_number: - logger.error("User WhatsApp number not set in presenter instance") - return False - - # Check if the user's phone number exists in users table - if db.account_exists(phone_num=self.user_whatsapp_number): - return True - - # Else, register the user with the detected language - if self.incoming_msg_type == "text": - incoming_msg_text = self.incoming_msg_body["body"] - user_lang = get_language_from_text(incoming_msg_text) - else: - # TODO(odyash, good_first_issue): use lightweight library/solution that gives us language from country code - # instead of hardcoding "en" in below code - user_lang = "en" - - status: Literal["success", "failure"] = db.register( - source=SourceType.WHATSAPP, - phone_num=self.user_whatsapp_number, - preferred_language=user_lang, - )["status"] - - if status == "success": - logger.info(f"Registered new whatsapp user (lang: {user_lang})!: {self.user_whatsapp_number}") - return True - else: - logger.error(f"Failed to register new whatsapp user: {self.user_whatsapp_number}") - return False - - async def send_typing_indicator_then_start_loop(self) -> None: - """Sends a typing indicator and starts a loop to periodically send more while processing the message.""" - if not self.user_whatsapp_number or not self.message_id: - logger.error("Cannot start typing indicator loop: missing user_whatsapp_number or message_id") - return - - self.first_indicator_time = time.time() - - # Send the initial typing indicator - await self._send_whatsapp_typing_indicator() - - # Start an async task that will keep sending typing indicators - self.typing_indicator_task = asyncio.create_task(self._typing_indicator_loop()) - - async def _typing_indicator_loop(self) -> None: - """Loop that periodically sends typing indicators while processing a message.""" - MAX_DURATION_SECONDS = 300 # 5 minutes maximum - INDICATOR_INTERVAL_SECONDS = 26 # Send indicator every 26 seconds - - try: - while True: - logger.debug("Currently in typing indicator loop (i.e., Ansari is taking longer than usual to respond)") - # Sleep for the interval - await asyncio.sleep(INDICATOR_INTERVAL_SECONDS) - - # Check if we've exceeded the maximum duration - elapsed_time = time.time() - self.first_indicator_time - if elapsed_time > MAX_DURATION_SECONDS: - logger.warning(f"Typing indicator loop exceeded maximum duration of {MAX_DURATION_SECONDS}s. Stopping.") - break - - # If we're still processing the message, send another typing indicator - logger.debug(f"Sending follow-up typing indicator after {elapsed_time:.1f}s") - await self._send_whatsapp_typing_indicator() - - except asyncio.CancelledError: - logger.debug("cancelling asyncio task...") - except Exception as e: - logger.error(f"Error in typing indicator loop: {e}") - logger.exception(e) - - async def _send_whatsapp_typing_indicator(self) -> None: - """Sends a typing indicator to the WhatsApp sender.""" - if not self.user_whatsapp_number or not self.message_id: - logger.error("Cannot send typing indicator: missing user_whatsapp_number or message_id") - return - - url = self.meta_api_url - headers = { - "Authorization": f"Bearer {self.access_token}", - "Content-Type": "application/json", - } - - try: - async with httpx.AsyncClient() as client: - logger.debug(f"SENDING TYPING INDICATOR REQUEST TO: {url}") - - json_data = { - "messaging_product": "whatsapp", - "status": "read", - "message_id": self.message_id, - "typing_indicator": {"type": "text"}, - } - - response = await client.post(url, headers=headers, json=json_data) - response.raise_for_status() # Raise an exception for HTTP errors - - logger.debug(f"Sent typing indicator to WhatsApp user {self.user_whatsapp_number}") - - except Exception as e: - logger.error(f"Error sending typing indicator: {e}. Details are in next log.") - logger.exception(e) - - async def send_whatsapp_message(self, msg_body: str) -> None: - """Sends a message to the WhatsApp sender. - - Args: - msg_body (str): The message body to be sent. - """ - if not self.user_whatsapp_number: - logger.error("Cannot send message: missing user_whatsapp_number") - return - - url = self.meta_api_url - headers = { - "Authorization": f"Bearer {self.access_token}", - "Content-Type": "application/json", - } - - # Split the message if it exceeds WhatsApp's character limit - message_parts = self._split_long_messages(msg_body) - - # Stop the typing indicator before sending the actual message - if self.typing_indicator_task and not self.typing_indicator_task.done(): - logger.debug("Typing indicator loop was cancelled (as Ansari will respond now)") - self.typing_indicator_task.cancel() - - # Send the message(s) to the user - try: - async with httpx.AsyncClient() as client: - logger.info( - f"Ansari responded to WhatsApp user {self.user_whatsapp_number} with the following message part(s):\n\n" - ) - - # If we have multiple parts, send them sequentially - for part in message_parts: - json_data = { - "messaging_product": "whatsapp", - "to": self.user_whatsapp_number, - "text": {"body": part}, - } - - response = await client.post(url, headers=headers, json=json_data) - response.raise_for_status() # Raise an exception for HTTP errors - - if msg_body != "...": - logger.info("\n".join(f"[Part {i + 1}]: \n{part}" for i, part in enumerate(message_parts))) - except Exception as e: - logger.error(f"Error sending message: {e}. Details are in next log.") - logger.exception(e) - - def _calculate_time_passed(self, last_message_time: Optional[datetime]) -> tuple[float, str]: - if last_message_time is None: - passed_time = float("inf") - else: - passed_time = (datetime.now(timezone.utc) - last_message_time).total_seconds() - - # Log the time passed since the last message - if passed_time < 60: - passed_time_logging = f"{passed_time:.1f}sec" - elif passed_time < 3600: - passed_time_logging = f"{passed_time / 60:.1f}mins" - elif passed_time < 86400: - passed_time_logging = f"{passed_time / 3600:.1f}hours" - else: - passed_time_logging = f"{passed_time / 86400:.1f}days" - - return passed_time, passed_time_logging - - def _get_retention_time_in_seconds(self) -> int: - reten_hours = get_settings().WHATSAPP_CHAT_RETENTION_HOURS - allowed_time = reten_hours * 60 * 60 - return allowed_time - - def _get_whatsapp_markdown(self, msg: str) -> str: - """Convert conventional markdown syntax to WhatsApp's markdown syntax""" - msg_direction = get_language_direction_from_text(msg) - - # Process standard markdown syntax - msg = self._convert_italic_syntax(msg) - msg = self._convert_bold_syntax(msg) - msg = self._convert_headers(msg) - - # Process lists based on text direction - if msg_direction in ["ltr", "rtl"]: - msg = self._format_nested_lists(msg) - - return msg - - def _convert_italic_syntax(self, text: str) -> str: - """Convert markdown italic syntax (*text*) to WhatsApp italic syntax (_text_)""" - # Regex details: - # (? str: - """Convert markdown bold syntax (**text**) to WhatsApp bold syntax (*text*)""" - return text.replace("**", "*") - - def _convert_headers(self, text: str) -> str: - """Convert markdown headers to WhatsApp's bold+italic format""" - # Process headers with content directly after them - # (?! ) # Ensures there's no space before the hash (avoiding matching in middle of text) - # #+ \**_* # Matches one or more hash symbols and ignores any bold/italic markers already present - # (.*?) # Captures the header text (non-greedy) - # \**_*\n # Matches any trailing formatting markers and the newline - # (?!\n) # Ensures the newline isn't followed by another newline (i.e., not an isolated header) - pattern = re.compile(r"(?! )#+ \**_*(.*?)\**_*\n(?!\n)") - text = pattern.sub(r"*_\1_*\n\n", text) - - # Process headers with empty line after them - pattern = re.compile(r"(?! )#+ \**_*(.*?)\**_*\n\n") - return pattern.sub(r"*_\1_*\n\n", text) - - def _format_nested_lists(self, text: str) -> str: - """ - Format only nested lists/bullet points with WhatsApp's special formatting. - - This handles: - 1. Nested bullet points within numbered lists - 2. Nested numbered lists within bullet points - 3. Purely nested bullet points - 4. Purely nested numbered lists - - Simple (non-nested) lists retain their original formatting. - """ - lines = text.split("\n") - processed_lines = [] - in_nested_section = False - nested_section_indent = 0 - - for i, line in enumerate(lines): - # Check for indentation to detect nesting - indent_match = re.match(r"^(\s+)", line) if line.strip() else None - current_indent = len(indent_match.group(1)) if indent_match else 0 - - # Check if this is a list item (numbered or bullet) - is_numbered_item = re.match(r"^\s*\d+\.\s", line) - is_bullet_item = re.match(r"^\s*[\*-]\s", line) - - # Determine if we're entering, in, or exiting a nested section - if (is_numbered_item or is_bullet_item) and current_indent > 0: - # This is a nested item - if not in_nested_section: - in_nested_section = True - nested_section_indent = current_indent - - # Format nested items - if is_numbered_item: - # Convert nested numbered list format: " 1. Item" -> " 1 - Item" - line = re.sub(r"(\s*)(\d+)(\.) ", r"\1\2 - ", line) - elif is_bullet_item: - # Convert nested bullet format: " - Item" or " * Item" -> " -- Item" - line = re.sub(r"(\s*)[\*-] ", r"\1-- ", line) - - elif in_nested_section and current_indent < nested_section_indent: - # We're exiting the nested section - in_nested_section = False - - # For non-nested items, leave them as they are - processed_lines.append(line) - - return "\n".join(processed_lines) - - def _split_long_messages(self, msg_body: str) -> list[str]: - """Split long messages into smaller chunks based on formatted headers or other patterns. - - This method implements a multi-level splitting strategy for messages that exceed - WhatsApp's character limit (4000): - 1. First tries to split by header pattern (*_HEADER_*) - 2. If that's not possible, tries to split by bold text (*BOLD*) - 3. Finally falls back to paragraph-based splitting - - Args: - msg_body (str): The message body to split if necessary - - Returns: - list[str]: A list of message chunks that can be sent separately - """ - # WhatsApp character limit - MAX_LENGTH = 4000 - - # If message is already under the limit, return it as is - if len(msg_body) <= MAX_LENGTH: - return [msg_body] - - # Strategy 1: Try to split by formatted headers (*_HEADER_*) - header_chunks = self._split_by_headers(msg_body, MAX_LENGTH) - if len(header_chunks) > 1: - return header_chunks - - # Strategy 2: Try to split by bold formatting (*BOLD*) - bold_chunks = self._split_by_bold_text(msg_body, MAX_LENGTH) - if len(bold_chunks) > 1: - return bold_chunks - - # Strategy 3: Fall back to paragraph-based splitting - return self._split_by_paragraphs(msg_body, MAX_LENGTH) - - def _split_by_headers(self, text: str, max_length: int) -> list[str]: - """Split text by formatted header pattern (*_HEADER_*). - - Args: - text (str): Text to split - max_length (int): Maximum allowed length of each chunk - - Returns: - list[str]: List of text chunks split by headers - - Example: - >>> text = "Text before header\n*_First Header_*\nText\n\n*_Second Header_*\nMore text" - >>> _split_by_headers(text, 1000) - ['Text before header', '*_First Header_*\nText', '*_Second Header_*\nMore text'] - """ - # Look for *_HEADER_* pattern - header_pattern = re.compile(r"\*_[^*_]+_\*") - headers = list(header_pattern.finditer(text)) - - # If we don't have multiple headers, we can't split effectively - if not headers or len(headers) <= 1: - return [text] - - chunks = [] - - # Process each header as a potential chunk boundary - for i, match in enumerate(headers): - # For the first header, handle any text that comes before it - if i == 0 and match.start() > 0: - prefix = text[: match.start()] - - # Always include the text before the first header in its own message(s) - # If it's too long, recursively split it - if len(prefix) <= max_length: - chunks.append(prefix) - else: - # If prefix is too long, split it using paragraph-based splitting - prefix_chunks = self._split_by_paragraphs(prefix, max_length) - chunks.extend(prefix_chunks) - - # Determine the end position for the chunk containing this header - end_pos = headers[i + 1].start() if i < len(headers) - 1 else len(text) - chunk = text[match.start() : end_pos] - - # If chunk fits within limit, add it directly - if len(chunk) <= max_length: - chunks.append(chunk) - else: - # Otherwise, try more aggressive splitting for this chunk - # First try bold formatting, then paragraphs - sub_chunks = self._split_by_bold_text(chunk, max_length) - chunks.extend(sub_chunks) - - return chunks - - def _split_by_bold_text(self, text: str, max_length: int) -> list[str]: - """Split text by looking for bold formatting (*TEXT*) patterns. - - This function splits text at bold formatting markers (*TEXT*) when the text - exceeds the maximum length. It treats each bold pattern as a potential - break point, always keeping the bold text with the content that follows it. - - Args: - text (str): Text to split - max_length (int): Maximum allowed length of each chunk - - Returns: - list[str]: List of text chunks split by bold formatting - - Example: - >>> text = "Some intro text\n*First bold section*\nMiddle content\n*Second bold*\nMore text" - >>> _split_by_bold_text(text, 30) - ['Some intro text', '*First bold section*\nMiddle content', '*Second bold*\nMore text'] - """ - if len(text) <= max_length: - return [text] - - # Find *TEXT* patterns - bold_pattern = re.compile(r"\*[^*]+\*") - bold_matches = list(bold_pattern.finditer(text)) - - # If we don't have enough bold patterns for effective splitting - if not bold_matches or len(bold_matches) <= 1: - return self._split_by_paragraphs(text, max_length) - - chunks = [] - - # Process each bold pattern as a potential chunk boundary - for i, match in enumerate(bold_matches): - # For the first bold pattern, handle any text that comes before it - if i == 0 and match.start() > 0: - prefix = text[: match.start()] - - # Always include the text before the first bold pattern in its own message(s) - # If it's too long, recursively split it - if len(prefix) <= max_length: - chunks.append(prefix) - else: - # If prefix is too long, split it using paragraph-based splitting - prefix_chunks = self._split_by_paragraphs(prefix, max_length) - chunks.extend(prefix_chunks) - - # Determine the end position for the chunk containing this bold pattern - end_pos = bold_matches[i + 1].start() if i < len(bold_matches) - 1 else len(text) - chunk = text[match.start() : end_pos] - - # If chunk fits within limit, add it directly - if len(chunk) <= max_length: - chunks.append(chunk) - else: - # Otherwise, fall back to paragraph splitting for this chunk - sub_chunks = self._split_by_paragraphs(chunk, max_length) - chunks.extend(sub_chunks) - - return chunks - - def _split_by_paragraphs(self, text: str, max_length: int) -> list[str]: - """Split text by paragraphs or fall back to fixed-size chunks if needed. - - This method attempts to split text at natural paragraph breaks (double newlines). If paragraphs themselves are - too long, it uses fixed-size chunk splitting as a fallback. - - Args: - text (str): Text to split - max_length (int): Maximum allowed length of each chunk - - Returns: - list[str]: List of text chunks split by paragraphs or fixed chunks - - Example: - >>> text = "This is paragraph 1.\\n\\nThis is paragraph 2.\\n\\nThis is a very long paragraph 3 that exceeds" - >>> text += " the maximum length and will need to be split." - >>> _split_by_paragraphs(text, 50) - ['This is paragraph 1.', 'This is paragraph 2.', 'This is a very long paragraph 3 that exceeds the', - ' maximum length and will need to be split.'] - """ - if len(text) <= max_length: - return [text] - - chunks = [] - - # Try splitting by paragraphs first (double newlines) - paragraphs = re.split(r"\n\n+", text) - - if len(paragraphs) > 1: - current = "" - - for para in paragraphs: - # If adding this paragraph would exceed the limit - if current and len(current) + len(para) + 2 > max_length: - chunks.append(current) - current = "" - - # If paragraph itself is too long, split it using fixed chunks - if len(para) > max_length: - # Add any accumulated text first - if current: - chunks.append(current) - current = "" - - # Use fixed-size chunk splitting for long paragraphs - para_chunks = self._split_by_fixed_chunks(para, max_length) - chunks.extend(para_chunks) - else: - # Add paragraph to current chunk with proper separator - if current: - current += "\n\n" + para - else: - current = para - - # Don't forget the last chunk - if current: - chunks.append(current) - - return chunks - else: - # If text doesn't have paragraphs, use fixed-size chunk splitting - return self._split_by_fixed_chunks(text, max_length) - - def _split_by_fixed_chunks(self, text: str, max_length: int) -> list[str]: - """Split text into fixed-size chunks of maximum length. - - This is the simplest fallback approach, which just takes chunks of - max_length characters until the entire text is processed. - - Args: - text (str): Text to split - max_length (int): Maximum allowed length of each chunk - - Returns: - list[str]: List of text chunks of maximum length - - Example: - >>> text = "This is a very long text that exceeds the maximum allowed length" - >>> _split_by_fixed_chunks(text, 20) - ['This is a very long ', 'text that exceeds the', ' maximum allowed len', 'gth'] - """ - # If text is already under the limit, return it as is - if len(text) <= max_length: - return [text] - - chunks = [] - - # Simply take max_length characters at a time - for i in range(0, len(text), max_length): - chunks.append(text[i : i + max_length]) - - return chunks - - async def handle_text_message(self) -> None: - """Processes the incoming text message and sends a response to the WhatsApp sender.""" - incoming_txt_msg = self.incoming_msg_body["body"] - - try: - logger.debug(f"Whatsapp user said: {incoming_txt_msg}") - - # Get user's ID from users_whatsapp table - # NOTE: we're not checking for user's existence here, as we've already done that in `main_webhook()` - user_id_whatsapp = db.retrieve_user_info(source=SourceType.WHATSAPP, phone_num=self.user_whatsapp_number) - - # Get details of the thread that the user last interacted with (i.e., max(updated_at)) - thread_id, last_msg_time = db.get_last_message_time_whatsapp(user_id_whatsapp) - - # Calculate the time passed since the last message - passed_time, passed_time_logging = self._calculate_time_passed(last_msg_time) - logger.debug(f"Time passed since user ({user_id_whatsapp})'s last whatsapp message: {passed_time_logging}") - - # Get the allowed retention time - allowed_time = self._get_retention_time_in_seconds() - - # Create a new thread if - # no threads have been previously created, - # or the last message has passed the allowed retention time - # NOTE: Technically, the `thread_id` condition is redundant, - # as `passed_time` will be `inf` when `last_message_time` is None, which happens when `thread_id` is None - # ... but we're keeping the condition for clarity and future-proofing :] - if thread_id is None or passed_time > allowed_time: - first_few_words = " ".join(incoming_txt_msg.split()[:6]) - - result: dict = db.create_thread(SourceType.WHATSAPP, user_id_whatsapp, first_few_words) - - if "error" in result: - logger.error(f"Error creating a new thread for whatsapp user ({user_id_whatsapp}): {result['error']}") - await self.send_whatsapp_message( - "An unexpected error occurred while creating a new chat session. Please try again later.", - ) - return - - thread_id = result["thread_id"] - - logger.info( - f"Created a new thread for the whatsapp user ({user_id_whatsapp}), " - + "as the allowed retention time has passed." - ) - - # Get `message_history` from current thread (excluding incoming user's message, as it will be logged later) - thread_name_and_history = db.get_thread_llm(thread_id, user_id_whatsapp) - if "messages" not in thread_name_and_history: - logger.error(f"Error retrieving message history for thread ({thread_id}) of user ({user_id_whatsapp})") - await self.send_whatsapp_message( - "An unexpected error occurred while getting your last chat session. Please try again later.", - ) - return - - msg_history: list[dict] = thread_name_and_history["messages"] - - msg_history_for_debugging = [msg for msg in msg_history if msg["role"] in {"user", "assistant"}] - logger.debug( - f"#msgs (user/assistant only) retrieved for user ({user_id_whatsapp})'s current whatsapp thread: " - + str(len(msg_history_for_debugging)) - ) - - user_msg = {"role": "user", "content": [{"type": "text", "text": incoming_txt_msg}]} - msg_history.append(user_msg) - - message_logger = MessageLogger(db, SourceType.WHATSAPP, user_id_whatsapp, thread_id) - if self.settings.AGENT == "Ansari": - agent = Ansari(settings=self.settings, message_logger=message_logger) - elif self.settings.AGENT == "AnsariClaude": - agent = AnsariClaude(settings=self.settings, message_logger=message_logger) - - # Send the thread's history to the Ansari agent which will - # log (i.e., append) the message history's last user message to DB, - # process the history, - # log (i.e., append) Ansari's output to DB - response = "" - for token in agent.replace_message_history(msg_history): - # NOTE: Check the `async_await_backgroundtasks_visualized.md` file - # for details on why we added this `await` line - await asyncio.sleep(0) - response += token - - # Convert conventional markdown syntax to WhatsApp's markdown syntax - logger.debug(f"Response before markdown conversion: \n\n{response}") - response = self._get_whatsapp_markdown(response) - - # Return the response back to the WhatsApp user if it's not empty - # Else, send an error message to the user - if response: - await self.send_whatsapp_message(response) - else: - logger.warning("Response was empty. Sending error message.") - await self.send_whatsapp_message( - "Ansari returned an empty response. Please rephrase your question, then try again.", - ) - except Exception as e: - logger.error(f"Error processing message: {e}. Details are in next log.") - logger.exception(e) - await self.send_whatsapp_message( - "An unexpected error occurred while processing your message. Please try again later.", - ) - - # NOTE: This function assumes `loc_lat` and `loc_long` columns are in `users` DB table - # If alternative columns are used (e.g., city), the function should be updated accordingly - async def handle_location_message(self) -> None: - """ - Handles an incoming location message by updating the user's location in the database - and sending a confirmation message. - """ - - loc = self.incoming_msg_body - db.update_user_by_phone_num(self.user_whatsapp_number, {"loc_lat": loc["latitude"], "loc_long": loc["longitude"]}) - # TODO(odyash, good_first_issue): update msg below to also say something like: - # 'Type "pt"/"prayer times" to get prayer times', then implement that feature - await self.send_whatsapp_message( - "Stored your location successfully!", # This will help us give you accurate prayer times ISA 🙌. - ) - - async def handle_unsupported_message( - self, - ) -> None: - """ - Handles an incoming unsupported message by sending an appropriate response. - """ - - msg_type = self.incoming_msg_type + "s" if not self.incoming_msg_type.endswith("s") else self.incoming_msg_type - msg_type = msg_type.replace("unsupporteds", "this media type") - await self.send_whatsapp_message( - f"Sorry, I can't process {msg_type} yet. Please send me a text message.", - ) - - def is_message_too_old(self) -> bool: - """ - Checks if the incoming message is older than the allowed threshold (24 hours). - - Uses the message_unix_time attribute (timestamp in Unix time format - seconds since epoch) - extracted during message processing to determine if the message is too old. - - Returns: - bool: True if the message is older than 24 hours, False otherwise - """ - # Define the too old threshold (24 hours in seconds) - TOO_OLD_THRESHOLD = 24 * 60 * 60 # 24 hours in seconds - - # If there's no timestamp, message can't be verified as too old - if not self.message_unix_time: - logger.debug("No timestamp available, cannot determine message age") - return False - - # Convert the Unix timestamp to a datetime object - try: - msg_time = datetime.fromtimestamp(self.message_unix_time, tz=timezone.utc) - # Get the current time in UTC - current_time = datetime.now(timezone.utc) - # Calculate time difference in seconds - time_diff = (current_time - msg_time).total_seconds() - - # Log the message age for debugging - if time_diff < 60: - age_logging = f"{time_diff:.1f} seconds" - elif time_diff < 3600: - age_logging = f"{time_diff / 60:.1f} minutes" - elif time_diff < 86400: - age_logging = f"{time_diff / 3600:.1f} hours" - else: - age_logging = f"{time_diff / 86400:.1f} days" - - logger.debug(f"Message age: {age_logging}") - - # Return True if the message is older than the threshold - return time_diff > TOO_OLD_THRESHOLD - - except (ValueError, TypeError) as e: - logger.error(f"Error parsing message timestamp: {e}") - return False - - def present(self): - pass From 42ea14114c70623dbdc7383aa1e1d8309fc80957 Mon Sep 17 00:00:00 2001 From: OdyAsh Date: Sat, 18 Oct 2025 13:46:56 +0300 Subject: [PATCH 4/5] refactor: update WhatsApp API endpoints to v2 and reorganize router structure --- .env.example | 4 ++-- ...ync_await_backgroundtasks_logs_for_tracing.log | 2 +- ...pi_structure_of_a_request_sent_using_zrok.json | 6 +++--- src/ansari/app/main_api.py | 4 ++-- .../whatsapp_router.py} | 15 +++++++-------- 5 files changed, 15 insertions(+), 16 deletions(-) rename src/ansari/{app/whatsapp_api_router.py => routers/whatsapp_router.py} (96%) diff --git a/.env.example b/.env.example index 461fce4..d07824b 100644 --- a/.env.example +++ b/.env.example @@ -46,8 +46,8 @@ template_dir="." # Directory path for templates # (@21:33 and 25:30, however they use glitch instead of zrok, so the video here is just to give you an idea how to setup a webhook) # Source 4 (where you can change callback url, given that your facebook account gets access by the app's admins): # https://developers.facebook.com/apps/871020755148175/whatsapp-business/wa-settings/ -# NOTE 1: When you see the `Callback URL`, it will be something like "https://ZROK_SHARE_TOKEN.share.zrok.io/whatsapp/v1" -# (The `/whatsapp/v1` endpoint can be found in `main_whatsapp.py`'s endpoints, that's why it's in the url above) +# NOTE 1: When you see the `Callback URL`, it will be something like "https://ZROK_SHARE_TOKEN.share.zrok.io/whatsapp/v2" +# (The `/whatsapp/v2` endpoint can be found in `main_whatsapp.py`'s endpoints, that's why it's in the url above) # NOTE 2: If an unexpected 3rd party discovers the ZROK_SHARE_TOKEN, # a new one will have to be generated, then added to Meta's callback URL of the *testing* app # (Noting that the *production* app's callback URL will be different anyway, so the 3rd party won't be able to access that app) diff --git a/docs/fastapi/async_await_backgroundtasks_logs_for_tracing.log b/docs/fastapi/async_await_backgroundtasks_logs_for_tracing.log index 93746a9..fda5960 100644 --- a/docs/fastapi/async_await_backgroundtasks_logs_for_tracing.log +++ b/docs/fastapi/async_await_backgroundtasks_logs_for_tracing.log @@ -1,4 +1,4 @@ -(manually inserted log) ------------------------------------------------------------------------------ FastAPI receives a message from a whatsapp user on /whatsapp/v1 endpoint, so the event loop picks the `main_webhook()` to be executed now ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ FastAPI receives a message from a whatsapp user on /whatsapp/v2 endpoint, so the event loop picks the `main_webhook()` to be executed now ------------------------------------------------------------------------------ (manually inserted log) ------------------------------------------------------------------------------ Execution in `main_webhook()` starts ------------------------------------------------------------------------------ 2025-04-20 07:56:22 | DEBUG | ansari.app.main_whatsapp:main_webhook:112 | ! Before `background_tasks -> send_typing_indicator_then_start_loop` 2025-04-20 07:56:22 | DEBUG | ansari.app.main_whatsapp:main_webhook:112 | ! After `background_tasks -> send_typing_indicator_then_start_loop` diff --git a/docs/structure_of_api_responses/meta_whatsapp_api_structure_of_a_request_sent_using_zrok.json b/docs/structure_of_api_responses/meta_whatsapp_api_structure_of_a_request_sent_using_zrok.json index 5a7d1af..69e178f 100644 --- a/docs/structure_of_api_responses/meta_whatsapp_api_structure_of_a_request_sent_using_zrok.json +++ b/docs/structure_of_api_responses/meta_whatsapp_api_structure_of_a_request_sent_using_zrok.json @@ -11,8 +11,8 @@ "scheme": "https", "method": "POST", "root_path": "", - "path": "/whatsapp/v1", - "raw_path": "/whatsapp/v1", + "path": "/whatsapp/v2", + "raw_path": "/whatsapp/v2", "query_string": "", "headers": [ ["host", "YOUR_ZROK_SHARE_TOKEN.share.zrok.io"], @@ -41,7 +41,7 @@ "endpoint": "", "path_params": {}, "route": { - "path": "/whatsapp/v1", + "path": "/whatsapp/v2", "name": "main_webhook", "methods": ["POST"] } diff --git a/src/ansari/app/main_api.py b/src/ansari/app/main_api.py index a50b216..c9629b3 100644 --- a/src/ansari/app/main_api.py +++ b/src/ansari/app/main_api.py @@ -38,7 +38,7 @@ from ansari.agents.ansari_workflow import AnsariWorkflow from ansari.ansari_db import AnsariDB, MessageLogger, SourceType from ansari.ansari_logger import get_logger -from ansari.app.whatsapp_api_router import router as whatsapp_api_router +from ansari.routers.whatsapp_router import router as whatsapp_router from ansari.config import Settings, get_settings from ansari.presenters.api_presenter import ApiPresenter from ansari.util.general_helpers import CORSMiddlewareWithLogging, get_extended_origins, register_to_mailing_list @@ -89,7 +89,7 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) # Include the WhatsApp API router -app.include_router(whatsapp_api_router) +app.include_router(whatsapp_router) # Custom exception handler, which aims to log FastAPI-related exceptions before raising them diff --git a/src/ansari/app/whatsapp_api_router.py b/src/ansari/routers/whatsapp_router.py similarity index 96% rename from src/ansari/app/whatsapp_api_router.py rename to src/ansari/routers/whatsapp_router.py index 4884921..a83e795 100644 --- a/src/ansari/app/whatsapp_api_router.py +++ b/src/ansari/routers/whatsapp_router.py @@ -1,7 +1,7 @@ # WhatsApp API Router for ansari-backend """FastAPI router containing WhatsApp-specific API endpoints for the ansari-whatsapp microservice.""" -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel @@ -33,8 +33,7 @@ class WhatsAppMessageRequest(BaseModel): thread_id: str message: str - -@router.post("/api/v2/whatsapp/users/register") +@router.post("/whatsapp/v2/users/register") async def register_whatsapp_user(req: WhatsAppUserRegisterRequest): """Register a new WhatsApp user with the Ansari backend. @@ -65,7 +64,7 @@ async def register_whatsapp_user(req: WhatsAppUserRegisterRequest): raise HTTPException(status_code=400, detail=f"Registration failed: {str(e)}") -@router.get("/api/v2/whatsapp/users/exists") +@router.get("/whatsapp/v2/users/exists") async def check_whatsapp_user_exists(phone_num: str): """Check if a WhatsApp user exists in the Ansari backend. @@ -88,7 +87,7 @@ async def check_whatsapp_user_exists(phone_num: str): raise HTTPException(status_code=400, detail=f"User existence check failed: {str(e)}") -@router.post("/api/v2/whatsapp/threads") +@router.post("/whatsapp/v2/threads") async def create_whatsapp_thread(req: WhatsAppThreadRequest): """Create a new thread for a WhatsApp user in the Ansari backend. @@ -128,7 +127,7 @@ async def create_whatsapp_thread(req: WhatsAppThreadRequest): raise HTTPException(status_code=400, detail=f"Thread creation failed: {str(e)}") -@router.get("/api/v2/whatsapp/threads/last") +@router.get("/whatsapp/v2/threads/last") async def get_last_whatsapp_thread(phone_num: str): """Get information about the last active thread for a WhatsApp user. @@ -172,7 +171,7 @@ async def get_last_whatsapp_thread(phone_num: str): raise HTTPException(status_code=400, detail=f"Failed to get last thread info: {str(e)}") -@router.get("/api/v2/whatsapp/threads/{thread_id}/history") +@router.get("/whatsapp/v2/threads/{thread_id}/history") async def get_whatsapp_thread_history(thread_id: str, phone_num: str): """Get the message history for a WhatsApp user's thread from the Ansari backend. @@ -216,7 +215,7 @@ async def get_whatsapp_thread_history(thread_id: str, phone_num: str): raise HTTPException(status_code=400, detail=f"Failed to get thread history: {str(e)}") -@router.post("/api/v2/whatsapp/messages/process") +@router.post("/whatsapp/v2/messages/process") def process_whatsapp_message(req: WhatsAppMessageRequest) -> StreamingResponse: """Process a message from a WhatsApp user with streaming response. From d81930cd0acbba22129bf37c4484cba811f798eb Mon Sep 17 00:00:00 2001 From: OdyAsh Date: Sun, 2 Nov 2025 07:59:20 +0200 Subject: [PATCH 5/5] feat: add WhatsApp service API key configuration and verification for secure requests --- .env.example | 123 +++++++++--------------- .github/workflows/deploy-production.yml | 2 + .github/workflows/deploy-staging.yml | 2 + src/ansari/config.py | 3 + src/ansari/routers/whatsapp_router.py | 67 +++++++++++-- 5 files changed, 115 insertions(+), 82 deletions(-) diff --git a/.env.example b/.env.example index d07824b..9f46bd7 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,45 @@ +# Where ansari-backend is currently deployed DEPLOYMENT_TYPE="development" # Deployment type (development, staging, production) + +###################################### Related to ansari-frontend ###################################### + FRONTEND_URL="http://localhost:8081" +###################################### Related to ansari-whatsapp ###################################### + +# Shared API key for authenticating requests from ansari-whatsapp microservice +# References: +# - https://fastapi.tiangolo.com/tutorial/security/ +# - https://fastapi.tiangolo.com/advanced/security/http-basic-auth/ +# - https://www.python-httpx.org/advanced/#client-instances +# Security: Must match the WHATSAPP_SERVICE_API_KEY in ansari-whatsapp's .env file +# Used to verify X-Whatsapp-Api-Key header on /whatsapp/v2/* endpoints +WHATSAPP_SERVICE_API_KEY="your_generated_secret_key_here" + +###################################### Related to Ansari for Android/IOS ###################################### + +# iOS app build versions +IOS_MINIMUM_BUILD_VERSION="1" # Minimum build version required for iOS app +IOS_LATEST_BUILD_VERSION="1" # Latest available build version for iOS app + +# Android app build versions +ANDROID_MINIMUM_BUILD_VERSION="1" # Minimum build version required for Android app +ANDROID_LATEST_BUILD_VERSION="1" # Latest available build version for Android app + + +###################################### Related to CORS ###################################### + +# Origins to be allowed by the backend +ORIGINS="https://ansari.chat,https://www.ansari.chat,https://pre.ansari.chat" + +###################################### Related to the DB ###################################### + +# Database connection string +MONGO_URL="mongodb://localhost:27017" +MONGO_DB_NAME="ansari_db" + +###################################### Related to 3rd Party Services ###################################### + KALEMAT_API_KEY="" # Token for Qur'an and Hadith search ANTHROPIC_API_KEY="" # API key for Claude AI model OPENAI_API_KEY="" # Token for GPT-4 (Optional) @@ -12,76 +51,20 @@ MAILCHIMP_API_KEY="" # API key MAILCHIMP_SERVER_PREFIX="" # Server prefix (data center) MAILCHIMP_LIST_ID="" # List ID -# Database connection string -MONGO_URL="mongodb://localhost:27017" -MONGO_DB_NAME="ansari_db" - -SECRET_KEY="secret" # Secret key for signing tokens - -# Origins to be allowed by the backend -ORIGINS="https://ansari.chat,https://www.ansari.chat,https://pre.ansari.chat" - # Vectara search engine configuration VECTARA_API_KEY="" # Authentication token for Vectara API QURAN_DOT_COM_API_KEY="" # This is the API key we give to quran.com to access us, not for us to access them -# Directory for storing templates -template_dir="." # Directory path for templates - -# Related to WhatsApp Business and Meta (leave empty if you're not planning to use WhatsApp) -# Source 1: https://www.youtube.com/watch?v=KP6_BUw3i0U -# Watch Until 32:25, while quickly skimming through the non-python code parts -# Source 2 (mentioned in video above): https://glitch.com/edit/#!/insidious-tartan-alvarezsaurus -# (the `verification_webhook` endpoint in `main_whatsapp` is inspired by the above URL) -# Source 3 (optional): https://developers.facebook.com/blog/post/2022/10/24/sending-messages-with-whatsapp-in-your-python-applications/#u_0_39_8q - -# Moreover, if want to test whatsapp's webhook locally, you can use zrok on a reserved URL with a zrok "share token" -# obtained by contacting its current holder: https://github.com/OdyAsh (source 1, 2 below) -# Alternatively, you can change the webhook url all together (source 3, 4 below) -# Check these sources for more details: -# Source 1: https://dev.to/odyash/quickly-share-your-app-with-zrok-4ihp -# Source 2: https://openziti.discourse.group/t/how-do-i-use-a-reserved-share-on-different-devices/2379/2 -# Source 3: https://youtu.be/KP6_BUw3i0U?t=1294 -# (@21:33 and 25:30, however they use glitch instead of zrok, so the video here is just to give you an idea how to setup a webhook) -# Source 4 (where you can change callback url, given that your facebook account gets access by the app's admins): -# https://developers.facebook.com/apps/871020755148175/whatsapp-business/wa-settings/ -# NOTE 1: When you see the `Callback URL`, it will be something like "https://ZROK_SHARE_TOKEN.share.zrok.io/whatsapp/v2" -# (The `/whatsapp/v2` endpoint can be found in `main_whatsapp.py`'s endpoints, that's why it's in the url above) -# NOTE 2: If an unexpected 3rd party discovers the ZROK_SHARE_TOKEN, -# a new one will have to be generated, then added to Meta's callback URL of the *testing* app -# (Noting that the *production* app's callback URL will be different anyway, so the 3rd party won't be able to access that app) -# (but we still don't want random calls to be made to our testing app, so that's why we'll still have to change an exposed token :]) -# NOTE 3: Obviously, that `871...175` in the above URL is the testing app's public id, so if this link still doesn't work even after you gain access, -# then the admins most probably created a new test app instance - -WHATSAPP_API_VERSION="<>" - -# NOTE 1: Contact the team to see whatsapp's 2 phone nums -> one for prod. env. and the other for local/stage testing -# NOTE 2: If you encounter this error: -# "Client error '400 Bad Request' for url 'https://graph.facebook.com/v22.0//messages'" -# then either: -# (1) the got deleted by Meta due to inactive use for a long time. -# If this happens, then open the page in source 3 (i.e., youtube video URL above) @6:11. -# Then, under "step 1" in the middle of the page, you'll see a selection box with the live WhatsApp number; -# click on it, then click "Get new test number". Then, wait for a while, then you'll see the new number; -# Now copy its number id and paste it in -# (2) Meta updated its version and so now is outdated. -# You can verify this by opening source 3 @26:13 and checking the version associated with `messages` -# (3) you sent a message with an incorrect format. Check source 3; 26:10->26:28, click on `messages` to see how it should be sent -WHATSAPP_BUSINESS_PHONE_NUMBER_ID="<>" - -# NOTE 1: check video in source 3 above from 30:45 to 32:15 to see where we get the access token -# NOTE 2: Contact the team to see their 2 access tokens -> one for prod. env. and the other for local/stage testing -# NOTE 3: If you want to debug or check the validity of an access token, check this URL: -# https://developers.facebook.com/tools/debug/accesstoken/?access_token= -WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER="<" - -WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK="<>" -WHATSAPP_CHAT_RETENTION_HOURS=3 -ZROK_SHARE_TOKEN="<>" - -# Related to internal code logic +SENTRY_DSN="" # Sentry DSN for error tracking + +###################################### Related to Internal Code Logic ###################################### + +SECRET_KEY="secret" # Secret key for signing tokens + +# Directory path for storing templates +template_dir="." + # Leave the values below when locally debugging the application # In production, don't add them to environment variables, or add them as "INFO"/"False" respectively LOGGING_LEVEL="DEBUG" @@ -90,16 +73,6 @@ DEV_MODE="True" # Application version control settings MAINTENANCE_MODE="False" # Whether the application is in maintenance mode -# iOS app build versions -IOS_MINIMUM_BUILD_VERSION="1" # Minimum build version required for iOS app -IOS_LATEST_BUILD_VERSION="1" # Latest available build version for iOS app - -# Android app build versions -ANDROID_MINIMUM_BUILD_VERSION="1" # Minimum build version required for Android app -ANDROID_LATEST_BUILD_VERSION="1" # Latest available build version for Android app - -SENTRY_DSN="" # Sentry DSN for error tracking - # To get rid of .py[cod] files (This should key should NOT be set in production!) # This is only to de-clutter your local development environment # Details: https://docs.python-guide.org/writing/gotchas/#disabling-bytecode-pyc-files diff --git a/.github/workflows/deploy-production.yml b/.github/workflows/deploy-production.yml index 524f1f5..7e0a33f 100644 --- a/.github/workflows/deploy-production.yml +++ b/.github/workflows/deploy-production.yml @@ -65,6 +65,7 @@ jobs: USUL_API_TOKEN: ${{ format('{0}{1}', secrets.SSM_ROOT, 'usul-api-token') }} SENTRY_DSN: ${{ format('{0}{1}', secrets.SSM_ROOT, 'sentry-dsn') }} + WHATSAPP_SERVICE_API_KEY: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-service-api-key') }} WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-access-token-from-sys-user') }} WHATSAPP_BUSINESS_PHONE_NUMBER_ID: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-business-phone-number-id') }} WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-verify-token-for-webhook') }} @@ -109,6 +110,7 @@ jobs: USUL_API_TOKEN SENTRY_DSN + WHATSAPP_SERVICE_API_KEY WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER WHATSAPP_BUSINESS_PHONE_NUMBER_ID WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK diff --git a/.github/workflows/deploy-staging.yml b/.github/workflows/deploy-staging.yml index e8471b9..d028081 100644 --- a/.github/workflows/deploy-staging.yml +++ b/.github/workflows/deploy-staging.yml @@ -65,6 +65,7 @@ jobs: USUL_API_TOKEN: ${{ format('{0}{1}', secrets.SSM_ROOT, 'usul-api-token') }} SENTRY_DSN: ${{ format('{0}{1}', secrets.SSM_ROOT, 'sentry-dsn') }} + WHATSAPP_SERVICE_API_KEY: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-service-api-key') }} WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-access-token-from-sys-user') }} WHATSAPP_BUSINESS_PHONE_NUMBER_ID: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-business-phone-number-id') }} WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK: ${{ format('{0}{1}', secrets.SSM_ROOT, 'whatsapp-verify-token-for-webhook') }} @@ -109,6 +110,7 @@ jobs: USUL_API_TOKEN SENTRY_DSN + WHATSAPP_SERVICE_API_KEY WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER WHATSAPP_BUSINESS_PHONE_NUMBER_ID WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK diff --git a/src/ansari/config.py b/src/ansari/config.py index 034aadd..d91ce58 100644 --- a/src/ansari/config.py +++ b/src/ansari/config.py @@ -44,6 +44,9 @@ def get_resource_path(filename): FRONTEND_URL: str = Field(default="https://ansari.chat") SENTRY_DSN: HttpUrl | None = None + # Service-to-service authentication + WHATSAPP_SERVICE_API_KEY: SecretStr # Shared secret for authenticating ansari-whatsapp requests + DATABASE_URL: PostgresDsn = Field( default="postgresql://postgres:password@localhost:5432/ansari", ) diff --git a/src/ansari/routers/whatsapp_router.py b/src/ansari/routers/whatsapp_router.py index a83e795..7582131 100644 --- a/src/ansari/routers/whatsapp_router.py +++ b/src/ansari/routers/whatsapp_router.py @@ -1,12 +1,13 @@ # WhatsApp API Router for ansari-backend """FastAPI router containing WhatsApp-specific API endpoints for the ansari-whatsapp microservice.""" -from fastapi import APIRouter, HTTPException, Request +from fastapi import APIRouter, HTTPException, Request, Depends, Header from fastapi.responses import StreamingResponse from pydantic import BaseModel from ansari.ansari_db import SourceType, MessageLogger from ansari.ansari_logger import get_logger +from ansari.config import get_settings logger = get_logger(__name__) @@ -17,6 +18,39 @@ from ansari.app.main_api import db, presenter +# Dependency for verifying WhatsApp service API key +# References: +# - https://fastapi.tiangolo.com/tutorial/security/ +# - https://fastapi.tiangolo.com/advanced/security/http-basic-auth/ +async def verify_whatsapp_api_key(x_whatsapp_api_key: str = Header(...)) -> None: + """ + Verify that the request comes from the authorized ansari-whatsapp microservice. + + This dependency checks the X-Whatsapp-Api-Key header against the configured + shared secret to ensure requests are coming from our trusted WhatsApp service. + + Args: + x_whatsapp_api_key: The API key from the X-Whatsapp-Api-Key header + + Raises: + HTTPException: 401 Unauthorized if the API key is missing or invalid + + References: + - https://fastapi.tiangolo.com/tutorial/security/ + - https://fastapi.tiangolo.com/advanced/security/http-basic-auth/ + """ + settings = get_settings() + expected_key = settings.WHATSAPP_SERVICE_API_KEY.get_secret_value() + + if not x_whatsapp_api_key or x_whatsapp_api_key != expected_key: + logger.error("Invalid or missing X-Whatsapp-Api-Key header") + raise HTTPException( + status_code=401, + detail="Unauthorized: Invalid or missing API key", + headers={"WWW-Authenticate": "ApiKey"}, + ) + + # Pydantic models for WhatsApp API requests class WhatsAppUserRegisterRequest(BaseModel): phone_num: str @@ -34,7 +68,10 @@ class WhatsAppMessageRequest(BaseModel): message: str @router.post("/whatsapp/v2/users/register") -async def register_whatsapp_user(req: WhatsAppUserRegisterRequest): +async def register_whatsapp_user( + req: WhatsAppUserRegisterRequest, + _: None = Depends(verify_whatsapp_api_key) +): """Register a new WhatsApp user with the Ansari backend. Args: @@ -65,7 +102,10 @@ async def register_whatsapp_user(req: WhatsAppUserRegisterRequest): @router.get("/whatsapp/v2/users/exists") -async def check_whatsapp_user_exists(phone_num: str): +async def check_whatsapp_user_exists( + phone_num: str, + _: None = Depends(verify_whatsapp_api_key) +): """Check if a WhatsApp user exists in the Ansari backend. Args: @@ -88,7 +128,10 @@ async def check_whatsapp_user_exists(phone_num: str): @router.post("/whatsapp/v2/threads") -async def create_whatsapp_thread(req: WhatsAppThreadRequest): +async def create_whatsapp_thread( + req: WhatsAppThreadRequest, + _: None = Depends(verify_whatsapp_api_key) +): """Create a new thread for a WhatsApp user in the Ansari backend. Args: @@ -128,7 +171,10 @@ async def create_whatsapp_thread(req: WhatsAppThreadRequest): @router.get("/whatsapp/v2/threads/last") -async def get_last_whatsapp_thread(phone_num: str): +async def get_last_whatsapp_thread( + phone_num: str, + _: None = Depends(verify_whatsapp_api_key) +): """Get information about the last active thread for a WhatsApp user. Args: @@ -172,7 +218,11 @@ async def get_last_whatsapp_thread(phone_num: str): @router.get("/whatsapp/v2/threads/{thread_id}/history") -async def get_whatsapp_thread_history(thread_id: str, phone_num: str): +async def get_whatsapp_thread_history( + thread_id: str, + phone_num: str, + _: None = Depends(verify_whatsapp_api_key) +): """Get the message history for a WhatsApp user's thread from the Ansari backend. Args: @@ -216,7 +266,10 @@ async def get_whatsapp_thread_history(thread_id: str, phone_num: str): @router.post("/whatsapp/v2/messages/process") -def process_whatsapp_message(req: WhatsAppMessageRequest) -> StreamingResponse: +def process_whatsapp_message( + req: WhatsAppMessageRequest, + _: None = Depends(verify_whatsapp_api_key) +) -> StreamingResponse: """Process a message from a WhatsApp user with streaming response. Args: