<div align='center'>

# OVERVIEW
</div>

This notebook aims to clarify **whether or not any of the suggested Ticketing & Communications services are acceptable for use for our Customer Support Center system**, as defined in the [`customer_support_rep_persona_01.md` persona document](../../customer_rep_persona_store/customer_support_rep_persona_01.md).

The suggested services are listed below:

1. **Chatwoot** (Open-source Customer Engagement Platform)
2. **MailHog** (Email Testing Server)

The criteria for qualifying a suggestion as a tool is defined below:

1. Python-based.
2. Open-source (free, avoid freemium as much as possible) customer-facing **and** producer-facing access.
3. Rate limit restrictions allow for appropriate edge-case testing.
4. Supports core architectural choice for LangChain ecosystem (e.g. supports asynchronous backends, etc).
5. I can currently access the tool as a human (i.e. confirmation that it is operable as a human, and has not been deprecated/restricted discreetly).
6. I can perform its expected basic operations.

## Pre-Notebook Initialization

1. Install Chatwoot via Docker: `docker run -d -p 3000:3000 --name chatwoot chatwoot/chatwoot:latest`
2. Install MailHog via Docker: `docker run -d -p 1025:1025 -p 8025:8025 --name mailhog mailhog/mailhog`
3. Create a Chatwoot account and generate API access token.
4. Configure SMTP settings to use MailHog for testing.

<u>Notes</u>:
- Chatwoot provides live chat, email, and social media channel management.
- MailHog captures outgoing emails for testing without sending to real recipients.

In [None]:
# Required Modules


In [None]:
import os
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dotenv import load_dotenv
load_dotenv()
from datetime import datetime


## Chatwoot


<ol>
<li> Python-based: <b>Available</b> (via REST API)
<li> Open-source: <b>Free</b> (MIT License for self-hosted)

<li> Maximum Rate Limit Restriction:
<ul>
<li> Self-hosted: <b>No rate limits</b> (depends on infrastructure).
<li> Cloud version has tier-based limits.
</ul>

<li> <b>Supports Core Architectural Choice</b> (REST API, WebSocket for real-time updates)
<li> Service is confirmed to <b>still be active & accessible</b>
</ol>

### Expected Basic Operations

1. [List Conversations](https://www.chatwoot.com/developers/api#operation/list-all-conversations).
2. [Create Conversation](https://www.chatwoot.com/developers/api#operation/create-a-conversation).
3. [Send Message](https://www.chatwoot.com/developers/api#operation/create-a-new-message-in-a-conversation).
4. [List Contacts](https://www.chatwoot.com/developers/api#operation/contactList).
5. [Create Contact](https://www.chatwoot.com/developers/api#operation/contactCreate).
6. [Assign Conversation](https://www.chatwoot.com/developers/api#operation/assign-a-conversation).

**<u>Notes:</u>**
1. Chatwoot supports multiple channels: website live chat, email, Facebook, Twitter, WhatsApp.
2. Provides agent assignment, labels, and conversation routing.
3. Supports webhooks for event-driven integrations.


### Setup

Chatwoot provides a REST API for managing conversations, contacts, and messages.


In [None]:
# Class Definition for Chatwoot API calls

class Simple_Chatwoot_Client():
    
    def __init__(
        self,
        base_url: str = None,
        api_access_token: str = None,
        account_id: int = 1
    ):
        self.base_url = base_url or os.getenv("CHATWOOT_URL", "http://localhost:3000")
        self.api_access_token = api_access_token or os.getenv("CHATWOOT_API_TOKEN")
        self.account_id = account_id
        self.SUCCESS_MSG = "[INFO]: Request performed successfully!"
        self.ERROR_MSG = "[WARNING]:"
        
    def _headers(self):
        """Get headers with API token"""
        return {
            "api_access_token": self.api_access_token,
            "Content-Type": "application/json"
        }
    
    def _api_url(self, endpoint: str):
        """Build API URL"""
        return f"{self.base_url}/api/v1/accounts/{self.account_id}/{endpoint}"
    
    def list_conversations(self, status: str = "open", page: int = 1):
        """List all conversations"""
        try:
            response = requests.get(
                self._api_url("conversations"),
                headers=self._headers(),
                params={"status": status, "page": page}
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def get_conversation(self, conversation_id: int):
        """Get a specific conversation"""
        try:
            response = requests.get(
                self._api_url(f"conversations/{conversation_id}"),
                headers=self._headers()
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def send_message(self, conversation_id: int, content: str, message_type: str = "outgoing"):
        """Send a message in a conversation"""
        try:
            response = requests.post(
                self._api_url(f"conversations/{conversation_id}/messages"),
                headers=self._headers(),
                json={
                    "content": content,
                    "message_type": message_type,
                    "private": False
                }
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def list_contacts(self, page: int = 1):
        """List all contacts"""
        try:
            response = requests.get(
                self._api_url("contacts"),
                headers=self._headers(),
                params={"page": page}
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def create_contact(self, name: str, email: str = None, phone: str = None):
        """Create a new contact"""
        try:
            response = requests.post(
                self._api_url("contacts"),
                headers=self._headers(),
                json={
                    "name": name,
                    "email": email,
                    "phone_number": phone
                }
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def assign_conversation(self, conversation_id: int, assignee_id: int):
        """Assign a conversation to an agent"""
        try:
            response = requests.post(
                self._api_url(f"conversations/{conversation_id}/assignments"),
                headers=self._headers(),
                json={"assignee_id": assignee_id}
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def list_agents(self):
        """List all agents in the account"""
        try:
            response = requests.get(
                self._api_url("agents"),
                headers=self._headers()
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def toggle_conversation_status(self, conversation_id: int, status: str = "resolved"):
        """Toggle conversation status (open, resolved, pending)"""
        try:
            response = requests.post(
                self._api_url(f"conversations/{conversation_id}/toggle_status"),
                headers=self._headers(),
                json={"status": status}
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None


### 1. List Conversations

Lists all conversations with a specified status.


In [None]:
# Chatwoot Client Init

chatwoot_client = Simple_Chatwoot_Client()

# Listing open conversations
conversations = chatwoot_client.list_conversations(status="open")
if conversations:
    print(f"Found {len(conversations.get('data', {}).get('payload', []))} open conversations")


### 2. List Contacts

Lists all contacts in the system.


In [None]:
# Listing contacts

contacts = chatwoot_client.list_contacts()
if contacts:
    print("Contacts:")
    for contact in contacts.get('payload', [])[:5]:
        print(f"  - {contact.get('name')}: {contact.get('email')}")


### 3. Create Contact

Creates a new contact for a customer.


In [None]:
# Creating a new contact

new_contact = chatwoot_client.create_contact(
    name="Test Customer",
    email="test.customer@example.com",
    phone="+1234567890"
)
if new_contact:
    print(f"Created contact: {new_contact.get('payload', {}).get('contact', {}).get('name')}")


### 4. List Agents

Lists all agents available for conversation assignment.


In [None]:
# Listing agents

agents = chatwoot_client.list_agents()
if agents:
    print("Available Agents:")
    for agent in agents:
        print(f"  - ID: {agent.get('id')}, Name: {agent.get('name')}, Email: {agent.get('email')}")


### 5. Send Message

Sends a message in a conversation.


In [None]:
# Sending a message (requires an active conversation ID)

conversation_id = 1  # Replace with actual conversation ID
message_content = "Hello! Thank you for contacting support. How can I help you today?"

# message_result = chatwoot_client.send_message(conversation_id, message_content)
# if message_result:
#     print(f"Message sent: {message_result.get('id')}")
print("[INFO]: Message sending example - uncomment to test with valid conversation ID")


### 6. Resolve Conversation

Changes conversation status to resolved.


In [None]:
# Resolving a conversation

# resolved = chatwoot_client.toggle_conversation_status(conversation_id, status="resolved")
# if resolved:
#     print(f"Conversation {conversation_id} resolved")
print("[INFO]: Conversation resolution example - uncomment to test with valid conversation ID")


## MailHog


<ol>
<li> Python-based: <b>Available</b> (via SMTP and REST API)
<li> Open-source: <b>Free</b> (MIT License)

<li> Maximum Rate Limit Restriction:
<ul>
<li> Self-hosted: <b>No rate limits</b> (local testing server).
<li> Designed for development/testing environments.
</ul>

<li> <b>Supports Core Architectural Choice</b> (Standard SMTP, REST API for retrieval)
<li> Service is confirmed to <b>still be active & accessible</b>
</ol>

### Expected Basic Operations

1. [Send Email via SMTP](https://github.com/mailhog/MailHog#smtp-server).
2. [List All Messages](https://github.com/mailhog/MailHog/blob/master/docs/APIv2/swagger-2.0.json).
3. [Get Message by ID](https://github.com/mailhog/MailHog/blob/master/docs/APIv2/swagger-2.0.json).
4. [Search Messages](https://github.com/mailhog/MailHog/blob/master/docs/APIv2/swagger-2.0.json).
5. [Delete Message](https://github.com/mailhog/MailHog/blob/master/docs/APIv2/swagger-2.0.json).
6. [Delete All Messages](https://github.com/mailhog/MailHog/blob/master/docs/APIv2/swagger-2.0.json).

**<u>Notes:</u>**
1. MailHog captures all outgoing emails without delivering to real recipients.
2. Provides a web UI at port 8025 for browsing captured emails.
3. Perfect for testing email functionality in development environments.


### Setup

MailHog provides SMTP for sending and REST API for retrieving captured emails.


In [None]:
# Class Definition for MailHog operations

class Simple_MailHog_Client():
    
    def __init__(
        self,
        smtp_host: str = None,
        smtp_port: int = 1025,
        api_url: str = None
    ):
        self.smtp_host = smtp_host or os.getenv("MAILHOG_SMTP_HOST", "localhost")
        self.smtp_port = smtp_port
        self.api_url = api_url or os.getenv("MAILHOG_API_URL", "http://localhost:8025")
        self.SUCCESS_MSG = "[INFO]: Operation performed successfully!"
        self.ERROR_MSG = "[WARNING]:"
        
    def send_email(self, from_addr: str, to_addr: str, subject: str, body: str, html: bool = False):
        """Send an email via SMTP (captured by MailHog)"""
        try:
            msg = MIMEMultipart('alternative')
            msg['Subject'] = subject
            msg['From'] = from_addr
            msg['To'] = to_addr
            
            if html:
                part = MIMEText(body, 'html')
            else:
                part = MIMEText(body, 'plain')
            msg.attach(part)
            
            with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
                server.sendmail(from_addr, [to_addr], msg.as_string())
            
            print(self.SUCCESS_MSG, f"Email sent to {to_addr}")
            return True
        except Exception as e:
            print(self.ERROR_MSG, e)
            return False
    
    def list_messages(self, start: int = 0, limit: int = 50):
        """List all captured messages"""
        try:
            response = requests.get(
                f"{self.api_url}/api/v2/messages",
                params={"start": start, "limit": limit}
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def get_message(self, message_id: str):
        """Get a specific message by ID"""
        try:
            response = requests.get(
                f"{self.api_url}/api/v1/messages/{message_id}"
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def search_messages(self, kind: str, query: str):
        """Search messages by kind (from, to, containing)"""
        try:
            response = requests.get(
                f"{self.api_url}/api/v2/search",
                params={"kind": kind, "query": query}
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG)
            return response.json()
        except Exception as e:
            print(self.ERROR_MSG, e)
            return None
    
    def delete_message(self, message_id: str):
        """Delete a specific message"""
        try:
            response = requests.delete(
                f"{self.api_url}/api/v1/messages/{message_id}"
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG, f"Message {message_id} deleted")
            return True
        except Exception as e:
            print(self.ERROR_MSG, e)
            return False
    
    def delete_all_messages(self):
        """Delete all captured messages"""
        try:
            response = requests.delete(
                f"{self.api_url}/api/v1/messages"
            )
            response.raise_for_status()
            print(self.SUCCESS_MSG, "All messages deleted")
            return True
        except Exception as e:
            print(self.ERROR_MSG, e)
            return False
    
    def get_message_count(self):
        """Get count of captured messages"""
        try:
            messages = self.list_messages(limit=1)
            if messages:
                return messages.get('total', 0)
            return 0
        except Exception as e:
            print(self.ERROR_MSG, e)
            return 0


### 1. Send Email via SMTP

Sends a test email that gets captured by MailHog.


In [None]:
# MailHog Client Init

mailhog_client = Simple_MailHog_Client()

# Sending a test email
mailhog_client.send_email(
    from_addr="support@example.com",
    to_addr="customer@example.com",
    subject="Support Ticket #12345 - Update",
    body="Dear Customer,\n\nYour support ticket has been updated. Please log in to view the response.\n\nBest regards,\nSupport Team"
)


### 2. List Captured Messages

Lists all emails captured by MailHog.


In [None]:
# Listing captured messages

messages = mailhog_client.list_messages()
if messages:
    print(f"Total captured messages: {messages.get('total', 0)}")
    for msg in messages.get('items', [])[:5]:
        content = msg.get('Content', {})
        headers = content.get('Headers', {})
        print(f"  - Subject: {headers.get('Subject', ['N/A'])[0]}")


### 3. Search Messages

Searches for messages by recipient.


In [None]:
# Searching for messages

search_results = mailhog_client.search_messages(kind="to", query="customer@example.com")
if search_results:
    print(f"Found {search_results.get('total', 0)} messages to customer@example.com")


### 4. Send HTML Email

Sends an HTML-formatted email notification.


In [None]:
# Sending an HTML email

html_body = """
<html>
<body>
    <h2>Support Ticket Update</h2>
    <p>Dear Customer,</p>
    <p>Your support ticket <strong>#12345</strong> has been updated.</p>
    <p>Status: <span style="color: green;">Resolved</span></p>
    <p>Please log in to view the full response.</p>
    <br>
    <p>Best regards,<br>Support Team</p>
</body>
</html>
"""

mailhog_client.send_email(
    from_addr="support@example.com",
    to_addr="customer@example.com",
    subject="[RESOLVED] Support Ticket #12345",
    body=html_body,
    html=True
)


### 5. Get Message Count

Gets the total count of captured messages.


In [None]:
# Getting message count

count = mailhog_client.get_message_count()
print(f"Total captured emails: {count}")


### 6. Clear All Messages

Deletes all captured messages (cleanup).


In [None]:
# Clearing all messages (uncomment to execute)

# mailhog_client.delete_all_messages()
print("[INFO]: Delete all messages example - uncomment to clear MailHog inbox")
