From f9ede0731c480fcea62f83a6464d3a6c3cf0fdbe Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 13 Jun 2025 18:15:39 -0700 Subject: [PATCH 01/21] add intercom integration --- app/app.py | 228 +++++++++++++++++++++++++++++++++++++++++++ app/rag_system.py | 6 +- app/requirements.txt | 1 + compose.dev.yaml | 7 +- compose.yaml | 10 ++ 5 files changed, 248 insertions(+), 4 deletions(-) diff --git a/app/app.py b/app/app.py index 09b0117..fe14f2b 100644 --- a/app/app.py +++ b/app/app.py @@ -9,6 +9,20 @@ import sys import traceback +import requests +from html.parser import HTMLParser +from werkzeug.test import EnvironBuilder +from werkzeug.wrappers import Request +import json +import logging + +import redis + + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + analytics.write_key = os.getenv('SEGMENT_WRITE_KEY') app = Flask(__name__, static_folder='templates/static') @@ -18,6 +32,9 @@ csrf = CSRFProtect(app) +# Initialize Redis connection +r = redis.from_url(os.getenv('REDIS_URL'), decode_responses=True) + def validate_pow(nonce, data, difficulty): # Calculate the sha256 of the concatenated string of 32-bit X-Nonce header and raw body. # This calculation has to match the code on the client side, in index.html. @@ -144,5 +161,216 @@ def debug_context(): context = rag_system.get_context(query) return jsonify({"context": context}) + +# Endpoint to get the whole conversation thread in Intercom and send an LLM answer to user +@app.route('/intercom/conversations/', methods=['GET']) +@csrf.exempt +def get_intercom_conversation(conversation_id): + logger.info(f"Received request to get conversation {conversation_id}") + # id = "38091" + id = conversation_id + url = "https://api.intercom.io/conversations/" + id + token = os.getenv('INTERCOM_TOKEN') + if not token: + return jsonify({"error": "Intercom token not set"}), 500 + + headers = { + "Content-Type": "application/json", + "Intercom-Version": "2.13", + "Authorization": "Bearer " + token + } + + response = requests.get(url, headers=headers) + + # extract conversation parts into a simplified json format + def extract_conversation_parts(response): + data = response.json() + parts = data.get('conversation_parts', {}).get('conversation_parts', []) + extracted_parts = [] + for part in parts: + body = part.get('body', '') + if not body: + continue + author = part.get('author', {}) + created_at = part.get('created_at') + extracted_parts.append({'body': body, 'author': author, 'created_at': created_at}) + return extracted_parts + + result = extract_conversation_parts(response) + + logger.info(f"Extracted {len(result)} parts from conversation {conversation_id}") + + def extract_latest_user_messages(conversation_parts): + # Get the latest user entries in the conversation + # Find the index of the last non-user entry + last_non_user_idx = None + for idx in range(len(conversation_parts) - 1, -1, -1): + if conversation_parts[idx].get('author', {}).get('type') != 'user': + last_non_user_idx = idx + break + + # Collect user entries after the last non-user entry + if last_non_user_idx is not None: + last_user_entries = [ + part for part in conversation_parts[last_non_user_idx + 1 :] + if part.get('author', {}).get('type') == 'user' + ] + else: + # If there is no non-user entry, include all user entries + last_user_entries = [ + part for part in conversation_parts if part.get('author', {}).get('type') == 'user' + ] + + # If no user entries found, return None + if not last_user_entries: + return None + + # Only keep the 'body' field from each user entry + bodies = [part['body'] for part in last_user_entries if 'body' in part] + + # Parse and concatenate all user message bodies as plain text + parsed_bodies = [] + for html_body in bodies: + parsed_bodies.append(parse_html_to_text(html_body)) + + # Join all parsed user messages into a single string + joined_text = " ".join(parsed_bodies) + return joined_text + + joined_text = extract_latest_user_messages(result) + + # If no user entries found, return a 204 error + if not joined_text: + return jsonify({"info": "No entries made by user found."}), 204 + + logger.info(f"Joined user messages: {joined_text}") + + # Use the extracted user message as a query to the RAG system and stream the answer + def generate(): + full_response = "" + try: + for token in rag_system.answer_query_stream(joined_text): + yield token + full_response += token + except Exception as e: + print(f"Error in /ask endpoint: {e}", file=sys.stderr) + traceback.print_exc() + yield "Internal Server Error" + + if not full_response: + full_response = "No response generated" + + if analytics.write_key: + # Track the query and response + # Use a deterministic, non-reversible hash for anonymous_id for Intercom conversations + anon_hash = hashlib.sha256(f"intercom-{conversation_id}".encode()).hexdigest() + analytics.track( + anonymous_id=anon_hash, + event='Chatbot Question submitted', + properties={'query': joined_text, 'response': full_response, 'source': 'Intercom Conversation'} + ) + + llm_response = "".join([token for token in generate()]) + llm_response = llm_response + " 🤖" # Add a marker to indicate the end of the response + + logger.info(f"LLM response: {llm_response}") + + # Send reply through Intercom API + def post_intercom_reply(conversation_id, response_text): + url = f"https://api.intercom.io/conversations/{conversation_id}/reply" + token = os.getenv('INTERCOM_TOKEN') + if not token: + return jsonify({"error": "Intercom token not set"}), 500 + + headers = { + "Content-Type": "application/json", + "Authorization": "Bearer " + token + } + + payload = { + "message_type": "comment", + "type": "admin", + "admin_id": int(os.getenv('INTERCOM_ADMIN_ID')), + "body": response_text + } + + response = requests.post(url, json=payload, headers=headers) + logger.info(f"Posted reply to Intercom; response status code: {response.status_code}") + + return response.json(), response.status_code + + return post_intercom_reply(conversation_id, llm_response) + + +class BodyHTMLParser(HTMLParser): + def __init__(self): + super().__init__() + self.text = [] + + def handle_data(self, data): + self.text.append(data) + + def get_text(self): + return ''.join(self.text) + + +# Helper function to parse HTML into plain text +def parse_html_to_text(html_content): + parser = BodyHTMLParser() + parser.feed(html_content) + return parser.get_text() + + +# Store conversation ID in Redis +def set_conversation_human_replied(conversation_id): + try: + # Use a Redis set to avoid duplicates + r.sadd('admin_replied_conversations', conversation_id) + logger.info(f"Added conversation_id {conversation_id} to Redis set admin_replied_conversations") + except Exception as e: + logger.error(f"Error adding conversation_id to Redis: {e}") + + + +@app.route('/intercom-webhook', methods=['POST']) +@csrf.exempt +def handle_webhook(): + data = request.json + + logger.info(f"Received Intercom webhook: {data}") + conversation_id = data.get('data', {}).get('item', {}).get('id') + + # Check if conversation is already marked as human admin-replied + try: + if r.sismember('admin_replied_conversations', conversation_id): + logger.info(f"Conversation {conversation_id} already marked as human admin-replied. Skipping further processing.") + return 'OK' + except Exception as e: + logger.error(f"Error checking conversation_id in Redis: {e}") + + # Check for admin replied webhook + topic = data.get('topic') + logger.info(f"Webhook topic: {topic}") + if topic == 'conversation.admin.replied': + + # Check if the admin is a bot or human based on presence of a message marker (e.g., "🤖") + last_message = data.get('data', {}).get('item', {}).get('conversation_parts', {}).get('conversation_parts', [])[-1].get('body', '') + last_message_text = parse_html_to_text(last_message) + + logger.info(f"Parsed last message text: {last_message_text}") + if last_message_text and last_message_text.endswith("🤖"): + logger.info(f"Last message in conversation {conversation_id} ends with the marker 🤖") + logger.info(f"Detected bot admin reply in conversation {conversation_id}; skipping further processing.") + else: + logger.info(f"Detected human admin reply in conversation {conversation_id}; marking as human admin-replied...") + set_conversation_human_replied(conversation_id) + logger.info(f"Successfully marked conversation {conversation_id} as human admin-replied.") + return 'OK' + else: + logger.info(f"Conversation {conversation_id} is a user reply; fetching an answer from LLM...") + get_intercom_conversation(conversation_id) + return 'OK' + + if __name__ == '__main__': app.run(host='0.0.0.0', port=5050) diff --git a/app/rag_system.py b/app/rag_system.py index 16ea0a1..904f666 100644 --- a/app/rag_system.py +++ b/app/rag_system.py @@ -30,14 +30,14 @@ def embed_knowledge_base(self): def normalize_query(self, query): return query.lower().strip() - def get_query_embedding(self, query, use_cpu=False): + def get_query_embedding(self, query, use_cpu=True): normalized_query = self.normalize_query(query) query_embedding = self.model.encode([normalized_query], convert_to_tensor=True) if use_cpu: query_embedding = query_embedding.cpu() return query_embedding - def get_doc_embeddings(self, use_cpu=False): + def get_doc_embeddings(self, use_cpu=True): if use_cpu: return self.doc_embeddings.cpu() return self.doc_embeddings @@ -66,7 +66,7 @@ def compute_document_scores(self, query_embedding, doc_embeddings, high_match_th return result - def retrieve(self, query, similarity_threshold=0.4, high_match_threshold=0.8, max_docs=5, use_cpu=False): + def retrieve(self, query, similarity_threshold=0.4, high_match_threshold=0.8, max_docs=5, use_cpu=True): # Note: Set use_cpu=True to run on CPU, which is useful for testing or environments without a GPU. # Set use_cpu=False to leverage GPU for better performance in production. diff --git a/app/requirements.txt b/app/requirements.txt index c7c717d..3c53119 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -10,3 +10,4 @@ huggingface_hub==0.15.1 openai==0.28.0 PyYAML==6.0.2 GitPython==3.1.44 +redis==6.2.0 diff --git a/compose.dev.yaml b/compose.dev.yaml index 976169e..edd6a2b 100644 --- a/compose.dev.yaml +++ b/compose.dev.yaml @@ -20,6 +20,11 @@ services: target: /app command: flask run --host=0.0.0.0 --port=5050 + redis: + extends: + file: compose.yaml + service: redis + llm: extends: file: compose.yaml @@ -33,7 +38,7 @@ services: mode: ingress environment: - AWS_REGION=us-west-2 - - AWS_PROFILE=defang-lab + - AWS_PROFILE=defang-sandbox - PORT=5051 volumes: - type: bind diff --git a/compose.yaml b/compose.yaml index 380765a..7422c9c 100644 --- a/compose.yaml +++ b/compose.yaml @@ -22,6 +22,8 @@ services: OPENAI_API_KEY: ${OPENAI_API_KEY} # Set your OpenAI API key here or in the .env file OPENAI_BASE_URL: "http://llm/api/v1" MODEL: "anthropic.claude-3-haiku-20240307-v1:0" + INTERCOM_TOKEN: + REDIS_URL: redis://redis:6379/0 command: uwsgi --http 0.0.0.0:5050 --wsgi-file app.py --callable app --processes 4 --threads 2 deploy: resources: @@ -34,6 +36,14 @@ services: retries: 5 #start_period: 40s + redis: + image: redis:alpine + ports: + - target: 6379 + published: 6379 + protocol: tcp + mode: host + llm: image: defangio/openai-access-gateway x-defang-llm: true From 978ba0d21e5865acc123cafec7741852f8e83ccc Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 13 Jun 2025 18:22:27 -0700 Subject: [PATCH 02/21] revise comment --- app/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/app.py b/app/app.py index fe14f2b..7bb5d0e 100644 --- a/app/app.py +++ b/app/app.py @@ -167,7 +167,7 @@ def debug_context(): @csrf.exempt def get_intercom_conversation(conversation_id): logger.info(f"Received request to get conversation {conversation_id}") - # id = "38091" + id = conversation_id url = "https://api.intercom.io/conversations/" + id token = os.getenv('INTERCOM_TOKEN') From b670adbc18eaba3bcdcab6acb03511f91b3fcdd1 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 13 Jun 2025 18:39:41 -0700 Subject: [PATCH 03/21] add improved comments --- app/app.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/app/app.py b/app/app.py index 7bb5d0e..1c671f7 100644 --- a/app/app.py +++ b/app/app.py @@ -321,7 +321,7 @@ def parse_html_to_text(html_content): return parser.get_text() -# Store conversation ID in Redis +# Store conversation ID in persistent storage def set_conversation_human_replied(conversation_id): try: # Use a Redis set to avoid duplicates @@ -330,6 +330,13 @@ def set_conversation_human_replied(conversation_id): except Exception as e: logger.error(f"Error adding conversation_id to Redis: {e}") +# Check if a conversation is already marked as replied by a human admin +def is_conversation_admin_replied(conversation_id): + try: + return r.sismember('admin_replied_conversations', conversation_id) + except Exception as e: + logger.error(f"Error checking conversation_id in Redis: {e}") + return False @app.route('/intercom-webhook', methods=['POST']) @@ -340,34 +347,36 @@ def handle_webhook(): logger.info(f"Received Intercom webhook: {data}") conversation_id = data.get('data', {}).get('item', {}).get('id') - # Check if conversation is already marked as human admin-replied - try: - if r.sismember('admin_replied_conversations', conversation_id): - logger.info(f"Conversation {conversation_id} already marked as human admin-replied. Skipping further processing.") - return 'OK' - except Exception as e: - logger.error(f"Error checking conversation_id in Redis: {e}") + # Check if conversation is already marked as replied by a human admin and if so, skip LLM response + if is_conversation_admin_replied(conversation_id): + logger.info(f"Conversation {conversation_id} already marked as human admin-replied. Skipping further processing.") + return 'OK' - # Check for admin replied webhook + # Check for the type of the webhook event topic = data.get('topic') logger.info(f"Webhook topic: {topic}") if topic == 'conversation.admin.replied': - # Check if the admin is a bot or human based on presence of a message marker (e.g., "🤖") + # Check if the admin is a bot or human based on presence of a message marker (e.g., "🤖") in the last message last_message = data.get('data', {}).get('item', {}).get('conversation_parts', {}).get('conversation_parts', [])[-1].get('body', '') last_message_text = parse_html_to_text(last_message) logger.info(f"Parsed last message text: {last_message_text}") if last_message_text and last_message_text.endswith("🤖"): + # If the last message ends with the marker, it indicates a bot reply logger.info(f"Last message in conversation {conversation_id} ends with the marker 🤖") logger.info(f"Detected bot admin reply in conversation {conversation_id}; skipping further processing.") else: + # If the last message does not end with the marker, it indicates a human reply logger.info(f"Detected human admin reply in conversation {conversation_id}; marking as human admin-replied...") + # Mark the conversation as replied by a human admin to skip LLM responses in the future set_conversation_human_replied(conversation_id) logger.info(f"Successfully marked conversation {conversation_id} as human admin-replied.") return 'OK' else: + # In this case, the webhook event is a user reply, not an admin reply logger.info(f"Conversation {conversation_id} is a user reply; fetching an answer from LLM...") + # Fetch the conversation and generate an LLM answer for the user get_intercom_conversation(conversation_id) return 'OK' From 7bf2cd60799d317ccb114c2b6822619db0ee0809 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 13 Jun 2025 19:13:28 -0700 Subject: [PATCH 04/21] refactor functions out of get_intercom_conversations() --- app/app.py | 235 ++++++++++++++++++++++++++--------------------------- 1 file changed, 116 insertions(+), 119 deletions(-) diff --git a/app/app.py b/app/app.py index 1c671f7..bb0eeec 100644 --- a/app/app.py +++ b/app/app.py @@ -15,9 +15,19 @@ from werkzeug.wrappers import Request import json import logging - import redis +class BodyHTMLParser(HTMLParser): + def __init__(self): + super().__init__() + self.text = [] + + def handle_data(self, data): + self.text.append(data) + + def get_text(self): + return ''.join(self.text) + # Configure logging logging.basicConfig(level=logging.INFO) @@ -162,6 +172,106 @@ def debug_context(): return jsonify({"context": context}) +# Extract conversation parts into a simplified JSON format +def extract_conversation_parts(response): + data = response.json() + parts = data.get('conversation_parts', {}).get('conversation_parts', []) + extracted_parts = [] + for part in parts: + body = part.get('body', '') + if not body: + continue + author = part.get('author', {}) + created_at = part.get('created_at') + extracted_parts.append({'body': body, 'author': author, 'created_at': created_at}) + return extracted_parts + +# Get the latest user entries in the conversation starting from the last non-user (i.e. admin) entry +def extract_latest_user_messages(conversation_parts): + # Find the index of the last non-user entry + last_non_user_idx = None + for idx in range(len(conversation_parts) - 1, -1, -1): + if conversation_parts[idx].get('author', {}).get('type') != 'user': + last_non_user_idx = idx + break + + # Collect user entries after the last non-user entry + if last_non_user_idx is not None: + last_user_entries = [ + part for part in conversation_parts[last_non_user_idx + 1 :] + if part.get('author', {}).get('type') == 'user' + ] + else: + # If there is no non-user entry, include all user entries + last_user_entries = [ + part for part in conversation_parts if part.get('author', {}).get('type') == 'user' + ] + + # If no user entries found, return None + if not last_user_entries: + return None + + # Only keep the 'body' field from each user entry + bodies = [part['body'] for part in last_user_entries if 'body' in part] + + # Parse and concatenate all user message bodies as plain text + parsed_bodies = [] + for html_body in bodies: + parsed_bodies.append(parse_html_to_text(html_body)) + + # Join all parsed user messages into a single string + joined_text = " ".join(parsed_bodies) + return joined_text + +# Helper function to parse HTML into plain text +def parse_html_to_text(html_content): + parser = BodyHTMLParser() + parser.feed(html_content) + return parser.get_text() + +# Store conversation ID in persistent storage +def set_conversation_human_replied(conversation_id): + try: + # Use a Redis set to avoid duplicates + r.sadd('admin_replied_conversations', conversation_id) + logger.info(f"Added conversation_id {conversation_id} to Redis set admin_replied_conversations") + except Exception as e: + logger.error(f"Error adding conversation_id to Redis: {e}") + +# Check if a conversation is already marked as replied by a human admin +def is_conversation_admin_replied(conversation_id): + try: + return r.sismember('admin_replied_conversations', conversation_id) + except Exception as e: + logger.error(f"Error checking conversation_id in Redis: {e}") + return False + + +# Post a reply to a conversation through Intercom API +def post_intercom_reply(conversation_id, response_text): + url = f"https://api.intercom.io/conversations/{conversation_id}/reply" + token = os.getenv('INTERCOM_TOKEN') + if not token: + return jsonify({"error": "Intercom token not set"}), 500 + + headers = { + "Content-Type": "application/json", + "Authorization": "Bearer " + token + } + + payload = { + "message_type": "comment", + "type": "admin", + "admin_id": int(os.getenv('INTERCOM_ADMIN_ID')), + "body": response_text + } + + response = requests.post(url, json=payload, headers=headers) + logger.info(f"Posted reply to Intercom; response status code: {response.status_code}") + + return response.json(), response.status_code + + # Endpoint to get the whole conversation thread in Intercom and send an LLM answer to user @app.route('/intercom/conversations/', methods=['GET']) @csrf.exempt @@ -175,71 +285,19 @@ def get_intercom_conversation(conversation_id): return jsonify({"error": "Intercom token not set"}), 500 headers = { - "Content-Type": "application/json", - "Intercom-Version": "2.13", - "Authorization": "Bearer " + token + "Content-Type": "application/json", + "Intercom-Version": "2.13", + "Authorization": "Bearer " + token } response = requests.get(url, headers=headers) - # extract conversation parts into a simplified json format - def extract_conversation_parts(response): - data = response.json() - parts = data.get('conversation_parts', {}).get('conversation_parts', []) - extracted_parts = [] - for part in parts: - body = part.get('body', '') - if not body: - continue - author = part.get('author', {}) - created_at = part.get('created_at') - extracted_parts.append({'body': body, 'author': author, 'created_at': created_at}) - return extracted_parts - + # Extract conversation parts from an Intercom request response result = extract_conversation_parts(response) - logger.info(f"Extracted {len(result)} parts from conversation {conversation_id}") - def extract_latest_user_messages(conversation_parts): - # Get the latest user entries in the conversation - # Find the index of the last non-user entry - last_non_user_idx = None - for idx in range(len(conversation_parts) - 1, -1, -1): - if conversation_parts[idx].get('author', {}).get('type') != 'user': - last_non_user_idx = idx - break - - # Collect user entries after the last non-user entry - if last_non_user_idx is not None: - last_user_entries = [ - part for part in conversation_parts[last_non_user_idx + 1 :] - if part.get('author', {}).get('type') == 'user' - ] - else: - # If there is no non-user entry, include all user entries - last_user_entries = [ - part for part in conversation_parts if part.get('author', {}).get('type') == 'user' - ] - - # If no user entries found, return None - if not last_user_entries: - return None - - # Only keep the 'body' field from each user entry - bodies = [part['body'] for part in last_user_entries if 'body' in part] - - # Parse and concatenate all user message bodies as plain text - parsed_bodies = [] - for html_body in bodies: - parsed_bodies.append(parse_html_to_text(html_body)) - - # Join all parsed user messages into a single string - joined_text = " ".join(parsed_bodies) - return joined_text - + # Get and join the latest user messages from the conversation parts joined_text = extract_latest_user_messages(result) - - # If no user entries found, return a 204 error if not joined_text: return jsonify({"info": "No entries made by user found."}), 204 @@ -275,70 +333,9 @@ def generate(): logger.info(f"LLM response: {llm_response}") - # Send reply through Intercom API - def post_intercom_reply(conversation_id, response_text): - url = f"https://api.intercom.io/conversations/{conversation_id}/reply" - token = os.getenv('INTERCOM_TOKEN') - if not token: - return jsonify({"error": "Intercom token not set"}), 500 - - headers = { - "Content-Type": "application/json", - "Authorization": "Bearer " + token - } - - payload = { - "message_type": "comment", - "type": "admin", - "admin_id": int(os.getenv('INTERCOM_ADMIN_ID')), - "body": response_text - } - - response = requests.post(url, json=payload, headers=headers) - logger.info(f"Posted reply to Intercom; response status code: {response.status_code}") - - return response.json(), response.status_code - return post_intercom_reply(conversation_id, llm_response) -class BodyHTMLParser(HTMLParser): - def __init__(self): - super().__init__() - self.text = [] - - def handle_data(self, data): - self.text.append(data) - - def get_text(self): - return ''.join(self.text) - - -# Helper function to parse HTML into plain text -def parse_html_to_text(html_content): - parser = BodyHTMLParser() - parser.feed(html_content) - return parser.get_text() - - -# Store conversation ID in persistent storage -def set_conversation_human_replied(conversation_id): - try: - # Use a Redis set to avoid duplicates - r.sadd('admin_replied_conversations', conversation_id) - logger.info(f"Added conversation_id {conversation_id} to Redis set admin_replied_conversations") - except Exception as e: - logger.error(f"Error adding conversation_id to Redis: {e}") - -# Check if a conversation is already marked as replied by a human admin -def is_conversation_admin_replied(conversation_id): - try: - return r.sismember('admin_replied_conversations', conversation_id) - except Exception as e: - logger.error(f"Error checking conversation_id in Redis: {e}") - return False - - @app.route('/intercom-webhook', methods=['POST']) @csrf.exempt def handle_webhook(): From 96cb8f1a80aca2a1ddd3ba64a61702bf63ffd841 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 13 Jun 2025 19:16:44 -0700 Subject: [PATCH 05/21] add more detail to comments --- app/app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/app.py b/app/app.py index bb0eeec..c658be9 100644 --- a/app/app.py +++ b/app/app.py @@ -272,7 +272,7 @@ def post_intercom_reply(conversation_id, response_text): return response.json(), response.status_code -# Endpoint to get the whole conversation thread in Intercom and send an LLM answer to user +# Endpoint to get a whole conversation thread in Intercom and send an LLM answer to user @app.route('/intercom/conversations/', methods=['GET']) @csrf.exempt def get_intercom_conversation(conversation_id): @@ -335,7 +335,7 @@ def generate(): return post_intercom_reply(conversation_id, llm_response) - +# Endpoint to handle incoming webhooks from Intercom @app.route('/intercom-webhook', methods=['POST']) @csrf.exempt def handle_webhook(): From a037baee6b20103246e67f4cc4a598fc6d9833c2 Mon Sep 17 00:00:00 2001 From: commit111 Date: Tue, 17 Jun 2025 15:21:25 -0700 Subject: [PATCH 06/21] add a ttl for redis entries --- app/app.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/app/app.py b/app/app.py index c658be9..7ea7b86 100644 --- a/app/app.py +++ b/app/app.py @@ -233,15 +233,15 @@ def parse_html_to_text(html_content): def set_conversation_human_replied(conversation_id): try: # Use a Redis set to avoid duplicates - r.sadd('admin_replied_conversations', conversation_id) + r.set(conversation_id, '1', ex=60*60*24) # Set TTL expiration to 1 day logger.info(f"Added conversation_id {conversation_id} to Redis set admin_replied_conversations") except Exception as e: logger.error(f"Error adding conversation_id to Redis: {e}") # Check if a conversation is already marked as replied by a human admin -def is_conversation_admin_replied(conversation_id): +def is_conversation_human_replied(conversation_id): try: - return r.sismember('admin_replied_conversations', conversation_id) + return r.exists(conversation_id) except Exception as e: logger.error(f"Error checking conversation_id in Redis: {e}") return False @@ -344,11 +344,6 @@ def handle_webhook(): logger.info(f"Received Intercom webhook: {data}") conversation_id = data.get('data', {}).get('item', {}).get('id') - # Check if conversation is already marked as replied by a human admin and if so, skip LLM response - if is_conversation_admin_replied(conversation_id): - logger.info(f"Conversation {conversation_id} already marked as human admin-replied. Skipping further processing.") - return 'OK' - # Check for the type of the webhook event topic = data.get('topic') logger.info(f"Webhook topic: {topic}") @@ -362,7 +357,7 @@ def handle_webhook(): if last_message_text and last_message_text.endswith("🤖"): # If the last message ends with the marker, it indicates a bot reply logger.info(f"Last message in conversation {conversation_id} ends with the marker 🤖") - logger.info(f"Detected bot admin reply in conversation {conversation_id}; skipping further processing.") + logger.info(f"Detected bot admin reply in conversation {conversation_id}; no action taken.") else: # If the last message does not end with the marker, it indicates a human reply logger.info(f"Detected human admin reply in conversation {conversation_id}; marking as human admin-replied...") @@ -370,11 +365,17 @@ def handle_webhook(): set_conversation_human_replied(conversation_id) logger.info(f"Successfully marked conversation {conversation_id} as human admin-replied.") return 'OK' - else: + elif topic == 'conversation.user.replied': # In this case, the webhook event is a user reply, not an admin reply - logger.info(f"Conversation {conversation_id} is a user reply; fetching an answer from LLM...") + # Check if the conversation was replied previously by a human admin + if is_conversation_human_replied(conversation_id): + logger.info(f"Conversation {conversation_id} already marked as human admin-replied; no action taken.") + return 'OK' # Fetch the conversation and generate an LLM answer for the user + logger.info(f"Detected a user reply in conversation {conversation_id}; fetching an answer from LLM...") get_intercom_conversation(conversation_id) + else: + logger.info(f"Received webhook for unsupported topic: {topic}; no action taken.") return 'OK' From 64a675acafc2249421ced4f81ed2494804a19bfc Mon Sep 17 00:00:00 2001 From: commit111 Date: Thu, 19 Jun 2025 15:30:53 -0700 Subject: [PATCH 07/21] add tests --- app/app.py | 3 +- app/requirements.txt | 2 + app/test_app.py | 150 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 app/test_app.py diff --git a/app/app.py b/app/app.py index 7ea7b86..8b49a16 100644 --- a/app/app.py +++ b/app/app.py @@ -16,7 +16,8 @@ import json import logging import redis - +from dotenv import load_dotenv +load_dotenv() class BodyHTMLParser(HTMLParser): def __init__(self): super().__init__() diff --git a/app/requirements.txt b/app/requirements.txt index 3c53119..1838b83 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -11,3 +11,5 @@ openai==0.28.0 PyYAML==6.0.2 GitPython==3.1.44 redis==6.2.0 +python-dotenv===1.1.0 +fakeredis==2.30.0 diff --git a/app/test_app.py b/app/test_app.py new file mode 100644 index 0000000..3731a49 --- /dev/null +++ b/app/test_app.py @@ -0,0 +1,150 @@ +import time +import unittest +from unittest.mock import patch, Mock +import fakeredis + +# Apply patch to use a fake redis for testing before importing app +redis_mock = fakeredis.FakeStrictRedis(decode_responses=True) +patch('redis.from_url', return_value=redis_mock).start() + +# Now import app, after the patch is applied +import app + +class TestApp(unittest.TestCase): + + @classmethod + def setUpClass(cls): + # Initialize the app or any required resources + cls.app = app + # Replace the Redis client in app with our mock + app.r = redis_mock + print("Successfully set up App class for testing!") + + def test_parse_html_to_text(self): + self.assertEqual(self.app.parse_html_to_text("

Hello, world!

"), "Hello, world!") + print("test_parse_html_to_text passed successfully.") + + def test_parse_html_to_text_multiple_tags(self): + html = "
Hello world!
How are you?
" + result = self.app.parse_html_to_text(html) + self.assertEqual(result, "Hello world!How are you?") + print("test_parse_html_to_text_multiple_tags passed successfully.") + + def test_extract_conversation_parts(self): + # Simulate a response object with .json() method + mock_response = Mock() + mock_response.json.return_value = { + 'conversation_parts': { + 'conversation_parts': [ + { + 'body': '

User message

', + 'author': {'type': 'user'}, + 'created_at': 123, + 'extra_field': 'foo' + }, + { + 'body': '

Admin reply

', + 'author': {'type': 'admin'}, + 'created_at': 124, + 'extra_field': 'bar' + }, + { + 'body': '', + 'author': {'type': 'user'}, + 'created_at': 125, + 'extra_field': 'baz' + } + ] + } + } + result = self.app.extract_conversation_parts(mock_response) + expected = [ + {'body': '

User message

', 'author': {'type': 'user'}, 'created_at': 123}, + {'body': '

Admin reply

', 'author': {'type': 'admin'}, 'created_at': 124} + ] + self.assertEqual(result, expected) + print("test_extract_conversation_parts passed successfully.") + + def test_extract_latest_user_messages_empty(self): + conversation_parts = [] + result = self.app.extract_latest_user_messages(conversation_parts) + self.assertIsNone(result) + print("test_extract_latest_user_messages_empty passed successfully.") + + def test_extract_latest_user_messages(self): + # Simulate conversation parts with user and admin messages + conversation_parts = [ + {'body': '

Admin message

', 'author': {'type': 'admin'}, 'created_at': 1}, + {'body': '

User message 1

', 'author': {'type': 'user'}, 'created_at': 2}, + {'body': '

User message 2

', 'author': {'type': 'user'}, 'created_at': 3}, + ] + result = self.app.extract_latest_user_messages(conversation_parts) + self.assertEqual(result, "User message 1 User message 2") + print("test_extract_latest_user_messages passed successfully.") + + def test_extract_latest_user_messages_no_user(self): + conversation_parts = [ + {'body': '

Admin message

', 'author': {'type': 'admin'}, 'created_at': 1} + ] + result = self.app.extract_latest_user_messages(conversation_parts) + self.assertIsNone(result) + print("test_extract_latest_user_messages_no_user passed successfully.") + + def test_extract_latest_user_messages_all_users(self): + conversation_parts = [ + {'body': '

User message 1

', 'author': {'type': 'user'}, 'created_at': 1}, + {'body': '

User message 2

', 'author': {'type': 'user'}, 'created_at': 2}, + ] + result = self.app.extract_latest_user_messages(conversation_parts) + self.assertEqual(result, "User message 1 User message 2") + print("test_extract_latest_user_messages_all_users passed successfully.") + + def test_extract_latest_user_messages_after_admin(self): + conversation_parts = [ + {'body': '

Admin message

', 'author': {'type': 'admin'}, 'created_at': 1}, + {'body': '

User message 1

', 'author': {'type': 'user'}, 'created_at': 2}, + {'body': '

User message 2

', 'author': {'type': 'user'}, 'created_at': 3}, + {'body': '

Admin message 2

', 'author': {'type': 'admin'}, 'created_at': 4}, + {'body': '

User message 3

', 'author': {'type': 'user'}, 'created_at': 5}, + ] + result = self.app.extract_latest_user_messages(conversation_parts) + self.assertEqual(result, "User message 3") + print("test_extract_latest_user_messages_after_admin passed successfully.") + + def test_is_conversation_human_replied_check_false(self): + conversation_id = "test_convo_id_1234" + self.app.r.delete(conversation_id) + result = self.app.is_conversation_human_replied(conversation_id) + self.assertFalse(result) + print("test_is_conversation_human_replied_check_false passed successfully.") + + def test_set_conversation_human_replied_and_check_true(self): + conversation_id = "test_convo_id_1235" + self.app.r.delete(conversation_id) + self.app.set_conversation_human_replied(conversation_id) + result = self.app.is_conversation_human_replied(conversation_id) + self.assertTrue(result) + self.app.r.delete(conversation_id) + print("test_set_conversation_human_replied_and_check_true passed successfully.") + + def test_set_conversation_human_replied_ttl_exists(self): + conversation_id = "test_convo_id_ttl" + self.app.r.delete(conversation_id) + self.app.set_conversation_human_replied(conversation_id) + ttl = self.app.r.ttl(conversation_id) + self.assertGreater(ttl, 0) + self.app.r.delete(conversation_id) + print("test_set_conversation_human_replied_ttl_exists passed successfully.") + + def test_set_redis_ttl_expiry(self): + conversation_id = "test_convo_id_expiry" + self.app.r.delete(conversation_id) + # Set with a short TTL for test + self.app.r.set(conversation_id, '1', ex=1) + time.sleep(2) + exists = self.app.r.exists(conversation_id) + self.assertFalse(exists) + print("test_set_redis_ttl_expiry passed successfully.") + +if __name__ == '__main__': + unittest.main() From e797d59b287508d4714a8cc21cf6bb12af479a0d Mon Sep 17 00:00:00 2001 From: commit111 Date: Thu, 19 Jun 2025 15:47:55 -0700 Subject: [PATCH 08/21] refactor generate --- app/app.py | 88 ++++++++++++++++++++++-------------------------------- 1 file changed, 36 insertions(+), 52 deletions(-) diff --git a/app/app.py b/app/app.py index 8b49a16..a2858ba 100644 --- a/app/app.py +++ b/app/app.py @@ -54,6 +54,31 @@ def validate_pow(nonce, data, difficulty): first_uint32 = int.from_bytes(calculated_hash[:4], byteorder='big') return first_uint32 <= difficulty +# Shared function to generate response stream from RAG system +def generate(query, source, anonymous_id): + full_response = "" + try: + for token in rag_system.answer_query_stream(query): + yield token + full_response += token + except Exception as e: + print(f"Error in RAG system: {e}", file=sys.stderr) + traceback.print_exc() + yield "Internal Server Error" + + if not full_response: + full_response = "No response generated" + + if analytics.write_key: + # Track the query and response + analytics.track( + anonymous_id=anonymous_id, + event='Chatbot Question submitted', + properties={'query': query, 'response': full_response, 'source': source} + ) + + return full_response + def handle_ask_request(request, session): data = request.get_json() query = data.get('query') @@ -65,33 +90,13 @@ def handle_ask_request(request, session): if 'anonymous_id' not in session: session['anonymous_id'] = str(uuid.uuid4()) anonymous_id = session['anonymous_id'] + + # Determine the source based on the user agent + user_agent = request.headers.get('User-Agent', '') + source = 'Ask Defang Discord Bot' if 'Discord Bot' in user_agent else 'Ask Defang Website' - def generate(): - full_response = "" - try: - for token in rag_system.answer_query_stream(query): - yield token - full_response += token - except Exception as e: - print(f"Error in /ask endpoint: {e}", file=sys.stderr) - traceback.print_exc() - yield "Internal Server Error" - - if not full_response: - full_response = "No response generated" - - if analytics.write_key: - # Determine the source based on the user agent - user_agent = request.headers.get('User-Agent', '') - source = 'Ask Defang Discord Bot' if 'Discord Bot' in user_agent else 'Ask Defang Website' - # Track the query and response - analytics.track( - anonymous_id=anonymous_id, - event='Chatbot Question submitted', - properties={'query': query, 'response': full_response, 'source': source} - ) - - return Response(stream_with_context(generate()), content_type='text/markdown') + # Use the shared generate function directly + return Response(stream_with_context(generate(query, source, anonymous_id)), content_type='text/markdown') @app.route('/', methods=['GET', 'POST']) def index(): @@ -304,32 +309,11 @@ def get_intercom_conversation(conversation_id): logger.info(f"Joined user messages: {joined_text}") - # Use the extracted user message as a query to the RAG system and stream the answer - def generate(): - full_response = "" - try: - for token in rag_system.answer_query_stream(joined_text): - yield token - full_response += token - except Exception as e: - print(f"Error in /ask endpoint: {e}", file=sys.stderr) - traceback.print_exc() - yield "Internal Server Error" - - if not full_response: - full_response = "No response generated" - - if analytics.write_key: - # Track the query and response - # Use a deterministic, non-reversible hash for anonymous_id for Intercom conversations - anon_hash = hashlib.sha256(f"intercom-{conversation_id}".encode()).hexdigest() - analytics.track( - anonymous_id=anon_hash, - event='Chatbot Question submitted', - properties={'query': joined_text, 'response': full_response, 'source': 'Intercom Conversation'} - ) - - llm_response = "".join([token for token in generate()]) + # Use a deterministic, non-reversible hash for anonymous_id for Intercom conversations + anon_hash = hashlib.sha256(f"intercom-{conversation_id}".encode()).hexdigest() + + # Use the shared generate function + llm_response = "".join(generate(joined_text, 'Intercom Conversation', anon_hash)) llm_response = llm_response + " 🤖" # Add a marker to indicate the end of the response logger.info(f"LLM response: {llm_response}") From ca36765d09930b684da1caacd72a66a02d2e61e0 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 20 Jun 2025 16:39:06 -0700 Subject: [PATCH 09/21] add intercom token to github workflow file --- .github/workflows/deploy.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index c661304..da3d348 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -52,7 +52,7 @@ jobs: - name: Deploy uses: DefangLabs/defang-github-action@v1.1.3 with: - config-env-vars: ASK_TOKEN OPENAI_API_KEY REBUILD_TOKEN SECRET_KEY SEGMENT_WRITE_KEY DISCORD_APP_ID DISCORD_TOKEN DISCORD_PUBLIC_KEY + config-env-vars: ASK_TOKEN OPENAI_API_KEY REBUILD_TOKEN SECRET_KEY SEGMENT_WRITE_KEY DISCORD_APP_ID DISCORD_TOKEN DISCORD_PUBLIC_KEY INTERCOM_TOKEN mode: production provider: aws @@ -65,3 +65,4 @@ jobs: DISCORD_APP_ID: ${{ secrets.DISCORD_APP_ID }} DISCORD_TOKEN: ${{ secrets.DISCORD_TOKEN }} DISCORD_PUBLIC_KEY: ${{ secrets.DISCORD_PUBLIC_KEY }} + INTERCOM_TOKEN: ${{ secrets.INTERCOM_TOKEN }} From 2375b1822cb3e541552c920aec0d2da4655a6435 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 20 Jun 2025 17:04:16 -0700 Subject: [PATCH 10/21] add intercom admin id to secret env vars --- .github/workflows/deploy.yaml | 3 ++- compose.yaml | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index da3d348..ef9e421 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -52,7 +52,7 @@ jobs: - name: Deploy uses: DefangLabs/defang-github-action@v1.1.3 with: - config-env-vars: ASK_TOKEN OPENAI_API_KEY REBUILD_TOKEN SECRET_KEY SEGMENT_WRITE_KEY DISCORD_APP_ID DISCORD_TOKEN DISCORD_PUBLIC_KEY INTERCOM_TOKEN + config-env-vars: ASK_TOKEN OPENAI_API_KEY REBUILD_TOKEN SECRET_KEY SEGMENT_WRITE_KEY DISCORD_APP_ID DISCORD_TOKEN DISCORD_PUBLIC_KEY INTERCOM_TOKEN INTERCOM_ADMIN_ID mode: production provider: aws @@ -66,3 +66,4 @@ jobs: DISCORD_TOKEN: ${{ secrets.DISCORD_TOKEN }} DISCORD_PUBLIC_KEY: ${{ secrets.DISCORD_PUBLIC_KEY }} INTERCOM_TOKEN: ${{ secrets.INTERCOM_TOKEN }} + INTERCOM_ADMIN_ID: ${{ secrets.INTERCOM_ADMIN_ID }} diff --git a/compose.yaml b/compose.yaml index 7422c9c..d552cda 100644 --- a/compose.yaml +++ b/compose.yaml @@ -23,6 +23,7 @@ services: OPENAI_BASE_URL: "http://llm/api/v1" MODEL: "anthropic.claude-3-haiku-20240307-v1:0" INTERCOM_TOKEN: + INTERCOM_ADMIN_ID: REDIS_URL: redis://redis:6379/0 command: uwsgi --http 0.0.0.0:5050 --wsgi-file app.py --callable app --processes 4 --threads 2 deploy: From 40cdb96b8e86ce363d038904f13094c14f7468d1 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 20 Jun 2025 17:10:12 -0700 Subject: [PATCH 11/21] remove extraneous endpoint --- app/app.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/app/app.py b/app/app.py index a2858ba..6fea545 100644 --- a/app/app.py +++ b/app/app.py @@ -278,9 +278,7 @@ def post_intercom_reply(conversation_id, response_text): return response.json(), response.status_code -# Endpoint to get a whole conversation thread in Intercom and send an LLM answer to user -@app.route('/intercom/conversations/', methods=['GET']) -@csrf.exempt +# Retrieves a whole conversation thread in Intercom and returns an LLM answer to the user def get_intercom_conversation(conversation_id): logger.info(f"Received request to get conversation {conversation_id}") From f47cc13f3df1d04581fd07be9566526141a9ab22 Mon Sep 17 00:00:00 2001 From: commit111 Date: Mon, 23 Jun 2025 16:08:59 -0700 Subject: [PATCH 12/21] run test file in dockerfile --- app/Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/Dockerfile b/app/Dockerfile index 01ba27a..5077b73 100644 --- a/app/Dockerfile +++ b/app/Dockerfile @@ -42,6 +42,9 @@ RUN chmod +x .tmp/prebuild.sh # Expose port 5050 for the Flask application EXPOSE 5050 +# Run test file +RUN python test_app.py + # Define environment variable for Flask ENV FLASK_APP=app.py From 2125d06062f6368cf1c02ae681613d42eb6c77a5 Mon Sep 17 00:00:00 2001 From: commit111 Date: Mon, 23 Jun 2025 16:18:19 -0700 Subject: [PATCH 13/21] remove loadenv and edit requirements --- app/app.py | 3 +-- app/requirements.txt | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/app/app.py b/app/app.py index 6fea545..2b7bf47 100644 --- a/app/app.py +++ b/app/app.py @@ -16,8 +16,7 @@ import json import logging import redis -from dotenv import load_dotenv -load_dotenv() + class BodyHTMLParser(HTMLParser): def __init__(self): super().__init__() diff --git a/app/requirements.txt b/app/requirements.txt index 1838b83..69d8aea 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -11,5 +11,4 @@ openai==0.28.0 PyYAML==6.0.2 GitPython==3.1.44 redis==6.2.0 -python-dotenv===1.1.0 -fakeredis==2.30.0 +fakeredis==2.30.1 From c0767bcb33fc175f655b1946f5317afcd64629c0 Mon Sep 17 00:00:00 2001 From: commit111 Date: Mon, 23 Jun 2025 17:23:39 -0700 Subject: [PATCH 14/21] refactor get_intercom_conversations --- app/app.py | 73 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/app/app.py b/app/app.py index 2b7bf47..f60634f 100644 --- a/app/app.py +++ b/app/app.py @@ -177,6 +177,39 @@ def debug_context(): return jsonify({"context": context}) +# Retrieve a conversation from Intercom API by its ID +def fetch_intercom_conversation(conversation_id): + id = conversation_id + url = "https://api.intercom.io/conversations/" + id + token = os.getenv('INTERCOM_TOKEN') + if not token: + return jsonify({"error": "Intercom token not set"}), 500 + + headers = { + "Content-Type": "application/json", + "Intercom-Version": "2.13", + "Authorization": "Bearer " + token + } + + response = requests.get(url, headers=headers) + if response.status_code != 200: + logger.error(f"Failed to fetch conversation {id} from Intercom; status code: {response.status_code}, response: {response.text}") + return jsonify({"error": "Failed to fetch conversation from Intercom"}), response.status_code + + return response + +# Determines the user query from the Intercom conversation response +def get_user_query(response, conversation_id): + # Extract conversation parts from an Intercom request response + result = extract_conversation_parts(response) + logger.info(f"Extracted {len(result)} parts from conversation {conversation_id}") + + # Get and join the latest user messages from the conversation parts + joined_text = extract_latest_user_messages(result) + if not joined_text: + return jsonify({"info": "No entries made by user found."}), 204 + return joined_text + # Extract conversation parts into a simplified JSON format def extract_conversation_parts(response): data = response.json() @@ -191,7 +224,7 @@ def extract_conversation_parts(response): extracted_parts.append({'body': body, 'author': author, 'created_at': created_at}) return extracted_parts -# Get the latest user entries in the conversation starting from the last non-user (i.e. admin) entry +# Joins the latest user entries in the conversation starting from the last non-user (i.e. admin) entry def extract_latest_user_messages(conversation_parts): # Find the index of the last non-user entry last_non_user_idx = None @@ -277,40 +310,22 @@ def post_intercom_reply(conversation_id, response_text): return response.json(), response.status_code -# Retrieves a whole conversation thread in Intercom and returns an LLM answer to the user -def get_intercom_conversation(conversation_id): +# Returns a generated LLM answer to the Intercom conversation based on previous user message history +def answer_intercom_conversation(conversation_id): logger.info(f"Received request to get conversation {conversation_id}") - id = conversation_id - url = "https://api.intercom.io/conversations/" + id - token = os.getenv('INTERCOM_TOKEN') - if not token: - return jsonify({"error": "Intercom token not set"}), 500 - - headers = { - "Content-Type": "application/json", - "Intercom-Version": "2.13", - "Authorization": "Bearer " + token - } - - response = requests.get(url, headers=headers) - - # Extract conversation parts from an Intercom request response - result = extract_conversation_parts(response) - logger.info(f"Extracted {len(result)} parts from conversation {conversation_id}") - - # Get and join the latest user messages from the conversation parts - joined_text = extract_latest_user_messages(result) - if not joined_text: - return jsonify({"info": "No entries made by user found."}), 204 + # Retrieves the history of the conversation thread in Intercom + conversation= fetch_intercom_conversation(conversation_id) - logger.info(f"Joined user messages: {joined_text}") + # Extracts the user query (which are latest user messages joined into a single string) from conversation history + user_query = get_user_query(conversation, conversation_id) + logger.info(f"Joined user messages: {user_query}") # Use a deterministic, non-reversible hash for anonymous_id for Intercom conversations anon_hash = hashlib.sha256(f"intercom-{conversation_id}".encode()).hexdigest() - # Use the shared generate function - llm_response = "".join(generate(joined_text, 'Intercom Conversation', anon_hash)) + # Generate the exact response using the RAG system + llm_response = "".join(generate(user_query, 'Intercom Conversation', anon_hash)) llm_response = llm_response + " 🤖" # Add a marker to indicate the end of the response logger.info(f"LLM response: {llm_response}") @@ -355,7 +370,7 @@ def handle_webhook(): return 'OK' # Fetch the conversation and generate an LLM answer for the user logger.info(f"Detected a user reply in conversation {conversation_id}; fetching an answer from LLM...") - get_intercom_conversation(conversation_id) + answer_intercom_conversation(conversation_id) else: logger.info(f"Received webhook for unsupported topic: {topic}; no action taken.") return 'OK' From b67c0ce20b5f602b74bfa8e9df3ceda39ec73b9b Mon Sep 17 00:00:00 2001 From: commit111 Date: Tue, 24 Jun 2025 16:49:33 -0700 Subject: [PATCH 15/21] move intercom code into its own file --- app/Dockerfile | 2 +- app/app.py | 208 +------------------------- app/intercom.py | 182 ++++++++++++++++++++++ app/{test_app.py => test_intercom.py} | 16 +- app/utils.py | 30 ++++ 5 files changed, 229 insertions(+), 209 deletions(-) create mode 100644 app/intercom.py rename app/{test_app.py => test_intercom.py} (95%) create mode 100644 app/utils.py diff --git a/app/Dockerfile b/app/Dockerfile index 5077b73..fb3c487 100644 --- a/app/Dockerfile +++ b/app/Dockerfile @@ -43,7 +43,7 @@ RUN chmod +x .tmp/prebuild.sh EXPOSE 5050 # Run test file -RUN python test_app.py +RUN python test_intercom.py # Define environment variable for Flask ENV FLASK_APP=app.py diff --git a/app/app.py b/app/app.py index f60634f..3b5529c 100644 --- a/app/app.py +++ b/app/app.py @@ -6,28 +6,14 @@ import os import segment.analytics as analytics import uuid -import sys -import traceback -import requests -from html.parser import HTMLParser from werkzeug.test import EnvironBuilder from werkzeug.wrappers import Request -import json + import logging import redis - -class BodyHTMLParser(HTMLParser): - def __init__(self): - super().__init__() - self.text = [] - - def handle_data(self, data): - self.text.append(data) - - def get_text(self): - return ''.join(self.text) - +from intercom import parse_html_to_text, set_conversation_human_replied, is_conversation_human_replied, answer_intercom_conversation +from utils import generate # Configure logging logging.basicConfig(level=logging.INFO) @@ -53,30 +39,7 @@ def validate_pow(nonce, data, difficulty): first_uint32 = int.from_bytes(calculated_hash[:4], byteorder='big') return first_uint32 <= difficulty -# Shared function to generate response stream from RAG system -def generate(query, source, anonymous_id): - full_response = "" - try: - for token in rag_system.answer_query_stream(query): - yield token - full_response += token - except Exception as e: - print(f"Error in RAG system: {e}", file=sys.stderr) - traceback.print_exc() - yield "Internal Server Error" - - if not full_response: - full_response = "No response generated" - - if analytics.write_key: - # Track the query and response - analytics.track( - anonymous_id=anonymous_id, - event='Chatbot Question submitted', - properties={'query': query, 'response': full_response, 'source': source} - ) - - return full_response + def handle_ask_request(request, session): data = request.get_json() @@ -177,165 +140,11 @@ def debug_context(): return jsonify({"context": context}) -# Retrieve a conversation from Intercom API by its ID -def fetch_intercom_conversation(conversation_id): - id = conversation_id - url = "https://api.intercom.io/conversations/" + id - token = os.getenv('INTERCOM_TOKEN') - if not token: - return jsonify({"error": "Intercom token not set"}), 500 - - headers = { - "Content-Type": "application/json", - "Intercom-Version": "2.13", - "Authorization": "Bearer " + token - } - - response = requests.get(url, headers=headers) - if response.status_code != 200: - logger.error(f"Failed to fetch conversation {id} from Intercom; status code: {response.status_code}, response: {response.text}") - return jsonify({"error": "Failed to fetch conversation from Intercom"}), response.status_code - - return response - -# Determines the user query from the Intercom conversation response -def get_user_query(response, conversation_id): - # Extract conversation parts from an Intercom request response - result = extract_conversation_parts(response) - logger.info(f"Extracted {len(result)} parts from conversation {conversation_id}") - - # Get and join the latest user messages from the conversation parts - joined_text = extract_latest_user_messages(result) - if not joined_text: - return jsonify({"info": "No entries made by user found."}), 204 - return joined_text - -# Extract conversation parts into a simplified JSON format -def extract_conversation_parts(response): - data = response.json() - parts = data.get('conversation_parts', {}).get('conversation_parts', []) - extracted_parts = [] - for part in parts: - body = part.get('body', '') - if not body: - continue - author = part.get('author', {}) - created_at = part.get('created_at') - extracted_parts.append({'body': body, 'author': author, 'created_at': created_at}) - return extracted_parts - -# Joins the latest user entries in the conversation starting from the last non-user (i.e. admin) entry -def extract_latest_user_messages(conversation_parts): - # Find the index of the last non-user entry - last_non_user_idx = None - for idx in range(len(conversation_parts) - 1, -1, -1): - if conversation_parts[idx].get('author', {}).get('type') != 'user': - last_non_user_idx = idx - break - - # Collect user entries after the last non-user entry - if last_non_user_idx is not None: - last_user_entries = [ - part for part in conversation_parts[last_non_user_idx + 1 :] - if part.get('author', {}).get('type') == 'user' - ] - else: - # If there is no non-user entry, include all user entries - last_user_entries = [ - part for part in conversation_parts if part.get('author', {}).get('type') == 'user' - ] - - # If no user entries found, return None - if not last_user_entries: - return None - - # Only keep the 'body' field from each user entry - bodies = [part['body'] for part in last_user_entries if 'body' in part] - - # Parse and concatenate all user message bodies as plain text - parsed_bodies = [] - for html_body in bodies: - parsed_bodies.append(parse_html_to_text(html_body)) - - # Join all parsed user messages into a single string - joined_text = " ".join(parsed_bodies) - return joined_text - -# Helper function to parse HTML into plain text -def parse_html_to_text(html_content): - parser = BodyHTMLParser() - parser.feed(html_content) - return parser.get_text() - -# Store conversation ID in persistent storage -def set_conversation_human_replied(conversation_id): - try: - # Use a Redis set to avoid duplicates - r.set(conversation_id, '1', ex=60*60*24) # Set TTL expiration to 1 day - logger.info(f"Added conversation_id {conversation_id} to Redis set admin_replied_conversations") - except Exception as e: - logger.error(f"Error adding conversation_id to Redis: {e}") - -# Check if a conversation is already marked as replied by a human admin -def is_conversation_human_replied(conversation_id): - try: - return r.exists(conversation_id) - except Exception as e: - logger.error(f"Error checking conversation_id in Redis: {e}") - return False - - -# Post a reply to a conversation through Intercom API -def post_intercom_reply(conversation_id, response_text): - url = f"https://api.intercom.io/conversations/{conversation_id}/reply" - token = os.getenv('INTERCOM_TOKEN') - if not token: - return jsonify({"error": "Intercom token not set"}), 500 - - headers = { - "Content-Type": "application/json", - "Authorization": "Bearer " + token - } - - payload = { - "message_type": "comment", - "type": "admin", - "admin_id": int(os.getenv('INTERCOM_ADMIN_ID')), - "body": response_text - } - - response = requests.post(url, json=payload, headers=headers) - logger.info(f"Posted reply to Intercom; response status code: {response.status_code}") - - return response.json(), response.status_code - - -# Returns a generated LLM answer to the Intercom conversation based on previous user message history -def answer_intercom_conversation(conversation_id): - logger.info(f"Received request to get conversation {conversation_id}") - - # Retrieves the history of the conversation thread in Intercom - conversation= fetch_intercom_conversation(conversation_id) - - # Extracts the user query (which are latest user messages joined into a single string) from conversation history - user_query = get_user_query(conversation, conversation_id) - logger.info(f"Joined user messages: {user_query}") - - # Use a deterministic, non-reversible hash for anonymous_id for Intercom conversations - anon_hash = hashlib.sha256(f"intercom-{conversation_id}".encode()).hexdigest() - - # Generate the exact response using the RAG system - llm_response = "".join(generate(user_query, 'Intercom Conversation', anon_hash)) - llm_response = llm_response + " 🤖" # Add a marker to indicate the end of the response - - logger.info(f"LLM response: {llm_response}") - - return post_intercom_reply(conversation_id, llm_response) - -# Endpoint to handle incoming webhooks from Intercom +# Handle incoming webhooks from Intercom @app.route('/intercom-webhook', methods=['POST']) @csrf.exempt def handle_webhook(): + data = request.json logger.info(f"Received Intercom webhook: {data}") @@ -359,13 +168,12 @@ def handle_webhook(): # If the last message does not end with the marker, it indicates a human reply logger.info(f"Detected human admin reply in conversation {conversation_id}; marking as human admin-replied...") # Mark the conversation as replied by a human admin to skip LLM responses in the future - set_conversation_human_replied(conversation_id) + set_conversation_human_replied(conversation_id, r) logger.info(f"Successfully marked conversation {conversation_id} as human admin-replied.") - return 'OK' elif topic == 'conversation.user.replied': # In this case, the webhook event is a user reply, not an admin reply # Check if the conversation was replied previously by a human admin - if is_conversation_human_replied(conversation_id): + if is_conversation_human_replied(conversation_id, r): logger.info(f"Conversation {conversation_id} already marked as human admin-replied; no action taken.") return 'OK' # Fetch the conversation and generate an LLM answer for the user diff --git a/app/intercom.py b/app/intercom.py new file mode 100644 index 0000000..d23ffa7 --- /dev/null +++ b/app/intercom.py @@ -0,0 +1,182 @@ +# Intercom API helper functions for handling conversations and replies +import os +import requests +import hashlib +from flask import jsonify +from html.parser import HTMLParser + +import logging +logger = logging.getLogger(__name__) + +from utils import generate + +class BodyHTMLParser(HTMLParser): + def __init__(self): + super().__init__() + self.text = [] + + def handle_data(self, data): + self.text.append(data) + + def get_text(self): + return ''.join(self.text) + +# Retrieve a conversation from Intercom API by its ID +def fetch_intercom_conversation(conversation_id): + id = conversation_id + url = "https://api.intercom.io/conversations/" + id + token = os.getenv('INTERCOM_TOKEN') + if not token: + return jsonify({"error": "Intercom token not set"}), 500 + + headers = { + "Content-Type": "application/json", + "Intercom-Version": "2.13", + "Authorization": "Bearer " + token + } + + response = requests.get(url, headers=headers) + if response.status_code != 200: + logger.error(f"Failed to fetch conversation {id} from Intercom; status code: {response.status_code}, response: {response.text}") + return jsonify({"error": "Failed to fetch conversation from Intercom"}), response.status_code + + return response + +# Determines the user query from the Intercom conversation response +def get_user_query(response, conversation_id): + # Extract conversation parts from an Intercom request response + result = extract_conversation_parts(response) + logger.info(f"Extracted {len(result)} parts from conversation {conversation_id}") + + # Get and join the latest user messages from the conversation parts + joined_text = extract_latest_user_messages(result) + if not joined_text: + return jsonify({"info": "No entries made by user found."}), 204 + return joined_text + +# Extract conversation parts into a simplified JSON format +def extract_conversation_parts(response): + data = response.json() + parts = data.get('conversation_parts', {}).get('conversation_parts', []) + extracted_parts = [] + for part in parts: + body = part.get('body', '') + if not body: + continue + author = part.get('author', {}) + created_at = part.get('created_at') + extracted_parts.append({'body': body, 'author': author, 'created_at': created_at}) + return extracted_parts + +# Joins the latest user entries in the conversation starting from the last non-user (i.e. admin) entry +def extract_latest_user_messages(conversation_parts): + # Find the index of the last non-user entry + last_non_user_idx = None + for idx in range(len(conversation_parts) - 1, -1, -1): + if conversation_parts[idx].get('author', {}).get('type') != 'user': + last_non_user_idx = idx + break + + # Collect user entries after the last non-user entry + if last_non_user_idx is not None: + last_user_entries = [ + part for part in conversation_parts[last_non_user_idx + 1 :] + if part.get('author', {}).get('type') == 'user' + ] + else: + # If there is no non-user entry, include all user entries + last_user_entries = [ + part for part in conversation_parts if part.get('author', {}).get('type') == 'user' + ] + + # If no user entries found, return None + if not last_user_entries: + return None + + # Only keep the 'body' field from each user entry + bodies = [part['body'] for part in last_user_entries if 'body' in part] + + # Parse and concatenate all user message bodies as plain text + parsed_bodies = [] + for html_body in bodies: + parsed_bodies.append(parse_html_to_text(html_body)) + + # Join all parsed user messages into a single string + joined_text = " ".join(parsed_bodies) + return joined_text + +# Helper function to parse HTML into plain text +def parse_html_to_text(html_content): + parser = BodyHTMLParser() + parser.feed(html_content) + return parser.get_text() + +# Store conversation ID in persistent storage +def set_conversation_human_replied(conversation_id, redis_client): + try: + # Use a Redis set to avoid duplicates + redis_client.set(conversation_id, '1', ex=60*60*24) # Set TTL expiration to 1 day + logger.info(f"Added conversation_id {conversation_id} to Redis set admin_replied_conversations") + except Exception as e: + logger.error(f"Error adding conversation_id to Redis: {e}") + +# Check if a conversation is already marked as replied by a human admin +def is_conversation_human_replied(conversation_id, redis_client): + try: + return redis_client.exists(conversation_id) + except Exception as e: + logger.error(f"Error checking conversation_id in Redis: {e}") + return False + +# Post a reply to a conversation through Intercom API +def post_intercom_reply(conversation_id, response_text): + url = f"https://api.intercom.io/conversations/{conversation_id}/reply" + token = os.getenv('INTERCOM_TOKEN') + if not token: + return jsonify({"error": "Intercom token not set"}), 500 + + headers = { + "Content-Type": "application/json", + "Authorization": "Bearer " + token + } + + payload = { + "message_type": "comment", + "type": "admin", + "admin_id": int(os.getenv('INTERCOM_ADMIN_ID')), + "body": response_text + } + + response = requests.post(url, json=payload, headers=headers) + logger.info(f"Posted reply to Intercom; response status code: {response.status_code}") + + return response.json(), response.status_code + + +# Returns a generated LLM answer to the Intercom conversation based on previous user message history +def answer_intercom_conversation(conversation_id): + logger.info(f"Received request to get conversation {conversation_id}") + # Retrieves the history of the conversation thread in Intercom + conversation = fetch_intercom_conversation(conversation_id) + # If a tuple is returned, it is an error response + if isinstance(conversation, tuple): + return conversation + + # Extracts the user query (which are latest user messages joined into a single string) from conversation history + user_query = get_user_query(conversation, conversation_id) + # If a tuple is returned, it is an error response + if isinstance(user_query, tuple): + return user_query + + logger.info(f"Joined user messages: {user_query}") + + # Use a deterministic, non-reversible hash for anonymous_id for Intercom conversations + anon_hash = hashlib.sha256(f"intercom-{conversation_id}".encode()).hexdigest() + + # Generate the exact response using the RAG system + llm_response = "".join(generate(user_query, 'Intercom Conversation', anon_hash)) + llm_response = llm_response + " 🤖" # Add a marker to indicate the end of the response + + logger.info(f"LLM response: {llm_response}") + + return post_intercom_reply(conversation_id, llm_response) diff --git a/app/test_app.py b/app/test_intercom.py similarity index 95% rename from app/test_app.py rename to app/test_intercom.py index 3731a49..2f8080a 100644 --- a/app/test_app.py +++ b/app/test_intercom.py @@ -8,16 +8,16 @@ patch('redis.from_url', return_value=redis_mock).start() # Now import app, after the patch is applied -import app +import intercom -class TestApp(unittest.TestCase): +class TestIntercom(unittest.TestCase): @classmethod def setUpClass(cls): # Initialize the app or any required resources - cls.app = app + cls.app = intercom # Replace the Redis client in app with our mock - app.r = redis_mock + intercom.r = redis_mock print("Successfully set up App class for testing!") def test_parse_html_to_text(self): @@ -114,15 +114,15 @@ def test_extract_latest_user_messages_after_admin(self): def test_is_conversation_human_replied_check_false(self): conversation_id = "test_convo_id_1234" self.app.r.delete(conversation_id) - result = self.app.is_conversation_human_replied(conversation_id) + result = self.app.is_conversation_human_replied(conversation_id, redis_mock) self.assertFalse(result) print("test_is_conversation_human_replied_check_false passed successfully.") def test_set_conversation_human_replied_and_check_true(self): conversation_id = "test_convo_id_1235" self.app.r.delete(conversation_id) - self.app.set_conversation_human_replied(conversation_id) - result = self.app.is_conversation_human_replied(conversation_id) + self.app.set_conversation_human_replied(conversation_id, redis_mock) + result = self.app.is_conversation_human_replied(conversation_id, redis_mock) self.assertTrue(result) self.app.r.delete(conversation_id) print("test_set_conversation_human_replied_and_check_true passed successfully.") @@ -130,7 +130,7 @@ def test_set_conversation_human_replied_and_check_true(self): def test_set_conversation_human_replied_ttl_exists(self): conversation_id = "test_convo_id_ttl" self.app.r.delete(conversation_id) - self.app.set_conversation_human_replied(conversation_id) + self.app.set_conversation_human_replied(conversation_id, redis_mock) ttl = self.app.r.ttl(conversation_id) self.assertGreater(ttl, 0) self.app.r.delete(conversation_id) diff --git a/app/utils.py b/app/utils.py new file mode 100644 index 0000000..7c033ad --- /dev/null +++ b/app/utils.py @@ -0,0 +1,30 @@ +from rag_system import rag_system +import sys +import traceback +import segment.analytics as analytics + +# Shared function to generate response stream from RAG system +def generate(query, source, anonymous_id): + full_response = "" + print(f"Received query: {str(query)}", file=sys.stderr) + try: + for token in rag_system.answer_query_stream(query): + yield token + full_response += token + except Exception as e: + print(f"Error in RAG system: {e}", file=sys.stderr) + traceback.print_exc() + yield "Internal Server Error" + + if not full_response: + full_response = "No response generated" + + if analytics.write_key: + # Track the query and response + analytics.track( + anonymous_id=anonymous_id, + event='Chatbot Question submitted', + properties={'query': query, 'response': full_response, 'source': source} + ) + + return full_response From d716d524bef21e8420558e336a7afd81e2299e2e Mon Sep 17 00:00:00 2001 From: commit111 Date: Wed, 25 Jun 2025 15:06:52 -0700 Subject: [PATCH 16/21] check for intercom ip addresses --- app/app.py | 4 +++- app/intercom.py | 20 ++++++++++++++++++ app/test_intercom.py | 50 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/app/app.py b/app/app.py index 3b5529c..1cc475d 100644 --- a/app/app.py +++ b/app/app.py @@ -12,7 +12,7 @@ import logging import redis -from intercom import parse_html_to_text, set_conversation_human_replied, is_conversation_human_replied, answer_intercom_conversation +from intercom import parse_html_to_text, set_conversation_human_replied, is_conversation_human_replied, answer_intercom_conversation, check_intercom_ip from utils import generate # Configure logging @@ -144,6 +144,8 @@ def debug_context(): @app.route('/intercom-webhook', methods=['POST']) @csrf.exempt def handle_webhook(): + if not check_intercom_ip(request): + return jsonify({"error": "Unauthorized IP"}), 403 data = request.json diff --git a/app/intercom.py b/app/intercom.py index d23ffa7..d6b0609 100644 --- a/app/intercom.py +++ b/app/intercom.py @@ -180,3 +180,23 @@ def answer_intercom_conversation(conversation_id): logger.info(f"LLM response: {llm_response}") return post_intercom_reply(conversation_id, llm_response) + +def check_intercom_ip(request): + # Restrict webhook access to a list of allowed IP addresses + INTERCOM_ALLOWED_IPS = [ + "34.231.68.152", + "34.197.76.213", + "35.171.78.91", + "35.169.138.21", + "52.70.27.159", + "52.44.63.161" + ] + remote_ip = request.headers.get('X-Forwarded-For', request.remote_addr) + # X-Forwarded-For may contain a comma-separated list; take the first IP + remote_ip = remote_ip.split(',')[0].strip() if remote_ip else None + + if remote_ip not in INTERCOM_ALLOWED_IPS: + # logger.info(f"Rejected webhook from unauthorized IP: {remote_ip}") + return False + + return True diff --git a/app/test_intercom.py b/app/test_intercom.py index 2f8080a..ad93e09 100644 --- a/app/test_intercom.py +++ b/app/test_intercom.py @@ -146,5 +146,55 @@ def test_set_redis_ttl_expiry(self): self.assertFalse(exists) print("test_set_redis_ttl_expiry passed successfully.") + def test_check_intercom_ip_allowed(self): + # Simulate a request with an allowed IP in X-Forwarded-For + class DummyRequest: + headers = {'X-Forwarded-For': '34.231.68.152'} + remote_addr = '1.2.3.4' + req = DummyRequest() + result = self.app.check_intercom_ip(req) + self.assertTrue(result) + print("test_check_intercom_ip_allowed passed successfully.") + + def test_check_intercom_ip_allowed_remote_addr(self): + # Simulate a request with allowed IP only in remote_addr + class DummyRequest: + headers = {} + remote_addr = '34.197.76.213' + req = DummyRequest() + result = self.app.check_intercom_ip(req) + self.assertTrue(result) + print("test_check_intercom_ip_allowed_remote_addr passed successfully.") + + def test_check_intercom_ip_not_allowed(self): + # Simulate a request with a non-allowed IP + class DummyRequest: + headers = {'X-Forwarded-For': '8.8.8.8'} + remote_addr = '8.8.4.4' + req = DummyRequest() + result = self.app.check_intercom_ip(req) + self.assertFalse(result) + print("test_check_intercom_ip_not_allowed passed successfully.") + + def test_check_intercom_ip_multiple_forwarded(self): + # Simulate a request with multiple IPs in X-Forwarded-For + class DummyRequest: + headers = {'X-Forwarded-For': '8.8.8.8, 34.231.68.152'} + remote_addr = '8.8.4.4' + req = DummyRequest() + result = self.app.check_intercom_ip(req) + self.assertFalse(result) + print("test_check_intercom_ip_multiple_forwarded passed successfully.") + + def test_check_intercom_ip_none(self): + # Simulate a request with no IPs + class DummyRequest: + headers = {} + remote_addr = None + req = DummyRequest() + result = self.app.check_intercom_ip(req) + self.assertFalse(result) + print("test_check_intercom_ip_none passed successfully.") + if __name__ == '__main__': unittest.main() From ca64a5f5f17e029a5f20ace354ff56be0908ef30 Mon Sep 17 00:00:00 2001 From: "Linda L." Date: Wed, 25 Jun 2025 15:14:31 -0700 Subject: [PATCH 17/21] Update app/intercom.py Co-authored-by: Eric Liu --- app/intercom.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/intercom.py b/app/intercom.py index d6b0609..398e903 100644 --- a/app/intercom.py +++ b/app/intercom.py @@ -51,8 +51,8 @@ def get_user_query(response, conversation_id): # Get and join the latest user messages from the conversation parts joined_text = extract_latest_user_messages(result) if not joined_text: - return jsonify({"info": "No entries made by user found."}), 204 - return joined_text + return "No entries made by user found.", 204 + return joined_text, result.status_code # Extract conversation parts into a simplified JSON format def extract_conversation_parts(response): From 512c0b1c6a00822ad72185be06e4b8116e820eb8 Mon Sep 17 00:00:00 2001 From: commit111 Date: Wed, 25 Jun 2025 15:20:52 -0700 Subject: [PATCH 18/21] edit status code --- app/intercom.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app/intercom.py b/app/intercom.py index 398e903..7e72886 100644 --- a/app/intercom.py +++ b/app/intercom.py @@ -52,7 +52,7 @@ def get_user_query(response, conversation_id): joined_text = extract_latest_user_messages(result) if not joined_text: return "No entries made by user found.", 204 - return joined_text, result.status_code + return joined_text, 200 # Extract conversation parts into a simplified JSON format def extract_conversation_parts(response): @@ -163,10 +163,10 @@ def answer_intercom_conversation(conversation_id): return conversation # Extracts the user query (which are latest user messages joined into a single string) from conversation history - user_query = get_user_query(conversation, conversation_id) + user_query, status_code = get_user_query(conversation, conversation_id) # If a tuple is returned, it is an error response - if isinstance(user_query, tuple): - return user_query + if status_code != 200: + return jsonify(user_query), status_code logger.info(f"Joined user messages: {user_query}") From e2c1fc0816773d26dcf7d3677e885ba1d6b385d2 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 27 Jun 2025 12:26:55 -0700 Subject: [PATCH 19/21] simplify id variable --- app/intercom.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/app/intercom.py b/app/intercom.py index 7e72886..acb1d3f 100644 --- a/app/intercom.py +++ b/app/intercom.py @@ -23,8 +23,7 @@ def get_text(self): # Retrieve a conversation from Intercom API by its ID def fetch_intercom_conversation(conversation_id): - id = conversation_id - url = "https://api.intercom.io/conversations/" + id + url = "https://api.intercom.io/conversations/" + conversation_id token = os.getenv('INTERCOM_TOKEN') if not token: return jsonify({"error": "Intercom token not set"}), 500 @@ -37,7 +36,7 @@ def fetch_intercom_conversation(conversation_id): response = requests.get(url, headers=headers) if response.status_code != 200: - logger.error(f"Failed to fetch conversation {id} from Intercom; status code: {response.status_code}, response: {response.text}") + logger.error(f"Failed to fetch conversation {conversation_id} from Intercom; status code: {response.status_code}, response: {response.text}") return jsonify({"error": "Failed to fetch conversation from Intercom"}), response.status_code return response From 3b21a12a06420e2458d0c68ad95f4c0451670836 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 27 Jun 2025 12:54:40 -0700 Subject: [PATCH 20/21] add status code to response --- app/intercom.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/app/intercom.py b/app/intercom.py index acb1d3f..f05504c 100644 --- a/app/intercom.py +++ b/app/intercom.py @@ -39,7 +39,7 @@ def fetch_intercom_conversation(conversation_id): logger.error(f"Failed to fetch conversation {conversation_id} from Intercom; status code: {response.status_code}, response: {response.text}") return jsonify({"error": "Failed to fetch conversation from Intercom"}), response.status_code - return response + return response, response.status_code # Determines the user query from the Intercom conversation response def get_user_query(response, conversation_id): @@ -156,14 +156,12 @@ def post_intercom_reply(conversation_id, response_text): def answer_intercom_conversation(conversation_id): logger.info(f"Received request to get conversation {conversation_id}") # Retrieves the history of the conversation thread in Intercom - conversation = fetch_intercom_conversation(conversation_id) - # If a tuple is returned, it is an error response - if isinstance(conversation, tuple): - return conversation + conversation, status_code = fetch_intercom_conversation(conversation_id) + if status_code != 200: + return jsonify(conversation), status_code # Extracts the user query (which are latest user messages joined into a single string) from conversation history user_query, status_code = get_user_query(conversation, conversation_id) - # If a tuple is returned, it is an error response if status_code != 200: return jsonify(user_query), status_code From 485924135d609a2d346ee7d45fb00d03e154fb83 Mon Sep 17 00:00:00 2001 From: commit111 Date: Fri, 27 Jun 2025 12:55:17 -0700 Subject: [PATCH 21/21] sanitize input before using it --- app/intercom.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/intercom.py b/app/intercom.py index f05504c..0ecfc52 100644 --- a/app/intercom.py +++ b/app/intercom.py @@ -23,6 +23,11 @@ def get_text(self): # Retrieve a conversation from Intercom API by its ID def fetch_intercom_conversation(conversation_id): + # Sanitize conversation_id to allow only digits (Intercom conversation IDs are numeric) + if not conversation_id.isdigit(): + logger.error(f"Invalid conversation_id: {conversation_id}") + return jsonify({"error": f"Invalid conversation_id: {conversation_id}"}), 400 + url = "https://api.intercom.io/conversations/" + conversation_id token = os.getenv('INTERCOM_TOKEN') if not token: