From 7a334b74285a43eec3d7e25c8af6313e728f7d6c Mon Sep 17 00:00:00 2001 From: Dario Pellegrino Date: Thu, 30 Oct 2025 16:08:08 +0100 Subject: [PATCH 1/7] add command to mute, unmute and start with muted user audio stream (#22) Signed-off-by: Dario Pellegrino --- README.md | 21 ++++++++--- mod_openai_audio_stream.c | 64 ++++++++++++++++++++++++++-------- mod_openai_audio_stream.h | 1 + openai_audio_streamer_glue.cpp | 39 ++++++++++++++++++--- openai_audio_streamer_glue.h | 3 +- 5 files changed, 102 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index ba1aeaf..4ee6534 100644 --- a/README.md +++ b/README.md @@ -131,19 +131,20 @@ Defaults to `false`, which enforces hostname match with the peer certificate. The freeswitch module exposes the following API commands: ``` -uuid_openai_audio_stream start +uuid_openai_audio_stream start [] [mute_user] ``` -Attaches a media bug and starts streaming audio (in L16 format) to the websocket server. FS default is 8k. If sampling-rate is other than 8k it will be resampled. +Attaches a media bug and starts streaming audio (in L16 format) to the websocket server. FS default is 8k. If sampling-rate is other than 8k it will be resampled. Passing `mute_user` delays forwarding caller audio to the Realtime API until you explicitly unmute. - `uuid` - Freeswitch channel unique id - `wss-url` - websocket url `ws://` or `wss://` - `mix-type` - choice of - "mono" - single channel containing caller's audio - "mixed" - single channel containing both caller and callee audio - "stereo" - two channels with caller audio in one and callee audio in the other. -- `sampling-rate` - choice of +- `sampling-rate` - optional, choice of - "8k" = 8000 Hz sample rate will be generated - "16k" = 16000 Hz sample rate will be generated - "24k" = 24000 Hz sample rate will be generated +- `mute_user` - optional flag. When present, the module initialises muted and ignores caller audio until an explicit `unmute`. - **IMPORTANT NOTE**: The OpenAI Realtime API, when using PCM audio format, expects the audio to be in 24 kHz sample rate. Use the sampling-rate parameter as `24k` (or `24000`) and mono to ensure that the audio is sent in the correct format. From the OpenAI Realtime API documentation: *input audio must be 16-bit PCM at a 24kHz sample rate, single channel (mono), and little-endian byte order.* Support for exchanging audio with OpenAI in other formats may be developed in the future, which would make the `` and ` stop ``` uuid_openai_audio_stream pause ``` -Pauses audio stream +Pauses audio streaming in both directions. Caller audio stops flowing to OpenAI and any OpenAI playback currently buffering into the channel is halted until `resume`. ``` uuid_openai_audio_stream resume ``` -Resumes audio stream +Resumes audio streaming in both directions after a `pause`. + +``` +uuid_openai_audio_stream mute +``` +Keeps the media bug alive but blocks upstream caller audio. Useful while you wait for a session update or play an intro prompt. + +``` +uuid_openai_audio_stream unmute +``` +Re-enables caller audio towards OpenAI after a `start ... mute_user` or `mute`. ## Events Module will generate the following event types: diff --git a/mod_openai_audio_stream.c b/mod_openai_audio_stream.c index da6dd02..1b2208d 100644 --- a/mod_openai_audio_stream.c +++ b/mod_openai_audio_stream.c @@ -63,7 +63,8 @@ static switch_bool_t capture_callback(switch_media_bug_t *bug, void *user_data, static switch_status_t start_capture(switch_core_session_t *session, switch_media_bug_flag_t flags, char* wsUri, - int sampling) + int sampling, + switch_bool_t start_muted) { switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug; @@ -87,7 +88,7 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "calling stream_session_init.\n"); if (SWITCH_STATUS_FALSE == stream_session_init(session, responseHandler, read_codec->implementation->actual_samples_per_second, - wsUri, sampling, channels, &pUserData)) { + wsUri, sampling, channels, start_muted, &pUserData)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing mod_openai_audio_stream session.\n"); return SWITCH_STATUS_FALSE; } @@ -127,6 +128,16 @@ static switch_status_t do_pauseresume(switch_core_session_t *session, int pause) return status; } +static switch_status_t do_user_mute(switch_core_session_t *session, int mute) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "mod_openai_audio_stream: %s\n", mute ? "mute" : "unmute"); + status = stream_session_set_user_mute(session, mute); + + return status; +} + static switch_status_t send_json(switch_core_session_t *session, char* json) { switch_status_t status = SWITCH_STATUS_FALSE; switch_channel_t *channel = switch_core_session_get_channel(session); @@ -141,10 +152,10 @@ static switch_status_t send_json(switch_core_session_t *session, char* json) { return status; } -#define STREAM_API_SYNTAX " [start | stop | send_json | pause | resume | graceful-shutdown ] [wss-url | path] [mono | mixed | stereo] [8000 | 16000 | 24000]" +#define STREAM_API_SYNTAX " [start | stop | send_json | pause | resume | mute | unmute | graceful-shutdown ] [wss-url | path] [mono | mixed | stereo] [8000 | 16000 | 24000] [mute_user]" SWITCH_STANDARD_API(stream_function) { - char *mycmd = NULL, *argv[6] = { 0 }; + char *mycmd = NULL, *argv[8] = { 0 }; int argc = 0; switch_status_t status = SWITCH_STATUS_FALSE; @@ -188,6 +199,8 @@ SWITCH_STANDARD_API(stream_function) //switch_channel_t *channel = switch_core_session_get_channel(lsession); char wsUri[MAX_WS_URI]; int sampling = 8000; + const char *sampling_str = NULL; + switch_bool_t start_muted = SWITCH_FALSE; switch_media_bug_flag_t flags = SMBF_READ_STREAM; flags |= SMBF_WRITE_REPLACE; if (0 == strcmp(argv[3], "mixed")) { @@ -202,14 +215,25 @@ SWITCH_STANDARD_API(stream_function) goto done; } if (argc > 4) { - if (0 == strcmp(argv[4], "16k")) { - sampling = 16000; - } else if (0 == strcmp(argv[4], "8k")) { - sampling = 8000; - } else if (0 == strcmp(argv[4], "24k")) { - sampling = 24000; + int next_index = 4; + if (!strcasecmp(argv[next_index], "mute_user")) { + start_muted = SWITCH_TRUE; + next_index++; } else { - sampling = atoi(argv[4]); + sampling_str = argv[next_index]; + if (0 == strcmp(sampling_str, "16k")) { + sampling = 16000; + } else if (0 == strcmp(sampling_str, "8k")) { + sampling = 8000; + } else if (0 == strcmp(sampling_str, "24k")) { + sampling = 24000; + } else { + sampling = atoi(sampling_str); + } + next_index++; + if (argc > next_index && !strcasecmp(argv[next_index], "mute_user")) { + start_muted = SWITCH_TRUE; + } } } @@ -217,17 +241,25 @@ SWITCH_STANDARD_API(stream_function) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "invalid websocket uri: %s\n", argv[2]); } else if (sampling % 8000 != 0) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, - "invalid sample rate: %s\n", argv[4]); + if (sampling_str) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "invalid sample rate: %s\n", sampling_str); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "invalid sample rate: %d\n", sampling); + } } else { - status = start_capture(lsession, flags, wsUri, sampling); + status = start_capture(lsession, flags, wsUri, sampling, start_muted); } + } else if (!strcasecmp(argv[1], "mute")) { + status = do_user_mute(lsession, 1); + } else if (!strcasecmp(argv[1], "unmute")) { + status = do_user_mute(lsession, 0); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "unsupported mod_openai_audio_stream cmd: %s\n", argv[1]); } - switch_core_session_rwunlock(lsession); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error locating session %s\n", @@ -270,6 +302,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_openai_audio_stream_load) switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid stop"); switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid pause"); switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid resume"); + switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid mute"); + switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid unmute"); switch_console_set_complete("add uuid_openai_audio_stream ::console::list_uuid send_json"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_openai_audio_stream API successfully loaded\n"); diff --git a/mod_openai_audio_stream.h b/mod_openai_audio_stream.h index 333e5af..37feece 100644 --- a/mod_openai_audio_stream.h +++ b/mod_openai_audio_stream.h @@ -30,6 +30,7 @@ struct private_data { int sampling; int channels; int audio_paused:1; + int user_audio_muted:1; int close_requested:1; RingBuffer *buffer; switch_buffer_t *sbuffer; diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index e5487c9..fa2461d 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -286,15 +286,19 @@ class AudioStreamer { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%s) processMessage - error: %s\n", m_sessionId.c_str(), message.c_str()); } else if(jsType && strcmp(jsType, "input_audio_buffer.speech_started") == 0) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - speech started\n", m_sessionId.c_str()); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - user speech started\n", m_sessionId.c_str()); clear_audio_queue(); // also clear the private_t playback buffer used in write frame playback_clear_requested = true; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - speech detected stopping audio playback\n", m_sessionId.c_str()); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - user speech detected stopping audio playback\n", m_sessionId.c_str()); status = SWITCH_TRUE; + } else if (jsType && strcmp(jsType, "input_audio_buffer.speech_stopped") == 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - user speech stopped\n", m_sessionId.c_str()); + playback_clear_requested = false; + } else if(jsType && strcmp(jsType, "response.output_audio.delta") == 0) { const char* jsonAudio = cJSON_GetObjectCstr(json, "delta"); playback_clear_requested = false; @@ -488,7 +492,8 @@ namespace { uint32_t sampling, int desiredSampling, int channels, responseHandler_t responseHandler, int deflate, int heart_beat, bool suppressLog, int rtp_packets, const char* extra_headers, bool no_reconnect, const char *tls_cafile, const char *tls_keyfile, - const char *tls_certfile, bool tls_disable_hostname_validation, bool disable_audiofiles) + const char *tls_certfile, bool tls_disable_hostname_validation, bool disable_audiofiles, + switch_bool_t start_muted) { int err; //speex @@ -505,6 +510,7 @@ namespace { tech_pvt->rtp_packets = rtp_packets; tech_pvt->channels = channels; tech_pvt->audio_paused = 0; + tech_pvt->user_audio_muted = start_muted ? 1 : 0; const size_t buflen = (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * rtp_packets); const size_t playback_buflen = 128000; // 128Kb may need to be decreased @@ -766,12 +772,35 @@ extern "C" { return SWITCH_STATUS_SUCCESS; } + switch_status_t stream_session_set_user_mute(switch_core_session_t *session, int mute) { + switch_channel_t *channel = switch_core_session_get_channel(session); + auto *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + if (!bug) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_set_user_mute failed because no bug\n"); + return SWITCH_STATUS_FALSE; + } + auto *tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + if (!tech_pvt) { + return SWITCH_STATUS_FALSE; + } + + switch_core_media_bug_flush(bug); + tech_pvt->user_audio_muted = mute ? 1 : 0; + if (mute) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "User audio muted\n"); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "User audio unmuted\n"); + } + return SWITCH_STATUS_SUCCESS; + } + switch_status_t stream_session_init(switch_core_session_t *session, responseHandler_t responseHandler, uint32_t samples_per_second, char *wsUri, int sampling, int channels, + switch_bool_t start_muted, void **ppUserData) { int deflate = 0, heart_beat = 0; @@ -852,7 +881,7 @@ extern "C" { return SWITCH_STATUS_FALSE; } if (SWITCH_STATUS_SUCCESS != stream_data_init(tech_pvt, session, wsUri, samples_per_second, sampling, channels, responseHandler, deflate, heart_beat, - suppressLog, rtp_packets, extra_headers, no_reconnect, tls_cafile, tls_keyfile, tls_certfile, tls_disable_hostname_validation, disable_audiofiles)) { + suppressLog, rtp_packets, extra_headers, no_reconnect, tls_cafile, tls_keyfile, tls_certfile, tls_disable_hostname_validation, disable_audiofiles, start_muted)) { destroy_tech_pvt(tech_pvt); return SWITCH_STATUS_FALSE; } @@ -865,7 +894,7 @@ extern "C" { switch_bool_t stream_frame(switch_media_bug_t *bug) { auto* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); - if (!tech_pvt || tech_pvt->audio_paused) return SWITCH_TRUE; + if (!tech_pvt || tech_pvt->audio_paused || tech_pvt->user_audio_muted) return SWITCH_TRUE; if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { diff --git a/openai_audio_streamer_glue.h b/openai_audio_streamer_glue.h index 04da75b..1bbd6ed 100644 --- a/openai_audio_streamer_glue.h +++ b/openai_audio_streamer_glue.h @@ -8,8 +8,9 @@ int validate_ws_uri(const char* url, char *wsUri); switch_status_t is_valid_utf8(const char *str); switch_status_t stream_session_send_json(switch_core_session_t *session, char* json); switch_status_t stream_session_pauseresume(switch_core_session_t *session, int pause); +switch_status_t stream_session_set_user_mute(switch_core_session_t *session, int mute); switch_status_t stream_session_init(switch_core_session_t *session, responseHandler_t responseHandler, - uint32_t samples_per_second, char *wsUri, int sampling, int channels, void **ppUserData); + uint32_t samples_per_second, char *wsUri, int sampling, int channels, switch_bool_t start_muted, void **ppUserData); switch_bool_t stream_frame(switch_media_bug_t *bug); switch_bool_t write_frame(switch_core_session_t *session, switch_media_bug_t *bug); switch_status_t stream_session_cleanup(switch_core_session_t *session, char* text, int channelIsClosing); From 9d4a88a2d80bf4d0be4554fdd7feb1757bd9f01b Mon Sep 17 00:00:00 2001 From: Dario Pellegrino Date: Thu, 30 Oct 2025 16:25:16 +0100 Subject: [PATCH 2/7] fix user speech events are now fired (#22) Signed-off-by: Dario Pellegrino --- openai_audio_streamer_glue.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index fa2461d..0cd98eb 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -286,18 +286,14 @@ class AudioStreamer { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%s) processMessage - error: %s\n", m_sessionId.c_str(), message.c_str()); } else if(jsType && strcmp(jsType, "input_audio_buffer.speech_started") == 0) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - user speech started\n", m_sessionId.c_str()); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "(%s) processMessage - user speech started, stopping openai audio playback\n", m_sessionId.c_str()); clear_audio_queue(); // also clear the private_t playback buffer used in write frame playback_clear_requested = true; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - user speech detected stopping audio playback\n", m_sessionId.c_str()); - - status = SWITCH_TRUE; - } else if (jsType && strcmp(jsType, "input_audio_buffer.speech_stopped") == 0) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) processMessage - user speech stopped\n", m_sessionId.c_str()); - playback_clear_requested = false; + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "(%s) processMessage - user speech stopped\n", m_sessionId.c_str()); + // Do not clear playback_clear_requested here; it should remain true until new audio is received. } else if(jsType && strcmp(jsType, "response.output_audio.delta") == 0) { const char* jsonAudio = cJSON_GetObjectCstr(json, "delta"); @@ -813,7 +809,6 @@ extern "C" { const char* tls_keyfile = NULL;; const char* tls_certfile = NULL;; const char* openai_api_key = NULL;; - const char* openai_realtime_version = NULL; bool tls_disable_hostname_validation = false; bool disable_audiofiles = false; From 8e9aaeb871a759f59a0fc9c4f3d6f5edf95d1bef Mon Sep 17 00:00:00 2001 From: Dario Pellegrino Date: Thu, 30 Oct 2025 21:05:40 +0100 Subject: [PATCH 3/7] chore(style): refactor C-style casts with static_cast (#22) Signed-off-by: Dario Pellegrino --- mod_openai_audio_stream.c | 1 - openai_audio_streamer_glue.cpp | 52 +++++++++++++++++----------------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/mod_openai_audio_stream.c b/mod_openai_audio_stream.c index 1b2208d..5aa0c2e 100644 --- a/mod_openai_audio_stream.c +++ b/mod_openai_audio_stream.c @@ -218,7 +218,6 @@ SWITCH_STANDARD_API(stream_function) int next_index = 4; if (!strcasecmp(argv[next_index], "mute_user")) { start_muted = SWITCH_TRUE; - next_index++; } else { sampling_str = argv[next_index]; if (0 == strcmp(sampling_str, "16k")) { diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index 0cd98eb..3eb1ef1 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -148,14 +148,14 @@ class AudioStreamer { if(!channel) { return nullptr; } - auto *bug = (switch_media_bug_t *) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); return bug; } inline void media_bug_close(switch_core_session_t *session) { auto *bug = get_media_bug(session); if(bug) { - auto* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto* tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); tech_pvt->close_requested = 1; switch_core_media_bug_close(&bug, SWITCH_FALSE); } @@ -413,7 +413,7 @@ class AudioStreamer { void writeBinary(uint8_t* buffer, size_t len) { if(!this->isConnected()) return; - webSocket.sendBinary(ix::IXWebSocketSendData((char *)buffer, len)); + webSocket.sendBinary(ix::IXWebSocketSendData(reinterpret_cast(buffer), len)); } void writeText(const char* text) { // Openai only accepts json not utf8 plain text @@ -544,14 +544,14 @@ namespace { } switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s: initializing buffer(%zu) to adjusted %zu bytes\n", tech_pvt->sessionId, buflen, adjSize); - tech_pvt->data = (uint8_t *) switch_core_alloc(pool, adjSize); + tech_pvt->data = static_cast(switch_core_alloc(pool, adjSize)); if (!tech_pvt->data) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s: Error allocating memory for data buffer.\n", tech_pvt->sessionId); return SWITCH_STATUS_FALSE; } memset(tech_pvt->data, 0, adjSize); - tech_pvt->buffer = (RingBuffer *) switch_core_alloc(pool, sizeof(RingBuffer)); + tech_pvt->buffer = static_cast(switch_core_alloc(pool, sizeof(RingBuffer))); if (!tech_pvt->buffer) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s: Error allocating memory for ring buffer.\n", tech_pvt->sessionId); @@ -589,7 +589,7 @@ namespace { tech_pvt->mutex = nullptr; } if (tech_pvt->pAudioStreamer) { - auto* as = (AudioStreamer *) tech_pvt->pAudioStreamer; + auto* as = static_cast(tech_pvt->pAudioStreamer); delete as; tech_pvt->pAudioStreamer = nullptr; } @@ -597,7 +597,7 @@ namespace { void finish(private_t* tech_pvt) { std::shared_ptr aStreamer; - aStreamer.reset((AudioStreamer *)tech_pvt->pAudioStreamer); + aStreamer.reset(static_cast(tech_pvt->pAudioStreamer)); tech_pvt->pAudioStreamer = nullptr; std::thread t([aStreamer]{ @@ -688,7 +688,7 @@ extern "C" { switch_status_t stream_session_send_json(switch_core_session_t *session, const char* base64_input) { switch_channel_t *channel = switch_core_session_get_channel(session); - switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); cJSON *json_obj = nullptr; char *json_unformatted = nullptr; switch_status_t status = SWITCH_STATUS_FALSE; @@ -697,7 +697,7 @@ extern "C" { return SWITCH_STATUS_FALSE; } - auto *tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_send_json failed to retrieve session data.\n"); return SWITCH_STATUS_FALSE; @@ -754,12 +754,12 @@ extern "C" { switch_status_t stream_session_pauseresume(switch_core_session_t *session, int pause) { switch_channel_t *channel = switch_core_session_get_channel(session); - auto *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); if (!bug) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_pauseresume failed because no bug\n"); return SWITCH_STATUS_FALSE; } - auto *tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt) return SWITCH_STATUS_FALSE; @@ -770,12 +770,12 @@ extern "C" { switch_status_t stream_session_set_user_mute(switch_core_session_t *session, int mute) { switch_channel_t *channel = switch_core_session_get_channel(session); - auto *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); if (!bug) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_set_user_mute failed because no bug\n"); return SWITCH_STATUS_FALSE; } - auto *tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt) { return SWITCH_STATUS_FALSE; } @@ -864,12 +864,12 @@ extern "C" { "{\"Authorization\": \"Bearer %s\"}", openai_api_key); extra_headers = headers_buf; } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "OPENAI_API_KEY is not set. Assuming you set STREAM_EXTRA_HEADERS variable.\n"); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "OPENAI_API_KEY is not set. Assuming you set STREAM_EXTRA_HEADERS variable.\n"); extra_headers = switch_channel_get_variable(channel, "STREAM_EXTRA_HEADERS"); } // allocate per-session tech_pvt - auto* tech_pvt = (private_t *) switch_core_session_alloc(session, sizeof(private_t)); + auto* tech_pvt = static_cast(switch_core_session_alloc(session, sizeof(private_t))); if (!tech_pvt) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error allocating memory!\n"); @@ -888,7 +888,7 @@ extern "C" { switch_bool_t stream_frame(switch_media_bug_t *bug) { - auto* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto* tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt || tech_pvt->audio_paused || tech_pvt->user_audio_muted) return SWITCH_TRUE; if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { @@ -914,7 +914,7 @@ extern "C" { while (switch_core_media_bug_read(bug, &frame, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { if(frame.datalen) { if (1 == tech_pvt->rtp_packets) { - pAudioStreamer->writeAudioDelta((uint8_t *) frame.data, frame.datalen); + pAudioStreamer->writeAudioDelta(static_cast(frame.data), frame.datalen); continue; } @@ -961,13 +961,13 @@ extern "C" { if(tech_pvt->channels == 1) { speex_resampler_process_int(tech_pvt->resampler, 0, - (const spx_int16_t *)frame.data, + static_cast(frame.data), &in_len, &out[0], &out_len); } else { speex_resampler_process_interleaved_int(tech_pvt->resampler, - (const spx_int16_t *)frame.data, + static_cast(frame.data), &in_len, &out[0], &out_len); @@ -976,11 +976,11 @@ extern "C" { if(out_len > 0) { const size_t bytes_written = out_len * tech_pvt->channels * sizeof(spx_int16_t); if (tech_pvt->rtp_packets == 1) { //20ms packet - pAudioStreamer->writeAudioDelta((uint8_t *) out, bytes_written); + pAudioStreamer->writeAudioDelta(reinterpret_cast(out), bytes_written); continue; } if (bytes_written <= inuse) { - switch_buffer_write(tech_pvt->sbuffer, (const uint8_t *)out, bytes_written); + switch_buffer_write(tech_pvt->sbuffer, out, bytes_written); } } @@ -1000,7 +1000,7 @@ extern "C" { } switch_bool_t write_frame(switch_core_session_t *session, switch_media_bug_t *bug) { - private_t *tech_pvt = (private_t *)switch_core_media_bug_get_user_data(bug); + private_t *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt || tech_pvt->audio_paused) { return SWITCH_TRUE; } @@ -1043,7 +1043,7 @@ extern "C" { return SWITCH_TRUE; } - switch_byte_t *data = (switch_byte_t *) frame->data; + switch_byte_t *data = static_cast(frame->data); if (inuse > bytes_needed) { switch_buffer_read(tech_pvt->playback_buffer, data, bytes_needed); @@ -1065,10 +1065,10 @@ extern "C" { switch_status_t stream_session_cleanup(switch_core_session_t *session, char* text, int channelIsClosing) { switch_channel_t *channel = switch_core_session_get_channel(session); - auto *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); if(bug) { - auto* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + auto* tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); char sessionId[MAX_SESSION_ID]; strncpy(sessionId, tech_pvt->sessionId, MAX_SESSION_ID - 1); @@ -1082,7 +1082,7 @@ extern "C" { switch_core_media_bug_remove(session, &bug); } - auto* audioStreamer = (AudioStreamer *) tech_pvt->pAudioStreamer; + auto* audioStreamer = static_cast(tech_pvt->pAudioStreamer); if(audioStreamer) { audioStreamer->deleteFiles(); stream_session_send_json(session, text); From c1156884011732ff7ec0261e8ecd722ff2e8d44b Mon Sep 17 00:00:00 2001 From: Dario Pellegrino Date: Thu, 30 Oct 2025 23:36:38 +0100 Subject: [PATCH 4/7] add option to mute/unmute user, openai or both (#22) Signed-off-by: Dario Pellegrino --- README.md | 11 +++++--- mod_openai_audio_stream.c | 45 ++++++++++++++++++++++++------ mod_openai_audio_stream.h | 1 + openai_audio_streamer_glue.cpp | 50 ++++++++++++++++++++++++++-------- openai_audio_streamer_glue.h | 1 + 5 files changed, 83 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 4ee6534..9e6c48a 100644 --- a/README.md +++ b/README.md @@ -167,14 +167,17 @@ uuid_openai_audio_stream resume Resumes audio streaming in both directions after a `pause`. ``` -uuid_openai_audio_stream mute +uuid_openai_audio_stream mute [user | openai | all] ``` -Keeps the media bug alive but blocks upstream caller audio. Useful while you wait for a session update or play an intro prompt. +Keeps the media bug alive while silencing the selected leg. Defaults to `user` when omitted. +- `user`: block caller audio being sent to OpenAI. +- `openai`: block OpenAI playback from reaching the channel. +- `all`: apply both mute operations at once. ``` -uuid_openai_audio_stream unmute +uuid_openai_audio_stream unmute [user | openai | all] ``` -Re-enables caller audio towards OpenAI after a `start ... mute_user` or `mute`. +Re-enables the selected audio leg after a corresponding `mute`. Defaults to `user` when omitted. ## Events Module will generate the following event types: diff --git a/mod_openai_audio_stream.c b/mod_openai_audio_stream.c index 5aa0c2e..5182849 100644 --- a/mod_openai_audio_stream.c +++ b/mod_openai_audio_stream.c @@ -128,12 +128,27 @@ static switch_status_t do_pauseresume(switch_core_session_t *session, int pause) return status; } -static switch_status_t do_user_mute(switch_core_session_t *session, int mute) +static switch_status_t do_audio_mute(switch_core_session_t *session, const char *target, int mute) { - switch_status_t status = SWITCH_STATUS_SUCCESS; - - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "mod_openai_audio_stream: %s\n", mute ? "mute" : "unmute"); - status = stream_session_set_user_mute(session, mute); + switch_status_t status = SWITCH_STATUS_FALSE; + const char *which = target && *target ? target : "user"; + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "mod_openai_audio_stream: %s %s audio\n", mute ? "mute" : "unmute", which); + + if (!strcasecmp(which, "user")) { + status = stream_session_set_user_mute(session, mute); + } else if (!strcasecmp(which, "openai")) { + status = stream_session_set_openai_mute(session, mute); + } else if (!strcasecmp(which, "all") || !strcasecmp(which, "both")) { + switch_status_t user_status = stream_session_set_user_mute(session, mute); + switch_status_t openai_status = stream_session_set_openai_mute(session, mute); + status = (user_status == SWITCH_STATUS_SUCCESS && openai_status == SWITCH_STATUS_SUCCESS) ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE; + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "mod_openai_audio_stream: invalid mute target '%s', expected user|openai|all\n", which); + status = SWITCH_STATUS_FALSE; + } return status; } @@ -152,7 +167,17 @@ static switch_status_t send_json(switch_core_session_t *session, char* json) { return status; } -#define STREAM_API_SYNTAX " [start | stop | send_json | pause | resume | mute | unmute | graceful-shutdown ] [wss-url | path] [mono | mixed | stereo] [8000 | 16000 | 24000] [mute_user]" +#define STREAM_API_SYNTAX \ +"USAGE:\n" \ +"--------------------------------------------------------------------------------\n" \ +"uuid_openai_audio_stream [start | stop | send_json | pause | resume |\n" \ +" mute | unmute | graceful-shutdown]\n" \ +" [wss-url | path | user | openai | all | base64json]\n" \ +" [mono | mixed | stereo]\n" \ +" [8000 | 16000 | 24000]\n" \ +" [mute_user]\n" \ +"--------------------------------------------------------------------------------\n" + SWITCH_STANDARD_API(stream_function) { char *mycmd = NULL, *argv[8] = { 0 }; @@ -167,7 +192,7 @@ SWITCH_STANDARD_API(stream_function) if (zstr(cmd) || argc < 2 || (0 == strcmp(argv[1], "start") && argc < 4)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error with command %s.\n", cmd); - stream->write_function(stream, "-USAGE: %s\n", STREAM_API_SYNTAX); + stream->write_function(stream, "%s\n", STREAM_API_SYNTAX); goto done; } else { if (strcasecmp(argv[1], "send_json")) { @@ -251,9 +276,11 @@ SWITCH_STANDARD_API(stream_function) status = start_capture(lsession, flags, wsUri, sampling, start_muted); } } else if (!strcasecmp(argv[1], "mute")) { - status = do_user_mute(lsession, 1); + const char *target = (argc > 2) ? argv[2] : "user"; + status = do_audio_mute(lsession, target, 1); } else if (!strcasecmp(argv[1], "unmute")) { - status = do_user_mute(lsession, 0); + const char *target = (argc > 2) ? argv[2] : "user"; + status = do_audio_mute(lsession, target, 0); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "unsupported mod_openai_audio_stream cmd: %s\n", argv[1]); diff --git a/mod_openai_audio_stream.h b/mod_openai_audio_stream.h index 37feece..804ad8b 100644 --- a/mod_openai_audio_stream.h +++ b/mod_openai_audio_stream.h @@ -31,6 +31,7 @@ struct private_data { int channels; int audio_paused:1; int user_audio_muted:1; + int openai_audio_muted:1; int close_requested:1; RingBuffer *buffer; switch_buffer_t *sbuffer; diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index 3eb1ef1..6189d9a 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -507,6 +507,7 @@ namespace { tech_pvt->channels = channels; tech_pvt->audio_paused = 0; tech_pvt->user_audio_muted = start_muted ? 1 : 0; + tech_pvt->openai_audio_muted = 0; const size_t buflen = (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * rtp_packets); const size_t playback_buflen = 128000; // 128Kb may need to be decreased @@ -790,6 +791,28 @@ extern "C" { return SWITCH_STATUS_SUCCESS; } + switch_status_t stream_session_set_openai_mute(switch_core_session_t *session, int mute) { + switch_channel_t *channel = switch_core_session_get_channel(session); + auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); + if (!bug) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_set_openai_mute failed because no bug\n"); + return SWITCH_STATUS_FALSE; + } + + auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); + if (!tech_pvt) { + return SWITCH_STATUS_FALSE; + } + + switch_core_media_bug_flush(bug); + tech_pvt->openai_audio_muted = mute ? 1 : 0; + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "OpenAI audio %s\n", mute ? "muted" : "unmuted"); + + return SWITCH_STATUS_SUCCESS; + } + switch_status_t stream_session_init(switch_core_session_t *session, responseHandler_t responseHandler, uint32_t samples_per_second, @@ -1027,7 +1050,6 @@ extern "C" { uint32_t inuse = switch_buffer_inuse(tech_pvt->playback_buffer); // push a chunk in the audio buffer used treated as cache - if (as->clear_requested()) { switch_buffer_zero(tech_pvt->playback_buffer); inuse = 0; @@ -1043,22 +1065,26 @@ extern "C" { return SWITCH_TRUE; } - switch_byte_t *data = static_cast(frame->data); - if (inuse > bytes_needed) { - switch_buffer_read(tech_pvt->playback_buffer, data, bytes_needed); - } else { - switch_buffer_read(tech_pvt->playback_buffer, data, inuse); + inuse = bytes_needed; } - if (!as->is_openai_speaking()) { - as->openai_speech_started(); - } + if (tech_pvt->openai_audio_muted) { + switch_buffer_toss(tech_pvt->playback_buffer, inuse); + } else { + switch_byte_t *data = static_cast(frame->data); + + switch_buffer_read(tech_pvt->playback_buffer, data, inuse); - frame->datalen = inuse > bytes_needed ? bytes_needed : inuse; - frame->samples = frame->datalen / bytes_per_sample; + if (!as->is_openai_speaking()) { + as->openai_speech_started(); + } + + frame->datalen = inuse; + frame->samples = frame->datalen / bytes_per_sample; - switch_core_media_bug_set_write_replace_frame(bug, frame); + switch_core_media_bug_set_write_replace_frame(bug, frame); + } return SWITCH_TRUE; } diff --git a/openai_audio_streamer_glue.h b/openai_audio_streamer_glue.h index 1bbd6ed..f83a051 100644 --- a/openai_audio_streamer_glue.h +++ b/openai_audio_streamer_glue.h @@ -9,6 +9,7 @@ switch_status_t is_valid_utf8(const char *str); switch_status_t stream_session_send_json(switch_core_session_t *session, char* json); switch_status_t stream_session_pauseresume(switch_core_session_t *session, int pause); switch_status_t stream_session_set_user_mute(switch_core_session_t *session, int mute); +switch_status_t stream_session_set_openai_mute(switch_core_session_t *session, int mute); switch_status_t stream_session_init(switch_core_session_t *session, responseHandler_t responseHandler, uint32_t samples_per_second, char *wsUri, int sampling, int channels, switch_bool_t start_muted, void **ppUserData); switch_bool_t stream_frame(switch_media_bug_t *bug); From 9662aa68def709ae6d647981cd4d31189a80fb64 Mon Sep 17 00:00:00 2001 From: Dario Pellegrino Date: Mon, 3 Nov 2025 14:50:37 +0100 Subject: [PATCH 5/7] refactor FS API command help and code to improve readability (#22) Signed-off-by: Dario Pellegrino --- mod_openai_audio_stream.c | 123 ++++++++++++++++++++++++++++---------- 1 file changed, 90 insertions(+), 33 deletions(-) diff --git a/mod_openai_audio_stream.c b/mod_openai_audio_stream.c index 5182849..945c3ab 100644 --- a/mod_openai_audio_stream.c +++ b/mod_openai_audio_stream.c @@ -171,13 +171,53 @@ static switch_status_t send_json(switch_core_session_t *session, char* json) { "USAGE:\n" \ "--------------------------------------------------------------------------------\n" \ "uuid_openai_audio_stream [start | stop | send_json | pause | resume |\n" \ -" mute | unmute | graceful-shutdown]\n" \ +" mute | unmute]\n" \ " [wss-url | path | user | openai | all | base64json]\n" \ " [mono | mixed | stereo]\n" \ " [8000 | 16000 | 24000]\n" \ " [mute_user]\n" \ "--------------------------------------------------------------------------------\n" +typedef enum { + STREAM_CMD_UNKNOWN, + STREAM_CMD_START, + STREAM_CMD_STOP, + STREAM_CMD_SEND_JSON, + STREAM_CMD_PAUSE, + STREAM_CMD_RESUME, + STREAM_CMD_MUTE, + STREAM_CMD_UNMUTE +} stream_command_t; + +static stream_command_t stream_command_from_string(const char *name) +{ + if (zstr(name)) { + return STREAM_CMD_UNKNOWN; + } + if (!strcasecmp(name, "start")) { + return STREAM_CMD_START; + } + if (!strcasecmp(name, "stop")) { + return STREAM_CMD_STOP; + } + if (!strcasecmp(name, "send_json")) { + return STREAM_CMD_SEND_JSON; + } + if (!strcasecmp(name, "pause")) { + return STREAM_CMD_PAUSE; + } + if (!strcasecmp(name, "resume")) { + return STREAM_CMD_RESUME; + } + if (!strcasecmp(name, "mute")) { + return STREAM_CMD_MUTE; + } + if (!strcasecmp(name, "unmute")) { + return STREAM_CMD_UNMUTE; + } + return STREAM_CMD_UNKNOWN; +} + SWITCH_STANDARD_API(stream_function) { char *mycmd = NULL, *argv[8] = { 0 }; @@ -190,38 +230,51 @@ SWITCH_STANDARD_API(stream_function) } assert(cmd); - if (zstr(cmd) || argc < 2 || (0 == strcmp(argv[1], "start") && argc < 4)) { + if (zstr(cmd) || argc < 2) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error with command %s.\n", cmd); stream->write_function(stream, "%s\n", STREAM_API_SYNTAX); goto done; - } else { - if (strcasecmp(argv[1], "send_json")) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "mod_openai_audio_stream cmd: %s\n", cmd ? cmd : ""); - } - switch_core_session_t *lsession = NULL; - if ((lsession = switch_core_session_locate(argv[0]))) { - if (!strcasecmp(argv[1], "stop")) { - if(argc > 2 && (is_valid_utf8(argv[2]) != SWITCH_STATUS_SUCCESS)) { + } + + stream_command_t command = stream_command_from_string(argv[1]); + + if (command != STREAM_CMD_SEND_JSON) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "mod_openai_audio_stream cmd: %s\n", cmd ? cmd : ""); + } + + switch_core_session_t *lsession = NULL; + if ((lsession = switch_core_session_locate(argv[0]))) { + switch (command) { + case STREAM_CMD_STOP: + if (argc > 2 && (is_valid_utf8(argv[2]) != SWITCH_STATUS_SUCCESS)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s contains invalid utf8 characters\n", argv[2]); - switch_core_session_rwunlock(lsession); - goto done; + goto release_session; } status = do_stop(lsession, argc > 2 ? argv[2] : NULL); - } else if (!strcasecmp(argv[1], "pause")) { + break; + case STREAM_CMD_PAUSE: status = do_pauseresume(lsession, 1); - } else if (!strcasecmp(argv[1], "resume")) { + break; + case STREAM_CMD_RESUME: status = do_pauseresume(lsession, 0); - } else if (!strcasecmp(argv[1], "send_json")) { + break; + case STREAM_CMD_SEND_JSON: if (argc < 3) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "send_json requires an argument specifying json to send\n"); - switch_core_session_rwunlock(lsession); - goto done; + goto release_session; } status = send_json(lsession, argv[2]); - } else if (!strcasecmp(argv[1], "start")) { - //switch_channel_t *channel = switch_core_session_get_channel(lsession); + break; + case STREAM_CMD_START: + { + if (argc < 4) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "Error with command %s.\n", cmd); + stream->write_function(stream, "%s\n", STREAM_API_SYNTAX); + goto release_session; + } char wsUri[MAX_WS_URI]; int sampling = 8000; const char *sampling_str = NULL; @@ -236,8 +289,7 @@ SWITCH_STANDARD_API(stream_function) } else if (0 != strcmp(argv[3], "mono")) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "invalid mix type: %s, must be mono, mixed, or stereo\n", argv[3]); - switch_core_session_rwunlock(lsession); - goto done; + goto release_session; } if (argc > 4) { int next_index = 4; @@ -275,22 +327,27 @@ SWITCH_STANDARD_API(stream_function) } else { status = start_capture(lsession, flags, wsUri, sampling, start_muted); } - } else if (!strcasecmp(argv[1], "mute")) { - const char *target = (argc > 2) ? argv[2] : "user"; - status = do_audio_mute(lsession, target, 1); - } else if (!strcasecmp(argv[1], "unmute")) { + break; + } + case STREAM_CMD_MUTE: + case STREAM_CMD_UNMUTE: + { const char *target = (argc > 2) ? argv[2] : "user"; - status = do_audio_mute(lsession, target, 0); - } else { + status = do_audio_mute(lsession, target, command == STREAM_CMD_MUTE ? 1 : 0); + break; + } + case STREAM_CMD_UNKNOWN: + default: switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "unsupported mod_openai_audio_stream cmd: %s\n", argv[1]); - } - - switch_core_session_rwunlock(lsession); - } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error locating session %s\n", - argv[0]); + break; } + +release_session: + switch_core_session_rwunlock(lsession); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error locating session %s\n", + argv[0]); } if (status == SWITCH_STATUS_SUCCESS) { From b27863c0b20dcb3a6224f4d93af1f5fc7cccb808 Mon Sep 17 00:00:00 2001 From: Dario Pellegrino Date: Mon, 3 Nov 2025 14:52:29 +0100 Subject: [PATCH 6/7] refactor audio queue handling and speech event logs (#22) Signed-off-by: Dario Pellegrino --- mod_openai_audio_stream.h | 1 - openai_audio_streamer_glue.cpp | 85 +++++++++++++++++++--------------- 2 files changed, 47 insertions(+), 39 deletions(-) diff --git a/mod_openai_audio_stream.h b/mod_openai_audio_stream.h index 804ad8b..811d472 100644 --- a/mod_openai_audio_stream.h +++ b/mod_openai_audio_stream.h @@ -38,7 +38,6 @@ struct private_data { uint8_t *data; int rtp_packets; switch_buffer_t *playback_buffer; - switch_mutex_t *playback_mutex; }; typedef struct private_data private_t; diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index 6189d9a..7e27483 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -347,26 +347,19 @@ class AudioStreamer { // managing queue, check if empty before popping or peeking - bool is_audio_queue_empty() { - std::lock_guard lock(m_audio_queue_mutex); - return m_audio_queue.empty(); - } - void push_audio_queue(const std::vector& audio_data) { std::lock_guard lock(m_audio_queue_mutex); m_audio_queue.push(audio_data); } - std::vector pop_audio_queue() { + bool pop_audio_queue(std::vector &out_audio) { std::lock_guard lock(m_audio_queue_mutex); - auto audio_data = m_audio_queue.front(); + if (m_audio_queue.empty()) { + return false; + } + out_audio = m_audio_queue.front(); m_audio_queue.pop(); - return audio_data; - } - - std::vector peek_audio_queue() { - std::lock_guard lock(m_audio_queue_mutex); - return m_audio_queue.front(); + return true; } void clear_audio_queue() { @@ -444,20 +437,29 @@ class AudioStreamer { void openai_speech_started() { m_openai_speaking = true; switch_core_session_t* psession = switch_core_session_locate(m_sessionId.c_str()); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai started speaking\n", m_sessionId.c_str()); - const char *payload = "{\"status\":\"started\"}"; - m_notify(psession, EVENT_OPENAI_SPEECH_STARTED, payload); - switch_core_session_rwunlock(psession); + + if (psession) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai started speaking\n", m_sessionId.c_str()); + const char *payload = "{\"status\":\"started\"}"; + m_notify(psession, EVENT_OPENAI_SPEECH_STARTED, payload); + switch_core_session_rwunlock(psession); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "(%s) Openai speech started - could not locate session\n", m_sessionId.c_str()); + } } void openai_speech_stopped() { m_openai_speaking = false; switch_core_session_t* psession = switch_core_session_locate(m_sessionId.c_str()); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai stopped speaking\n", m_sessionId.c_str()); - const char *payload = "{\"status\":\"stopped\"}"; - m_notify(psession, EVENT_OPENAI_SPEECH_STOPPED, payload); - switch_core_session_rwunlock(psession); + if (psession) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%s) Openai stopped speaking\n", m_sessionId.c_str()); + const char *payload = "{\"status\":\"stopped\"}"; + m_notify(psession, EVENT_OPENAI_SPEECH_STOPPED, payload); + switch_core_session_rwunlock(psession); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "(%s) Openai speech stopped - could not locate session\n", m_sessionId.c_str()); + } } @@ -510,13 +512,7 @@ namespace { tech_pvt->openai_audio_muted = 0; const size_t buflen = (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * rtp_packets); - const size_t playback_buflen = 128000; // 128Kb may need to be decreased - - if (switch_mutex_init(&tech_pvt->playback_mutex, SWITCH_MUTEX_NESTED, pool) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, - "%s: Error creating playback mutex.\n", tech_pvt->sessionId); - return SWITCH_STATUS_FALSE; - } + const size_t playback_buflen = 128000; // 128KB may need to be decreased if (switch_buffer_create(pool, &tech_pvt->playback_buffer, playback_buflen) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, @@ -782,11 +778,14 @@ extern "C" { } switch_core_media_bug_flush(bug); + auto last_state = tech_pvt->user_audio_muted; tech_pvt->user_audio_muted = mute ? 1 : 0; - if (mute) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "User audio muted\n"); + if (last_state == tech_pvt->user_audio_muted) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "User audio is already %s\n", tech_pvt->user_audio_muted ? "muted" : "unmuted"); } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "User audio unmuted\n"); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "User audio %s\n", tech_pvt->user_audio_muted ? "muted" : "unmuted"); } return SWITCH_STATUS_SUCCESS; } @@ -805,10 +804,15 @@ extern "C" { } switch_core_media_bug_flush(bug); + auto last_state = tech_pvt->openai_audio_muted; tech_pvt->openai_audio_muted = mute ? 1 : 0; - - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, - "OpenAI audio %s\n", mute ? "muted" : "unmuted"); + if (last_state == tech_pvt->openai_audio_muted) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "OpenAI audio is already %s\n", tech_pvt->openai_audio_muted ? "muted" : "unmuted"); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + "OpenAI audio %s\n", tech_pvt->openai_audio_muted ? "muted" : "unmuted"); + } return SWITCH_STATUS_SUCCESS; } @@ -1054,10 +1058,15 @@ extern "C" { switch_buffer_zero(tech_pvt->playback_buffer); inuse = 0; } - if (inuse < bytes_needed * 2 && !as->is_audio_queue_empty()) { - auto chunk = as->pop_audio_queue(); - switch_buffer_write(tech_pvt->playback_buffer, chunk.data(), chunk.size() * sizeof(int16_t)); - } else if (inuse == 0) { + bool chunk_enqueued = false; + if (inuse < bytes_needed * 2) { + std::vector chunk; + if (as->pop_audio_queue(chunk)) { + switch_buffer_write(tech_pvt->playback_buffer, chunk.data(), chunk.size() * sizeof(int16_t)); + chunk_enqueued = true; + } + } + if (!chunk_enqueued && inuse == 0) { // Openai just finished speaking for interruption or end of response if(as->is_openai_speaking() && as->is_response_audio_done()) { as->openai_speech_stopped(); From 129d7cd08df1fee5d314b434388035430c029dc0 Mon Sep 17 00:00:00 2001 From: Dario Pellegrino Date: Mon, 3 Nov 2025 17:26:58 +0100 Subject: [PATCH 7/7] fix flush buffers and stream 1s silence to trigger VAD on user mute (#22) Not flushing audio buffer caused a delayed response upon unmute (caused by releasing old buffered user audio). Not sending an empty chunk of audio would not trigger model inference when user speech gets interrupted by mute. Also check pr #23 for more details Signed-off-by: Dario Pellegrino --- openai_audio_streamer_glue.cpp | 47 +++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/openai_audio_streamer_glue.cpp b/openai_audio_streamer_glue.cpp index 7e27483..61ccf6e 100644 --- a/openai_audio_streamer_glue.cpp +++ b/openai_audio_streamer_glue.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -768,26 +769,60 @@ extern "C" { switch_status_t stream_session_set_user_mute(switch_core_session_t *session, int mute) { switch_channel_t *channel = switch_core_session_get_channel(session); auto *bug = static_cast(switch_channel_get_private(channel, MY_BUG_NAME)); + switch_status_t status = SWITCH_STATUS_FALSE; if (!bug) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "stream_session_set_user_mute failed because no bug\n"); - return SWITCH_STATUS_FALSE; + return status; } auto *tech_pvt = static_cast(switch_core_media_bug_get_user_data(bug)); if (!tech_pvt) { - return SWITCH_STATUS_FALSE; + return status; } + status = SWITCH_STATUS_SUCCESS; switch_core_media_bug_flush(bug); - auto last_state = tech_pvt->user_audio_muted; + const int last_state = tech_pvt->user_audio_muted; tech_pvt->user_audio_muted = mute ? 1 : 0; if (last_state == tech_pvt->user_audio_muted) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "User audio is already %s\n", tech_pvt->user_audio_muted ? "muted" : "unmuted"); - } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, + return status; + } + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "User audio %s\n", tech_pvt->user_audio_muted ? "muted" : "unmuted"); + + if (tech_pvt->user_audio_muted) { + if (tech_pvt->mutex) { + switch_mutex_lock(tech_pvt->mutex); + } + + if (tech_pvt->buffer) { + ringBufferClear(tech_pvt->buffer); + } + if (tech_pvt->sbuffer) { + switch_buffer_zero(tech_pvt->sbuffer); + } + + AudioStreamer *streamer = static_cast(tech_pvt->pAudioStreamer); + if (streamer && streamer->isConnected()) { + size_t channels = tech_pvt->channels > 0 ? static_cast(tech_pvt->channels) : 1; + size_t sample_rate = tech_pvt->sampling > 0 ? static_cast(tech_pvt->sampling) : 24000; // 24 KHz is currently the only supported rate by openai + size_t bytes = channels * sample_rate * sizeof(int16_t); + std::vector silence(bytes, 0); + streamer->writeAudioDelta(silence.data(), silence.size()); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, + "Sent %zu bytes of silence after muting user audio\n", silence.size()); + } else { + status = SWITCH_STATUS_FALSE; + } + + if (tech_pvt->mutex) { + switch_mutex_unlock(tech_pvt->mutex); + } } - return SWITCH_STATUS_SUCCESS; + + return status; } switch_status_t stream_session_set_openai_mute(switch_core_session_t *session, int mute) {