Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make metrics its own library #36

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ endif()
###
### Library Source
###
# add_subdirectory(lib)
add_subdirectory(lib)

add_subdirectory(src)

Expand Down
2 changes: 1 addition & 1 deletion cmd/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
find_package(portaudio QUIET)
if(portaudio_FOUND)
add_executable(sound sound.cc)
target_link_libraries(sound PUBLIC qmedia)
target_link_libraries(sound PUBLIC metrics qmedia)

if (WIN32)
target_link_libraries(sound PUBLIC portaudio)
Expand Down
9 changes: 8 additions & 1 deletion cmd/sendVideoFrame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,14 @@ int main(int argc, char **argv)
std::uint32_t height = 0;
std::uint32_t format = dec_format; // I420
auto received_image_size = MediaClient_getVideoFrame(
client, stream_id, &ts, &width, &height, &format, &buffer, nullptr);
client,
stream_id,
&ts,
&width,
&height,
&format,
&buffer,
nullptr);
std::cerr << " r " << ts << " " << received_image_size
<< std::endl;
}
Expand Down
23 changes: 14 additions & 9 deletions cmd/sound.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ void recordThreadFunc(MediaClient *client, MediaStreamId stream_id)
.count();

client->send_audio(stream_id,
reinterpret_cast<uint8_t *>(const_cast<char *>(audioBuff)),
reinterpret_cast<uint8_t *>(
const_cast<char *>(audioBuff)),
buff_size,
timestamp);
logger->debug << "-" << std::flush;
Expand All @@ -113,7 +114,7 @@ void recordThreadFunc(MediaClient *client, MediaStreamId stream_id)
free(zerobuff);
}

void playThreadFunc(MediaClient* client, MediaStreamId stream_id)
void playThreadFunc(MediaClient *client, MediaStreamId stream_id)
{
std::chrono::steady_clock::time_point loop_time =
std::chrono::steady_clock::now();
Expand Down Expand Up @@ -166,7 +167,8 @@ void playThreadFunc(MediaClient* client, MediaStreamId stream_id)

std::chrono::steady_clock::time_point get_audio =
std::chrono::steady_clock::now();
int recv_actual = client->get_audio(stream_id, timestamp, &raw_data, buff_size, nullptr);
int recv_actual = client->get_audio(
stream_id, timestamp, &raw_data, buff_size, nullptr);
auto audio_delta = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - get_audio);
logger->info << "{A:" << audio_delta.count() << "}" << std::flush;
Expand Down Expand Up @@ -271,7 +273,8 @@ int main(int argc, char *argv[])
if (argc < 4)
{
std::cerr << "Must provide mode of operation" << std::endl;
std::cerr << "Usage: sound <remote-address> <port> <mode> <name> <source-id> "
std::cerr << "Usage: sound <remote-address> <port> <mode> <name> "
"<source-id> "
<< std::endl;
std::cerr << "Mode: pub/sub/pubsub" << std::endl;
std::cerr << "" << std::endl;
Expand Down Expand Up @@ -313,9 +316,9 @@ int main(int argc, char *argv[])

// configure new stream callback
auto wrapped_stream_callback = [](uint64_t client_id,
uint64_t source_id,
uint64_t source_ts,
MediaType media_type)
uint64_t source_id,
uint64_t source_ts,
MediaType media_type)
{
logger->info << "[Sound]: New Sorcce" << source_id << std::flush;
recvMedia = true;
Expand Down Expand Up @@ -385,7 +388,7 @@ int main(int argc, char *argv[])
}

std::vector<std::thread> threads;
uint64_t stream_id {0};
uint64_t stream_id{0};
if (mode == "send")
{
MediaConfig config{};
Expand All @@ -408,7 +411,9 @@ int main(int argc, char *argv[])
stream_id = client.add_audio_stream(0x1000, 0x2000, 0x3000, config);
threads.emplace_back(playThreadFunc, &client, stream_id);
threads.at(0).detach();
} else if (mode == "sendrecv") {
}
else if (mode == "sendrecv")
{
MediaConfig config{};
config.media_direction = MediaConfig::MediaDirection::sendrecv;
config.media_codec = MediaConfig::CodecType::opus;
Expand Down
6 changes: 3 additions & 3 deletions include/qmedia/media_client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ enum struct TransportType
enum MediaType
{
invalid = 0,
audio = 1,
video = 2
audio = 1,
video = 2
};

struct AudioConfig
Expand Down Expand Up @@ -147,7 +147,7 @@ public:
unsigned char **buffer,
void **to_free);

void release_media_buffer(void* buffer);
void release_media_buffer(void *buffer);

private:
// Should this be exposed in public header ??
Expand Down
23 changes: 10 additions & 13 deletions lib/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set(CURRENT_LIB_NAME metrics)

###
### Dependencies
###
Expand All @@ -10,22 +12,17 @@ find_package(OpenSSL 1.1 REQUIRED)
# curl for influx
find_package(CURL REQUIRED)

add_library(metrics
src/measurements.cc
src/metrics.cc)
target_link_libraries(metrics
PUBLIC
${CURL_LIBRARIES}
OpenSSL::SSL
OpenSSL::Crypto)
target_include_directories(metrics
PUBLIC
${CMAKE_SOURCE_DIR}/include)
target_compile_options(metrics
file(GLOB_RECURSE LIB_HEADERS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/include/*.hh")
file(GLOB_RECURSE LIB_SOURCES CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc")

add_library(${CURRENT_LIB_NAME} ${LIB_HEADERS} ${LIB_SOURCES})
target_link_libraries(${CURRENT_LIB_NAME} PUBLIC ${CURL_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto)
target_include_directories(${CURRENT_LIB_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_options(${CURRENT_LIB_NAME}
PRIVATE
$<$<OR:$<CXX_COMPILER_ID:Clang>,$<CXX_COMPILER_ID:AppleClang>,$<CXX_COMPILER_ID:GNU>>: -Wpedantic -Wextra -Wall>
$<$<CXX_COMPILER_ID:MSVC>: >)
set_target_properties(metrics
set_target_properties(${CURRENT_LIB_NAME}
PROPERTIES
CXX_STANDARD 17
CXX_STANDARD_REQUIRED YES
Expand Down
30 changes: 0 additions & 30 deletions lib/metrics/include/metrics.hh

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,17 @@ namespace metrics {

enum struct MeasurementType
{
PacketRate_Tx,
PacketRate_Rx,
FrameRate_Tx,
FrameRate_Rx,
QDepth_Tx,
QDepth_Rx,
FrameRate_Tx, // frame count before transmit
FrameRate_Rx, // frame count from transport after re-assembly
EncodeTime, // time elapsed for encoding a raw sample (ms)
EndToEndFrameDelay, // tx to rx latency (ms)
};

const auto measurement_names = std::map<MeasurementType, std::string>{
{MeasurementType::PacketRate_Tx, "TxPacketCount"},
{MeasurementType::PacketRate_Rx, "RxPacketCount"},
{MeasurementType::FrameRate_Tx, "TxFrameCount"},
{MeasurementType::FrameRate_Rx, "RxFrameCount"},
{MeasurementType::QDepth_Tx, "TxQueueDepth"},
{MeasurementType::QDepth_Rx, "RxQueueDepth"},
{MeasurementType::EncodeTime, "EncodeTimeInMs"},
{MeasurementType::EndToEndFrameDelay, "EndToEndFrameDelayInMs"},
};

///
Expand All @@ -32,12 +28,10 @@ const auto measurement_names = std::map<MeasurementType, std::string>{
///

struct Measurement {
virtual std::string toString() = 0;
virtual ~Measurement() = default;
virtual std::string serialize() = 0;
};

/// Influx Measurement and Helpers
///

// handy defines
using Field = std::pair<std::string, uint64_t>;
Expand All @@ -46,23 +40,23 @@ using Tag = std::pair<std::string, uint64_t>;
using Tags = std::list<Tag>;
using TimePoint = std::pair<long long, Fields>;

class InfluxMeasurement : public Measurement
class InfluxMeasurement : public Measurement
{
public:
static std::shared_ptr<InfluxMeasurement> create(std::string name, Tags tags);

static std::unique_ptr<InfluxMeasurement> createMeasurement(std::string name, Tags tags);

InfluxMeasurement(std::string &name, Tags &tags);
InfluxMeasurement(std::string &name_in, Tags &tags_in);
~InfluxMeasurement() = default;

// Measurement - to line protocol
std::string toString() override;
std::string serialize() override;

struct TimeEntry
{
Tags tags;
Fields fields;
};

using TimeSeriesEntry = std::pair<long long, TimeEntry>;

// Setters for the measurement
Expand Down
62 changes: 62 additions & 0 deletions lib/metrics/include/metrics/metrics.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#pragma once

#include <curl/curl.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <map>
#include <list>
#include <chrono>

#include "measurements.hh"

namespace metrics
{

struct InfluxConfig
{
static const std::string url;
static const std::string org;
static const std::string bucket;
static const std::string auth_token;
};

class Metrics
{
public:
void add_measurement(const std::string& name, std::shared_ptr<Measurement> measurement);
Metrics(CURL* handle);
~Metrics();
private:

void emitMetrics();
void sendMetrics(const std::vector<std::string>& collected_metrics);
void push_loop();

bool shutdown = false;
std::mutex metrics_mutex;
std::condition_variable cv;
std::mutex push_mutex;
std::thread metrics_thread;
std::map<std::string, std::shared_ptr<Measurement>> measurements;
// make it RAII
CURL *handle;
};

///
/// Factory
///


enum class MetricProvider {
influx = 0
};

struct MetricsFactory
{
static std::shared_ptr<Metrics> GetInfluxProvider();
static std::map<MetricProvider, std::shared_ptr<Metrics>> metric_providers;
};

}

Loading