In [1]:
#!import code/Setup.cs
// Currently ACS is working to enable streaming with OpenAI Realtime, for now this is a placeholder

In [2]:
#pragma warning disable OPENAI002

using Azure.AI.OpenAI;
using Azure.Identity;
using OpenAI;
using OpenAI.RealtimeConversation;
using System.ClientModel;
using NAudio.Wave;
using System.Threading;
using Microsoft.DotNet.Interactive;
using System.Threading.Tasks;

AzureOpenAIClient topLevelClient = new(
    new Uri(configuration.AzureOpenAIEndpoint),
    new ApiKeyCredential(configuration.AzureOpenAIKey));
var realtimeConversationClient = topLevelClient.GetRealtimeConversationClient(configuration.AzureOpenAIRealtimeDeployName);


In [3]:
#pragma warning disable OPENAI002
public class MicrophoneAudioStream : Stream, IDisposable
{
    private const int SAMPLES_PER_SECOND = 24000;
    private const int BYTES_PER_SAMPLE = 2;
    private const int CHANNELS = 1;

    // For simplicity, this is configured to use a static 10-second ring buffer.
    private readonly byte[] _buffer = new byte[BYTES_PER_SAMPLE * SAMPLES_PER_SECOND * CHANNELS * 10];
    private readonly object _bufferLock = new();
    private int _bufferReadPos = 0;
    private int _bufferWritePos = 0;

    private readonly WaveInEvent _waveInEvent;

    private MicrophoneAudioStream()
    {
        _waveInEvent = new()
        {
            WaveFormat = new WaveFormat(SAMPLES_PER_SECOND, BYTES_PER_SAMPLE * 8, CHANNELS),
        };
        _waveInEvent.DataAvailable += (_, e) =>
        {
            lock (_bufferLock)
            {
                int bytesToCopy = e.BytesRecorded;
                if (_bufferWritePos + bytesToCopy >= _buffer.Length)
                {
                    int bytesToCopyBeforeWrap = _buffer.Length - _bufferWritePos;
                    Array.Copy(e.Buffer, 0, _buffer, _bufferWritePos, bytesToCopyBeforeWrap);
                    bytesToCopy -= bytesToCopyBeforeWrap;
                    _bufferWritePos = 0;
                }
                Array.Copy(e.Buffer, e.BytesRecorded - bytesToCopy, _buffer, _bufferWritePos, bytesToCopy);
                _bufferWritePos += bytesToCopy;
            }
        };
        _waveInEvent.StartRecording();
    }

    public static MicrophoneAudioStream Start() => new();

    public override bool CanRead => true;

    public override bool CanSeek => false;

    public override bool CanWrite => false;

    public override long Length => throw new NotImplementedException();

    public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int totalCount = count;

        int GetBytesAvailable() => _bufferWritePos < _bufferReadPos
            ? _bufferWritePos + (_buffer.Length - _bufferReadPos)
            : _bufferWritePos - _bufferReadPos;

        // For simplicity, we'll block until all requested data is available and not perform partial reads.
        while (GetBytesAvailable() < count)
        {
            Thread.Sleep(100);
        }

        lock (_bufferLock)
        {
            if (_bufferReadPos + count >= _buffer.Length)
            {
                int bytesBeforeWrap = _buffer.Length - _bufferReadPos;
                Array.Copy(
                    sourceArray: _buffer,
                    sourceIndex: _bufferReadPos,
                    destinationArray: buffer,
                    destinationIndex: offset,
                    length: bytesBeforeWrap);
                _bufferReadPos = 0;
                count -= bytesBeforeWrap;
                offset += bytesBeforeWrap;
            }

            Array.Copy(_buffer, _bufferReadPos, buffer, offset, count);
            _bufferReadPos += count;
        }

        return totalCount;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }

    protected override void Dispose(bool disposing)
    {
        _waveInEvent?.Dispose();
        base.Dispose(disposing);
    }
}
public class SpeakerOutput : IDisposable
{
    BufferedWaveProvider _waveProvider;
    WaveOutEvent _waveOutEvent;

    public SpeakerOutput()
    {
        WaveFormat outputAudioFormat = new(
            rate: 24000,
            bits: 16,
            channels: 1);
        _waveProvider = new(outputAudioFormat)
        {
            BufferDuration = TimeSpan.FromMinutes(2),
        };
        _waveOutEvent = new();
        _waveOutEvent.Init(_waveProvider);
        _waveOutEvent.Play();

    }

    public void EnqueueForPlayback(BinaryData audioData)
    {
        byte[] buffer = audioData.ToArray();
        _waveProvider.AddSamples(buffer, 0, buffer.Length);
    }

    public void ClearPlayback()
    {
        _waveProvider.ClearBuffer();
    }

    public void Dispose()
    {
        _waveOutEvent?.Stop();
        _waveOutEvent?.Dispose();
        _waveProvider?.ClearBuffer();  
    }
}
ConversationFunctionTool finishConversationTool = new()
{
    Name = "user_wants_to_finish_conversation",
    Description = "Invoked when the user says goodbye, expresses being finished, or otherwise seems to want to stop the interaction.",
    Parameters = BinaryData.FromString("{}")
};

In [7]:
#pragma warning disable OPENAI002
try
{

    using SpeakerOutput speakerOutput = new();

    using var session = await realtimeConversationClient.StartConversationSessionAsync();
    await session.ConfigureSessionAsync(new ConversationSessionOptions()
    {
        Tools = { finishConversationTool },
        InputTranscriptionOptions = new()
        {
            Model = "whisper-1",
        },
    });
    while(!KernelInvocationContext.Current.CancellationToken.IsCancellationRequested)
    {
        // With the session configured, we start processing commands received from the service.
        await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync(KernelInvocationContext.Current.CancellationToken))
        {
            // session.created is the very first command on a session and lets us know that connection was successful.
            if (update is ConversationSessionStartedUpdate)
            {
                Console.WriteLine($" <<< Connected: session started");
                // This is a good time to start capturing microphone input and sending audio to the service. The
                // input stream will be chunked and sent asynchronously, so we don't need to await anything in the
                // processing loop.
                _ = Task.Run(async () =>
                {
                    using MicrophoneAudioStream microphoneInput = MicrophoneAudioStream.Start();
                    Console.WriteLine($" >>> Listening to microphone input");
                    Console.WriteLine($" >>> (Just tell the app you're done to finish)");
                    Console.WriteLine();
                    await session.SendAudioAsync(microphoneInput);
                });
            }

            // input_audio_buffer.speech_started tells us that the beginning of speech was detected in the input audio
            // we're sending from the microphone.
            if (update is ConversationInputSpeechStartedUpdate)
            {
                Console.WriteLine($" <<< Start of speech detected");
                // Like any good listener, we can use the cue that the user started speaking as a hint that the app
                // should stop talking. Note that we could also track the playback position and truncate the response
                // item so that the model doesn't "remember things it didn't say" -- that's not demonstrated here.
                speakerOutput.ClearPlayback();
            }

            // input_audio_buffer.speech_stopped tells us that the end of speech was detected in the input audio sent
            // from the microphone. It'll automatically tell the model to start generating a response to reply back.
            if (update is ConversationInputSpeechFinishedUpdate)
            {
                Console.WriteLine($" <<< End of speech detected");
            }

            // conversation.item.input_audio_transcription.completed will only arrive if input transcription was
            // configured for the session. It provides a written representation of what the user said, which can
            // provide good feedback about what the model will use to respond.
            if (update is ConversationInputTranscriptionFinishedUpdate transcriptionFinishedUpdate)
            {
                Console.WriteLine($" >>> USER: {transcriptionFinishedUpdate.Transcript}");
            }

            // response.audio.delta provides incremental output audio generated by the model talking. Here, we
            // immediately enqueue it for playback on the active speaker output.
            if (update is ConversationAudioDeltaUpdate audioDeltaUpdate)
            {
                speakerOutput.EnqueueForPlayback(audioDeltaUpdate.Delta);
            }

            // response.audio_transcript.delta provides the incremental transcription of the emitted audio. The model
            // typically produces output much faster than it should be played back, so the transcript may move very
            // quickly relative to what's heard.
            if (update is ConversationOutputTranscriptionDeltaUpdate outputTranscriptionDeltaUpdate)
            {
                Console.Write(outputTranscriptionDeltaUpdate.Delta);
            }

            // response.output_item.done tells us that a model-generated item with streaming content is completed.
            // That's a good signal to provide a visual break and perform final evaluation of tool calls.
            if (update is ConversationItemFinishedUpdate itemFinishedUpdate)
            {
                Console.WriteLine();
                if (itemFinishedUpdate.FunctionName == finishConversationTool.Name)
                {

                    Console.WriteLine($" <<< Finish tool invoked -- ending conversation!");
                    session.Dispose();
                    speakerOutput.Dispose();
                    break;
                }
            }

            // error commands, as the name implies, are raised when something goes wrong.
            if (update is ConversationErrorUpdate errorUpdate)
            {
                Console.WriteLine();
                Console.WriteLine();
                Console.WriteLine($" <<< ERROR: {errorUpdate.ErrorMessage}");
                Console.WriteLine(errorUpdate.GetRawContent().ToString());
                break;
            }
        }
    }
 

}
catch (OperationCanceledException)
{
    Console.WriteLine("Caught OperationCanceledException.");
}

 <<< Connected: session started
 >>> Listening to microphone input
 >>> (Just tell the app you're done to finish)

 <<< Start of speech detected
 <<< End of speech detected
Hello! I hear >>> USER: Hello testing this out

 you loud and clear. Is there anything specific you'd like to test or chat about?
 <<< Start of speech detected
 <<< End of speech detected
I can help >>> USER: What can you do?

 answer questions, chat about a variety of topics, tell jokes, provide information, explain concepts, offer advice, and more! What would you like to know or talk about today?
 <<< Start of speech detected
 <<< End of speech detected
 >>> USER: End conversation.


 <<< Finish tool invoked -- ending conversation!
Caught OperationCanceledException.
