## Contact Center Insights generation involves two steps:

1. **Transcription with speaker Diarization**
   - **NVIDIA Riva Integration:** Transcribes incoming audio calls between two speakers using NVIDIA Riva's Parakeet CTC 1.1b ASR model and creates a structured transcript.

2. **Insight Generation**
   - **Entity Extraction:** Extracts key entities like customer and agent names, topic and subtopic of the conversation.
    - **Agent Performance Evaluation:** Evaluates agent performance based several key metrics.
    - **Combine Insights:** Combines all extracted insights into a structured JSON.

## Content Overview
1. [Install dependencies](#Install-dependencies)
2. [Set required environment variables](#Set-required-environment-variables)
3. [Transcribe Audio](#Transcribe-Audio)
4. [Generate Insights](#Generate-Insights)

# 1. Install dependencies

In [None]:
%pip install -r requirements.txt

# 2. Set required environment variables

In [None]:
import getpass
import os
from dotenv import load_dotenv
from io import BytesIO
from pydub import AudioSegment
import riva.client
from pydantic import BaseModel, Field
from typing import List, Optional
from langchain_nvidia_ai_endpoints import ChatNVIDIA


load_dotenv()

# validate we have the required variables
REQUIRED_VARIABLES = [
    "NVIDIA_PARAKEET_NIM_API_KEY",
    "NVIDIA_LLAMA_NIM_API_KEY",
]

for var in REQUIRED_VARIABLES:
    if var not in os.environ:
        os.environ[var] = getpass.getpass(f"Please set the {var} environment variable.")

# optional variables
os.environ["RIVA_SPEECH_API_SERVER"] = os.getenv("RIVA_SPEECH_API_SERVER", "grpc.nvcf.nvidia.com")

# Look for audio files in the current directory with .wav format
audio_files = [f for f in os.listdir("audio") if f.endswith(".wav")]
if not audio_files:
    raise Exception("No .wav files found in the current directory.")

AUDIO_FILE = audio_files[0]
print(f"Using audio file: {AUDIO_FILE}")

# validate the audio file, it must have two channels
audio = AudioSegment.from_file(f"audio/{AUDIO_FILE}")
if audio.channels != 2:
    raise Exception("Audio file must have exactly two channels.")

# 3. Transcribe Audio

In [None]:
class Utterance(BaseModel):
    time: int = Field(..., description="Time in milliseconds when the utterance starts")
    speaker: int
    spoken_words: str = Field(..., description="Words spoken by the speaker")

class Transcript(BaseModel):
    utterances: List[Utterance]
    call_duration: int = Field(..., description="Duration of the conversation in milliseconds")

    def __str__(self):
        return "\n".join([f"{u.time} - {u.speaker}: {u.spoken_words}" for u in self.utterances])

In [None]:
def split_audio_channels(filename: str) -> tuple[BytesIO, BytesIO, int]:
    """Split the audio file into two channels."""
    audio = AudioSegment.from_file(f"audio/{filename}", format="wav")

    left_channel = audio.split_to_mono()[0]
    right_channel = audio.split_to_mono()[1]

    left_channel_bytes, right_channel_bytes = BytesIO(), BytesIO()
    left_channel.export(left_channel_bytes, format="wav")
    right_channel.export(right_channel_bytes, format="wav")

    # duration of the audio in milliseconds
    duration = len(audio)

    return left_channel_bytes, right_channel_bytes, duration

In [None]:
def transcribe_with_riva(audio_bytes):
    """Transcribe the audio file using Riva Speech API."""

    # Authenticate with Riva Speech API
    auth = riva.client.Auth(
        uri=os.environ["RIVA_SPEECH_API_SERVER"],
        use_ssl=True,
        metadata_args=[
            ['authorization', 'Bearer {}'.format(os.environ["NVIDIA_PARAKEET_NIM_API_KEY"])],
            ['function-id', '1598d209-5e27-4d3c-8079-4751568b1081']
        ]
    )

    # Configure the transcription
    config = riva.client.RecognitionConfig(
        language_code="en-US",
        enable_word_time_offsets=True,      # Enables word timestamps
        max_alternatives=1,                 # Set to 1 for single-best result
        enable_automatic_punctuation=True,
        audio_channel_count = 1,
    )

    riva_asr = riva.client.ASRService(auth)
    response = riva_asr.offline_recognize(audio_bytes, config)
    
    return response.results

In [None]:
def combine_and_format_results(left_results, right_results) -> List[Utterance]:
    """Combine the results from the two channels and format them."""
    
    def extract_transcript(results, speaker_label):
        """Extract the transcript and start time from the first word of each alternative."""

        transcript_results = []
        for result in results:
            for alternative in result.alternatives:
                transcript_results.append({
                    'transcript': alternative.transcript,
                    'start_time': alternative.words[0].start_time, # Start time of the first word
                    'speaker': speaker_label
                })

        return transcript_results

    left_results = extract_transcript(left_results, 0)
    right_results = extract_transcript(right_results, 1)

    combined_results = left_results + right_results
    # sort all utterances by start_time
    combined_results.sort(key=lambda x: x['start_time'])

    # format the results
    utterances = []
    for result in combined_results:

        utt = Utterance(time=float(result['start_time']), speaker=result['speaker'], spoken_words=result['transcript'].strip())
        utterances.append(utt)

    return utterances

#### Split the audio into two channels and transcribe

In [None]:
# split the audio file into two channels
left_channel, right_channel, duration = split_audio_channels(AUDIO_FILE)

# transcribe the both channels individually
left_results = transcribe_with_riva(left_channel.getvalue())
right_results = transcribe_with_riva(right_channel.getvalue())

#### Format the results combined form both channels

In [None]:
# combine and format the results
utterances = combine_and_format_results(left_results, right_results)
transcript = Transcript(utterances=utterances, call_duration=duration)

# 4. Generate Insights

### 1. Entity Extraction

In [None]:
from langchain_core.prompts import ChatPromptTemplate

class Entities(BaseModel):
    agent_name: Optional[str]
    customer_name: Optional[str]
    agent_speaker: int 
    customer_speaker: int 
    reason: Optional[str] 
    topic: str 
    subtopic: str

entity_extraction_instructions = """You are an expert analyst specialized in extracting insights from call center transcripts.
You will extract the following information from the call transcript:
- Agent name, if not found, use "Unknown Agent"
- Customer name, if not found, use "Unknown Customer"
- Speaker id for the agent, 0 or 1
- Speaker id for the customer, 0 or 1
- Primary reason for the call
- Main topic of the conversation
- More specific subtopic under the main topic

Call Transcript:
###
{transcript}
###
"""

llm = ChatNVIDIA(model="meta/llama-3.3-70b-instruct", api_key=os.environ["NVIDIA_LLAMA_NIM_API_KEY"])
prompt = ChatPromptTemplate.from_template(entity_extraction_instructions)
llm = llm.with_structured_output(Entities)
chain = prompt | llm 
transcript_text = "\n".join([f"{u.time} - {u.speaker}: {u.spoken_words}" for u in transcript.utterances])

entities: Entities = chain.invoke({
   "transcript": transcript_text
})
print(entities)

### 2 Agent Performance Evaluation

In [None]:
class ScoreMetric(BaseModel):
    value: int = Field(description="Score metric value from 1 to 10, 1 being the lowest and 10 being the highest")
    justification: str = Field(description="Explanation for the score metric value")

class BoolMetric(BaseModel):
    value: int = Field(description="Boolean metric value, 0 for false, 1 for true")
    justification: str = Field(description="Explanation for the choice of the boolean metric")

class StringMetric(BaseModel):
    value: str
    justification: str = Field(description="Explanation of the string metric value")


class Evaluation(BaseModel):
    greeting: BoolMetric
    hold: BoolMetric
    ticket: BoolMetric
    listening: BoolMetric
    understanding: BoolMetric
    tone: BoolMetric
    proactivity: BoolMetric
    clarity: BoolMetric
    resolved: BoolMetric
    customer_sentiment: ScoreMetric
    escalation: BoolMetric
    agent_feedback: StringMetric
    escalation_reason: StringMetric

In [None]:
template = """Evaluate call transcript against metrics shown below and provide value for each metric.
Provide justification for each metric value.

Metrics:
- greeting: Did the agent greet the customer?
- hold: Did the agent put the customer on hold?
- ticket: Did the agent create a ticket?
- listening: How well did the agent listen to the customer?
- understanding: How well did the agent understand the customer?
- tone: How was the agent's tone?
- proactivity: How proactive was the agent?
- clarity: How clear was the agent's communication?
- resolved: Was the issue resolved?
- customer_sentiment: Customer sentiment score from 1 to 10
- escalation: Was the call escalated?
- agent_feedback: Feedback for the agent for handling similar calls in the future
- escalation_reason: If the call was escalated, provide the reason for escalation

Here is the call transcript:
###
{transcript}
###
"""

llm = ChatNVIDIA(model="meta/llama-3.3-70b-instruct", api_key=os.environ["NVIDIA_LLAMA_NIM_API_KEY"])
prompt = ChatPromptTemplate.from_template(template)
chain = prompt | llm.with_structured_output(Evaluation) 
transcript_text = "\n".join([f"{u.time} - {u.speaker}: {u.spoken_words}" for u in transcript.utterances])

params = {"transcript": transcript_text}
evaluation: Evaluation = chain.invoke(params)
print(evaluation)


#### Prepare the result and save

In [None]:
class Result(BaseModel):
    transcript: Transcript
    entities: Entities
    evaluation: Evaluation

result = Result(transcript=transcript, entities=entities, evaluation=evaluation)

# check if the results directory exists
if not os.path.exists("results"):
    os.makedirs("results")
    
with open(f"results/{AUDIO_FILE}.json", "w") as f:
    f.write(result.model_dump_json(indent=2))