diff --git a/README.md b/README.md index ba1aeaf..9e6c48a 100644 --- a/README.md +++ b/README.md @@ -131,19 +131,20 @@ Defaults to `false`, which enforces hostname match with the peer certificate. The freeswitch module exposes the following API commands: ``` -uuid_openai_audio_stream start +uuid_openai_audio_stream start [] [mute_user] ``` -Attaches a media bug and starts streaming audio (in L16 format) to the websocket server. FS default is 8k. If sampling-rate is other than 8k it will be resampled. +Attaches a media bug and starts streaming audio (in L16 format) to the websocket server. FS default is 8k. If sampling-rate is other than 8k it will be resampled. Passing `mute_user` delays forwarding caller audio to the Realtime API until you explicitly unmute. - `uuid` - Freeswitch channel unique id - `wss-url` - websocket url `ws://` or `wss://` - `mix-type` - choice of - "mono" - single channel containing caller's audio - "mixed" - single channel containing both caller and callee audio - "stereo" - two channels with caller audio in one and callee audio in the other. -- `sampling-rate` - choice of +- `sampling-rate` - optional, choice of - "8k" = 8000 Hz sample rate will be generated - "16k" = 16000 Hz sample rate will be generated - "24k" = 24000 Hz sample rate will be generated +- `mute_user` - optional flag. When present, the module initialises muted and ignores caller audio until an explicit `unmute`. - **IMPORTANT NOTE**: The OpenAI Realtime API, when using PCM audio format, expects the audio to be in 24 kHz sample rate. Use the sampling-rate parameter as `24k` (or `24000`) and mono to ensure that the audio is sent in the correct format. From the OpenAI Realtime API documentation: *input audio must be 16-bit PCM at a 24kHz sample rate, single channel (mono), and little-endian byte order.* Support for exchanging audio with OpenAI in other formats may be developed in the future, which would make the `` and ` stop ``` uuid_openai_audio_stream pause ``` -Pauses audio stream +Pauses audio streaming in both directions. Caller audio stops flowing to OpenAI and any OpenAI playback currently buffering into the channel is halted until `resume`. ``` uuid_openai_audio_stream resume ``` -Resumes audio stream +Resumes audio streaming in both directions after a `pause`. + +``` +uuid_openai_audio_stream mute [user | openai | all] +``` +Keeps the media bug alive while silencing the selected leg. Defaults to `user` when omitted. +- `user`: block caller audio being sent to OpenAI. +- `openai`: block OpenAI playback from reaching the channel. +- `all`: apply both mute operations at once. + +``` +uuid_openai_audio_stream unmute [user | openai | all] +``` +Re-enables the selected audio leg after a corresponding `mute`. Defaults to `user` when omitted. ## Events Module will generate the following event types: diff --git a/mod_openai_audio_stream.c b/mod_openai_audio_stream.c index da6dd02..945c3ab 100644 --- a/mod_openai_audio_stream.c +++ b/mod_openai_audio_stream.c @@ -63,7 +63,8 @@ static switch_bool_t capture_callback(switch_media_bug_t *bug, void *user_data, static switch_status_t start_capture(switch_core_session_t *session, switch_media_bug_flag_t flags, char* wsUri, - int sampling) + int sampling, + switch_bool_t start_muted) { switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug; @@ -87,7 +88,7 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "calling stream_session_init.\n"); if (SWITCH_STATUS_FALSE == stream_session_init(session, responseHandler, read_codec->implementation->actual_samples_per_second, - wsUri, sampling, channels, &pUserData)) { + wsUri, sampling, channels, start_muted, &pUserData)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing mod_openai_audio_stream session.\n"); return SWITCH_STATUS_FALSE; } @@ -127,6 +128,31 @@ static switch_status_t do_pauseresume(switch_core_session_t *session, int pause) return status; } +static switch_status_t do_audio_mute(switch_core_session_t *session, const char *target, int mute) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + const char *which = target && *target ? target : "user"; + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "mod_openai_audio_stream: %s %s audio\n", mute ? "mute" : "unmute", which); + + if (!strcasecmp(which, "user")) { + status = stream_session_set_user_mute(session, mute); + } else if (!strcasecmp(which, "openai")) { + status = stream_session_set_openai_mute(session, mute); + } else if (!strcasecmp(which, "all") || !strcasecmp(which, "both")) { + switch_status_t user_status = stream_session_set_user_mute(session, mute); + switch_status_t openai_status = stream_session_set_openai_mute(session, mute); + status = (user_status == SWITCH_STATUS_SUCCESS && openai_status == SWITCH_STATUS_SUCCESS) ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE; + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "mod_openai_audio_stream: invalid mute target '%s', expected user|openai|all\n", which); + status = SWITCH_STATUS_FALSE; + } + + return status; +} + static switch_status_t send_json(switch_core_session_t *session, char* json) { switch_status_t status = SWITCH_STATUS_FALSE; switch_channel_t *channel = switch_core_session_get_channel(session); @@ -141,10 +167,60 @@ static switch_status_t send_json(switch_core_session_t *session, char* json) { return status; } -#define STREAM_API_SYNTAX " [start | stop | send_json | pause | resume | graceful-shutdown ] [wss-url | path] [mono | mixed | stereo] [8000 | 16000 | 24000]" +#define STREAM_API_SYNTAX \ +"USAGE:\n" \ +"--------------------------------------------------------------------------------\n" \ +"uuid_openai_audio_stream [start | stop | send_json | pause | resume |\n" \ +" mute | unmute]\n" \ +" [wss-url | path | user | openai | all | base64json]\n" \ +" [mono | mixed | stereo]\n" \ +" [8000 | 16000 | 24000]\n" \ +" [mute_user]\n" \ +"--------------------------------------------------------------------------------\n" + +typedef enum { + STREAM_CMD_UNKNOWN, + STREAM_CMD_START, + STREAM_CMD_STOP, + STREAM_CMD_SEND_JSON, + STREAM_CMD_PAUSE, + STREAM_CMD_RESUME, + STREAM_CMD_MUTE, + STREAM_CMD_UNMUTE +} stream_command_t; + +static stream_command_t stream_command_from_string(const char *name) +{ + if (zstr(name)) { + return STREAM_CMD_UNKNOWN; + } + if (!strcasecmp(name, "start")) { + return STREAM_CMD_START; + } + if (!strcasecmp(name, "stop")) { + return STREAM_CMD_STOP; + } + if (!strcasecmp(name, "send_json")) { + return STREAM_CMD_SEND_JSON; + } + if (!strcasecmp(name, "pause")) { + return STREAM_CMD_PAUSE; + } + if (!strcasecmp(name, "resume")) { + return STREAM_CMD_RESUME; + } + if (!strcasecmp(name, "mute")) { + return STREAM_CMD_MUTE; + } + if (!strcasecmp(name, "unmute")) { + return STREAM_CMD_UNMUTE; + } + return STREAM_CMD_UNKNOWN; +} + SWITCH_STANDARD_API(stream_function) { - char *mycmd = NULL, *argv[6] = { 0 }; + char *mycmd = NULL, *argv[8] = { 0 }; int argc = 0; switch_status_t status = SWITCH_STATUS_FALSE; @@ -154,40 +230,55 @@ SWITCH_STANDARD_API(stream_function) } assert(cmd); - if (zstr(cmd) || argc < 2 || (0 == strcmp(argv[1], "start") && argc < 4)) { + if (zstr(cmd) || argc < 2) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error with command %s.\n", cmd); - stream->write_function(stream, "-USAGE: %s\n", STREAM_API_SYNTAX); + stream->write_function(stream, "%s\n", STREAM_API_SYNTAX); goto done; - } else { - if (strcasecmp(argv[1], "send_json")) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "mod_openai_audio_stream cmd: %s\n", cmd ? cmd : ""); - } - switch_core_session_t *lsession = NULL; - if ((lsession = switch_core_session_locate(argv[0]))) { - if (!strcasecmp(argv[1], "stop")) { - if(argc > 2 && (is_valid_utf8(argv[2]) != SWITCH_STATUS_SUCCESS)) { + } + + stream_command_t command = stream_command_from_string(argv[1]); + + if (command != STREAM_CMD_SEND_JSON) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "mod_openai_audio_stream cmd: %s\n", cmd ? cmd : ""); + } + + switch_core_session_t *lsession = NULL; + if ((lsession = switch_core_session_locate(argv[0]))) { + switch (command) { + case STREAM_CMD_STOP: + if (argc > 2 && (is_valid_utf8(argv[2]) != SWITCH_STATUS_SUCCESS)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s contains invalid utf8 characters\n", argv[2]); - switch_core_session_rwunlock(lsession); - goto done; + goto release_session; } status = do_stop(lsession, argc > 2 ? argv[2] : NULL); - } else if (!strcasecmp(argv[1], "pause")) { + break; + case STREAM_CMD_PAUSE: status = do_pauseresume(lsession, 1); - } else if (!strcasecmp(argv[1], "resume")) { + break; + case STREAM_CMD_RESUME: status = do_pauseresume(lsession, 0); - } else if (!strcasecmp(argv[1], "send_json")) { + break; + case STREAM_CMD_SEND_JSON: if (argc < 3) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "send_json requires an argument specifying json to send\n"); - switch_core_session_rwunlock(lsession); - goto done; + goto release_session; } status = send_json(lsession, argv[2]); - } else if (!strcasecmp(argv[1], "start")) { - //switch_channel_t *channel = switch_core_session_get_channel(lsession); + break; + case STREAM_CMD_START: + { + if (argc < 4) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "Error with command %s.\n", cmd); + stream->write_function(stream, "%s\n", STREAM_API_SYNTAX); + goto release_session; + } char wsUri[MAX_WS_URI]; int sampling = 8000; + const char *sampling_str = NULL; + switch_bool_t start_muted = SWITCH_FALSE; switch_media_bug_flag_t flags = SMBF_READ_STREAM; flags |= SMBF_WRITE_REPLACE; if (0 == strcmp(argv[3], "mixed")) { @@ -198,18 +289,27 @@ SWITCH_STANDARD_API(stream_function) } else if (0 != strcmp(argv[3], "mono")) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "invalid mix type: %s, must be mono, mixed, or stereo\n", argv[3]); - switch_core_session_rwunlock(lsession); - goto done; + goto release_session; } if (argc > 4) { - if (0 == strcmp(argv[4], "16k")) { - sampling = 16000; - } else if (0 == strcmp(argv[4], "8k")) { - sampling = 8000; - } else if (0 == strcmp(argv[4], "24k")) { - sampling = 24000; + int next_index = 4; + if (!strcasecmp(argv[next_index], "mute_user")) { + start_muted = SWITCH_TRUE; } else { - sampling = atoi(argv[4]); + sampling_str = argv[next_index]; + if (0 == strcmp(sampling_str, "16k")) { + sampling = 16000; + } else if (0 == strcmp(sampling_str, "8k")) { + sampling = 8000; + } else if (0 == strcmp(sampling_str, "24k")) { + sampling = 24000; + } else { + sampling = atoi(sampling_str); + } + next_index++; + if (argc > next_index && !strcasecmp(argv[next_index], "mute_user")) { + start_muted = SWITCH_TRUE; + } } } @@ -217,22 +317,37 @@ SWITCH_STANDARD_API(stream_function) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "invalid websocket uri: %s\n", argv[2]); } else if (sampling % 8000 != 0) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, - "invalid sample rate: %s\n", argv[4]); + if (sampling_str) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "invalid sample rate: %s\n", sampling_str); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "invalid sample rate: %d\n", sampling); + } } else { - status = start_capture(lsession, flags, wsUri, sampling); + status = start_capture(lsession, flags, wsUri, sampling, start_muted); } - } else { + break; + } + case STREAM_CMD_MUTE: + case STREAM_CMD_UNMUTE: + { + const char *target = (argc > 2) ? argv[2] : "user"; + status = do_audio_mute(lsession, target, command == STREAM_CMD_MUTE ? 1 : 0); + break; + } + case STREAM_CMD_UNKNOWN: + default: switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "unsupported mod_openai_audio_stream cmd: %s\n", argv[1]); - } - - - switch_core_session_rwunlock(lsession); - } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error locating session %s\n", - argv[0]); + break; } + +release_session: + switch_core_session_rwunlock(lsession); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error locating session %s\n", + argv[0]); } if (status == SWITCH_STATUS_SUCCESS) { @@ -270,6 +385,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_openai_audio_stream_load) switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid stop"); switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid pause"); switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid resume"); + switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid mute"); + switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid unmute"); switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid send_json"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_openai_audio_stream API successfully loaded\n"); diff --git a/mod_openai_audio_stream.h b/mod_openai_audio_stream.h index 333e5af..811d472 100644 --- a/mod_openai_audio_stream.h +++ b/mod_openai_audio_stream.h @@ -30,13 +30,14 @@ struct private_data { int sampling; int channels; int audio_paused:1; + int user_audio_muted:1; + int openai_audio_muted:1; int close_requested:1; RingBuffer *buffer; switch_buffer_t *sbuffer; uint8_t *data; int rtp_packets; switch_buffer_t *playback_buffer; - switch_mutex_t *playback_mutex; }; typedef struct private_data private_t; diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index e5487c9..61ccf6e 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -148,14 +149,14 @@ class AudioStreamer { if(!channel) { return nullptr; } - auto *bug = (switch_media_bug_t *) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); return bug; } inline void media_bug_close(switch_core_session_t *session) { auto *bug = get_media_bug(session); if(bug) { - auto* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto* tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); tech_pvt->close_requested = 1; switch_core_media_bug_close(&bug, SWITCH_FALSE); } @@ -286,14 +287,14 @@ class AudioStreamer { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%s) processMessage - error: %s\n", m_sessionId.c_str(), message.c_str()); } else if(jsType && strcmp(jsType, "input_audio_buffer.speech_started") == 0) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - speech started\n", m_sessionId.c_str()); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "(%s) processMessage - user speech started, stopping openai audio playback\n", m_sessionId.c_str()); clear_audio_queue(); // also clear the private_t playback buffer used in write frame playback_clear_requested = true; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - speech detected stopping audio playback\n", m_sessionId.c_str()); - - status = SWITCH_TRUE; + } else if (jsType && strcmp(jsType, "input_audio_buffer.speech_stopped") == 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "(%s) processMessage - user speech stopped\n", m_sessionId.c_str()); + // Do not clear playback_clear_requested here; it should remain true until new audio is received. } else if(jsType && strcmp(jsType, "response.output_audio.delta") == 0) { const char* jsonAudio = cJSON_GetObjectCstr(json, "delta"); @@ -347,26 +348,19 @@ class AudioStreamer { // managing queue, check if empty before popping or peeking - bool is_audio_queue_empty() { - std::lock_guard lock(m_audio_queue_mutex); - return m_audio_queue.empty(); - } - void push_audio_queue(const std::vector& audio_data) { std::lock_guard lock(m_audio_queue_mutex); m_audio_queue.push(audio_data); } - std::vector pop_audio_queue() { + bool pop_audio_queue(std::vector &out_audio) { std::lock_guard lock(m_audio_queue_mutex); - auto audio_data = m_audio_queue.front(); + if (m_audio_queue.empty()) { + return false; + } + out_audio = m_audio_queue.front(); m_audio_queue.pop(); - return audio_data; - } - - std::vector peek_audio_queue() { - std::lock_guard lock(m_audio_queue_mutex); - return m_audio_queue.front(); + return true; } void clear_audio_queue() { @@ -413,7 +407,7 @@ class AudioStreamer { void writeBinary(uint8_t* buffer, size_t len) { if(!this->isConnected()) return; - webSocket.sendBinary(ix::IXWebSocketSendData((char *)buffer, len)); + webSocket.sendBinary(ix::IXWebSocketSendData(reinterpret_cast(buffer), len)); } void writeText(const char* text) { // Openai only accepts json not utf8 plain text @@ -444,20 +438,29 @@ class AudioStreamer { void openai_speech_started() { m_openai_speaking = true; switch_core_session_t* psession = switch_core_session_locate(m_sessionId.c_str()); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai started speaking\n", m_sessionId.c_str()); - const char *payload = "{\"status\":\"started\"}"; - m_notify(psession, EVENT_OPENAI_SPEECH_STARTED, payload); - switch_core_session_rwunlock(psession); + + if (psession) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai started speaking\n", m_sessionId.c_str()); + const char *payload = "{\"status\":\"started\"}"; + m_notify(psession, EVENT_OPENAI_SPEECH_STARTED, payload); + switch_core_session_rwunlock(psession); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "(%s) Openai speech started - could not locate session\n", m_sessionId.c_str()); + } } void openai_speech_stopped() { m_openai_speaking = false; switch_core_session_t* psession = switch_core_session_locate(m_sessionId.c_str()); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai stopped speaking\n", m_sessionId.c_str()); - const char *payload = "{\"status\":\"stopped\"}"; - m_notify(psession, EVENT_OPENAI_SPEECH_STOPPED, payload); - switch_core_session_rwunlock(psession); + if (psession) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai stopped speaking\n", m_sessionId.c_str()); + const char *payload = "{\"status\":\"stopped\"}"; + m_notify(psession, EVENT_OPENAI_SPEECH_STOPPED, payload); + switch_core_session_rwunlock(psession); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "(%s) Openai speech stopped - could not locate session\n", m_sessionId.c_str()); + } } @@ -488,7 +491,8 @@ namespace { uint32_t sampling, int desiredSampling, int channels, responseHandler_t responseHandler, int deflate, int heart_beat, bool suppressLog, int rtp_packets, const char* extra_headers, bool no_reconnect, const char *tls_cafile, const char *tls_keyfile, - const char *tls_certfile, bool tls_disable_hostname_validation, bool disable_audiofiles) + const char *tls_certfile, bool tls_disable_hostname_validation, bool disable_audiofiles, + switch_bool_t start_muted) { int err; //speex @@ -505,15 +509,11 @@ namespace { tech_pvt->rtp_packets = rtp_packets; tech_pvt->channels = channels; tech_pvt->audio_paused = 0; + tech_pvt->user_audio_muted = start_muted ? 1 : 0; + tech_pvt->openai_audio_muted = 0; const size_t buflen = (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * rtp_packets); - const size_t playback_buflen = 128000; // 128Kb may need to be decreased - - if (switch_mutex_init(&tech_pvt->playback_mutex, SWITCH_MUTEX_NESTED, pool) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, - "%s: Error creating playback mutex.\n", tech_pvt->sessionId); - return SWITCH_STATUS_FALSE; - } + const size_t playback_buflen = 128000; // 128KB may need to be decreased if (switch_buffer_create(pool, &tech_pvt->playback_buffer, playback_buflen) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, @@ -542,14 +542,14 @@ namespace { } switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s: initializing buffer(%zu) to adjusted %zu bytes\n", tech_pvt->sessionId, buflen, adjSize); - tech_pvt->data = (uint8_t *) switch_core_alloc(pool, adjSize); + tech_pvt->data = static_cast(switch_core_alloc(pool, adjSize)); if (!tech_pvt->data) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s: Error allocating memory for data buffer.\n", tech_pvt->sessionId); return SWITCH_STATUS_FALSE; } memset(tech_pvt->data, 0, adjSize); - tech_pvt->buffer = (RingBuffer *) switch_core_alloc(pool, sizeof(RingBuffer)); + tech_pvt->buffer = static_cast(switch_core_alloc(pool, sizeof(RingBuffer))); if (!tech_pvt->buffer) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s: Error allocating memory for ring buffer.\n", tech_pvt->sessionId); @@ -587,7 +587,7 @@ namespace { tech_pvt->mutex = nullptr; } if (tech_pvt->pAudioStreamer) { - auto* as = (AudioStreamer *) tech_pvt->pAudioStreamer; + auto* as = static_cast(tech_pvt->pAudioStreamer); delete as; tech_pvt->pAudioStreamer = nullptr; } @@ -595,7 +595,7 @@ namespace { void finish(private_t* tech_pvt) { std::shared_ptr aStreamer; - aStreamer.reset((AudioStreamer *)tech_pvt->pAudioStreamer); + aStreamer.reset(static_cast(tech_pvt->pAudioStreamer)); tech_pvt->pAudioStreamer = nullptr; std::thread t([aStreamer]{ @@ -686,7 +686,7 @@ extern "C" { switch_status_t stream_session_send_json(switch_core_session_t *session, const char* base64_input) { switch_channel_t *channel = switch_core_session_get_channel(session); - switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); cJSON *json_obj = nullptr; char *json_unformatted = nullptr; switch_status_t status = SWITCH_STATUS_FALSE; @@ -695,7 +695,7 @@ extern "C" { return SWITCH_STATUS_FALSE; } - auto *tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_send_json failed to retrieve session data.\n"); return SWITCH_STATUS_FALSE; @@ -752,12 +752,12 @@ extern "C" { switch_status_t stream_session_pauseresume(switch_core_session_t *session, int pause) { switch_channel_t *channel = switch_core_session_get_channel(session); - auto *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); if (!bug) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_pauseresume failed because no bug\n"); return SWITCH_STATUS_FALSE; } - auto *tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt) return SWITCH_STATUS_FALSE; @@ -766,12 +766,99 @@ extern "C" { return SWITCH_STATUS_SUCCESS; } + switch_status_t stream_session_set_user_mute(switch_core_session_t *session, int mute) { + switch_channel_t *channel = switch_core_session_get_channel(session); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); + switch_status_t status = SWITCH_STATUS_FALSE; + if (!bug) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_set_user_mute failed because no bug\n"); + return status; + } + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); + if (!tech_pvt) { + return status; + } + + status = SWITCH_STATUS_SUCCESS; + switch_core_media_bug_flush(bug); + const int last_state = tech_pvt->user_audio_muted; + tech_pvt->user_audio_muted = mute ? 1 : 0; + if (last_state == tech_pvt->user_audio_muted) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "User audio is already %s\n", tech_pvt->user_audio_muted ? "muted" : "unmuted"); + return status; + } + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "User audio %s\n", tech_pvt->user_audio_muted ? "muted" : "unmuted"); + + if (tech_pvt->user_audio_muted) { + if (tech_pvt->mutex) { + switch_mutex_lock(tech_pvt->mutex); + } + + if (tech_pvt->buffer) { + ringBufferClear(tech_pvt->buffer); + } + if (tech_pvt->sbuffer) { + switch_buffer_zero(tech_pvt->sbuffer); + } + + AudioStreamer *streamer = static_cast(tech_pvt->pAudioStreamer); + if (streamer && streamer->isConnected()) { + size_t channels = tech_pvt->channels > 0 ? static_cast(tech_pvt->channels) : 1; + size_t sample_rate = tech_pvt->sampling > 0 ? static_cast(tech_pvt->sampling) : 24000; // 24 KHz is currently the only supported rate by openai + size_t bytes = channels * sample_rate * sizeof(int16_t); + std::vector silence(bytes, 0); + streamer->writeAudioDelta(silence.data(), silence.size()); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, + "Sent %zu bytes of silence after muting user audio\n", silence.size()); + } else { + status = SWITCH_STATUS_FALSE; + } + + if (tech_pvt->mutex) { + switch_mutex_unlock(tech_pvt->mutex); + } + } + + return status; + } + + switch_status_t stream_session_set_openai_mute(switch_core_session_t *session, int mute) { + switch_channel_t *channel = switch_core_session_get_channel(session); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); + if (!bug) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_set_openai_mute failed because no bug\n"); + return SWITCH_STATUS_FALSE; + } + + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); + if (!tech_pvt) { + return SWITCH_STATUS_FALSE; + } + + switch_core_media_bug_flush(bug); + auto last_state = tech_pvt->openai_audio_muted; + tech_pvt->openai_audio_muted = mute ? 1 : 0; + if (last_state == tech_pvt->openai_audio_muted) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "OpenAI audio is already %s\n", tech_pvt->openai_audio_muted ? "muted" : "unmuted"); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "OpenAI audio %s\n", tech_pvt->openai_audio_muted ? "muted" : "unmuted"); + } + + return SWITCH_STATUS_SUCCESS; + } + switch_status_t stream_session_init(switch_core_session_t *session, responseHandler_t responseHandler, uint32_t samples_per_second, char *wsUri, int sampling, int channels, + switch_bool_t start_muted, void **ppUserData) { int deflate = 0, heart_beat = 0; @@ -784,7 +871,6 @@ extern "C" { const char* tls_keyfile = NULL;; const char* tls_certfile = NULL;; const char* openai_api_key = NULL;; - const char* openai_realtime_version = NULL; bool tls_disable_hostname_validation = false; bool disable_audiofiles = false; @@ -840,19 +926,19 @@ extern "C" { "{\"Authorization\": \"Bearer %s\"}", openai_api_key); extra_headers = headers_buf; } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "OPENAI_API_KEY is not set. Assuming you set STREAM_EXTRA_HEADERS variable.\n"); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "OPENAI_API_KEY is not set. Assuming you set STREAM_EXTRA_HEADERS variable.\n"); extra_headers = switch_channel_get_variable(channel, "STREAM_EXTRA_HEADERS"); } // allocate per-session tech_pvt - auto* tech_pvt = (private_t *) switch_core_session_alloc(session, sizeof(private_t)); + auto* tech_pvt = static_cast(switch_core_session_alloc(session, sizeof(private_t))); if (!tech_pvt) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error allocating memory!\n"); return SWITCH_STATUS_FALSE; } if (SWITCH_STATUS_SUCCESS != stream_data_init(tech_pvt, session, wsUri, samples_per_second, sampling, channels, responseHandler, deflate, heart_beat, - suppressLog, rtp_packets, extra_headers, no_reconnect, tls_cafile, tls_keyfile, tls_certfile, tls_disable_hostname_validation, disable_audiofiles)) { + suppressLog, rtp_packets, extra_headers, no_reconnect, tls_cafile, tls_keyfile, tls_certfile, tls_disable_hostname_validation, disable_audiofiles, start_muted)) { destroy_tech_pvt(tech_pvt); return SWITCH_STATUS_FALSE; } @@ -864,8 +950,8 @@ extern "C" { switch_bool_t stream_frame(switch_media_bug_t *bug) { - auto* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); - if (!tech_pvt || tech_pvt->audio_paused) return SWITCH_TRUE; + auto* tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); + if (!tech_pvt || tech_pvt->audio_paused || tech_pvt->user_audio_muted) return SWITCH_TRUE; if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { @@ -890,7 +976,7 @@ extern "C" { while (switch_core_media_bug_read(bug, &frame, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { if(frame.datalen) { if (1 == tech_pvt->rtp_packets) { - pAudioStreamer->writeAudioDelta((uint8_t *) frame.data, frame.datalen); + pAudioStreamer->writeAudioDelta(static_cast(frame.data), frame.datalen); continue; } @@ -937,13 +1023,13 @@ extern "C" { if(tech_pvt->channels == 1) { speex_resampler_process_int(tech_pvt->resampler, 0, - (const spx_int16_t *)frame.data, + static_cast(frame.data), &in_len, &out[0], &out_len); } else { speex_resampler_process_interleaved_int(tech_pvt->resampler, - (const spx_int16_t *)frame.data, + static_cast(frame.data), &in_len, &out[0], &out_len); @@ -952,11 +1038,11 @@ extern "C" { if(out_len > 0) { const size_t bytes_written = out_len * tech_pvt->channels * sizeof(spx_int16_t); if (tech_pvt->rtp_packets == 1) { //20ms packet - pAudioStreamer->writeAudioDelta((uint8_t *) out, bytes_written); + pAudioStreamer->writeAudioDelta(reinterpret_cast(out), bytes_written); continue; } if (bytes_written <= inuse) { - switch_buffer_write(tech_pvt->sbuffer, (const uint8_t *)out, bytes_written); + switch_buffer_write(tech_pvt->sbuffer, out, bytes_written); } } @@ -976,7 +1062,7 @@ extern "C" { } switch_bool_t write_frame(switch_core_session_t *session, switch_media_bug_t *bug) { - private_t *tech_pvt = (private_t *)switch_core_media_bug_get_user_data(bug); + private_t *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt || tech_pvt->audio_paused) { return SWITCH_TRUE; } @@ -1003,15 +1089,19 @@ extern "C" { uint32_t inuse = switch_buffer_inuse(tech_pvt->playback_buffer); // push a chunk in the audio buffer used treated as cache - if (as->clear_requested()) { switch_buffer_zero(tech_pvt->playback_buffer); inuse = 0; } - if (inuse < bytes_needed * 2 && !as->is_audio_queue_empty()) { - auto chunk = as->pop_audio_queue(); - switch_buffer_write(tech_pvt->playback_buffer, chunk.data(), chunk.size() * sizeof(int16_t)); - } else if (inuse == 0) { + bool chunk_enqueued = false; + if (inuse < bytes_needed * 2) { + std::vector chunk; + if (as->pop_audio_queue(chunk)) { + switch_buffer_write(tech_pvt->playback_buffer, chunk.data(), chunk.size() * sizeof(int16_t)); + chunk_enqueued = true; + } + } + if (!chunk_enqueued && inuse == 0) { // Openai just finished speaking for interruption or end of response if(as->is_openai_speaking() && as->is_response_audio_done()) { as->openai_speech_stopped(); @@ -1019,32 +1109,36 @@ extern "C" { return SWITCH_TRUE; } - switch_byte_t *data = (switch_byte_t *) frame->data; - if (inuse > bytes_needed) { - switch_buffer_read(tech_pvt->playback_buffer, data, bytes_needed); - } else { - switch_buffer_read(tech_pvt->playback_buffer, data, inuse); + inuse = bytes_needed; } - if (!as->is_openai_speaking()) { - as->openai_speech_started(); - } + if (tech_pvt->openai_audio_muted) { + switch_buffer_toss(tech_pvt->playback_buffer, inuse); + } else { + switch_byte_t *data = static_cast(frame->data); - frame->datalen = inuse > bytes_needed ? bytes_needed : inuse; - frame->samples = frame->datalen / bytes_per_sample; + switch_buffer_read(tech_pvt->playback_buffer, data, inuse); - switch_core_media_bug_set_write_replace_frame(bug, frame); + if (!as->is_openai_speaking()) { + as->openai_speech_started(); + } + + frame->datalen = inuse; + frame->samples = frame->datalen / bytes_per_sample; + + switch_core_media_bug_set_write_replace_frame(bug, frame); + } return SWITCH_TRUE; } switch_status_t stream_session_cleanup(switch_core_session_t *session, char* text, int channelIsClosing) { switch_channel_t *channel = switch_core_session_get_channel(session); - auto *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); if(bug) { - auto* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto* tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); char sessionId[MAX_SESSION_ID]; strncpy(sessionId, tech_pvt->sessionId, MAX_SESSION_ID - 1); @@ -1058,7 +1152,7 @@ extern "C" { switch_core_media_bug_remove(session, &bug); } - auto* audioStreamer = (AudioStreamer *) tech_pvt->pAudioStreamer; + auto* audioStreamer = static_cast(tech_pvt->pAudioStreamer); if(audioStreamer) { audioStreamer->deleteFiles(); stream_session_send_json(session, text); diff --git a/openai_audio_streamer_glue.h b/openai_audio_streamer_glue.h index 04da75b..f83a051 100644 --- a/openai_audio_streamer_glue.h +++ b/openai_audio_streamer_glue.h @@ -8,8 +8,10 @@ int validate_ws_uri(const char* url, char *wsUri); switch_status_t is_valid_utf8(const char *str); switch_status_t stream_session_send_json(switch_core_session_t *session, char* json); switch_status_t stream_session_pauseresume(switch_core_session_t *session, int pause); +switch_status_t stream_session_set_user_mute(switch_core_session_t *session, int mute); +switch_status_t stream_session_set_openai_mute(switch_core_session_t *session, int mute); switch_status_t stream_session_init(switch_core_session_t *session, responseHandler_t responseHandler, - uint32_t samples_per_second, char *wsUri, int sampling, int channels, void **ppUserData); + uint32_t samples_per_second, char *wsUri, int sampling, int channels, switch_bool_t start_muted, void **ppUserData); switch_bool_t stream_frame(switch_media_bug_t *bug); switch_bool_t write_frame(switch_core_session_t *session, switch_media_bug_t *bug); switch_status_t stream_session_cleanup(switch_core_session_t *session, char* text, int channelIsClosing);