# Setup

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
# Standard library imports
from pydantic import BaseModel
from typing import Literal
import threading
import warnings
import random
import queue
import json
import csv
import os

warnings.filterwarnings("ignore")
JWT_SECRET_API = !echo $(aws --profile "chat-prod_ro" secretsmanager get-secret-value --secret-id "arn:aws:secretsmanager:us-west-2:242659714806:secret:shared/cresta-server-jwt_secret-VDn5My" --query SecretString --output text) # type: ignore
os.environ["JWT_SECRET_API"] = json.loads(JWT_SECRET_API[0])["jwt-secret"]
os.environ["CONFIG_SERVICE_ADDR"] = "auth.chat-prod.internal.cresta.ai:443"
os.environ["CONFIG_USE_SECURE_CHANNEL"] = "true"

# Third party imports

from sentence_transformers import SentenceTransformer, util
from retry import retry

# Greyparrot imports
from greyparrot.llm.prompting import prompts as prompts_utils
from greyparrot.conversations.db import ConversationsDBConn
from greyparrot.multi_tenancy.v3_config import V3Config
from greyparrot.chats_common import PartialChat
from greyparrot.dataset_common import Dataset
from greyparrot.common import get_logger
from greyparrot.conversations.utils import get_chats


logger = get_logger(__name__)

# Local imports
from llm_proxy_client import LLMProxyDevClient

In [4]:
embedder = SentenceTransformer("all-mpnet-base-v2")

In [5]:
customer_id = "brinks"
profile_id = "care-voice"
usecase_id = "care-voice"
language_code = "en-US"

In [6]:
def get_chats_with_ids(chat_ids: list[str]) -> list[PartialChat]:
    customer_name: str = V3Config.short_name_from_ids(customer_id, profile_id)
    conv_db_conn: ConversationsDBConn = ConversationsDBConn.from_customer_name(customer_name)
    chats: list[PartialChat] = conv_db_conn.get_detailed_chats(
        customer_id=customer_id,
        profile_id=profile_id,
        usecase_id=usecase_id,
        language_code=language_code,
        conversation_ids=chat_ids,
        is_dev_user=False,
    )
    return chats

In [7]:
LLM_ENGINE: str = "gpt-4o-mini"
CONCURRENCY: int = 10

In [8]:
@retry(tries=1, delay=60, backoff=2, logger=logger)
def chat_completion(**kwargs):
    return LLMProxyDevClient("openai").beta.chat.completions.parse(**kwargs)

# Discovery prompts

In [9]:
# TODO remove this after fixing speaker_role flips
flips: dict[str, str] = {"agent": "visitor", "visitor": "agent"}

In [10]:
def chat_to_prompt_text(chat: PartialChat) -> str:
    return "\n".join([
        f"{prompts_utils.speaker_role_str_for_prompts(flips[m.speaker_role.value]).capitalize()}: {m.text}"
        for m in chat.messages
    ])

In [25]:
SYSTEM_PROMPT_AGENT_WORKFLOW_DISCOVERY = """### Context and data description
You are a conversation analyst working for Brinks Home Security Call Center.

You will be given 1 conversation at a time. Each conversation is between a Brinks Call Center Agent and a Customer. Your primary goal is to extract workflows of steps which the Agent takes in **the given conversation** to help resolve the Customer's needs related to their home security system and services.

The primary use case of these workflows is to create a troubleshooting template to address similar customer needs in the future.

Each workflow should be a list of steps which the Agent needs to take.

For each workflow, return:
- Product: Specific Brinks product/service involved (e.g., Security Panel, Door/Window Sensor, Motion Detector, Security Camera, Doorbell Camera, Smart Lock, Mobile App, Alarm.com Account)
- Issue: Specific customer problem (e.g., False Alarms, Device Offline, Camera Not Recording, App Login Issues, Billing Questions, Account Changes)
- Steps: Detailed troubleshooting or resolution steps the Agent follows

Make sure the product is specific to Brinks' security equipment and services, not general categories.
Make sure the issue clearly describes the exact problem the customer is experiencing.
Make sure the steps are detailed and follow Brinks' standard operating procedures.

Common scenarios include:
- Security system troubleshooting (panel issues, sensor malfunctions, connectivity problems)
- Camera and video recording issues
- Account management (billing, autopay, contact updates)
- Mobile app and Alarm.com portal assistance
- Service changes (moving, upgrading, cancellation)
- Installation and maintenance appointments

**Important**: There could be more than 1 workflow in a single conversation. There could also be no workflows in a single conversation. The workflows will be used to create troubleshooting guides to address similar customer needs in the future."""

In [12]:
class Flow(BaseModel):
    product: str
    issue: str
    steps: list[str]

class Flows(BaseModel):
    flows: list[Flow]

In [13]:
def discover_flow_in_chat(chat: PartialChat,
                          llm_engine: str = LLM_ENGINE) -> list[Flow]:
    logger.info(f"Discovering Agent flow in chat {chat.chat_name}")
    messages: list[dict[str, str]] = [
        {
            "role": "system",
            "content": SYSTEM_PROMPT_AGENT_WORKFLOW_DISCOVERY
        },
        {
            "role": "user",
            "content": chat_to_prompt_text(chat)
        },
    ]

    # Note: temperature=0.1 to allow for some exploration
    return chat_completion(model=llm_engine,
                            messages=messages,
                            temperature=0.1,
                            response_format=Flows).choices[0].message.parsed.flows

In [14]:
def extract_flows_from_chats(chats: list[PartialChat],
                                concurrency: int=10) -> dict[str, list[Flow]]:
    lock: threading.Lock = threading.Lock()
    indexes: queue.Queue = queue.Queue()

    workflows: dict[str, list[Flow]] = {}
    for idx in range(len(chats)):
        indexes.put(idx)

    def workflow_labeler_worker():
        while True:
            try:
                idx = indexes.get(block=False)
            except queue.Empty:
                return
            chat = chats[idx]
            try:
                extracted_workflows: list[Flow] = discover_flow_in_chat(chat).flows
                with lock:
                    workflows[str(chat)] = extracted_workflows
                    if len(workflows) % 10 == 0:
                        print(f"Workflows from {len(workflows)} chats extracted!")
            except Exception as e:
                logger.warning(e, str(chat))
            indexes.task_done()

    logger.info(
        f"Starting processing {len(chats)} chats with {concurrency} workers")
    workers = [
        threading.Thread(target=workflow_labeler_worker)
        for _ in range(concurrency)
    ]
    for worker in workers:
        worker.start()
    for worker in workers:
        worker.join()
    logger.info(f"Finished processing all {len(chats)} chats")

    return workflows

In [15]:
test_chat: PartialChat = get_chats_with_ids(["0843c54c-6487-45ce-946a-cc6257484f54"])[0]
workflows: list[Flow] = discover_flow_in_chat(test_chat)



cmd: cresta-cli connstring -i voice-prod voice-prod brinks-care-voice-FKrGHU -r
cmd: cresta-cli connstring -i voice-prod voice-prod brinks-care-voice -r


{"message": "Discovering Agent flow in chat 0843c54c-6487-45ce-946a-cc6257484f54", "name": "__main__", "asctime": "2025-02-13 21:17:55", "levelname": "INFO", "filename": "<ipython-input-13-6a60bf4efb16>", "funcName": "discover_flow_in_chat", "threadName": "MainThread", "status": "INFO"}


In [16]:
print(chat_to_prompt_text(test_chat))

Customer: Technical support
Agent: Thank you for being the best part of Brinks Home. My name is Juan. Who do I have the pleasure of speaking with?
Customer: Debbie. Deborah Terryya.
Agent: Good morning, missus Deborah. How can I assist you?
Customer: Okay. On December 16th, I had Brinks installed. I had one. Camera put on my front door, and I had one put on the tireport, and then I went out of town for a couple weeks. And only my carport has been working since December 6th eight. And I clicked on the troubleshoot for my front door, and it just keeps coming up that there's a problem. I mean, I did I did everything it told me to do. I unplugged the power cord, and told me to wait 5 minutes. And it was trying to find what the problem was. And it is still not responding my front door.
Agent: The front door. The other
Customer: Which would be by my court
Agent: Okay. Gotcha. Let me get into the account real quick. And let me just double check what is happening to that sensor.
Customer: Yeah

In [17]:
print(f"Found {len(workflows)} workflows\n")
for i, flow in enumerate(workflows):
    print(f"[Agent Workflow #{i + 1}]")
    print(f"Product: {flow.product}")
    print(f"Issue: {flow.issue}")
    print("Steps:", end="\n* ")
    print("\n* ".join(flow.steps))
    if i < len(workflows) - 1:
        print("-" * 100)

Found 1 workflows

[Agent Workflow #1]
Product: Brinks Home Front Door Camera
Issue: Front door camera not responding
Steps:
* Verify customer account details including phone number and address.
* Ask the customer to check the LED light on the front door camera.
* Instruct the customer to unplug the power supply of the front door camera and wait for 30 seconds to 1 minute.
* Confirm with the customer that they unplugged the correct power supply for the front door camera.
* Guide the customer to reconnect the power supply to the front door camera.
* Ask the customer again to check the LED light after reconnecting the power supply.
* If no light is visible, instruct the customer to test the power outlet by plugging in a phone charger to see if it is supplying power.
* If the outlet is working, inform the customer that the power supply for the camera is likely faulty and needs replacement.
* Provide the customer with the contact number for the installation company to schedule a technician

# Relevant chats (from KA-QE trainset)

In [18]:
data_set = get_chats(
    'brinks-care-voice', 
    '2024-06-01',
    '2024-12-01',
    200,)



cmd: cresta-cli connstring -i voice-prod voice-prod brinks-care-voice-FKrGHU -r
cmd: cresta-cli connstring -i voice-prod voice-prod brinks-care-voice -r


200it [00:01, 122.45it/s]
{"message": "Caching to cache_chats-6cdad05445414399627e4b4e59113f84.pkl", "name": "cache-utils", "asctime": "2025-02-13 21:18:13", "levelname": "INFO", "filename": "cache.py", "funcName": "wrapper", "threadName": "MainThread", "status": "INFO"}
{"message": "Saving dataset to cache_chats-6cdad05445414399627e4b4e59113f84.pkl", "name": "dataset-common", "asctime": "2025-02-13 21:18:13", "levelname": "INFO", "filename": "common.py", "funcName": "save", "threadName": "MainThread", "status": "INFO"}


In [None]:
sampled_chat_ids = list(set([
    chat.chat_name
    for chat in (item if not isinstance(item, tuple) else item[0] for item in data_set)
    if hasattr(chat, 'chat_name')
]))
print(len(sampled_chat_ids))

200


# Agent Workflow Discovery

In [None]:
sampled_chats = get_chats_with_ids(sampled_chat_ids)

In [None]:
workflows = extract_flows_from_chats(sampled_chats)

In [None]:
len(workflows)

In [None]:
for pc, ws in list(workflows.items())[:5]:
    print(f"\n\n<{pc}>")
    for idx, w in enumerate(ws):
        print("-" * 50)
        print(f"[Agent Workflow#{idx + 1}]")
        for k, v in w.items():
            if isinstance(v, list):
                val = "\n* ".join(v)
                print(f"{k.capitalize()}:\n* {val}")
            else:
                print(f"{k.capitalize()}: {v}")

In [62]:
def group_semantically_similar_workflows(workflows: list[Flow], embedder: 
                                         SentenceTransformer, group_on: Literal["product", "issue"], semantic_threshold: int = 0.9):
    if group_on == "product":
        embeddings = embedder.encode([w.product for w in workflows], convert_to_tensor=True)
    else: 
        embeddings = embedder.encode([w.issue for w in workflows], convert_to_tensor=True)
        
    workflow_groups, solo_workflows = [], []
    for idx, workflow in enumerate(workflows):
        existing_groups = [group for group in workflow_groups if idx in group]
        if existing_groups:
            assert len(
                existing_groups) == 1, "A workflow should only be in 1 group"
            continue

        print(f"Finding similar Workflows for Workflow#{idx}")

        scores = util.cos_sim(embeddings[idx:idx + 1],
                              embeddings).cpu().tolist()[0]
        matches = [
            i for i, score in enumerate(scores) if score > semantic_threshold
        ]
        filtered_matches = [i for i in matches if i != idx]
        if filtered_matches:
            outstanding_groups = []
            new_group = filtered_matches + [idx]
            for group in workflow_groups:
                if set(filtered_matches).intersection(set(group)):
                    new_group.extend(group)
                else:
                    outstanding_groups.append(group)
            workflow_groups = outstanding_groups + [list(set(new_group))]
        else:
            solo_workflows.append(idx)

    print(f"Found {len(solo_workflows)} # of Solo Workflows..")
    print(f"Found {len(workflow_groups)} # Groups of Workflows..")

    return [workflows[idx] for idx in solo_workflows
           ], [[workflows[idx] for idx in group] for group in workflow_groups]

In [None]:
solo_workflows, grouped_workflows = group_semantically_similar_workflows(
    workflows, embedder, "product")

In [None]:
for each in grouped_workflows:
    print("-" * 50)
    print(len(each))
    for e in each:
        print(e)

In [None]:
unique_workflow_titles = solo_workflows + [random.choice(w) for w in grouped_workflows]
unique_workflows = [title_to_workflow[w] for w in unique_workflow_titles]
len(unique_workflows)

In [None]:
filtered_unique_workflows = [w for w in unique_workflows if len(w["steps"]) > 3] # skip too small workflows
len(filtered_unique_workflows)

In [None]:
filtered_unique_workflows[0]

In [None]:
for w in filtered_unique_workflows[:10]:
    print(w["title"])

In [None]:
for w in filtered_unique_workflows:
    print("-" * 100)
    print("\n\n")
    for k, v in w.items():
        if isinstance(v, list):
            val = "\n* ".join(v)
            print(f"{k.capitalize()}:\n* {val}")
        else:
            print(f"{k.capitalize()}: {v}")    

In [None]:
[w["title"] for w in filtered_unique_workflows]

In [None]:
len(filtered_unique_workflows)