Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
- name: Deploy
uses: DefangLabs/defang-github-action@v1.1.3
with:
config-env-vars: ASK_TOKEN OPENAI_API_KEY REBUILD_TOKEN SECRET_KEY SEGMENT_WRITE_KEY DISCORD_APP_ID DISCORD_TOKEN DISCORD_PUBLIC_KEY
config-env-vars: ASK_TOKEN OPENAI_API_KEY REBUILD_TOKEN SECRET_KEY SEGMENT_WRITE_KEY DISCORD_APP_ID DISCORD_TOKEN DISCORD_PUBLIC_KEY INTERCOM_TOKEN INTERCOM_ADMIN_ID
mode: staging
provider: aws

Expand All @@ -65,3 +65,5 @@ jobs:
DISCORD_APP_ID: ${{ secrets.DISCORD_APP_ID }}
DISCORD_TOKEN: ${{ secrets.DISCORD_TOKEN }}
DISCORD_PUBLIC_KEY: ${{ secrets.DISCORD_PUBLIC_KEY }}
INTERCOM_TOKEN: ${{ secrets.INTERCOM_TOKEN }}
INTERCOM_ADMIN_ID: ${{ secrets.INTERCOM_ADMIN_ID }}
3 changes: 3 additions & 0 deletions app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ RUN chmod +x .tmp/prebuild.sh
# Expose port 5050 for the Flask application
EXPOSE 5050

# Run test file
RUN python test_intercom.py

# Define environment variable for Flask
ENV FLASK_APP=app.py

Expand Down
98 changes: 70 additions & 28 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@
import os
import segment.analytics as analytics
import uuid
import sys
import traceback

from werkzeug.test import EnvironBuilder
from werkzeug.wrappers import Request

import logging
import redis
from intercom import parse_html_to_text, set_conversation_human_replied, is_conversation_human_replied, answer_intercom_conversation, check_intercom_ip
from utils import generate

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

analytics.write_key = os.getenv('SEGMENT_WRITE_KEY')

Expand All @@ -18,6 +28,9 @@

csrf = CSRFProtect(app)

# Initialize Redis connection
r = redis.from_url(os.getenv('REDIS_URL'), decode_responses=True)

def validate_pow(nonce, data, difficulty):
# Calculate the sha256 of the concatenated string of 32-bit X-Nonce header and raw body.
# This calculation has to match the code on the client side, in index.html.
Expand All @@ -26,6 +39,8 @@ def validate_pow(nonce, data, difficulty):
first_uint32 = int.from_bytes(calculated_hash[:4], byteorder='big')
return first_uint32 <= difficulty



def handle_ask_request(request, session):
data = request.get_json()
query = data.get('query')
Expand All @@ -37,33 +52,13 @@ def handle_ask_request(request, session):
if 'anonymous_id' not in session:
session['anonymous_id'] = str(uuid.uuid4())
anonymous_id = session['anonymous_id']

# Determine the source based on the user agent
user_agent = request.headers.get('User-Agent', '')
source = 'Ask Defang Discord Bot' if 'Discord Bot' in user_agent else 'Ask Defang Website'

def generate():
full_response = ""
try:
for token in rag_system.answer_query_stream(query):
yield token
full_response += token
except Exception as e:
print(f"Error in /ask endpoint: {e}", file=sys.stderr)
traceback.print_exc()
yield "Internal Server Error"

if not full_response:
full_response = "No response generated"

if analytics.write_key:
# Determine the source based on the user agent
user_agent = request.headers.get('User-Agent', '')
source = 'Ask Defang Discord Bot' if 'Discord Bot' in user_agent else 'Ask Defang Website'
# Track the query and response
analytics.track(
anonymous_id=anonymous_id,
event='Chatbot Question submitted',
properties={'query': query, 'response': full_response, 'source': source}
)

return Response(stream_with_context(generate()), content_type='text/markdown')
# Use the shared generate function directly
return Response(stream_with_context(generate(query, source, anonymous_id)), content_type='text/markdown')

@app.route('/', methods=['GET', 'POST'])
def index():
Expand Down Expand Up @@ -144,5 +139,52 @@ def debug_context():
context = rag_system.get_context(query)
return jsonify({"context": context})


# Handle incoming webhooks from Intercom
@app.route('/intercom-webhook', methods=['POST'])
@csrf.exempt
def handle_webhook():
if not check_intercom_ip(request):
return jsonify({"error": "Unauthorized IP"}), 403

data = request.json

logger.info(f"Received Intercom webhook: {data}")
conversation_id = data.get('data', {}).get('item', {}).get('id')

# Check for the type of the webhook event
topic = data.get('topic')
logger.info(f"Webhook topic: {topic}")
if topic == 'conversation.admin.replied':

# Check if the admin is a bot or human based on presence of a message marker (e.g., "🤖") in the last message
last_message = data.get('data', {}).get('item', {}).get('conversation_parts', {}).get('conversation_parts', [])[-1].get('body', '')
last_message_text = parse_html_to_text(last_message)

logger.info(f"Parsed last message text: {last_message_text}")
if last_message_text and last_message_text.endswith("🤖"):
# If the last message ends with the marker, it indicates a bot reply
logger.info(f"Last message in conversation {conversation_id} ends with the marker 🤖")
logger.info(f"Detected bot admin reply in conversation {conversation_id}; no action taken.")
else:
# If the last message does not end with the marker, it indicates a human reply
logger.info(f"Detected human admin reply in conversation {conversation_id}; marking as human admin-replied...")
# Mark the conversation as replied by a human admin to skip LLM responses in the future
set_conversation_human_replied(conversation_id, r)
logger.info(f"Successfully marked conversation {conversation_id} as human admin-replied.")
elif topic == 'conversation.user.replied':
# In this case, the webhook event is a user reply, not an admin reply
# Check if the conversation was replied previously by a human admin
if is_conversation_human_replied(conversation_id, r):
logger.info(f"Conversation {conversation_id} already marked as human admin-replied; no action taken.")
return 'OK'
# Fetch the conversation and generate an LLM answer for the user
logger.info(f"Detected a user reply in conversation {conversation_id}; fetching an answer from LLM...")
answer_intercom_conversation(conversation_id)
else:
logger.info(f"Received webhook for unsupported topic: {topic}; no action taken.")
return 'OK'


if __name__ == '__main__':
app.run(host='0.0.0.0', port=5050)
204 changes: 204 additions & 0 deletions app/intercom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Intercom API helper functions for handling conversations and replies
import os
import requests
import hashlib
from flask import jsonify
from html.parser import HTMLParser

import logging
logger = logging.getLogger(__name__)

from utils import generate

class BodyHTMLParser(HTMLParser):
def __init__(self):
super().__init__()
self.text = []

def handle_data(self, data):
self.text.append(data)

def get_text(self):
return ''.join(self.text)

# Retrieve a conversation from Intercom API by its ID
def fetch_intercom_conversation(conversation_id):
# Sanitize conversation_id to allow only digits (Intercom conversation IDs are numeric)
if not conversation_id.isdigit():
logger.error(f"Invalid conversation_id: {conversation_id}")
return jsonify({"error": f"Invalid conversation_id: {conversation_id}"}), 400

url = "https://api.intercom.io/conversations/" + conversation_id
token = os.getenv('INTERCOM_TOKEN')
if not token:
return jsonify({"error": "Intercom token not set"}), 500

headers = {
"Content-Type": "application/json",
"Intercom-Version": "2.13",
"Authorization": "Bearer " + token
}

response = requests.get(url, headers=headers)
if response.status_code != 200:
logger.error(f"Failed to fetch conversation {conversation_id} from Intercom; status code: {response.status_code}, response: {response.text}")
return jsonify({"error": "Failed to fetch conversation from Intercom"}), response.status_code

return response, response.status_code

# Determines the user query from the Intercom conversation response
def get_user_query(response, conversation_id):
# Extract conversation parts from an Intercom request response
result = extract_conversation_parts(response)
logger.info(f"Extracted {len(result)} parts from conversation {conversation_id}")

# Get and join the latest user messages from the conversation parts
joined_text = extract_latest_user_messages(result)
if not joined_text:
return "No entries made by user found.", 204
return joined_text, 200

# Extract conversation parts into a simplified JSON format
def extract_conversation_parts(response):
data = response.json()
parts = data.get('conversation_parts', {}).get('conversation_parts', [])
extracted_parts = []
for part in parts:
body = part.get('body', '')
if not body:
continue
author = part.get('author', {})
created_at = part.get('created_at')
extracted_parts.append({'body': body, 'author': author, 'created_at': created_at})
return extracted_parts

# Joins the latest user entries in the conversation starting from the last non-user (i.e. admin) entry
def extract_latest_user_messages(conversation_parts):
# Find the index of the last non-user entry
last_non_user_idx = None
for idx in range(len(conversation_parts) - 1, -1, -1):
if conversation_parts[idx].get('author', {}).get('type') != 'user':
last_non_user_idx = idx
break

# Collect user entries after the last non-user entry
if last_non_user_idx is not None:
last_user_entries = [
part for part in conversation_parts[last_non_user_idx + 1 :]
if part.get('author', {}).get('type') == 'user'
]
else:
# If there is no non-user entry, include all user entries
last_user_entries = [
part for part in conversation_parts if part.get('author', {}).get('type') == 'user'
]

# If no user entries found, return None
if not last_user_entries:
return None

# Only keep the 'body' field from each user entry
bodies = [part['body'] for part in last_user_entries if 'body' in part]

# Parse and concatenate all user message bodies as plain text
parsed_bodies = []
for html_body in bodies:
parsed_bodies.append(parse_html_to_text(html_body))

# Join all parsed user messages into a single string
joined_text = " ".join(parsed_bodies)
return joined_text

# Helper function to parse HTML into plain text
def parse_html_to_text(html_content):
parser = BodyHTMLParser()
parser.feed(html_content)
return parser.get_text()

# Store conversation ID in persistent storage
def set_conversation_human_replied(conversation_id, redis_client):
try:
# Use a Redis set to avoid duplicates
redis_client.set(conversation_id, '1', ex=60*60*24) # Set TTL expiration to 1 day
logger.info(f"Added conversation_id {conversation_id} to Redis set admin_replied_conversations")
except Exception as e:
logger.error(f"Error adding conversation_id to Redis: {e}")

# Check if a conversation is already marked as replied by a human admin
def is_conversation_human_replied(conversation_id, redis_client):
try:
return redis_client.exists(conversation_id)
except Exception as e:
logger.error(f"Error checking conversation_id in Redis: {e}")
return False

# Post a reply to a conversation through Intercom API
def post_intercom_reply(conversation_id, response_text):
url = f"https://api.intercom.io/conversations/{conversation_id}/reply"
token = os.getenv('INTERCOM_TOKEN')
if not token:
return jsonify({"error": "Intercom token not set"}), 500

headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + token
}

payload = {
"message_type": "comment",
"type": "admin",
"admin_id": int(os.getenv('INTERCOM_ADMIN_ID')),
"body": response_text
}

response = requests.post(url, json=payload, headers=headers)
logger.info(f"Posted reply to Intercom; response status code: {response.status_code}")

return response.json(), response.status_code


# Returns a generated LLM answer to the Intercom conversation based on previous user message history
def answer_intercom_conversation(conversation_id):
logger.info(f"Received request to get conversation {conversation_id}")
# Retrieves the history of the conversation thread in Intercom
conversation, status_code = fetch_intercom_conversation(conversation_id)
if status_code != 200:
return jsonify(conversation), status_code

# Extracts the user query (which are latest user messages joined into a single string) from conversation history
user_query, status_code = get_user_query(conversation, conversation_id)
if status_code != 200:
return jsonify(user_query), status_code

logger.info(f"Joined user messages: {user_query}")

# Use a deterministic, non-reversible hash for anonymous_id for Intercom conversations
anon_hash = hashlib.sha256(f"intercom-{conversation_id}".encode()).hexdigest()

# Generate the exact response using the RAG system
llm_response = "".join(generate(user_query, 'Intercom Conversation', anon_hash))
llm_response = llm_response + " 🤖" # Add a marker to indicate the end of the response

logger.info(f"LLM response: {llm_response}")

return post_intercom_reply(conversation_id, llm_response)

def check_intercom_ip(request):
# Restrict webhook access to a list of allowed IP addresses
INTERCOM_ALLOWED_IPS = [
"34.231.68.152",
"34.197.76.213",
"35.171.78.91",
"35.169.138.21",
"52.70.27.159",
"52.44.63.161"
]
remote_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
# X-Forwarded-For may contain a comma-separated list; take the first IP
remote_ip = remote_ip.split(',')[0].strip() if remote_ip else None

if remote_ip not in INTERCOM_ALLOWED_IPS:
# logger.info(f"Rejected webhook from unauthorized IP: {remote_ip}")
return False

return True
6 changes: 3 additions & 3 deletions app/rag_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def embed_knowledge_base(self):
def normalize_query(self, query):
return query.lower().strip()

def get_query_embedding(self, query, use_cpu=False):
def get_query_embedding(self, query, use_cpu=True):
normalized_query = self.normalize_query(query)
query_embedding = self.model.encode([normalized_query], convert_to_tensor=True)
if use_cpu:
query_embedding = query_embedding.cpu()
return query_embedding

def get_doc_embeddings(self, use_cpu=False):
def get_doc_embeddings(self, use_cpu=True):
if use_cpu:
return self.doc_embeddings.cpu()
return self.doc_embeddings
Expand Down Expand Up @@ -66,7 +66,7 @@ def compute_document_scores(self, query_embedding, doc_embeddings, high_match_th

return result

def retrieve(self, query, similarity_threshold=0.4, high_match_threshold=0.8, max_docs=5, use_cpu=False):
def retrieve(self, query, similarity_threshold=0.4, high_match_threshold=0.8, max_docs=5, use_cpu=True):
# Note: Set use_cpu=True to run on CPU, which is useful for testing or environments without a GPU.
# Set use_cpu=False to leverage GPU for better performance in production.

Expand Down
2 changes: 2 additions & 0 deletions app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ huggingface_hub==0.15.1
openai==0.28.0
PyYAML==6.0.2
GitPython==3.1.44
redis==6.2.0
fakeredis==2.30.1
Loading