From 409e80424041cd41704a86fac1353c9b2fcc14bf Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Sun, 9 Oct 2022 10:55:56 -0700 Subject: [PATCH 1/3] make lib/metrics build --- cmd/sendVideoFrame.cc | 9 +- cmd/sound.cc | 23 +++-- include/qmedia/media_client.hh | 6 +- lib/metrics/CMakeLists.txt | 10 +-- lib/metrics/include/metrics.hh | 30 ------- .../include/{ => metrics}/measurements.hh | 26 +++--- lib/metrics/include/metrics/metrics.hh | 48 ++++++++++ lib/metrics/src/measurements.cc | 4 +- lib/metrics/src/metrics.cc | 68 +++++--------- lib/metrics/test/CMakeLists.txt | 4 +- src/audio_encoder.hh | 3 +- src/extern/neo_media_client.cc | 6 +- src/extern/neo_media_client.hh | 2 +- src/full_Fill.cc | 36 +++++--- src/full_Fill.hh | 3 +- src/h264_encoder.cc | 2 +- src/jitter.cc | 89 ++++++++++++------- src/jitter.hh | 9 +- src/jitter_queues.cc | 1 - src/logger.cc | 9 +- src/media_client.cc | 23 ++--- src/media_stream.cc | 63 ++++++++----- src/media_stream.hh | 16 ++-- src/media_transport.cc | 59 ++++++------ src/media_transport.hh | 43 +++++---- src/packet.cc | 2 +- src/playout_leakybucket.cc | 33 ++++--- src/playout_leakybucket.hh | 4 +- src/playout_sync.cc | 3 +- src/playout_tools.cc | 3 +- src/video_stream.cc | 37 +++++--- 31 files changed, 387 insertions(+), 287 deletions(-) delete mode 100644 lib/metrics/include/metrics.hh rename lib/metrics/include/{ => metrics}/measurements.hh (78%) create mode 100644 lib/metrics/include/metrics/metrics.hh diff --git a/cmd/sendVideoFrame.cc b/cmd/sendVideoFrame.cc index aa00330..10eb8d4 100644 --- a/cmd/sendVideoFrame.cc +++ b/cmd/sendVideoFrame.cc @@ -155,7 +155,14 @@ int main(int argc, char **argv) std::uint32_t height = 0; std::uint32_t format = dec_format; // I420 auto received_image_size = MediaClient_getVideoFrame( - client, stream_id, &ts, &width, &height, &format, &buffer, nullptr); + client, + stream_id, + &ts, + &width, + &height, + &format, + &buffer, + nullptr); std::cerr << " r " << ts << " " << received_image_size << std::endl; } diff --git a/cmd/sound.cc b/cmd/sound.cc index 6139ff7..231a644 100644 --- a/cmd/sound.cc +++ b/cmd/sound.cc @@ -100,7 +100,8 @@ void recordThreadFunc(MediaClient *client, MediaStreamId stream_id) .count(); client->send_audio(stream_id, - reinterpret_cast(const_cast(audioBuff)), + reinterpret_cast( + const_cast(audioBuff)), buff_size, timestamp); logger->debug << "-" << std::flush; @@ -113,7 +114,7 @@ void recordThreadFunc(MediaClient *client, MediaStreamId stream_id) free(zerobuff); } -void playThreadFunc(MediaClient* client, MediaStreamId stream_id) +void playThreadFunc(MediaClient *client, MediaStreamId stream_id) { std::chrono::steady_clock::time_point loop_time = std::chrono::steady_clock::now(); @@ -166,7 +167,8 @@ void playThreadFunc(MediaClient* client, MediaStreamId stream_id) std::chrono::steady_clock::time_point get_audio = std::chrono::steady_clock::now(); - int recv_actual = client->get_audio(stream_id, timestamp, &raw_data, buff_size, nullptr); + int recv_actual = client->get_audio( + stream_id, timestamp, &raw_data, buff_size, nullptr); auto audio_delta = std::chrono::duration_cast( std::chrono::steady_clock::now() - get_audio); logger->info << "{A:" << audio_delta.count() << "}" << std::flush; @@ -271,7 +273,8 @@ int main(int argc, char *argv[]) if (argc < 4) { std::cerr << "Must provide mode of operation" << std::endl; - std::cerr << "Usage: sound " + std::cerr << "Usage: sound " + " " << std::endl; std::cerr << "Mode: pub/sub/pubsub" << std::endl; std::cerr << "" << std::endl; @@ -313,9 +316,9 @@ int main(int argc, char *argv[]) // configure new stream callback auto wrapped_stream_callback = [](uint64_t client_id, - uint64_t source_id, - uint64_t source_ts, - MediaType media_type) + uint64_t source_id, + uint64_t source_ts, + MediaType media_type) { logger->info << "[Sound]: New Sorcce" << source_id << std::flush; recvMedia = true; @@ -385,7 +388,7 @@ int main(int argc, char *argv[]) } std::vector threads; - uint64_t stream_id {0}; + uint64_t stream_id{0}; if (mode == "send") { MediaConfig config{}; @@ -408,7 +411,9 @@ int main(int argc, char *argv[]) stream_id = client.add_audio_stream(0x1000, 0x2000, 0x3000, config); threads.emplace_back(playThreadFunc, &client, stream_id); threads.at(0).detach(); - } else if (mode == "sendrecv") { + } + else if (mode == "sendrecv") + { MediaConfig config{}; config.media_direction = MediaConfig::MediaDirection::sendrecv; config.media_codec = MediaConfig::CodecType::opus; diff --git a/include/qmedia/media_client.hh b/include/qmedia/media_client.hh index 4f3bf33..71449ea 100644 --- a/include/qmedia/media_client.hh +++ b/include/qmedia/media_client.hh @@ -21,8 +21,8 @@ enum struct TransportType enum MediaType { invalid = 0, - audio = 1, - video = 2 + audio = 1, + video = 2 }; struct AudioConfig @@ -147,7 +147,7 @@ public: unsigned char **buffer, void **to_free); - void release_media_buffer(void* buffer); + void release_media_buffer(void *buffer); private: // Should this be exposed in public header ?? diff --git a/lib/metrics/CMakeLists.txt b/lib/metrics/CMakeLists.txt index a98072b..a275aa4 100644 --- a/lib/metrics/CMakeLists.txt +++ b/lib/metrics/CMakeLists.txt @@ -13,14 +13,8 @@ find_package(CURL REQUIRED) add_library(metrics src/measurements.cc src/metrics.cc) -target_link_libraries(metrics - PUBLIC - ${CURL_LIBRARIES} - OpenSSL::SSL - OpenSSL::Crypto) -target_include_directories(metrics - PUBLIC - ${CMAKE_SOURCE_DIR}/include) +target_link_libraries(metrics PRIVATE ${CURL_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto) +target_include_directories(metrics PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) target_compile_options(metrics PRIVATE $<$,$,$>: -Wpedantic -Wextra -Wall> diff --git a/lib/metrics/include/metrics.hh b/lib/metrics/include/metrics.hh deleted file mode 100644 index 11d7a6d..0000000 --- a/lib/metrics/include/metrics.hh +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include "measurements.hh" - -namespace metrics -{ - -class Metrics -{ -public: - typedef std::shared_ptr MetricsPtr; - - void pusher(); - void push(); // push(std::string & name) - specific push - - std::mutex metrics_mutex; - std::condition_variable cv; - std::mutex push_mutex; - std::map measurements; -}; - -} diff --git a/lib/metrics/include/measurements.hh b/lib/metrics/include/metrics/measurements.hh similarity index 78% rename from lib/metrics/include/measurements.hh rename to lib/metrics/include/metrics/measurements.hh index 12245a2..08b928e 100644 --- a/lib/metrics/include/measurements.hh +++ b/lib/metrics/include/metrics/measurements.hh @@ -9,21 +9,17 @@ namespace metrics { enum struct MeasurementType { - PacketRate_Tx, - PacketRate_Rx, - FrameRate_Tx, - FrameRate_Rx, - QDepth_Tx, - QDepth_Rx, + FrameRate_Tx, // frame count before transmit + FrameRate_Rx, // frame count from transport after re-assembly + EncodeTime, // time elapsed for encoding a raw sample (ms) + EndToEndLatency, // tx to rx latency (ms) }; const auto measurement_names = std::map{ - {MeasurementType::PacketRate_Tx, "TxPacketCount"}, - {MeasurementType::PacketRate_Rx, "RxPacketCount"}, {MeasurementType::FrameRate_Tx, "TxFrameCount"}, {MeasurementType::FrameRate_Rx, "RxFrameCount"}, - {MeasurementType::QDepth_Tx, "TxQueueDepth"}, - {MeasurementType::QDepth_Rx, "RxQueueDepth"}, + {MeasurementType::EncodeTime, "EncodeTimeInMs"}, + {MeasurementType::EndToEndLatency, "EndToEndLatencyInMs"}, }; /// @@ -32,12 +28,10 @@ const auto measurement_names = std::map{ /// struct Measurement { - virtual std::string toString() = 0; virtual ~Measurement() = default; + virtual std::string toString() = 0; }; -/// Influx Measurement and Helpers -/// // handy defines using Field = std::pair; @@ -46,11 +40,10 @@ using Tag = std::pair; using Tags = std::list; using TimePoint = std::pair; -class InfluxMeasurement : public Measurement +class InfluxMeasurement : public Measurement { public: - - static std::unique_ptr createMeasurement(std::string name, Tags tags); + static std::unique_ptr create(std::string name, Tags tags); InfluxMeasurement(std::string &name, Tags &tags); ~InfluxMeasurement() = default; @@ -63,6 +56,7 @@ public: Tags tags; Fields fields; }; + using TimeSeriesEntry = std::pair; // Setters for the measurement diff --git a/lib/metrics/include/metrics/metrics.hh b/lib/metrics/include/metrics/metrics.hh new file mode 100644 index 0000000..4a3ab08 --- /dev/null +++ b/lib/metrics/include/metrics/metrics.hh @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "measurements.hh" + +namespace metrics +{ + +struct InfluxConfig +{ + static const std::string url; + static const std::string org; + static const std::string bucket; + static const std::string auth_token; +}; + +class Metrics +{ +public: + // influxdb factory + static std::shared_ptr create(const InfluxConfig& config); + + void pusher(); + void push(); // push(std::string & name) - specific push + + // created via factory + Metrics(CURL* handle); + ~Metrics() = default; + + bool shutdown = false; + bool push_signals = false; + std::mutex metrics_mutex; + std::condition_variable cv; + std::mutex push_mutex; + std::thread metrics_thread; + std::map> measurements; + // make it RAII + CURL *handle; +}; + +} diff --git a/lib/metrics/src/measurements.cc b/lib/metrics/src/measurements.cc index 458ac8a..5890731 100644 --- a/lib/metrics/src/measurements.cc +++ b/lib/metrics/src/measurements.cc @@ -1,10 +1,10 @@ #include -#include "measurements.hh" +#include namespace metrics { -std::unique_ptr InfluxMeasurement::createMeasurement(std::string name, Tags tags) +std::unique_ptr InfluxMeasurement::create(std::string name, Tags tags) { return std::make_unique(name, tags); } diff --git a/lib/metrics/src/metrics.cc b/lib/metrics/src/metrics.cc index 43cb48b..e7cbb4b 100644 --- a/lib/metrics/src/metrics.cc +++ b/lib/metrics/src/metrics.cc @@ -5,75 +5,55 @@ #include #include -#include "metrics.hh" +#include +#include -using namespace qmedia; - -const std::string MetricsConfig::URL = ""; -const std::string MetricsConfig::ORG = ""; -const std::string MetricsConfig::BUCKET = ""; -const std::string MetricsConfig::AUTH_TOKEN = ""; +namespace metrics +{ -Metrics::Metrics(const std::string &influx_url, - const std::string &org, - const std::string &bucket, - const std::string &auth_token) +// factory methods +std::shared_ptr Metrics::create(const InfluxConfig& config) { - // don't run push thread or run curl if url not provided - if (influx_url.empty()) return; + if (config.url.empty() || config.auth_token.empty()) + { + return nullptr; + } // manipulate url properties for use with CURL - std::string adjusted_url = influx_url + "/api/v2/write?org=" + org + - "&bucket=" + bucket + "&precision=ns"; + std::string adjusted_url = config.url + "/api/v2/write?org=" + config.org + + "&bucket=" + config.bucket + "&precision=ns"; std::clog << "influx url:" << adjusted_url << std::endl; // initial curl curl_global_init(CURL_GLOBAL_ALL); - - // get a curl handle - handle = curl_easy_init(); - - // set the action to POST + auto handle = curl_easy_init(); curl_easy_setopt(handle, CURLOPT_POST, 1L); - - // set the url curl_easy_setopt(handle, CURLOPT_URL, adjusted_url.c_str()); - - assert(!auth_token.empty()); - curl_easy_setopt(handle, CURLOPT_HTTPAUTH, CURLAUTH_ANY); - - struct curl_slist *headerlist = NULL; - std::string token_str = "Authorization: Token " + auth_token; + struct curl_slist *headerlist = nullptr; + std::string token_str = "Authorization: Token " + config.auth_token; headerlist = curl_slist_append(headerlist, token_str.c_str()); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headerlist); - - // verify certificate curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L); - - // verify host curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L); - // do not allow unlimited redirects curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 50L); - // enable TCP keep-alive probing curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, 1L); - // Start the service thread - metrics_thread = std::thread(&Metrics::pusher, this); - metrics_thread.detach(); + return std::make_shared(handle); + } -Metrics::MeasurementPtr Metrics::createMeasurement(std::string name, - Measurement::Tags tags) + +Metrics::Metrics(CURL* handle_in) { - std::lock_guard lock(metrics_mutex); - auto frame = std::make_shared(name, tags); - measurements.insert(std::pair(name, frame)); - return frame; + handle = handle_in; + metrics_thread = std::thread(&Metrics::pusher, this); + metrics_thread.detach(); } +} + diff --git a/lib/metrics/test/CMakeLists.txt b/lib/metrics/test/CMakeLists.txt index f9832aa..152c050 100644 --- a/lib/metrics/test/CMakeLists.txt +++ b/lib/metrics/test/CMakeLists.txt @@ -7,8 +7,8 @@ find_package(doctest REQUIRED) file(GLOB TEST_SOURCES CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/*.cc) add_executable(${TEST_APP_NAME} ${TEST_SOURCES}) -add_dependencies(${TEST_APP_NAME} ${CURRENT_LIB_NAME}) -target_link_libraries(${TEST_APP_NAME} ${CURRENT_LIB_NAME} doctest::doctest) +add_dependencies(${TEST_APP_NAME} ${CURRENT_LIB_NAME} metrics) +target_link_libraries(${TEST_APP_NAME} ${CURRENT_LIB_NAME} metrics doctest::doctest) # Enable CTest include(doctest) diff --git a/src/audio_encoder.hh b/src/audio_encoder.hh index a53a291..dbc8af2 100644 --- a/src/audio_encoder.hh +++ b/src/audio_encoder.hh @@ -23,7 +23,8 @@ namespace qmedia struct AudioEncoder { public: - typedef std::function &&, uint64_t)> frameReadyCallback; + typedef std::function &&, uint64_t)> + frameReadyCallback; AudioEncoder(unsigned int audio_sample_rate, int audio_channels, diff --git a/src/extern/neo_media_client.cc b/src/extern/neo_media_client.cc index 2621015..1881f8b 100644 --- a/src/extern/neo_media_client.cc +++ b/src/extern/neo_media_client.cc @@ -158,7 +158,8 @@ extern "C" } auto media_client = static_cast(instance); - return media_client->get_audio(streamId, *timestamp, buffer, max_len, to_free); + return media_client->get_audio( + streamId, *timestamp, buffer, max_len, to_free); } void CALL MediaClient_sendVideoFrame(void *instance, @@ -213,7 +214,8 @@ extern "C" streamId, *timestamp, *width, *height, *format, buffer, to_free); } - void CALL release_media_buffer(void *instance, void* buffer) { + void CALL release_media_buffer(void *instance, void *buffer) + { if (!instance || !buffer) { return; diff --git a/src/extern/neo_media_client.hh b/src/extern/neo_media_client.hh index ed760c6..a7d6550 100644 --- a/src/extern/neo_media_client.hh +++ b/src/extern/neo_media_client.hh @@ -92,5 +92,5 @@ extern "C" unsigned char **buffer, void **to_free); - EXPORT void CALL release_media_buffer(void *instance, void* buffer); + EXPORT void CALL release_media_buffer(void *instance, void *buffer); } diff --git a/src/full_Fill.cc b/src/full_Fill.cc index e7bc38c..9ac8058 100644 --- a/src/full_Fill.cc +++ b/src/full_Fill.cc @@ -27,16 +27,19 @@ unsigned int fullFill::getTotalInBuffers(LoggerPointer logger) if (logger) { logger->debug << "getTotalInBuffers: totalLength: " << totalLength - << ", read_front " << read_front << std::flush; + << ", read_front " << read_front << std::flush; } return totalLength - read_front; } -uint64_t fullFill::calculate_timestamp(LoggerPointer logger, unsigned int front, +uint64_t fullFill::calculate_timestamp(LoggerPointer logger, + unsigned int front, uint64_t timestamp) const { - logger->debug << "calculate_timestamp: front " << front << ", timestamp " << timestamp << std::endl; - if (front == 0 || timestamp == 0) { + logger->debug << "calculate_timestamp: front " << front << ", timestamp " + << timestamp << std::endl; + if (front == 0 || timestamp == 0) + { return timestamp; } @@ -47,7 +50,8 @@ uint64_t fullFill::calculate_timestamp(LoggerPointer logger, unsigned int front, return timestamp + microseconds_passed; } -bool fullFill::fill(LoggerPointer logger, std::vector &fill_buffer, +bool fullFill::fill(LoggerPointer logger, + std::vector &fill_buffer, unsigned int fill_length, uint64_t ×tamp) { @@ -56,9 +60,9 @@ bool fullFill::fill(LoggerPointer logger, std::vector &fill_buffer, timestamp = UINT64_MAX; // set timestamp from first filled sample - // using MAX to set only once - logger->debug << "Fill: total_length: " << total_length - << ", fill_length " << fill_length - << ", num elems " << buffers.size() << std::flush; + logger->debug << "Fill: total_length: " << total_length << ", fill_length " + << fill_length << ", num elems " << buffers.size() + << std::flush; if (total_length >= fill_length) { while (fill_buffer.size() < fill_length) @@ -72,8 +76,8 @@ bool fullFill::fill(LoggerPointer logger, std::vector &fill_buffer, if (timestamp == UINT64_MAX) { timestamp = calculate_timestamp(logger, read_front, dat.second); - logger->debug << "Fill: calculate_timestamp: " << timestamp << std::flush; - + logger->debug << "Fill: calculate_timestamp: " << timestamp + << std::flush; } if (available_length == to_fill) @@ -84,14 +88,16 @@ bool fullFill::fill(LoggerPointer logger, std::vector &fill_buffer, read_front = 0; buffers.pop_front(); break; - } else if (available_length < to_fill) + } + else if (available_length < to_fill) { fill_buffer.insert(fill_buffer.end(), dat.first.begin() + read_front, dat.first.end()); read_front = 0; buffers.pop_front(); - } else + } + else { // current buffer has more data than to_fill, copy until to_fill fill_buffer.insert(fill_buffer.end(), @@ -103,12 +109,14 @@ bool fullFill::fill(LoggerPointer logger, std::vector &fill_buffer, } } - if (timestamp == UINT64_MAX) { + if (timestamp == UINT64_MAX) + { logger->debug << "Fill: timestamp == UINT64_MAX" << std::flush; timestamp = 0; } - logger->debug << "Fill: fill_buffer size " << fill_buffer.size() << std::flush; + logger->debug << "Fill: fill_buffer size " << fill_buffer.size() + << std::flush; return (fill_buffer.size() == fill_length); } diff --git a/src/full_Fill.hh b/src/full_Fill.hh index c98d642..6de7d78 100644 --- a/src/full_Fill.hh +++ b/src/full_Fill.hh @@ -19,7 +19,8 @@ public: void addBuffer(const uint8_t *buffer, unsigned int length, std::uint64_t timestamp); - bool fill(LoggerPointer logger, std::vector &fill_buffer, + bool fill(LoggerPointer logger, + std::vector &fill_buffer, unsigned int fill_length, std::uint64_t ×tamp); diff --git a/src/h264_encoder.cc b/src/h264_encoder.cc index 8f07091..93bad90 100644 --- a/src/h264_encoder.cc +++ b/src/h264_encoder.cc @@ -210,7 +210,7 @@ H264Encoder::encode(const char *input_buffer, if (genKeyFrame || idr_frame) { logger->debug << "h264Encoder:: Force IDR, total_frames_encoded: " - << total_frames_encoded << std::flush; + << total_frames_encoded << std::flush; auto ret = encoder->ForceIntraFrame(true); logger->debug << "h264Encoder:: IDR Frame Generation Result " << ret << std::flush; diff --git a/src/jitter.cc b/src/jitter.cc index bc4bad4..53c7eab 100644 --- a/src/jitter.cc +++ b/src/jitter.cc @@ -60,7 +60,7 @@ void Jitter::set_video_params(uint32_t video_max_width, video.decoder = std::make_unique(video_decode_pixel_format); logger->info << "[set_video_params]" << video.last_decoded_width << "," - << video.last_decoded_height << "," + << video.last_decoded_height << "," << video.last_decoded_format << std::flush; assert(video.decoder); } @@ -134,7 +134,7 @@ bool Jitter::push(PacketPointer packet, auto seq = packet->encodedSequenceNum; video.push(std::move(packet), sync.video_seq_popped, now); logger->debug << "[jitter-v: seq_no:" << seq - << ", q-size:" << video.mq.size() << std::flush; + << ", q-size:" << video.mq.size() << std::flush; break; } @@ -201,14 +201,15 @@ PacketPointer Jitter::popAudio(uint64_t sourceID, } logger->debug << "[J-PopAudio: Q-depth:" << audio.mq.size() << "*" - << audio.ms_per_audio_packet << "=" << audio.getMsInQueue() - << "]" << std::flush; + << audio.ms_per_audio_packet << "=" << audio.getMsInQueue() + << "]" << std::flush; logger->debug << "[J-PopAudio: Jitter-ms:" << audio_jitter.getJitterMs() - << "]" << std::flush; + << "]" << std::flush; logger->debug << "[J-PopAudio: Asking Length:" << length << "]" - << std::flush; + << std::flush; logger->debug << "[J-PopAudio: Playing total in buffers" - << audio.playout.getTotalInBuffers(nullptr) << "]" << std::flush; + << audio.playout.getTotalInBuffers(nullptr) << "]" + << std::flush; QueueMonitor(now); int num_depth_adjustments = 1; @@ -216,8 +217,8 @@ PacketPointer Jitter::popAudio(uint64_t sourceID, // loop to ensure clients asking for variable length data while (audio.playout.getTotalInBuffers(logger) < length) { - if (bucket.initialFill(audio.getMsInQueue(), - audio_jitter.getJitterMs(), logger)) + if (bucket.initialFill( + audio.getMsInQueue(), audio_jitter.getJitterMs(), logger)) { // we don't have anything in our buffers, create PLC packet = audio.createPLC(audio.getFrameSize()); @@ -368,13 +369,17 @@ void Jitter::decodeVideoPacket(PacketPointer packet, video.lastDecodedFrame); if (error) { - logger->error << "[Jitter::decodeVideoPacket]" << error << "]" << std::endl; + logger->error << "[Jitter::decodeVideoPacket]" << error << "]" + << std::endl; // reuse the last decoded frame / parameters - } else { + } + else + { sync.video_popped( packet->sourceRecordTime, packet->encodedSequenceNum, now); // update the last decoded frame / parameters - video.last_decoded_format = (uint8_t) VideoConfig::PixelFormat::I420; + video.last_decoded_format = (uint8_t) + VideoConfig::PixelFormat::I420; video.last_decoded_height = height; video.last_decoded_width = width; video.last_decoded_timestamp = packet->sourceRecordTime; @@ -419,9 +424,8 @@ int Jitter::popVideo(uint64_t sourceID, return len; } - logger->debug << "[Jitter:popVideo]:"<< sourceID - << ", queue has: " << video.mq.size() << " ]" - << std::flush; + logger->debug << "[Jitter:popVideo]:" << sourceID + << ", queue has: " << video.mq.size() << " ]" << std::flush; if (idle_client) { @@ -485,7 +489,7 @@ int Jitter::popVideo(uint64_t sourceID, len = setDecodedFrame( sourceID, width, height, format, timestamp, buffer); logger->debug << "jitter: popDiscard: keyFrame requested" - << std::flush; + << std::flush; break; case Sync::sync_action::pop_video_only: for (unsigned int pops = 0; pops < (video.mq.size() - 2); @@ -535,25 +539,34 @@ void Jitter::QueueMonitor(std::chrono::steady_clock::time_point now) { std::lock_guard lock(audio.mq.qMutex); unsigned int plcs = 0; - logger->debug << "[JQM: last_seq_popped:]" << sync.audio_seq_popped << std::flush; + logger->debug << "[JQM: last_seq_popped:]" << sync.audio_seq_popped + << std::flush; unsigned int lost_in_queue = audio.mq.lostInQueue(plcs, sync.audio_seq_popped); logger->debug << "[JQM: lostInQueue:" << lost_in_queue << ",plcs:" << plcs << "]" << std::flush; // total number of audio frames in queue unsigned int queue_size = audio.getMsInQueue(); - logger->debug << "[JQM: getMsPerAudioPacket" << audio.getMsPerAudioPacket() << "]" << std::flush; - logger->debug << "[JQM: Q-Size == getMsPerAudioPacket:" << queue_size << "]" << std::flush; + logger->debug << "[JQM: getMsPerAudioPacket" << audio.getMsPerAudioPacket() + << "]" << std::flush; + logger->debug << "[JQM: Q-Size == getMsPerAudioPacket:" << queue_size << "]" + << std::flush; // unsigned int queue_size = audio.mq.size(); // average jitter since the last pop unsigned int jitter_ms = audio_jitter.getJitterMs(); - logger->debug << "[JQM: averagee audio-jitter-ms:" << jitter_ms << "]" << std::flush; + logger->debug << "[JQM: averagee audio-jitter-ms:" << jitter_ms << "]" + << std::flush; unsigned int ms_per_audio = audio.getMsPerAudioPacket(); logger->debug << "[JQM: ms_per_audio:" << ms_per_audio << "]" << std::flush; unsigned int client_fps = audio.fps.getFps(); logger->debug << "[JQM: client_fps:" << client_fps << "]" << std::flush; - bucket.tick( - now, queue_size, lost_in_queue, jitter_ms, ms_per_audio, client_fps, logger); + bucket.tick(now, + queue_size, + lost_in_queue, + jitter_ms, + ms_per_audio, + client_fps, + logger); } /// @@ -671,8 +684,10 @@ PacketPointer Jitter::Audio::pop(std::chrono::steady_clock::time_point now) unsigned int Jitter::Audio::getMsPerPacketInQueue(LoggerPointer logger) { std::lock_guard lock(mq.qMutex); - if(logger) { - logger->debug << "Audio::getMsPerPacketInQueue:" << mq.Q.size() << std::flush; + if (logger) + { + logger->debug << "Audio::getMsPerPacketInQueue:" << mq.Q.size() + << std::flush; } for (auto &mf : mq.Q) { @@ -691,8 +706,10 @@ unsigned int Jitter::Audio::getMsPerPacketInQueue(LoggerPointer logger) break; } size_t num_bytes = mf->packet->data.size(); - if(logger) { - logger->debug << "Audio::getMsPerPacketInQueue: num_bytes" << num_bytes << std::flush; + if (logger) + { + logger->debug << "Audio::getMsPerPacketInQueue: num_bytes" + << num_bytes << std::flush; } if (num_bytes > 0) @@ -700,15 +717,22 @@ unsigned int Jitter::Audio::getMsPerPacketInQueue(LoggerPointer logger) // unsigned int samples_per_channel = num_bytes / audio_channels / media_type_size; - if(logger) { - logger->debug << "Audio::getMsPerPacketInQueue: samples_per_channel" << samples_per_channel << std::flush; + if (logger) + { + logger->debug << "Audio::getMsPerPacketInQueue: " + "samples_per_channel" + << samples_per_channel << std::flush; } unsigned int msPerPacket = 1000 / (audio_sample_rate / samples_per_channel); - if(logger) { - logger->debug << "Audio::getMsPerPacketInQueue: audio_sample_rate" << audio_sample_rate << std::flush; - logger->debug << "Audio::getMsPerPacketInQueue: msPerPacket" << msPerPacket << std::flush; + if (logger) + { + logger->debug << "Audio::getMsPerPacketInQueue: " + "audio_sample_rate" + << audio_sample_rate << std::flush; + logger->debug << "Audio::getMsPerPacketInQueue: msPerPacket" + << msPerPacket << std::flush; } return msPerPacket; } @@ -805,7 +829,8 @@ void Jitter::Video::push(PacketPointer raw_packet, std::map> JitterFactory::jitters = {}; -std::shared_ptr JitterFactory::GetJitter(LoggerPointer logger, uint64_t client_id) +std::shared_ptr JitterFactory::GetJitter(LoggerPointer logger, + uint64_t client_id) { if (auto it{jitters.find(client_id)}; it != std::end(jitters)) { diff --git a/src/jitter.hh b/src/jitter.hh index c079741..b9cadc7 100644 --- a/src/jitter.hh +++ b/src/jitter.hh @@ -68,7 +68,7 @@ public: PacketPointer createPLC(unsigned int size); PacketPointer createZeroPayload(unsigned int size); - unsigned int getMsPerAudioPacket(LoggerPointer logger=nullptr); + unsigned int getMsPerAudioPacket(LoggerPointer logger = nullptr); unsigned int getMsInQueue(); void pruneAudioQueue(std::chrono::steady_clock::time_point now, unsigned int prune_target); @@ -146,9 +146,10 @@ private: void idleClientPruneAudio(std::chrono::steady_clock::time_point now); }; -struct JitterFactory { - - static std::shared_ptr GetJitter(LoggerPointer logger, uint64_t client_id); +struct JitterFactory +{ + static std::shared_ptr GetJitter(LoggerPointer logger, + uint64_t client_id); static const unsigned int maxJitters = 2; static std::map> jitters; }; diff --git a/src/jitter_queues.cc b/src/jitter_queues.cc index cfee6ce..35eb040 100644 --- a/src/jitter_queues.cc +++ b/src/jitter_queues.cc @@ -184,7 +184,6 @@ void MetaQueue::queueAudioFrame(PacketPointer raw_packet, } } } - } PacketPointer MetaQueue::pop(std::chrono::steady_clock::time_point now) diff --git a/src/logger.cc b/src/logger.cc index f9a2d04..a5fde20 100644 --- a/src/logger.cc +++ b/src/logger.cc @@ -308,15 +308,16 @@ std::string Logger::GetTimestamp() const auto now = std::chrono::system_clock::now(); auto now_ms = std::chrono::time_point_cast(now); std::time_t t = std::chrono::system_clock::to_time_t(now); - struct tm tm_result{}; + struct tm tm_result + { + }; #ifdef _WIN32 localtime_s(&tm_result, &t); #else localtime_r(&t, &tm_result); #endif - oss << std::put_time(&tm_result, "%FT%T") << "." - << std::setfill('0') << std::setw(3) - << (now_ms.time_since_epoch().count()) % 1000; + oss << std::put_time(&tm_result, "%FT%T") << "." << std::setfill('0') + << std::setw(3) << (now_ms.time_since_epoch().count()) % 1000; return oss.str(); } diff --git a/src/media_client.cc b/src/media_client.cc index 6099725..5a4947b 100644 --- a/src/media_client.cc +++ b/src/media_client.cc @@ -135,7 +135,8 @@ void MediaClient::do_work() auto message = media_transport->recv(); if (message.data.empty()) { - log->info << "[MediaClient::do_work]: Message data is empty" << std::flush; + log->info << "[MediaClient::do_work]: Message data is empty" + << std::flush; continue; } @@ -144,7 +145,7 @@ void MediaClient::do_work() iss >> media_stream_id; // Note: since a subscribe should preceed before - // data arrive, we should find an entry when + // data arrival, we should find an entry when // there is data for a given stream if (!active_streams.count(media_stream_id)) { @@ -156,8 +157,8 @@ void MediaClient::do_work() if (message.data.size() > 100) { log->debug << "[MediaClient::do_work]: got message for " - << media_stream_id << " data:" << message.data.size() - << std::flush; + << media_stream_id << " data:" << message.data.size() + << std::flush; } // hand the data to appropriate media stream @@ -165,7 +166,6 @@ void MediaClient::do_work() message.group_id, message.object_id, std::move(message.data)); - } } @@ -173,7 +173,7 @@ int MediaClient::get_audio(MediaStreamId streamId, uint64_t ×tamp, unsigned char **buffer, unsigned int max_len, - void **to_free) + void **to_free) { uint32_t recv_length = 0; // happens on client thread @@ -203,7 +203,8 @@ std::uint32_t MediaClient::get_video(MediaStreamId streamId, // happens on client thread if (!active_streams.count(streamId)) { - log->warning << "[MediaClient::get_video]: media stream inactive" << std::flush; + log->warning << "[MediaClient::get_video]: media stream inactive" + << std::flush; return 0; } @@ -211,7 +212,8 @@ std::uint32_t MediaClient::get_video(MediaStreamId streamId, active_streams[streamId]); MediaConfig config{}; - recv_length = video_stream->get_media(timestamp, config, buffer, 0, to_free); + recv_length = video_stream->get_media( + timestamp, config, buffer, 0, to_free); width = config.video_max_width; height = config.video_max_height; format = (uint32_t) config.video_decode_pixel_format; @@ -224,7 +226,8 @@ void MediaClient::remove_media_stream(MediaStreamId media_stream_id) // happens on client thread if (!active_streams.count(media_stream_id)) { - log->warning << "[MediaClient::get_video]: media stream inactive" << std::flush; + log->warning << "[MediaClient::get_video]: media stream inactive" + << std::flush; return; } @@ -233,7 +236,7 @@ void MediaClient::remove_media_stream(MediaStreamId media_stream_id) void MediaClient::release_media_buffer(void *buffer) { - delete((Packet*) buffer); + delete ((Packet *) buffer); } } // namespace qmedia diff --git a/src/media_stream.cc b/src/media_stream.cc index 6bf0b0d..6f2f99a 100644 --- a/src/media_stream.cc +++ b/src/media_stream.cc @@ -8,14 +8,15 @@ namespace qmedia /// MediaStream /// -void MediaStream::handle_media(MediaClient::NewSourceCallback stream_callback, +void MediaStream::handle_media(MediaClient::NewSourceCallback stream_callback, uint64_t /*group_id*/, uint64_t /*object_id*/, std::vector &&data) { if (data.empty()) { - logger->info << "[MediaStream::handle_media]: empty data " << std::flush; + logger->info << "[MediaStream::handle_media]: empty data " + << std::flush; return; } @@ -24,15 +25,17 @@ void MediaStream::handle_media(MediaClient::NewSourceCallback stream_callback, auto ret = Packet::decode(data, packet.get()); if (!ret) { - logger->info << "[MediaStream::handle_media]: packet decoder error " << std::flush; + logger->info << "[MediaStream::handle_media]: packet decoder error " + << std::flush; return; } uint64_t client_id = packet->clientID; - uint64_t source_id= packet->sourceID; + uint64_t source_id = packet->sourceID; uint64_t source_ts = packet->sourceRecordTime; MediaType media_type = MediaType::invalid; - switch(packet->mediaType) { + switch (packet->mediaType) + { case Packet::MediaType::H264: media_type = MediaType::video; break; @@ -45,11 +48,19 @@ void MediaStream::handle_media(MediaClient::NewSourceCallback stream_callback, media_type = MediaType::invalid; } + auto now_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + std::chrono::milliseconds sent_time(source_ts); + logger->info << (int) packet->mediaType << "," + << (now_ms - sent_time.count()) / 1000 << std::flush; + bool new_stream = false; auto jitter_instance = JitterFactory::GetJitter(logger, client_id); if (jitter_instance == nullptr) { - logger->warning << "[MediaStream::handle_media]: jitter is null" << std::flush; + logger->warning << "[MediaStream::handle_media]: jitter is null" + << std::flush; return; } @@ -60,12 +71,12 @@ void MediaStream::handle_media(MediaClient::NewSourceCallback stream_callback, { stream_callback(client_id, source_id, source_ts, media_type); } - } void MediaStream::remove_stream() { - if(media_transport) { + if (media_transport) + { media_transport->unregister_stream(id(), media_direction); } } @@ -106,7 +117,8 @@ void AudioStream::configure() auto jitter = JitterFactory::GetJitter(logger, client_id); if (jitter == nullptr) { - logger->error << "[VideoStream::configure]: jitter is null" << std::flush; + logger->error << "[VideoStream::configure]: jitter is null" + << std::flush; } Packet::MediaType packet_media_type = Packet::MediaType::Bad; @@ -122,7 +134,8 @@ void AudioStream::configure() assert(0); } - jitter->set_audio_params(config.sample_rate, config.channels, packet_media_type); + jitter->set_audio_params( + config.sample_rate, config.channels, packet_media_type); } MediaStreamId AudioStream::id() @@ -173,15 +186,17 @@ void AudioStream::handle_media(MediaConfig::CodecType codec_type, } size_t AudioStream::get_media(uint64_t ×tamp, - MediaConfig &/*config*/, + MediaConfig & /*config*/, unsigned char **buffer, unsigned int max_len, - void** to_free) + void **to_free) { int recv_length = 0; auto jitter = JitterFactory::GetJitter(logger, client_id); - if (jitter == nullptr) { - logger->error << "[AudioStream::get_media] Jitter not found" << std::flush; + if (jitter == nullptr) + { + logger->error << "[AudioStream::get_media] Jitter not found" + << std::flush; return 0; } @@ -192,7 +207,8 @@ size_t AudioStream::get_media(uint64_t ×tamp, timestamp = packet->sourceRecordTime; *buffer = &packet->data[0]; recv_length = packet->data.size(); - logger->debug << "[AudioStream::get_media] recv_length:" << recv_length << std::flush; + logger->debug << "[AudioStream::get_media] recv_length:" << recv_length + << std::flush; *to_free = packet.release(); } @@ -203,7 +219,8 @@ size_t AudioStream::get_media(uint64_t ×tamp, /// Private /// -void AudioStream::audio_encoder_callback(std::vector &&bytes, uint64_t timestamp) +void AudioStream::audio_encoder_callback(std::vector &&bytes, + uint64_t timestamp) { static uint64_t group_id = 0; static uint64_t object_id = 0; @@ -230,11 +247,11 @@ void AudioStream::audio_encoder_callback(std::vector &&bytes, uint64_t } logger->debug << "MediaStream: " << id() - << " sending audio packet: " << packet->encodedSequenceNum - << ", timestamp " << packet->sourceRecordTime - << std::flush; + << " sending audio packet: " << packet->encodedSequenceNum + << ", timestamp " << packet->sourceRecordTime << std::flush; - media_transport->send_data(id(), std::move(packet->encoded_data), group_id, object_id); + media_transport->send_data( + id(), std::move(packet->encoded_data), group_id, object_id); group_id += 1; } @@ -242,8 +259,10 @@ std::shared_ptr AudioStream::setupAudioEncoder() { if (encoder == nullptr) { - auto callback = std::bind( - &AudioStream::audio_encoder_callback, this, std::placeholders::_1, std::placeholders::_2); + auto callback = std::bind(&AudioStream::audio_encoder_callback, + this, + std::placeholders::_1, + std::placeholders::_2); encoder = std::make_shared(config.sample_rate, config.channels, config.sample_type, diff --git a/src/media_stream.hh b/src/media_stream.hh index 7549b71..7357346 100644 --- a/src/media_stream.hh +++ b/src/media_stream.hh @@ -30,7 +30,8 @@ public: client_id(client_id_in), config(config_in), logger(logger_in) - {} + { + } virtual ~MediaStream() = default; @@ -44,7 +45,7 @@ public: MediaConfig &config, unsigned char **buffer, unsigned int max_len, - void** to_free) = 0; + void **to_free) = 0; void set_transport(std::shared_ptr transport) { @@ -91,11 +92,12 @@ struct AudioStream : public MediaStream MediaConfig &config, unsigned char **buffer, unsigned int max_len, - void** to_free) override; + void **to_free) override; private: std::shared_ptr setupAudioEncoder(); - void audio_encoder_callback(std::vector &&bytes, uint64_t timestamp); + void audio_encoder_callback(std::vector &&bytes, + uint64_t timestamp); std::shared_ptr encoder = nullptr; uint64_t encode_sequence_num = 0; }; @@ -123,7 +125,7 @@ struct VideoStream : public MediaStream MediaConfig &config, unsigned char **buffer, unsigned int max_len, - void** to_free) override; + void **to_free) override; private: PacketPointer encode_h264(uint8_t *buffer, @@ -134,8 +136,8 @@ private: std::unique_ptr encoder = nullptr; uint64_t encode_sequence_num = 0; std::atomic is_decoder_initialized = false; - std::uint64_t group_id {0}; - std::uint64_t object_id {0}; + std::uint64_t group_id{0}; + std::uint64_t object_id{0}; bool got_first_idr = false; }; diff --git a/src/media_transport.cc b/src/media_transport.cc index 21bf666..c275f9d 100644 --- a/src/media_transport.cc +++ b/src/media_transport.cc @@ -7,8 +7,9 @@ namespace qmedia /// Delegate /// -Delegate::Delegate(TransportMessageHandler* handler) -: message_handler(handler){} +Delegate::Delegate(TransportMessageHandler *handler) : message_handler(handler) +{ +} void Delegate::set_logger(LoggerPointer logger_in) { @@ -20,12 +21,14 @@ void Delegate::on_data_arrived(const std::string &name, std::uint64_t group_id, std::uint64_t object_id) { - if(message_handler) { - if(data.size() > 100) + if (message_handler) + { + if (data.size() > 100) { log(quicr::LogLevel::debug, "on_data_arrived: name " + name); } - message_handler->handle(TransportMessageInfo{name, group_id, object_id, data}); + message_handler->handle( + TransportMessageInfo{name, group_id, object_id, data}); } } @@ -60,10 +63,9 @@ void Delegate::log(quicr::LogLevel /*level*/, const std::string &message) /// QuicRMediaTransport::QuicRMediaTransport(const std::string &server_ip, - const uint16_t port, - LoggerPointer logger_in) : - delegate(this), - qr_client(delegate, server_ip, port), logger(logger_in) + const uint16_t port, + LoggerPointer logger_in) : + delegate(this), qr_client(delegate, server_ip, port), logger(logger_in) { while (!qr_client.is_transport_ready()) { @@ -74,7 +76,7 @@ QuicRMediaTransport::QuicRMediaTransport(const std::string &server_ip, } void QuicRMediaTransport::register_stream(uint64_t id, - MediaConfig::MediaDirection direction) + MediaConfig::MediaDirection direction) { logger->info << "[MediaTransport]: register_stream " << id << std::flush; auto qname = quicr::QuicrName{std::to_string(id), 0}; @@ -84,18 +86,22 @@ void QuicRMediaTransport::register_stream(uint64_t id, } else if (direction == MediaConfig::MediaDirection::recvonly) { - auto intent = quicr::SubscribeIntent{quicr::SubscribeIntent::Mode::immediate, 0, 0}; + auto intent = quicr::SubscribeIntent{ + quicr::SubscribeIntent::Mode::immediate, 0, 0}; qr_client.subscribe({qname}, intent, false, true); } else { qr_client.register_names({qname}, false); - auto intent = quicr::SubscribeIntent{quicr::SubscribeIntent::Mode::immediate, 0, 0}; + auto intent = quicr::SubscribeIntent{ + quicr::SubscribeIntent::Mode::immediate, 0, 0}; qr_client.subscribe({qname}, intent, false, true); } } -void QuicRMediaTransport::unregister_stream(uint64_t id, MediaConfig::MediaDirection direction) +void QuicRMediaTransport::unregister_stream( + uint64_t id, + MediaConfig::MediaDirection direction) { logger->info << "[MediaTransport]: unregister_stream " << id << std::flush; auto qname = quicr::QuicrName{std::to_string(id), 0}; @@ -114,27 +120,28 @@ void QuicRMediaTransport::unregister_stream(uint64_t id, MediaConfig::MediaDirec } } -void QuicRMediaTransport::send_data(uint64_t id, quicr::bytes &&data, - uint64_t group_id, uint64_t object_id) +void QuicRMediaTransport::send_data(uint64_t id, + quicr::bytes &&data, + uint64_t group_id, + uint64_t object_id) { auto qname = quicr::QuicrName{std::to_string(id), 0}; - int* a = nullptr; + int *a = nullptr; *a++; - qr_client.publish_named_data(qname.name, std::move(data), group_id, object_id, 0, 0); + qr_client.publish_named_data( + qname.name, std::move(data), group_id, object_id, 0, 0); } void QuicRMediaTransport::wait_for_messages() { std::unique_lock ulock(recv_queue_mutex); - recv_cv.wait(ulock, [&]() -> bool { - return (shutdown || !receive_queue.empty()); - }); + recv_cv.wait( + ulock, [&]() -> bool { return (shutdown || !receive_queue.empty()); }); ulock.unlock(); } TransportMessageInfo QuicRMediaTransport::recv() { - TransportMessageInfo info; { std::lock_guard lock(recv_queue_mutex); @@ -145,10 +152,12 @@ TransportMessageInfo QuicRMediaTransport::recv() } } - if(info.data.size() > 200) { - logger->debug << "[QuicRMediaTransport:recv]: Got a message off the queue " - << info.name << ", size:" << info.data.size() << ", q-size: " - << receive_queue.size() << std::flush; + if (info.data.size() > 200) + { + logger->debug << "[QuicRMediaTransport:recv]: Got a message off the " + "queue " + << info.name << ", size:" << info.data.size() + << ", q-size: " << receive_queue.size() << std::flush; } return info; } diff --git a/src/media_transport.hh b/src/media_transport.hh index 98dda56..f965187 100644 --- a/src/media_transport.hh +++ b/src/media_transport.hh @@ -16,15 +16,15 @@ struct TransportMessageInfo quicr::bytes data; }; -struct TransportMessageHandler { +struct TransportMessageHandler +{ virtual ~TransportMessageHandler() = default; - virtual void handle(TransportMessageInfo&& info) = 0; + virtual void handle(TransportMessageInfo &&info) = 0; }; - struct Delegate : public quicr::QuicRClient::Delegate { - Delegate(TransportMessageHandler* handler); + Delegate(TransportMessageHandler *handler); virtual void on_data_arrived(const std::string &name, quicr::bytes &&data, @@ -37,19 +37,24 @@ struct Delegate : public quicr::QuicRClient::Delegate virtual void log(quicr::LogLevel level, const std::string &message) override; - void set_logger(LoggerPointer logger_in); private: LoggerPointer logger; - TransportMessageHandler* message_handler; + TransportMessageHandler *message_handler; }; -struct MediaTransport : TransportMessageHandler { +struct MediaTransport : TransportMessageHandler +{ virtual ~MediaTransport() = default; - virtual void register_stream(uint64_t id, MediaConfig::MediaDirection direction) = 0; - virtual void unregister_stream(uint64_t id, MediaConfig::MediaDirection direction) = 0; - virtual void send_data(uint64_t id, quicr::bytes &&data, uint64_t group_id, uint64_t object_id) = 0; + virtual void register_stream(uint64_t id, + MediaConfig::MediaDirection direction) = 0; + virtual void unregister_stream(uint64_t id, + MediaConfig::MediaDirection direction) = 0; + virtual void send_data(uint64_t id, + quicr::bytes &&data, + uint64_t group_id, + uint64_t object_id) = 0; virtual void wait_for_messages() = 0; virtual TransportMessageInfo recv() = 0; }; @@ -58,21 +63,27 @@ struct MediaTransport : TransportMessageHandler { struct QuicRMediaTransport : public MediaTransport { explicit QuicRMediaTransport(const std::string &server_ip, - const uint16_t port, - LoggerPointer logger_in); + const uint16_t port, + LoggerPointer logger_in); ~QuicRMediaTransport() = default; - virtual void register_stream(uint64_t id, MediaConfig::MediaDirection direction) override; + virtual void register_stream(uint64_t id, + MediaConfig::MediaDirection direction) override; - virtual void unregister_stream(uint64_t id, MediaConfig::MediaDirection direction) override; + virtual void + unregister_stream(uint64_t id, + MediaConfig::MediaDirection direction) override; - virtual void send_data(uint64_t id, quicr::bytes &&data, uint64_t group_id, uint64_t object_id) override; + virtual void send_data(uint64_t id, + quicr::bytes &&data, + uint64_t group_id, + uint64_t object_id) override; virtual void wait_for_messages() override; virtual TransportMessageInfo recv() override; - virtual void handle(TransportMessageInfo&& info) override; + virtual void handle(TransportMessageInfo &&info) override; private: Delegate delegate; diff --git a/src/packet.cc b/src/packet.cc index 8e10f30..1ca097d 100644 --- a/src/packet.cc +++ b/src/packet.cc @@ -7,7 +7,7 @@ #include "media.pb.h" #include -//#include +// #include namespace qmedia { diff --git a/src/playout_leakybucket.cc b/src/playout_leakybucket.cc index 326a2ae..93a5560 100644 --- a/src/playout_leakybucket.cc +++ b/src/playout_leakybucket.cc @@ -13,7 +13,8 @@ unsigned int LeakyBucket::getRecommendedFillLevel(unsigned int audio_jitter_ms) unsigned int new_target = target_fill_level; unsigned int min_jitter_ms = audio_jitter_ms; - if (min_jitter_ms > new_target) { + if (min_jitter_ms > new_target) + { new_target = min_jitter_ms; } @@ -83,7 +84,8 @@ void LeakyBucket::tick(std::chrono::steady_clock::time_point now, unsigned int fps, LoggerPointer logger) { - if (initialFill(queue_depth, audio_jitter_ms)) { + if (initialFill(queue_depth, audio_jitter_ms)) + { logger->debug << "tick initial fill true" << std::flush; return; } @@ -103,7 +105,7 @@ void LeakyBucket::tick(std::chrono::steady_clock::time_point now, new_target_fill_ms = max_bucket_size; int current_fill_level_ms = queue_depth * ms_per_audio; - //int current_fill_level_ms = queue_depth; + // int current_fill_level_ms = queue_depth; fill_change = current_fill_level_ms - new_target_fill_ms; @@ -114,25 +116,28 @@ void LeakyBucket::tick(std::chrono::steady_clock::time_point now, else current_drain = normal; - logger->debug <<"tick: fill levels: minimum_fill_level_ms=" << minimum_fill_level_ms - << ",target_fill_level=" << target_fill_level - << ", max_bucket_size=" << max_bucket_size - << ", current_fill_level=" << current_fill_level_ms - << ", fill_change=" << fill_change - << ", current_drain=" << current_drain - << std::flush; - + logger->debug + << "tick: fill levels: minimum_fill_level_ms=" << minimum_fill_level_ms + << ",target_fill_level=" << target_fill_level + << ", max_bucket_size=" << max_bucket_size + << ", current_fill_level=" << current_fill_level_ms + << ", fill_change=" << fill_change + << ", current_drain=" << current_drain << std::flush; } -bool LeakyBucket::initialFill(unsigned int ms_in_queue, unsigned int jitter_ms, LoggerPointer logger) +bool LeakyBucket::initialFill(unsigned int ms_in_queue, + unsigned int jitter_ms, + LoggerPointer logger) { if (initial_fill) { // jitter vs target vs max unsigned int start_ms = getRecommendedFillLevel(jitter_ms); - if(logger) { - logger->debug << "[initialFill] ms_in_queue " << ms_in_queue << ",start_ms " << start_ms << std::flush; + if (logger) + { + logger->debug << "[initialFill] ms_in_queue " << ms_in_queue + << ",start_ms " << start_ms << std::flush; } if (ms_in_queue >= start_ms) { diff --git a/src/playout_leakybucket.hh b/src/playout_leakybucket.hh index 6cd0b01..afd4c93 100644 --- a/src/playout_leakybucket.hh +++ b/src/playout_leakybucket.hh @@ -50,7 +50,9 @@ public: unsigned int fps, LoggerPointer logger = nullptr); void adjustQueueDepthTrackerDiscardedPackets(int num); - bool initialFill(unsigned int ms_in_queue, unsigned int jitter_ms, LoggerPointer logger=nullptr); + bool initialFill(unsigned int ms_in_queue, + unsigned int jitter_ms, + LoggerPointer logger = nullptr); double getSrcRatio(); private: diff --git a/src/playout_sync.cc b/src/playout_sync.cc index 05121dd..06cb374 100644 --- a/src/playout_sync.cc +++ b/src/playout_sync.cc @@ -89,7 +89,8 @@ Sync::sync_action Sync::getVideoAction(unsigned int /*audio_pop_delay*/, else { // if we find out-of-order deeper in the queue - wait - if (action == pop) { + if (action == pop) + { break; } diff --git a/src/playout_tools.cc b/src/playout_tools.cc index 8126940..b4fa9ac 100644 --- a/src/playout_tools.cc +++ b/src/playout_tools.cc @@ -21,7 +21,8 @@ void PopFrequencyCounter::updatePopFrequency( if ((interval_sum + delta) > measure_interval_ms) { - if(pop_time.empty()) { + if (pop_time.empty()) + { return; } interval_sum -= pop_time.front(); diff --git a/src/video_stream.cc b/src/video_stream.cc index d82efbf..57a858f 100644 --- a/src/video_stream.cc +++ b/src/video_stream.cc @@ -46,15 +46,16 @@ void VideoStream::configure() assert("Invalid media direction"); } - auto jitter = JitterFactory::GetJitter(logger, client_id); + auto jitter = JitterFactory::GetJitter(logger, client_id); if (jitter == nullptr) { - logger->error << "[VideoStream::configure]: jitter is null" << std::flush; + logger->error << "[VideoStream::configure]: jitter is null" + << std::flush; } jitter->set_video_params(config.video_max_width, config.video_max_height, - (uint32_t ) config.video_decode_pixel_format); + (uint32_t) config.video_decode_pixel_format); } MediaStreamId VideoStream::id() @@ -106,15 +107,21 @@ void VideoStream::handle_media(MediaConfig::CodecType codec_type, return; } - if (encoded->is_intra_frame) { - if (got_first_idr) { + if (encoded->is_intra_frame) + { + if (got_first_idr) + { group_id += 1; object_id = 0; - logger->info << "[VideoStream:handle_media]: New group started:" + logger->info << "[VideoStream:handle_media]: New group " + "started:" << group_id << std::flush; - } else { + } + else + { got_first_idr = true; - logger->info << "[VideoStream:handle_media]: First group started:" + logger->info << "[VideoStream:handle_media]: First " + "group started:" << group_id << std::flush; } } @@ -128,7 +135,8 @@ void VideoStream::handle_media(MediaConfig::CodecType codec_type, } break; case MediaConfig::CodecType::raw: - {} + { + } break; default: assert("Incorrect codec type"); @@ -139,7 +147,7 @@ size_t VideoStream::get_media(uint64_t ×tamp, MediaConfig &config_in, unsigned char **buffer, unsigned int /*max_len*/, - void** /*to_free*/) + void ** /*to_free*/) { size_t recv_length = 0; @@ -159,8 +167,10 @@ size_t VideoStream::get_media(uint64_t ×tamp, timestamp, buffer); - config_in.video_decode_pixel_format = (VideoConfig::PixelFormat) pixel_format; - if(buffer == nullptr) { + config_in.video_decode_pixel_format = (VideoConfig::PixelFormat) + pixel_format; + if (buffer == nullptr) + { assert(0); } @@ -202,7 +212,8 @@ PacketPointer VideoStream::encode_h264(uint8_t *buffer, if (encoded_frame_type == VideoEncoder::EncodedFrameType::Skip || encoded_frame_type == VideoEncoder::EncodedFrameType::Invalid) { - logger->info << "[VideoStream::encode_h264] Encoded Frame Type ignored due " + logger->info << "[VideoStream::encode_h264] Encoded Frame Type ignored " + "due " << (int) encoded_frame_type << std::flush; return nullptr; } From 07c47ae27e8882725b4938dc7bbe8cbd3e7b8db1 Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Sun, 9 Oct 2022 17:03:42 -0700 Subject: [PATCH 2/3] wire in measurements and metrics loop --- CMakeLists.txt | 2 +- lib/metrics/include/metrics/measurements.hh | 10 +- lib/metrics/include/metrics/metrics.hh | 12 +- lib/metrics/src/measurements.cc | 128 +++++++------------- lib/metrics/src/metrics.cc | 120 +++++++++++++++++- 5 files changed, 174 insertions(+), 98 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 77f8b02..7a46a56 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -111,7 +111,7 @@ endif() ### ### Library Source ### -# add_subdirectory(lib) +add_subdirectory(lib) add_subdirectory(src) diff --git a/lib/metrics/include/metrics/measurements.hh b/lib/metrics/include/metrics/measurements.hh index 08b928e..3e95ba0 100644 --- a/lib/metrics/include/metrics/measurements.hh +++ b/lib/metrics/include/metrics/measurements.hh @@ -12,14 +12,14 @@ enum struct MeasurementType FrameRate_Tx, // frame count before transmit FrameRate_Rx, // frame count from transport after re-assembly EncodeTime, // time elapsed for encoding a raw sample (ms) - EndToEndLatency, // tx to rx latency (ms) + EndToEndFrameDelay, // tx to rx latency (ms) }; const auto measurement_names = std::map{ {MeasurementType::FrameRate_Tx, "TxFrameCount"}, {MeasurementType::FrameRate_Rx, "RxFrameCount"}, {MeasurementType::EncodeTime, "EncodeTimeInMs"}, - {MeasurementType::EndToEndLatency, "EndToEndLatencyInMs"}, + {MeasurementType::EndToEndFrameDelay, "EndToEndFrameDelayInMs"}, }; /// @@ -29,7 +29,7 @@ const auto measurement_names = std::map{ struct Measurement { virtual ~Measurement() = default; - virtual std::string toString() = 0; + virtual std::string serialize() = 0; }; @@ -45,11 +45,11 @@ class InfluxMeasurement : public Measurement public: static std::unique_ptr create(std::string name, Tags tags); - InfluxMeasurement(std::string &name, Tags &tags); + InfluxMeasurement(std::string &name_in, Tags &tags_in); ~InfluxMeasurement() = default; // Measurement - to line protocol - std::string toString() override; + std::string serialize() override; struct TimeEntry { diff --git a/lib/metrics/include/metrics/metrics.hh b/lib/metrics/include/metrics/metrics.hh index 4a3ab08..eeb2437 100644 --- a/lib/metrics/include/metrics/metrics.hh +++ b/lib/metrics/include/metrics/metrics.hh @@ -27,15 +27,17 @@ public: // influxdb factory static std::shared_ptr create(const InfluxConfig& config); - void pusher(); - void push(); // push(std::string & name) - specific push + void add_measurement(const std::string& name, std::shared_ptr measurement); - // created via factory Metrics(CURL* handle); - ~Metrics() = default; + ~Metrics(); +private: + + void emitMetrics(); + void sendMetrics(const std::vector& collected_metrics); + void push_loop(); bool shutdown = false; - bool push_signals = false; std::mutex metrics_mutex; std::condition_variable cv; std::mutex push_mutex; diff --git a/lib/metrics/src/measurements.cc b/lib/metrics/src/measurements.cc index 5890731..c52ec89 100644 --- a/lib/metrics/src/measurements.cc +++ b/lib/metrics/src/measurements.cc @@ -14,22 +14,12 @@ InfluxMeasurement::InfluxMeasurement(std::string &name_in, Tags &tags_in) : name(name_in), tags(tags_in) {} -void InfluxMeasurement::set(std::chrono::system_clock::time_point now, Field field) -{ - Fields fields; - fields.emplace_back(field); - set(now, fields); -} void InfluxMeasurement::set_time_entry(std::chrono::system_clock::time_point now, TimeEntry &&entry) { - long long time = std::chrono::duration_cast( - now.time_since_epoch()) - .count(); - TimeSeriesEntry tse; - tse.first = time; - tse.second = entry; + long long time = std::chrono::duration_cast(now.time_since_epoch()).count(); + TimeSeriesEntry tse {time, entry}; { std::lock_guard lock(series_lock); series.emplace_back(tse); @@ -40,82 +30,14 @@ void InfluxMeasurement::set_time_entry(std::chrono::system_clock::time_point now void InfluxMeasurement::set(std::chrono::system_clock::time_point now, Fields fields) { - long long time = std::chrono::duration_cast( - now.time_since_epoch()) - .count(); - TimePoint entry { time, fields}; + long long time = std::chrono::duration_cast(now.time_since_epoch()).count(); + TimePoint entry {time, fields}; { std::lock_guard lock(series_lock); time_points.emplace_back(entry); } } -std::list InfluxMeasurement::lineProtocol() -{ - - std::list lines; - std::string name_tags = lineProtocol_nameAndTags(tags); - - { - std::lock_guard lock(series_lock); - - if (name_tags.empty() && series.empty()) { - return lines; - } - - for (auto entry : time_points) - { - std::string line = name_tags; - line += lineProtocol_fields(entry.second); - line += " "; - line += std::to_string(entry.first); // time - line += "\n"; - lines.emplace_back(line); - } - - time_points.clear(); - - // add series generated via TimeSeriesEntries - std::for_each( - series.begin(), - series.end(), - [&lines, this](auto &entry) - { - // gen tags - std::string line = lineProtocol_nameAndTags(entry.second.tags); - if (line.empty()) { - return; - } - - line += lineProtocol_fields(entry.second.fields); - line += " "; - line += std::to_string(entry.first); // time - line += "\n"; - lines.emplace_back(line); - }); - - series.clear(); - - } - return lines; -} - -std::string InfluxMeasurement::toString() { - - auto lines = lineProtocol(); - - // join all the lines collected so far - std::string points; - for (const auto &point : lines) { - points += point; - } - - return points; -} - -/// -/// Private -/// std::string InfluxMeasurement::lineProtocol_nameAndTags() { @@ -124,11 +46,10 @@ std::string InfluxMeasurement::lineProtocol_nameAndTags() std::string InfluxMeasurement::lineProtocol_nameAndTags(Tags &tag_set) { - if (name.empty()) { - return ""; - } + if (name.empty()) return ""; std::string m = name; + if (!tag_set.empty()) { for (const auto &tag : tag_set) @@ -139,7 +60,6 @@ std::string InfluxMeasurement::lineProtocol_nameAndTags(Tags &tag_set) m += std::to_string(tag.second); } } - return m; } @@ -162,4 +82,40 @@ std::string InfluxMeasurement::lineProtocol_fields(Fields &fields) return line; } +std::list InfluxMeasurement::lineProtocol() +{ + std::list lines; + std::string name_tags = lineProtocol_nameAndTags(tags); + + { + std::lock_guard lock(series_lock); + if (name_tags.empty() && series.empty()) return lines; + + // add series generated via TimeSeriesEntries + std::for_each(series.begin(), series.end(), [&lines, this](auto &entry) + { + // gen tags + std::string line = lineProtocol_nameAndTags(entry.second.tags); + if (line.empty()) return; + line += lineProtocol_fields(entry.second.fields); + line += " "; + line += std::to_string(entry.first); // time + line += "\n"; + lines.emplace_back(line); + }); + series.clear(); + } // lock guard + + return lines; +} + +std::string InfluxMeasurement::serialize() { + auto entries = lineProtocol(); + std::string serialized = ""; + for(const auto& entry: entries) { + serialized += entry; + } + return serialized; +} + } diff --git a/lib/metrics/src/metrics.cc b/lib/metrics/src/metrics.cc index e7cbb4b..d635b0a 100644 --- a/lib/metrics/src/metrics.cc +++ b/lib/metrics/src/metrics.cc @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -50,10 +51,127 @@ std::shared_ptr Metrics::create(const InfluxConfig& config) Metrics::Metrics(CURL* handle_in) { handle = handle_in; - metrics_thread = std::thread(&Metrics::pusher, this); + metrics_thread = std::thread(&Metrics::push_loop, this); metrics_thread.detach(); } +Metrics::~Metrics() +{ + shutdown = true; + if (metrics_thread.joinable()) { + metrics_thread.join(); + } + + curl_global_cleanup(); +} + + +void Metrics::add_measurement(const std::string& name, std::shared_ptr measurement) { + measurements.insert(std::pair>(name, measurement)); +} + +void Metrics::sendMetrics(const std::vector& collected_metrics) +{ + CURLcode res; // curl response + std::string payload; // accumulated statements + long response_code; // http response code + + //std::cerr << "SendMetrics: count:" << collected_metrics.size() << std::endl; + + // Iterate over the vector of strings + for (auto &statement : collected_metrics) + { + // Concatenate this string to the payload + payload += statement; + } + + // std::clog << "Points\n" << influx_payload << std::endl; + // set the payload, which is a collection of influx statements + curl_easy_setopt( + handle, CURLOPT_POSTFIELDSIZE_LARGE, payload.size()); + curl_easy_setopt(handle, CURLOPT_POSTFIELDS, payload.c_str()); + + // perform the request + res = curl_easy_perform(handle); + + if (res != CURLE_OK) { + std::clog << "Unable to post metrics:" + << curl_easy_strerror(res) << std::endl; + return; + } + + curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code); + + std::cerr << "Metrics write result: " << response_code << std::endl; + + if (response_code < 200 || response_code >= 300) + { + std::cerr << "Http error posting to influx: " << response_code + << std::endl; + return; + } +} + + +void Metrics::emitMetrics() +{ + auto collected_metrics = std::vector{}; + + // Lock the mutex while collecting metrics data + std::unique_lock lock(metrics_mutex); + + for (const auto &[type, measurement] : measurements) + { + auto points = measurement->serialize(); + + if (points.empty()) + { + continue; + } + + collected_metrics.emplace_back(points); + } + + // release the lock + lock.unlock(); + + sendMetrics(collected_metrics); +} + +void Metrics::push_loop() +{ + constexpr auto period = 1000; + std::chrono::time_point next_time; + + // Lock the mutex before starting work + std::unique_lock lock(metrics_mutex); + + // Initialize then metrics emitter time + next_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(period); + + while (!shutdown) + { + // Wait until alerted + cv.wait_until(lock, next_time, [&]() { return shutdown; }); + + // Were we told to terminate? + if (shutdown) { + std::clog << "Metrics is shutdown " << std::flush; + break; + } + + // unlock the mutex + lock.unlock(); + + emitMetrics(); + + // Re-lock the mutex + lock.lock(); + + next_time = std::chrono::steady_clock::now() + + std::chrono::milliseconds(period); + } +} } From cd9f5da01a61f310c7ebb522d6cd25b40590b3f6 Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Thu, 27 Oct 2022 21:14:34 -0700 Subject: [PATCH 3/3] fix compiler issues --- cmd/CMakeLists.txt | 2 +- lib/metrics/CMakeLists.txt | 17 ++-- lib/metrics/include/metrics/measurements.hh | 2 +- lib/metrics/include/metrics/metrics.hh | 20 ++++- lib/metrics/src/measurements.cc | 4 +- lib/metrics/src/metrics.cc | 88 ++++++++++++--------- src/CMakeLists.txt | 2 + src/h264_encoder.cc | 26 +++++- src/h264_encoder.hh | 6 +- src/media_stream.hh | 8 +- src/metrics_helper.cc | 3 + src/metrics_helper.hh | 13 +++ src/video_stream.cc | 4 +- 13 files changed, 135 insertions(+), 60 deletions(-) create mode 100644 src/metrics_helper.cc create mode 100644 src/metrics_helper.hh diff --git a/cmd/CMakeLists.txt b/cmd/CMakeLists.txt index b9178b9..764cea5 100644 --- a/cmd/CMakeLists.txt +++ b/cmd/CMakeLists.txt @@ -5,7 +5,7 @@ find_package(portaudio QUIET) if(portaudio_FOUND) add_executable(sound sound.cc) - target_link_libraries(sound PUBLIC qmedia) + target_link_libraries(sound PUBLIC metrics qmedia) if (WIN32) target_link_libraries(sound PUBLIC portaudio) diff --git a/lib/metrics/CMakeLists.txt b/lib/metrics/CMakeLists.txt index a275aa4..b2f117a 100644 --- a/lib/metrics/CMakeLists.txt +++ b/lib/metrics/CMakeLists.txt @@ -1,3 +1,5 @@ +set(CURRENT_LIB_NAME metrics) + ### ### Dependencies ### @@ -10,16 +12,17 @@ find_package(OpenSSL 1.1 REQUIRED) # curl for influx find_package(CURL REQUIRED) -add_library(metrics - src/measurements.cc - src/metrics.cc) -target_link_libraries(metrics PRIVATE ${CURL_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto) -target_include_directories(metrics PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_compile_options(metrics +file(GLOB_RECURSE LIB_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/include/*.hh") +file(GLOB_RECURSE LIB_SOURCES CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc") + +add_library(${CURRENT_LIB_NAME} ${LIB_HEADERS} ${LIB_SOURCES}) +target_link_libraries(${CURRENT_LIB_NAME} PUBLIC ${CURL_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto) +target_include_directories(${CURRENT_LIB_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) +target_compile_options(${CURRENT_LIB_NAME} PRIVATE $<$,$,$>: -Wpedantic -Wextra -Wall> $<$: >) -set_target_properties(metrics +set_target_properties(${CURRENT_LIB_NAME} PROPERTIES CXX_STANDARD 17 CXX_STANDARD_REQUIRED YES diff --git a/lib/metrics/include/metrics/measurements.hh b/lib/metrics/include/metrics/measurements.hh index 3e95ba0..c1484b0 100644 --- a/lib/metrics/include/metrics/measurements.hh +++ b/lib/metrics/include/metrics/measurements.hh @@ -43,7 +43,7 @@ using TimePoint = std::pair; class InfluxMeasurement : public Measurement { public: - static std::unique_ptr create(std::string name, Tags tags); + static std::shared_ptr create(std::string name, Tags tags); InfluxMeasurement(std::string &name_in, Tags &tags_in); ~InfluxMeasurement() = default; diff --git a/lib/metrics/include/metrics/metrics.hh b/lib/metrics/include/metrics/metrics.hh index eeb2437..e2f7d16 100644 --- a/lib/metrics/include/metrics/metrics.hh +++ b/lib/metrics/include/metrics/metrics.hh @@ -24,11 +24,7 @@ struct InfluxConfig class Metrics { public: - // influxdb factory - static std::shared_ptr create(const InfluxConfig& config); - void add_measurement(const std::string& name, std::shared_ptr measurement); - Metrics(CURL* handle); ~Metrics(); private: @@ -47,4 +43,20 @@ private: CURL *handle; }; +/// +/// Factory +/// + + +enum class MetricProvider { + influx = 0 +}; + +struct MetricsFactory +{ + static std::shared_ptr GetInfluxProvider(); + static std::map> metric_providers; +}; + } + diff --git a/lib/metrics/src/measurements.cc b/lib/metrics/src/measurements.cc index c52ec89..2f8d6db 100644 --- a/lib/metrics/src/measurements.cc +++ b/lib/metrics/src/measurements.cc @@ -4,9 +4,9 @@ namespace metrics { -std::unique_ptr InfluxMeasurement::create(std::string name, Tags tags) +std::shared_ptr InfluxMeasurement::create(std::string name, Tags tags) { - return std::make_unique(name, tags); + return std::make_shared(name, tags); } InfluxMeasurement::InfluxMeasurement(std::string &name_in, Tags &tags_in) : diff --git a/lib/metrics/src/metrics.cc b/lib/metrics/src/metrics.cc index d635b0a..0c3f02c 100644 --- a/lib/metrics/src/metrics.cc +++ b/lib/metrics/src/metrics.cc @@ -12,42 +12,6 @@ namespace metrics { -// factory methods -std::shared_ptr Metrics::create(const InfluxConfig& config) -{ - if (config.url.empty() || config.auth_token.empty()) - { - return nullptr; - } - - // manipulate url properties for use with CURL - std::string adjusted_url = config.url + "/api/v2/write?org=" + config.org + - "&bucket=" + config.bucket + "&precision=ns"; - - std::clog << "influx url:" << adjusted_url << std::endl; - - // initial curl - curl_global_init(CURL_GLOBAL_ALL); - auto handle = curl_easy_init(); - curl_easy_setopt(handle, CURLOPT_POST, 1L); - curl_easy_setopt(handle, CURLOPT_URL, adjusted_url.c_str()); - curl_easy_setopt(handle, CURLOPT_HTTPAUTH, CURLAUTH_ANY); - struct curl_slist *headerlist = nullptr; - std::string token_str = "Authorization: Token " + config.auth_token; - headerlist = curl_slist_append(headerlist, token_str.c_str()); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headerlist); - curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L); - curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L); - // do not allow unlimited redirects - curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 50L); - // enable TCP keep-alive probing - curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, 1L); - // Start the service thread - return std::make_shared(handle); - -} - - Metrics::Metrics(CURL* handle_in) { handle = handle_in; @@ -85,6 +49,9 @@ void Metrics::sendMetrics(const std::vector& collected_metrics) payload += statement; } + std::cerr << "[metrics]: " << payload << std::endl; + + // std::clog << "Points\n" << influx_payload << std::endl; // set the payload, which is a collection of influx statements curl_easy_setopt( @@ -173,5 +140,54 @@ void Metrics::push_loop() } } +/// +/// factory +/// + +static const std::string influx_url = ""; +static const std::string influx_auth_token = ""; +static const std::string influx_org = ""; +static const std::string influx_bucket = ""; + +std::map> MetricsFactory::metric_providers = {}; + +// factory methods +std::shared_ptr MetricsFactory::GetInfluxProvider() +{ + if (metric_providers.count(MetricProvider::influx) > 0) { + return metric_providers[MetricProvider::influx]; + } + + if (influx_url.empty() || influx_auth_token.empty()) + { + return nullptr; + } + + // manipulate url properties for use with CURL + std::string adjusted_url = influx_url + "/api/v2/write?org=" + influx_org + + "&bucket=" + influx_bucket + "&precision=ns"; + + std::clog << "influx url:" << adjusted_url << std::endl; + + // initial curl + curl_global_init(CURL_GLOBAL_ALL); + auto handle = curl_easy_init(); + curl_easy_setopt(handle, CURLOPT_POST, 1L); + curl_easy_setopt(handle, CURLOPT_URL, adjusted_url.c_str()); + curl_easy_setopt(handle, CURLOPT_HTTPAUTH, CURLAUTH_ANY); + struct curl_slist *headerlist = nullptr; + std::string token_str = "Authorization: Token " + influx_auth_token; + headerlist = curl_slist_append(headerlist, token_str.c_str()); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headerlist); + curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L); + // do not allow unlimited redirects + curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 50L); + // enable TCP keep-alive probing + curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, 1L); + metric_providers[MetricProvider::influx] = std::make_shared(handle); + return metric_providers[MetricProvider::influx]; +} + } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 201308e..a462668 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,9 +19,11 @@ add_library(qmedia resampler.cc video_stream.cc) target_link_directories(qmedia PUBLIC ${OpenH264_LIB_DIR}) +add_dependencies(qmedia metrics) target_link_libraries(qmedia PUBLIC ${LIBRARIES} + metrics OpenSSL::Crypto openh264 quicr) diff --git a/src/h264_encoder.cc b/src/h264_encoder.cc index 93bad90..c83cccb 100644 --- a/src/h264_encoder.cc +++ b/src/h264_encoder.cc @@ -7,8 +7,11 @@ #include #include "h264_encoder.hh" +#include "metrics/metrics.hh" +#include "metrics/measurements.hh" using namespace qmedia; +using namespace metrics; static bool debug = false; @@ -46,9 +49,13 @@ H264Encoder::H264Encoder(unsigned int video_max_width, unsigned int video_max_frame_rate, unsigned int video_max_bitrate, std::uint32_t video_pixel_format, - const LoggerPointer &logger_in) + const LoggerPointer &logger_in, + uint64_t client_id_in, + uint64_t stream_id_in) { logger = logger_in; + client_id = client_id_in; + stream_id = stream_id_in; int rv = WelsCreateSVCEncoder(&encoder); assert(rv == cmResultSuccess && encoder); @@ -150,9 +157,7 @@ H264Encoder::encode(const char *input_buffer, } auto now = std::chrono::system_clock::now(); - auto now_ms = std::chrono::duration_cast( - now.time_since_epoch()) - .count(); + auto now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); // nv12 to i420 planar auto *input = reinterpret_cast( @@ -263,6 +268,19 @@ H264Encoder::encode(const char *input_buffer, total_bytes_encoded += output_bitstream.size(); total_frames_encoded++; + auto now_2 = std::chrono::system_clock::now(); + auto now_ms_2 = std::chrono::duration_cast(now_2.time_since_epoch()).count(); + auto diff = now_ms - now_ms; + + logger->info << "h264Encoder: encode delta:" << (now_ms_2 - now_ms) << std::flush; + + auto metrics = MetricsFactory::GetInfluxProvider(); + auto msmt = InfluxMeasurement::create(measurement_names.at(MeasurementType::EncodeTime), {}); + auto entry = InfluxMeasurement::TimeEntry{{{"clientID", client_id},{"sourceID", stream_id}}, + {{"count", 1}, {"duration", diff}}}; + msmt->set_time_entry(now_2, std::move(entry)); + metrics->add_measurement(measurement_names.at(MeasurementType::EncodeTime), msmt); + // success return toEncodedFrameType(encodedFrame.eFrameType); } diff --git a/src/h264_encoder.hh b/src/h264_encoder.hh index 15e4a60..81565b2 100644 --- a/src/h264_encoder.hh +++ b/src/h264_encoder.hh @@ -19,7 +19,9 @@ public: unsigned int video_max_frame_rate, unsigned int video_max_bitrate, std::uint32_t video_pixel_format, - const LoggerPointer &logger); + const LoggerPointer &logger, + uint64_t client_id, + uint64_t stream_id); ~H264Encoder(); @@ -48,5 +50,7 @@ public: SFrameBSInfo encodedFrame; SSourcePicture inputFrame; LoggerPointer logger; + uint64_t client_id {0}; + uint64_t stream_id {0}; }; } // namespace qmedia \ No newline at end of file diff --git a/src/media_stream.hh b/src/media_stream.hh index 7357346..2cfc381 100644 --- a/src/media_stream.hh +++ b/src/media_stream.hh @@ -5,13 +5,14 @@ #include #include +#include + #include "media_transport.hh" #include "audio_encoder.hh" #include "codec.hh" #include "names.hh" #include "jitter.hh" -#include "metrics.hh" namespace qmedia { @@ -31,8 +32,10 @@ public: config(config_in), logger(logger_in) { + metrics = metrics::MetricsFactory::GetInfluxProvider(); } + virtual ~MediaStream() = default; virtual MediaStreamId id() = 0; @@ -61,7 +64,6 @@ public: protected: std::atomic mutedAudioEmptyFrames = false; - uint64_t domain = 0; uint64_t conference_id = 0; uint64_t client_id = 0; @@ -70,7 +72,7 @@ protected: MediaConfig::MediaDirection media_direction; std::shared_ptr media_transport; LoggerPointer logger; - Metrics::MetricsPtr metrics = nullptr; + std::shared_ptr metrics = nullptr; }; struct AudioStream : public MediaStream diff --git a/src/metrics_helper.cc b/src/metrics_helper.cc new file mode 100644 index 0000000..420c2ef --- /dev/null +++ b/src/metrics_helper.cc @@ -0,0 +1,3 @@ +// +// Created by snandaku on 10/10/22. +// diff --git a/src/metrics_helper.hh b/src/metrics_helper.hh new file mode 100644 index 0000000..c37cbff --- /dev/null +++ b/src/metrics_helper.hh @@ -0,0 +1,13 @@ +#pragma once +#include + +struct MetricsHelper { + std::shared_ptr create(); + +private: + MetricsHelper(uint64_t client_id_in, uint64_t source_id_in); + ~MetricsHelper() = default; + + uint64_t client_id; + uint64_t source_id; +}; \ No newline at end of file diff --git a/src/video_stream.cc b/src/video_stream.cc index 57a858f..7edca65 100644 --- a/src/video_stream.cc +++ b/src/video_stream.cc @@ -29,7 +29,9 @@ void VideoStream::configure() config.video_max_frame_rate, config.video_max_bitrate, (uint32_t) config.video_encode_pixel_format, - logger); + logger, + client_id, + id()); if (!encoder) {