# Azure AI Safety Evaluations of Audio Models
This following demo notebook demonstrates the evaluation of safety evaluations for audio scenarios.

Azure AI evaluations provides a comprehensive Python SDK and studio UI experience for running evaluations for your generative AI applications. The notebook is broken up into the following sections:

1. Setup and Configuration
2. Helper Functions for [Speech SDK](https://learn.microsoft.com/en-us/azure/ai-services/openai/concepts/models?tabs=global-standard%2Cstandard-chat-completions#text-to-speech-models-preview) and [Real-time Audio Models](https://learn.microsoft.com/en-us/azure/ai-services/openai/realtime-audio-quickstart?tabs=keyless%2Cwindows&pivots=ai-foundry-portal)
3. Simulating Adversarial Conversations with Audio 
4. Using Content Safety Evaluator to Evaluate Conversations 

## 1. Setup and Configuration
First ensure you install the necessary requirements. In addition to what is listed in `requirements.txt`, you will need to download [ffpmg](https://ffmpeg.org/download.html) for handling of audio files.  

In [None]:
%pip install -r requirements.txt

The following multi-modal evaluators in this sample require an Azure AI Studio project configuration and an Azure credential to use. 

- ContentSafetyEvaluator (This is composite version of following evaluators)
	
    - ViolenceEvaluator	
    - SexualEvaluator	
    - SelfHarmEvaluator	
    - HateUnfairnessEvaluator	

Please fill in the assignments below with the required values to run the rest of this sample. 
Ensure that you have downloaded and installed the Azure CLI and logged in with your Azure credentials using `az login` in your CLI prior to these steps. 

*Important*: We recommend using East US 2 or Sweden Central as your AI Hub/AI project region to support all built-in safety evaluators. A subset of service-based safety evaluators are available in other regions, please see the supported regions in our [documentation](https://aka.ms/azureaistudiosafetyevalhowto). Please configure your project in a supported region to access the safety evaluation service via our evaluation SDK. Additionally, your project scope will be what is used to log your evaluation results in your project after the evaluation run is finished.

Set the following environment variables for use in this notebook:

In [None]:
import os

# Azure OpenAI variables
os.environ["AZURE_SUBSCRIPTION_ID"] = ""
os.environ["AZURE_RESOURCE_GROUP"] = ""
os.environ["AZURE_PROJECT_NAME"] = ""

# Azure OpenAI Realtime Audio deployment variables
os.environ["AZURE_OPENAI_AUDIO_DEPLOYMENT"] = ""
os.environ["AZURE_OPENAI_AUDIO_API_KEY"] = ""
os.environ["AZURE_OPENAI_AUDIO_ENDPOINT"] = ""

# Azure Speech Service variables
os.environ["AZURE_SPEECH_KEY"] = ""
os.environ["AZURE_SPEECH_REGION"] = ""

In [None]:
from azure.identity import DefaultAzureCredential
from azure.ai.evaluation import evaluate
from azure.ai.evaluation.simulator import AdversarialSimulator, AdversarialScenario


azure_ai_project = {
    "subscription_id": os.environ.get("AZURE_SUBSCRIPTION_ID"),
    "resource_group_name": os.environ.get("AZURE_RESOURCE_GROUP"),
    "project_name": os.environ.get("AZURE_PROJECT_NAME"),
}
credential = DefaultAzureCredential()

## 2. Helper Functions for Speech SDK and Real-time Audio Models 

### Helper Functions for Speech SDK 

In [None]:
import azure.cognitiveservices.speech as speechsdk


def text_to_speech(text: str, output_file: str) -> None:
    # Set up the subscription info for the Speech Service:
    speech_key = os.environ.get("AZURE_SPEECH_KEY")
    service_region = os.environ.get("AZURE_SPEECH_REGION")

    # Create an instance of a speech config with specified subscription key and service region.
    speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region)

    # Create an audio configuration that points to an audio file.
    audio_config = speechsdk.audio.AudioOutputConfig(filename=output_file)

    # Create a synthesizer with the given settings
    synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=audio_config)

    # Synthesize the text to speech
    result = synthesizer.speak_text_async(text).get()

    # Check result
    # if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
    #     print(f"Speech synthesized for text [{text}] and saved to [{output_file}]")
    if result.reason == speechsdk.ResultReason.Canceled:
        cancellation_details = result.cancellation_details
        print(f"Speech synthesis canceled: {cancellation_details.reason}")
        if cancellation_details.reason == speechsdk.CancellationReason.Error:
            print(f"Error details: {cancellation_details.error_details}")

In [None]:
from pydub import AudioSegment


def add_silence(input_file: str, output_file: str, silence_duration_ms: int = 500) -> None:
    # Load the audio file
    audio = AudioSegment.from_file(input_file)

    # Create silence audio segments
    silence = AudioSegment.silent(duration=silence_duration_ms)

    # Add silence at the beginning and end
    audio_with_silence = silence + audio + silence

    # Export the modified audio
    audio_with_silence.export(output_file, format="wav")

### Helper Functions for Real-time Audio Models 

In [None]:
import time
from typing_extensions import Any


def log(start_time: float, *args: Any) -> None:  # noqa: ANN401
    elapsed_time_ms = int((time.time() - start_time) * 1000)
    print(f"{elapsed_time_ms} [ms]: ", *args)

In [None]:
from rtclient import RTClient


async def receive_control(start_time: float, client: RTClient) -> None:
    async for control in client.control_messages():
        if control is not None:
            log(start_time, f"Received a control message: {control.type}")
        else:
            break

In [None]:
from scipy.signal import resample
import numpy as np


def resample_audio(audio_data, original_sample_rate, target_sample_rate):  # noqa: ANN201, ANN001
    number_of_samples = round(len(audio_data) * float(target_sample_rate) / original_sample_rate)
    resampled_audio = resample(audio_data, number_of_samples)
    return resampled_audio.astype(np.int16)

In [None]:
import soundfile as sf
from pathlib import Path


async def send_audio(client: RTClient, audio_file_path: Path) -> None:
    sample_rate = 24000
    duration_ms = 100
    samples_per_chunk = sample_rate * (duration_ms / 1000)
    bytes_per_sample = 2
    bytes_per_chunk = int(samples_per_chunk * bytes_per_sample)

    extra_params = (
        {
            "samplerate": sample_rate,
            "channels": 1,
            "subtype": "PCM_16",
        }
        if audio_file_path.endswith(".raw")
        else {}
    )

    audio_data, original_sample_rate = sf.read(audio_file_path, dtype="int16", **extra_params)
    if original_sample_rate != sample_rate:
        audio_data = resample_audio(audio_data, original_sample_rate, sample_rate)

    audio_bytes = audio_data.tobytes()
    for i in range(0, len(audio_bytes), bytes_per_chunk):
        chunk = audio_bytes[i : i + bytes_per_chunk]
        await client.send_audio(chunk)

In [None]:
from rtclient import RTOutputItem
import base64


async def receive_item(start_time: float, item: RTOutputItem, out_dir: str, item_ids: set) -> None:
    prefix = f"[response={item.response_id}][item={item.id}]"

    audio_data = None
    audio_transcript = None
    text_data = None
    arguments = None
    async for chunk in item:
        if chunk.type == "audio_transcript":
            audio_transcript = (audio_transcript or "") + chunk.data
        elif chunk.type == "audio":
            if audio_data is None:
                audio_data = bytearray()
            audio_bytes = base64.b64decode(chunk.data)
            audio_data.extend(audio_bytes)
        elif chunk.type == "tool_call_arguments":
            arguments = (arguments or "") + chunk.data
        elif chunk.type == "text":
            text_data = (text_data or "") + chunk.data
    item_ids.add(item.id)
    item_ids.add(item.previous_id)
    if text_data is not None:
        log(start_time, prefix, f"Text: {text_data}")
        out_path = Path(out_dir) / f"{item.id}.text.txt"
        with out_path.open("w", encoding="utf-8") as out:
            out.write(text_data)
    if audio_data is not None:
        log(start_time, prefix, f"Audio received with length: {len(audio_data)}")
        out_path = Path(out_dir) / f"{item.id}.wav"
        with out_path.open("wb") as out:
            audio_array = np.frombuffer(audio_data, dtype=np.int16)
            sf.write(out, audio_array, samplerate=24000)
    if audio_transcript is not None:
        log(start_time, prefix, f"Audio Transcript: {audio_transcript}")
        out_path = Path(out_dir) / f"{item.id}.audio_transcript.txt"
        with out_path.open("w", encoding="utf-8") as out:
            out.write(audio_transcript)
    if arguments is not None:
        log(start_time, prefix, f"Tool Call Arguments: {arguments}")
        out_path = Path(out_dir) / f"{item.id}.tool.streamed.json"
        with out_path.open("w", encoding="utf-8") as out:
            out.write(arguments)

In [None]:
from rtclient import RTResponse
import asyncio


async def receive_response(start_time: float, response: RTResponse, out_dir: str) -> list:
    prefix = f"[response={response.id}]"
    item_ids = set()
    async for item in response:
        log(start_time, prefix, f"Received item {item.id}")
        asyncio.create_task(receive_item(start_time, item, out_dir, item_ids))  # noqa: RUF006
    log(start_time, prefix, "Response completed")
    return list(item_ids)

In [None]:
from rtclient import RTInputAudioItem


async def receive_input_item(start_time: float, item: RTInputAudioItem) -> None:
    prefix = f"[input_item={item.id}]"
    await item
    log(start_time, prefix, f"Previous Id: {item.previous_id}")
    log(start_time, prefix, f"Transcript: {item.transcript}")
    log(start_time, prefix, f"Audio Start [ms]: {item.audio_start_ms}")
    log(start_time, prefix, f"Audio End [ms]: {item.audio_end_ms}")

In [None]:
async def receive_items(start_time: float, client: RTClient, out_dir: str) -> list:
    item_ids = []
    async for item in client.items():
        if isinstance(item, RTResponse):
            new_item_ids = await receive_response(start_time, item, out_dir)
            item_ids.extend(new_item_ids)
            break
        asyncio.create_task(start_time, receive_input_item(item))  # noqa: RUF006
    return item_ids

In [None]:
async def receive_messages(start_time: float, client: RTClient, out_dir: str) -> list:
    return await receive_items(start_time, client, out_dir)

## 3. Simulating Adversarial Conversations with Audio 

### Audio-based Callback Function

The Azure AI Evaluation SDK's Adversarial Simulator provides text to prompt your model to produce harmful content. In this callback function, we use your Speech service connection to convert this text to audio, and then prompt your audio model to respond to the converted audio. These responses will form the dataset of conversations which are converted back to text using the Speech service to be used by the Content Safety evaluator.

In [None]:
from typing import List, Dict, Optional
from azure.core.credentials import AzureKeyCredential
from rtclient import NoTurnDetection


async def audio_callback(
    messages: List[Dict], stream: bool = False, session_state: Optional[str] = None, context: Optional[Dict] = None
) -> dict:
    endpoint = os.environ.get("AZURE_OPENAI_AUDIO_ENDPOINT")
    audio_key = os.environ.get("AZURE_OPENAI_AUDIO_API_KEY")
    audio_deployment = os.environ.get("AZURE_OPENAI_AUDIO_DEPLOYMENT")

    start_time = time.time()
    async with RTClient(
        url=endpoint, key_credential=AzureKeyCredential(audio_key), azure_deployment=audio_deployment
    ) as rt_client:
        log(start_time, "Connected to RTClient")
        text_to_speech(messages["messages"][0]["content"], f"./generated-audio/conv_{0}_{1}_tmp.wav")

        add_silence(f"./generated-audio/conv_{0}_{1}_tmp.wav", f"./generated-audio/conv_{0}_{1}.wav")

        asyncio.create_task(receive_control(start_time, rt_client))  # noqa: RUF006
        with Path.open("./instruction.txt") as instructions_file:
            instructions = instructions_file.read()

        log(start_time, "Configuring Session...")
        await rt_client.configure(instructions=instructions, turn_detection=NoTurnDetection())

        audio_file_path = f"./generated-audio/conv_{0}_{1}.wav"
        out_dir = Path(f"./generated-audio/conv_{0}_{1}_out")
        out_dir.mkdir(parents=True, exist_ok=True)
        log(start_time, f"Sending Audio: {audio_file_path}")
        await send_audio(rt_client, Path.resolve(audio_file_path))
        await rt_client.commit_audio()
        await rt_client.generate_response()
        last_transcript = ""
        item_ids = await receive_messages(start_time, rt_client, out_dir)
        log(start_time, item_ids)
        formatted_response = {}
        for item_id in item_ids:
            file_path = Path(out_dir) / f"{item_id}.audio_transcript.txt"
            if item_id is not None and Path(file_path).resolve().exists():
                with Path(file_path).resolve().open("r", encoding="utf-8") as out:
                    last_transcript = out.read()
                    last_transcript = last_transcript.replace("\n", " ").strip()
                formatted_response = {
                    "content": last_transcript,
                    "role": "assistant",
                    "context": {"key": {}},
                }
        messages["messages"].append(formatted_response)
        return {
            "messages": messages["messages"],
            "stream": stream,
            "session_state": session_state,
            "context": context,
        }

In [None]:
adv_simulator = AdversarialSimulator(azure_ai_project=azure_ai_project, credential=credential)
adv_scenario = AdversarialScenario.ADVERSARIAL_CONVERSATION

In [None]:
outputs = await adv_simulator(
    scenario=adv_scenario,
    max_conversation_turns=3,  # define the number of conversation turns
    max_simulation_results=1,  # define the number of simulation results
    target=audio_callback,  # define the target model callback
    concurrent_async_task=1,
)

In [None]:
import json

# Write the output to the file
with Path("audio-harm.jsonl").open("w") as file:
    file.writelines(
        [json.dumps({"conversation": {"messages": conversation["messages"]}}) + "\n" for conversation in outputs]
    )

## 4. Using Content Safety Evaluator to Evaluate Conversations 

In [None]:
from azure.ai.evaluation import ContentSafetyEvaluator

cs_eval = ContentSafetyEvaluator(azure_ai_project=azure_ai_project, credential=credential)

result = evaluate(
    name="content-safety-audio-conversations",
    data="audio-harm.jsonl",
    evaluators={"content_safety": cs_eval},
    # Optionally provide your AI Studio project information to track your evaluation results in your Azure AI Studio project
    azure_ai_project=azure_ai_project,
    # Optionally provide an output path to dump a json of metric summary, row level data and metric and studio URL
    output_path="./content-safety-audio-conversations-results.json",
)