From 9de03b53ebc5dee13478ea94b19544871673495b Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Mon, 20 Jun 2022 11:55:06 -0700 Subject: [PATCH 1/3] remove udp/quic transports --- include/netTransportQuic.hh | 138 ------ include/netTransportQuicR.hh | 3 +- include/netTransportUDP.hh | 61 --- include/packet.hh | 2 - include/simple_video_assembler.hh | 2 + include/transport.hh | 2 - include/transport_manager.hh | 49 +-- src/neo.cc | 56 +-- src/netTransportQuic.cc | 690 ------------------------------ src/netTransportQuicR.cc | 5 +- src/netTransportUDP.cc | 400 ----------------- src/transport_manager.cc | 26 +- 12 files changed, 25 insertions(+), 1409 deletions(-) delete mode 100644 include/netTransportQuic.hh delete mode 100644 include/netTransportUDP.hh delete mode 100644 src/netTransportQuic.cc delete mode 100644 src/netTransportUDP.cc diff --git a/include/netTransportQuic.hh b/include/netTransportQuic.hh deleted file mode 100644 index cfc3278..0000000 --- a/include/netTransportQuic.hh +++ /dev/null @@ -1,138 +0,0 @@ -#ifdef ENABLE_QUIC -#pragma once - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#if defined(__linux__) || defined(__APPLE__) -#include -#include -#elif defined(_WIN32) -#define NOMINMAX -#include -#include -#endif - -#include "netTransportUDP.hh" -#include "transport.hh" - -#include -#include -#include -#include - -namespace neo_media -{ -class TransportManager; - -class NetTransportQUIC; - -// Context shared with th the underlying quic stack -struct TransportContext -{ - TransportManager *transportManager; - NetTransportQUIC *transport; - bool initialized = false; -}; - -// apln for quic handshake -typedef enum -{ - alpn_undef = 0, - alpn_neo_media -} picoquic_alpn_enum; - -typedef struct st_alpn_list_t -{ - picoquic_alpn_enum alpn_code; - char const *alpn_val; -} alpn_list_t; - -static alpn_list_t alpn_list[] = {{alpn_neo_media, "proto-pq-sample"}}; - -// QUIC transport -class NetTransportQUIC : public NetTransport -{ -public: - // Client - NetTransportQUIC(TransportManager *manager, - std::string sfuName_in, - uint16_t sfuPort_in); - - // Server - NetTransportQUIC(TransportManager *manager, uint16_t sfuPort_in); - virtual ~NetTransportQUIC(); - - virtual bool ready() override; - virtual void close() override; - virtual bool doSends() override; - virtual bool doRecvs() override; - virtual NetTransport::PeerConnectionInfo getConnectionInfo() - { - return PeerConnectionInfo{quic_client_ctx.server_address, - quic_client_ctx.server_address_len, - local_connection_id}; - } - - // callback registered with the quic stack on transport and data states - static int datagram_callback(picoquic_cnx_t *cnx, - uint64_t stream_id, - uint8_t *bytes, - size_t length, - picoquic_call_back_event_t fin_or_event, - void *callback_ctx, - void *v_stream_ctx); - - TransportManager *transportManager; - - // Reports if the underlying quic stack is ready - // for application messages - std::mutex quicConnectionReadyMutex; - bool quicConnectionReady; - - // Thread and its function managing quic context. - std::thread quicTransportThread; - int runQuicProcess(); - static int quicTransportThreadFunc(NetTransportQUIC *netTransportQuic) - { - return netTransportQuic->runQuicProcess(); - } - - // Handy storage for few quic client context - struct QuicClientContext - { - std::string server_name; - uint16_t port; - std::string sni; - struct sockaddr_storage server_address; - socklen_t server_address_len; - }; - -private: - // Kick start Quic's connection context as a client - int quic_start_connection(); - void add_connection(bytes &conn_id, picoquic_cnx_t *conn); - void remove_connection(const bytes &conn_id); - - const bool m_isServer; - const std::string alpn = "proto-pq-sample"; - sockaddr_storage local_address; - uint16_t local_port = 0; - bytes local_connection_id; - QuicClientContext quic_client_ctx; - picoquic_quic_t *quicHandle = nullptr; // ref to quic stack - picoquic_cnx_t *client_cnx; // ref to client connection - NetTransportUDP *udp_socket; // underlying transport socket. - std::map connections; -}; - -} // namespace neo_media - -#endif \ No newline at end of file diff --git a/include/netTransportQuicR.hh b/include/netTransportQuicR.hh index 4dd0413..2d621fa 100644 --- a/include/netTransportQuicR.hh +++ b/include/netTransportQuicR.hh @@ -19,9 +19,10 @@ #include #endif -#include "netTransportUDP.hh" +#include #include "transport.hh" #include "logger.hh" +#include "packet.hh" #include "picoquic.h" #include "picoquic_internal.h" diff --git a/include/netTransportUDP.hh b/include/netTransportUDP.hh deleted file mode 100644 index 955353a..0000000 --- a/include/netTransportUDP.hh +++ /dev/null @@ -1,61 +0,0 @@ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include "transport.hh" -#include "packet.hh" - -namespace neo_media -{ -class TransportManager; -/// -/// TransportManager -/// -class NetTransportUDP : public NetTransport -{ - /* roles and repsobilites of this class : - - encrypts and decrypts things - - sends and revies ack / nacks - - takes cares of congestion controll - - hides encode / decode of packet into protobuf, RTP whatever - */ -public: - NetTransportUDP(TransportManager *, - std::string sfuName_in, - uint16_t sfuPort_in); // create a Client - NetTransportUDP(TransportManager *, uint16_t sfuPort); // create a - // server - virtual ~NetTransportUDP(); - - virtual bool ready(); - virtual void close(); - virtual void shutdown(); - virtual bool doSends(); - virtual bool doRecvs(); - virtual NetTransport::PeerConnectionInfo getConnectionInfo() - { - return PeerConnectionInfo{sfuAddr, sfuAddrLen, {}}; - } - - bool isServer() { return m_isServer; } - - bool read(NetTransport::Data &packet); - bool write(const NetTransport::Data &packet); - - const bool m_isServer; - TransportManager *transportManager; - int fd; - std::string sfuName; - uint16_t sfuPort; - struct sockaddr_storage sfuAddr; - socklen_t sfuAddrLen; -}; - -} // namespace neo_media diff --git a/include/packet.hh b/include/packet.hh index e55c726..4725935 100644 --- a/include/packet.hh +++ b/include/packet.hh @@ -12,9 +12,7 @@ #include #include #endif -#include #include "transport.hh" -using namespace sframe; namespace neo_media { diff --git a/include/simple_video_assembler.hh b/include/simple_video_assembler.hh index 5b395f1..fefa358 100644 --- a/include/simple_video_assembler.hh +++ b/include/simple_video_assembler.hh @@ -1,4 +1,6 @@ #pragma once + +#include #include "frame_assembler.hh" #include "modulus_deque.hh" diff --git a/include/transport.hh b/include/transport.hh index f1ad971..81ac6ee 100644 --- a/include/transport.hh +++ b/include/transport.hh @@ -26,8 +26,6 @@ public: enum Type { - UDP, - PICO_QUIC, QUICR }; diff --git a/include/transport_manager.hh b/include/transport_manager.hh index 7096354..53813e4 100644 --- a/include/transport_manager.hh +++ b/include/transport_manager.hh @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -21,8 +20,6 @@ #include "packet.hh" #include "transport.hh" -#include "netTransportUDP.hh" -#include "netTransportQuic.hh" #include "netTransportQuicR.hh" #include "logger.hh" #include "metrics.hh" @@ -45,47 +42,18 @@ public: const LoggerPointer &logger) { NetTransport *transport = nullptr; - std::string name = ""; switch (type) { - case NetTransport::UDP: - name = "TransportUDP"; - if (sfuName_in.empty()) - { - transport = new NetTransportUDP(transportManager, - sfuPort_in); - } - else - { - transport = new NetTransportUDP( - transportManager, sfuName_in, sfuPort_in); - } - break; - case NetTransport::QUICR: - name = "TransportQuicr"; + case NetTransport::QUICR:{ transport = new NetTransportQUICR( transportManager, sfuName_in, sfuPort_in, logger); - break; - case NetTransport::PICO_QUIC: - name = "TransportQUIC"; -#ifdef ENABLE_QUIC - if (sfuName_in.empty()) - { - // server - transport = NetTransportQUIC(transportManager, sfuPort_in); - } - else - { - // client - transport = NetTransportQUIC( - transportManager, sfuName_in, sfuPort_in); - } -#else - throw std::runtime_error("PICO_QUIC transport support isn't " - "enabled"); -#endif + transport->setLogger("TransportQuicR", logger); + } + break; + default: + throw std::runtime_error("Invalid Transport"); } - transport->setLogger(name, logger); + return transport; } @@ -180,7 +148,6 @@ protected: // Decode bytes to Packet, return nullptr on decode error bool netDecode(const std::string &data_in, Packet *packet_out); - std::unique_ptr mls_context; LoggerPointer logger; bool isLoopback = false; @@ -227,8 +194,6 @@ private: struct sockaddr_in sfuAddr; // struct sockaddr_storage sfuAddr; socklen_t sfuAddrLen; int64_t current_epoch = 0; - sframe::MLSContext::SenderID senderId; - sframe::MLSContext mls_context; LoggerPointer logger; }; diff --git a/src/neo.cc b/src/neo.cc index 6ef3fa1..c71801b 100644 --- a/src/neo.cc +++ b/src/neo.cc @@ -47,59 +47,21 @@ void Neo::init(const std::string &remote_address, transport_type = xport_type; media_dir = dir; - if (transport_type == NetTransport::Type::PICO_QUIC) - { - transport = std::make_unique( - NetTransport::Type::PICO_QUIC, - remote_address, - remote_port, - metrics, - log); - while (!transport->transport_ready()) - { - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - else if (transport_type == NetTransport::Type::QUICR) - { - transport = std::make_unique( - NetTransport::Type::QUICR, - remote_address, - remote_port, - metrics, - log); - transport->start(); - } - else - { - // UDP - transport = std::make_unique( - NetTransport::Type::UDP, remote_address, remote_port, metrics, log); - transport->start(); - } - int64_t epoch_id = 1; - transport->setCryptoKey(epoch_id, bytes(8, uint8_t(epoch_id))); + transport = std::make_unique( + transport_type, + remote_address, + remote_port, + metrics, + log); - // Construct and send a Join Packet - if (transport_type != NetTransport::Type::QUICR) - { - PacketPointer joinPacket = std::make_unique(); - assert(joinPacket); - joinPacket->packetType = neo_media::Packet::Type::Join; - joinPacket->clientID = myClientID; - joinPacket->conferenceID = myConferenceID; - joinPacket->echo = echo; - transport->send(std::move(joinPacket)); - } + transport->start(); - // TODO: add audio_encoder + int64_t epoch_id = 1; + transport->setCryptoKey(epoch_id, bytes(8, uint8_t(epoch_id))); workThread = std::thread(neoWorkThread, this); - // Init video pipeline unless disabled with zero width or height or bad - // format - if (!video_max_width || !video_max_height) { log->error << "video disabled" << std::flush; diff --git a/src/netTransportQuic.cc b/src/netTransportQuic.cc deleted file mode 100644 index 5b80ae9..0000000 --- a/src/netTransportQuic.cc +++ /dev/null @@ -1,690 +0,0 @@ -#ifdef ENABLE_QUIC -#include -#include -#include -#include -#include // memcpy -#include -#include - -#if defined(__linux) || defined(__APPLE__) -#include -#include -#endif -#if defined(__linux__) -#include -#include -#include -#elif defined(__APPLE__) -#include -#elif defined(_WIN32) -#include -#include -#endif - -#include "netTransportQuic.hh" -#include "transport_manager.hh" - -#include -#include -#include -#include -#include -#include - -using namespace neo_media; - -#define SERVER_CERT_FILE "cert.pem" -#define SERVER_KEY_FILE "key.pem" - -/// -// Utility -/// - -static std::string to_hex(const std::vector &data) -{ - std::stringstream hex(std::ios_base::out); - hex.flags(std::ios::hex); - for (const auto &byte : data) - { - hex << std::setw(2) << std::setfill('0') << int(byte); - } - return hex.str(); -} - -static void print_sock_info(const std::string &debug_string, - sockaddr_storage *addr) -{ - char hoststr[NI_MAXHOST]; - char portstr[NI_MAXSERV]; - socklen_t len = sizeof(struct sockaddr_storage); - int rc = getnameinfo((struct sockaddr *) addr, - len, - hoststr, - sizeof(hoststr), - portstr, - sizeof(portstr), - NI_NUMERICHOST | NI_NUMERICSERV); - if (rc != 0) - { - std::cout << "getnameinfo error = " << gai_strerror(rc) << "\n"; - // assert(0); - } - else - { - std::cout << debug_string << " host: " << hoststr - << " port: " << portstr << std::endl; - } -} - -/// -// alpn helpers -/// - -static size_t nb_alpn_list = sizeof(alpn_list) / sizeof(alpn_list_t); - -picoquic_alpn_enum parse_alpn_nz(char const *alpn, size_t len) -{ - picoquic_alpn_enum code = alpn_undef; - - if (alpn != nullptr) - { - for (size_t i = 0; i < nb_alpn_list; i++) - { - if (memcmp(alpn, alpn_list[i].alpn_val, len) == 0 && - alpn_list[i].alpn_val[len] == 0) - { - code = alpn_list[i].alpn_code; - break; - } - } - } - - return code; -} - -/* Callback from the TLS stack upon receiving a list of proposed ALPN in the - * Client Hello */ -size_t select_alpn(picoquic_quic_t *quic, ptls_iovec_t *list, size_t count) -{ - size_t ret = count; - - for (size_t i = 0; i < count; i++) - { - if (parse_alpn_nz((const char *) list[i].base, list[i].len) != - alpn_undef) - { - ret = i; - break; - } - } - - return ret; -} - -// quic stack report reason -int transport_close_reason(picoquic_cnx_t *cnx) -{ - uint64_t last_err = 0; - int ret = 0; - if ((last_err = picoquic_get_local_error(cnx)) != 0) - { - fprintf(stdout, - "Connection end with local error 0x%" PRIx64 ".\n", - last_err); - ret = -1; - } - - if ((last_err = picoquic_get_remote_error(cnx)) != 0) - { - fprintf(stdout, - "Connection end with remote error 0x%" PRIx64 ".\n", - last_err); - ret = -1; - } - - if ((last_err = picoquic_get_application_error(cnx)) != 0) - { - fprintf(stdout, - "Connection end with application error 0x%" PRIx64 ".\n", - last_err); - ret = -1; - } - - return ret; -} - -// Callback from quic stack on transport states and data -int NetTransportQUIC::datagram_callback(picoquic_cnx_t *cnx, - uint64_t stream_id, - uint8_t *bytes_in, - size_t length, - picoquic_call_back_event_t fin_or_event, - void *callback_ctx, - void *v_stream_ctx) -{ - // std::cout << "datagram_callback <<<\n"; - int ret = 0; - auto *ctx = (neo_media::TransportContext *) callback_ctx; - if (!ctx->initialized) - { - picoquic_set_callback(cnx, &NetTransportQUIC::datagram_callback, ctx); - ctx->initialized = true; - } - - ret = 0; - - switch (fin_or_event) - { - case picoquic_callback_stream_data: - case picoquic_callback_stream_fin: - case picoquic_callback_stream_reset: /* Client reset stream #x */ - case picoquic_callback_stop_sending: /* Client asks server to reset - stream #x */ - case picoquic_callback_stream_gap: - case picoquic_callback_prepare_to_send: - std::cout << "Unexpected callback: " - "picoquic_callback_prepare_to_send" - << std::endl; - ret = -1; - break; - case picoquic_callback_stateless_reset: - case picoquic_callback_close: /* Received connection close */ - case picoquic_callback_application_close: - { - auto cnx_id = picoquic_get_client_cnxid(cnx); - auto cnx_id_bytes = bytes(cnx_id.id, cnx_id.id + cnx_id.id_len); - std::cout << to_hex(cnx_id_bytes) - << ":picoquic_callback_application_close: " - << transport_close_reason(cnx) << std::endl; - ctx->transport->remove_connection(cnx_id_bytes); - } - break; - case picoquic_callback_version_negotiation: - break; - case picoquic_callback_almost_ready: - std::cout << "picoquic_callback_almost_ready" << std::endl; - break; - case picoquic_callback_ready: - { - std::cout << " Quic Callback: Transport Ready" << std::endl; - if (ctx->transport) - { - std::lock_guard lock( - ctx->transport->quicConnectionReadyMutex); - - ctx->transport->quicConnectionReady = true; - - // save the connection information - auto cnx_id = picoquic_get_client_cnxid(cnx); - auto cnx_id_bytes = bytes(cnx_id.id, cnx_id.id + cnx_id.id_len); - ctx->transport->add_connection(cnx_id_bytes, cnx); - - if (cnx->client_mode) - { - ctx->transport->local_connection_id = std::move( - cnx_id_bytes); - } - } - } - ret = 0; - break; - case picoquic_callback_datagram: - { - /* Process the datagram - */ - auto data = std::string(bytes_in, bytes_in + length); - // std::cout << "picoquic_callback_datagram " << data.size() - // << " bytes\n"; - struct sockaddr *peer_addr = nullptr; - picoquic_get_peer_addr(cnx, &peer_addr); - NetTransport::PeerConnectionInfo peer_info; - // TODO: support IPV6 - memcpy(&peer_info.addr, - (sockaddr_in *) peer_addr, - sizeof(sockaddr_in)); - peer_info.addrLen = sizeof(struct sockaddr_storage); - auto cnx_id = picoquic_get_client_cnxid(cnx); - auto cnx_id_bytes = bytes(cnx_id.id, cnx_id.id + cnx_id.id_len); - // print_sock_info("dg_callbk:", &peer_info.addr); - peer_info.transport_connection_id = std::move(cnx_id_bytes); - ctx->transportManager->recvDataFromNet(data, std::move(peer_info)); - ret = 0; - break; - } - default: - assert(0); - } - - return ret; -} - -// TODO: can this be made instance member -int picoquic_server_callback(picoquic_cnx_t *cnx, - uint64_t stream_id, - uint8_t *bytes, - size_t length, - picoquic_call_back_event_t fin_or_event, - void *callback_ctx, - void *v_stream_ctx) -{ - return NetTransportQUIC::datagram_callback( - cnx, stream_id, bytes, length, fin_or_event, callback_ctx, v_stream_ctx); -} - -NetTransportQUIC::~NetTransportQUIC() -{ - close(); -} - -void NetTransportQUIC::close() -{ - // TODO: implement graceful shutdown - assert(0); -} - -bool NetTransportQUIC::doRecvs() -{ - return false; -} - -bool NetTransportQUIC::doSends() -{ - return false; -} - -/// -/// Private Implementation -/// - -void NetTransportQUIC::add_connection(bytes &conn_id, picoquic_cnx_t *conn) -{ - if (!connections.count(conn_id)) - { - std::cout << conn << ": Connection Saved " << to_hex(conn_id) - << std::endl; - connections.emplace(conn_id, conn); - } -} - -void NetTransportQUIC::remove_connection(const bytes &conn_id) -{ - const auto &pos = connections.find(conn_id); - if (pos != connections.end()) - { - std::cout << "Connection Removed " << to_hex(conn_id) << std::endl; - connections.erase(pos); - } -} - -int NetTransportQUIC::quic_start_connection() -{ - // create client connection context - std::cout << "starting client connection to " << quic_client_ctx.sni - << std::endl; - client_cnx = picoquic_create_cnx( - quicHandle, - picoquic_null_connection_id, - picoquic_null_connection_id, - (struct sockaddr *) &quic_client_ctx.server_address, - picoquic_get_quic_time(quicHandle), - PICOQUIC_TWENTIETH_INTEROP_VERSION, - quic_client_ctx.sni.data(), - alpn.data(), - 1); - - if (client_cnx == nullptr) - { - return -1; - } - - std::cout << "cnx proposed version " << client_cnx->proposed_version - << std::endl; - - // context to be used on callback - auto *xport_ctx = new TransportContext{}; - xport_ctx->transportManager = transportManager; - xport_ctx->transport = this; - picoquic_set_callback(client_cnx, datagram_callback, (void *) xport_ctx); - - // enable quic datagram mode - client_cnx->local_parameters.max_datagram_frame_size = 1500; - - return picoquic_start_client_cnx(client_cnx); -} - -// Client Transport -NetTransportQUIC::NetTransportQUIC(TransportManager *t, - std::string sfuName, - uint16_t sfuPort) : - transportManager(t), quicConnectionReady(false), m_isServer(false) -{ - std::cout << "Quic Client Transport" << std::endl; - udp_socket = new NetTransportUDP{nullptr, sfuName, sfuPort}; - assert(udp_socket); - - // TODO: remove the duplication - std::string sPort = std::to_string(htons(sfuPort)); - struct addrinfo hints = {}, *address_list = nullptr; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = IPPROTO_UDP; - int err = getaddrinfo( - sfuName.c_str(), sPort.c_str(), &hints, &address_list); - if (err) - { - assert(0); - } - struct addrinfo *item = nullptr, *found_addr = nullptr; - for (item = address_list; item != nullptr; item = item->ai_next) - { - if (item->ai_family == AF_INET && item->ai_socktype == SOCK_DGRAM && - item->ai_protocol == IPPROTO_UDP) - { - found_addr = item; - break; - } - } - - if (found_addr == nullptr) - { - assert(0); - } - - struct sockaddr_in *ipv4_dest = - (struct sockaddr_in *) &quic_client_ctx.server_address; - memcpy(ipv4_dest, found_addr->ai_addr, found_addr->ai_addrlen); - ipv4_dest->sin_port = htons(sfuPort); - quic_client_ctx.server_address_len = sizeof(quic_client_ctx.server_address); - quic_client_ctx.server_name = sfuName; - quic_client_ctx.port = sfuPort; - - // create quic client context - auto ticket_store_filename = "token-store.bin"; - - auto ctx = new TransportContext{}; - ctx->transport = this; - ctx->transportManager = transportManager; - - /* Create QUIC context */ - quicHandle = picoquic_create(1, - NULL, - NULL, - NULL, - alpn.data(), - NULL, - NULL, - NULL, - NULL, - NULL, - picoquic_current_time(), - NULL, - "ticket-store.bin", - NULL, - 0); - - assert(quicHandle != nullptr); - - picoquic_set_default_congestion_algorithm(quicHandle, - picoquic_bbr_algorithm); - - if (picoquic_load_retry_tokens(quicHandle, ticket_store_filename) != 0) - { - fprintf(stderr, - "No token file present. Will create one as <%s>.\n", - ticket_store_filename); - } - - (void) picoquic_set_default_connection_id_length(quicHandle, 4); - - // logging, TODO: configure to on/off - // picoquic_set_textlog(quicHandle, "clientlog.txt"); - picoquic_set_log_level(quicHandle, 2); - - udp_socket = new NetTransportUDP{nullptr, sfuName, sfuPort}; - assert(udp_socket); - - // start the quic thread - quicTransportThread = std::thread(quicTransportThreadFunc, this); -} - -// server -NetTransportQUIC::NetTransportQUIC(TransportManager *t, uint16_t sfuPort) : - transportManager(t), quicConnectionReady(false), m_isServer(true) - -{ - std::cout << "Quic Server Transport" << std::endl; - char default_server_cert_file[512]; - char default_server_key_file[512]; - const char *server_cert_file = nullptr; - const char *server_key_file = nullptr; - - picoquic_get_input_path(default_server_cert_file, - sizeof(default_server_cert_file), - "/tmp", - SERVER_CERT_FILE); - server_cert_file = default_server_cert_file; - - picoquic_get_input_path(default_server_key_file, - sizeof(default_server_key_file), - "/tmp", - SERVER_KEY_FILE); - server_key_file = default_server_key_file; - - auto ctx = new TransportContext{}; - ctx->transport = this; - ctx->transportManager = transportManager; - - quicHandle = picoquic_create(1, - server_cert_file, - server_key_file, - NULL, - alpn.data(), - picoquic_server_callback, - ctx, - NULL, - NULL, - NULL, - picoquic_current_time(), - NULL, - NULL, - NULL, - 0); - - assert(quicHandle != nullptr); - - picoquic_set_alpn_select_fn(quicHandle, select_alpn); - picoquic_set_default_congestion_algorithm(quicHandle, - picoquic_bbr_algorithm); - // logging, TODO: configure to on/off - // picoquic_set_textlog(quicHandle, "serverlog.txt"); - picoquic_set_log_level(quicHandle, 2); - - udp_socket = new NetTransportUDP{nullptr, sfuPort}; - - // kickoff quic process - quicTransportThread = std::thread(quicTransportThreadFunc, this); -} - -bool NetTransportQUIC::ready() -{ - bool ret; - { - std::lock_guard lock(quicConnectionReadyMutex); - ret = quicConnectionReady; - } - if (ret) - { - std::cout << "NetTransportQUIC::ready()" << std::endl; - } - return ret; -} - -// Main quic process thread -// 1. check for incoming packets -// 2. check for outgoing packets -int NetTransportQUIC::runQuicProcess() -{ - // create the quic client connection context - if (!m_isServer) - { - auto ret = quic_start_connection(); - assert(ret == 0); - std::cout << "Quic client connection started ...\n"; - } - - picoquic_quic_t *quic = quicHandle; - int if_index = 0; - picoquic_connection_id_t log_cid; - picoquic_cnx_t *last_cnx = nullptr; - - while (!transportManager->shutDown) - { - Data packet; - // call to next wake delay, pass it to select() - auto got = udp_socket->read(packet); - if (got) - { - if (local_port == 0) - { - if (picoquic_get_local_address(udp_socket->fd, - &local_address) != 0) - { - memset(&local_address, 0, sizeof(struct sockaddr_storage)); - fprintf(stderr, "Could not read local address.\n"); - } - // todo: support AF_INET6 - local_port = ((struct sockaddr_in *) &local_address)->sin_port; - std::cout << "Found local port " << local_port << std::endl; - } - - // std::cout << "Recvd data from net:" << packet.data.size() - // << " bytes\n"; - // let the quic stack know of the incoming packet - uint64_t curr_time = picoquic_get_quic_time(quicHandle); - - // print_sock_info("incoming: peer: ", &packet.peer.addr); - // print_sock_info("incoming: local: ", &local_address); - - int ret = picoquic_incoming_packet( - quic, - reinterpret_cast(packet.data.data()), - packet.data.size(), - (struct sockaddr *) &packet.peer.addr, - (struct sockaddr *) &local_address, - -1, - 0, - curr_time); - assert(ret == 0); - } - - // dequeue from the application - Data send_packet; - std::string data; - NetTransport::PeerConnectionInfo peer_info; - data.resize(1500); - got = transportManager->getDataToSendToNet( - data, &peer_info, &send_packet.peer.addrLen); - uint64_t curr_time_send = picoquic_current_time(); - if (got) - { - picoquic_cnx_t *peer_cnx = nullptr; - if (!peer_info.transport_connection_id.empty()) - { - auto &cnx_id = peer_info.transport_connection_id; - if (!connections.count(cnx_id)) - { - std::cerr << "ConnectionId not found in map " - << to_hex(cnx_id) << std::endl; - if (!m_isServer) - { - // need to know why connection id gets changed, is it - // migration ? - auto *cnx = connections.at(local_connection_id); - remove_connection(local_connection_id); - add_connection(cnx_id, cnx); - } - else - { - assert(0); - } - } // cnxId miss - - peer_cnx = connections.at(cnx_id); - /* - auto peer_cnx_id = picoquic_get_client_cnxid(peer_cnx); - auto cnx_id_bytes = bytes(peer_cnx_id.id, - peer_cnx_id.id + peer_cnx_id.id_len); - std::cout << "peer_cnx: " << peer_cnx - << ", cnxId:" << to_hex(cnx_id) - << ", size: " << data.size() << std::endl; - */ - } - else - { - peer_cnx = connections.at(local_connection_id); - // std::cout << "local:enqueueing datagram: using peer_info cnx: - // cnxId: " << to_hex(local_connection_id) - // << ", size: " << data.size() << std::endl; - } - - int ret = picoquic_queue_datagram_frame( - peer_cnx, - data.size(), - reinterpret_cast(data.data())); - assert(ret == 0); - } - - // verify if there are any packets from the underlying quic context - size_t send_length = 0; - std::string send_buffer; - send_buffer.resize(1500); - struct sockaddr_storage peer_addr; - struct sockaddr_storage local_addr; - - int ret = picoquic_prepare_next_packet( - quicHandle, - curr_time_send, - reinterpret_cast(send_buffer.data()), - send_buffer.size(), - &send_length, - &peer_addr, - &local_addr, - &if_index, - &log_cid, - &last_cnx); - - assert(ret == 0); - - if (send_length > 0) - { - // std::cout << "Sending data returned by quic: " << send_length - // << " bytes\n"; - send_buffer.resize(send_length); - send_packet.data = std::move(send_buffer); - if (!m_isServer) - { - // std::cout << " sending to sfu\n"; - // client sending first packet - send_packet.peer.addrLen = udp_socket->sfuAddrLen; - memcpy(&send_packet.peer.addr, - &udp_socket->sfuAddr, - udp_socket->sfuAddrLen); - } - else - { - // print_sock_info("sending to: ", &send_packet.peer.addr); - send_packet.peer.addrLen = sizeof(peer_addr); - memcpy(&send_packet.peer.addr, &peer_addr, sizeof(peer_addr)); - } - - udp_socket->write(send_packet); - } - } // !transport_shutdown - - std::cout << "DONE" << std::endl; - assert(0); - // return true; -} - -#endif \ No newline at end of file diff --git a/src/netTransportQuicR.cc b/src/netTransportQuicR.cc index 0c4e0fe..76c135d 100644 --- a/src/netTransportQuicR.cc +++ b/src/netTransportQuicR.cc @@ -1,4 +1,3 @@ -#ifdef ENABLE_QUICR #include #include #include @@ -733,6 +732,4 @@ int NetTransportQUICR::runQuicProcess() /* Free the quicrq context */ quicrq_delete(quicr_client_ctx.qr_ctx); return 0; -} - -#endif +} \ No newline at end of file diff --git a/src/netTransportUDP.cc b/src/netTransportUDP.cc deleted file mode 100644 index a594737..0000000 --- a/src/netTransportUDP.cc +++ /dev/null @@ -1,400 +0,0 @@ -#include // memcpy -#include -#include -#include - -#if defined(__linux) || defined(__APPLE__) -#include -#include -#include -#endif -#if defined(__linux__) -#include -#include -#elif defined(__APPLE__) -#include -#elif defined(_WIN32) -#include -#include -#endif - -#include "transport_manager.hh" -#include "netTransportUDP.hh" - -using namespace neo_media; - -NetTransportUDP::~NetTransportUDP() -{ - close(); -} - -bool NetTransportUDP::ready() -{ - return (fd > 0); -} - -void NetTransportUDP::shutdown() -{ -#if defined(_WIN32) - closesocket(fd); -#else - ::shutdown(fd, SHUT_RDWR); -#endif -} - -void NetTransportUDP::close() -{ - if (fd > 0) - { -#if defined(_WIN32) - closesocket(fd); -#else - ::close(fd); -#endif - } - - fd = 0; -} - -bool NetTransportUDP::doSends() -{ - std::string buffer; - buffer.reserve(1500); - struct sockaddr_storage remoteAddr; - socklen_t remoteAddrLen; - PeerConnectionInfo peer_info; - - bool ok = transportManager->getDataToSendToNet( - buffer, &peer_info, &remoteAddrLen); - if ((!ok) || (buffer.size() == 0)) - { - return false; - } - - int numSent = sendto(fd, - buffer.data(), - buffer.size(), - 0 /*flags*/, - (struct sockaddr *) &peer_info.addr, - sizeof(sockaddr_in)); - if (numSent < 0) - { -#if defined(_WIN32) - int error = WSAGetLastError(); - if (error == WSAETIMEDOUT) - { - return false; - } - else - { - std::cerr << "sending on UDP socket got error: " - << WSAGetLastError() << std::endl; - assert(0); - } -#else - // TODO: this drops packet on floor, we need a way to - // requeue/resend - int e = errno; - std::cerr << "sending on UDP socket got error: " << strerror(e) - << std::endl; - assert(0); // TODO -#endif - } - else if (numSent != (int) buffer.size()) - { - assert(0); // TODO - } - - return true; -} - -bool NetTransportUDP::write(const NetTransport::Data &packet) -{ - int numSent = sendto(fd, - packet.data.data(), - packet.data.size(), - 0 /*flags*/, - (struct sockaddr *) &packet.peer.addr, - sizeof(sockaddr_in)); - if (numSent < 0) - { -#if defined(_WIN32) - int error = WSAGetLastError(); - if (error == WSAETIMEDOUT) - { - return false; - } - else - { - std::cerr << "sending on UDP socket got error: " - << WSAGetLastError() << std::endl; - assert(0); - } -#else - // TODO: this drops packet on floor, we need a way to - // requeue/resend - int e = errno; - std::cerr << "sending on UDP socket got error: " << strerror(e) - << std::endl; - assert(0); // TODO -#endif - } - else if (numSent != (int) packet.data.size()) - { - assert(0); // TODO - } - - return true; -} - -bool NetTransportUDP::doRecvs() -{ - const int dataSize = 1500; - std::string buffer(dataSize, 0); - - struct sockaddr_storage remoteAddr; - memset(&remoteAddr, 0, sizeof(remoteAddr)); - socklen_t remoteAddrLen = sizeof(remoteAddr); - - int rLen = recvfrom(fd, - buffer.data(), - buffer.size(), - 0 /*flags*/, - (struct sockaddr *) &remoteAddr, - &remoteAddrLen); - if (rLen < 0) - { -#if defined(_WIN32) - int error = WSAGetLastError(); - if (error == WSAETIMEDOUT || error == WSAECONNRESET) - { - return false; - } - else - { - std::cerr << "reading from UDP socket got error: " - << WSAGetLastError() << std::endl; - assert(0); - } -#else - int e = errno; - if (e == EAGAIN) - { - // timeout on read - return false; - } - else - { - std::cerr << "reading from UDP socket got error: " << strerror(e) - << std::endl; - assert(0); // TODO - } -#endif - } - - if (rLen == 0) - { - return false; - } - buffer.resize(rLen); - auto peer_info = NetTransport::PeerConnectionInfo{ - remoteAddr, remoteAddrLen, {}}; - - transportManager->recvDataFromNet(buffer, peer_info); - - return true; -} - -// TODO: normalize the API -bool NetTransportUDP::read(NetTransport::Data &packet) -{ - const int dataSize = 1500; - std::string buffer; - buffer.resize(dataSize); - struct sockaddr_storage remoteAddr; - memset(&remoteAddr, 0, sizeof(remoteAddr)); - socklen_t remoteAddrLen = sizeof(remoteAddr); - int rLen = recvfrom(fd, - buffer.data(), - buffer.size(), - 0 /*flags*/, - (struct sockaddr *) &remoteAddr, - &remoteAddrLen); - if (rLen < 0) - { -#if defined(_WIN32) - int error = WSAGetLastError(); - if (error == WSAETIMEDOUT) - { - return false; - } - else - { - std::cerr << "reading from UDP socket got error: " - << WSAGetLastError() << std::endl; - assert(0); - } -#else - int e = errno; - if (e == EAGAIN) - { - // timeout on read - return false; - } - else - { - std::cerr << "reading from UDP socket got error: " << strerror(e) - << std::endl; - assert(0); // TODO - } -#endif - } - - if (rLen == 0) - { - return false; - } - - buffer.resize(rLen); - packet.data = std::move(buffer); - packet.peer.addrLen = remoteAddrLen; - memcpy(&(packet.peer.addr), &remoteAddr, remoteAddrLen); - return true; -} - -NetTransportUDP::NetTransportUDP(TransportManager *t, - std::string sfuName_in, - uint16_t sfuPort_in) : - m_isServer(false), - transportManager(t), - sfuName(std::move(sfuName_in)), - sfuPort(sfuPort_in) -{ - // create a Client -#if defined(_WIN32) - WSADATA wsaData; - int wsa_err = WSAStartup(MAKEWORD(2, 2), &wsaData); - if (wsa_err) - { - assert(0); - } -#endif - - fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (fd == -1) - { - assert(0); // TODO - } - - // make socket non blocking IO - struct timeval timeOut; - timeOut.tv_sec = 0; - timeOut.tv_usec = 2000; // 2 ms - int err = 0; - /*int err = setsockopt( - fd, SOL_SOCKET, SO_RCVTIMEO, (char *) &timeOut, sizeof(timeOut)); - if (err) - { - assert(0); // TODO - }*/ - - struct sockaddr_in srvAddr; - srvAddr.sin_family = AF_INET; - srvAddr.sin_addr.s_addr = htonl(INADDR_ANY); - srvAddr.sin_port = 0; - err = bind(fd, (struct sockaddr *) &srvAddr, sizeof(srvAddr)); - if (err) - { - assert(0); - } - - std::string sPort = std::to_string(htons(sfuPort)); - struct addrinfo hints = {}, *address_list = NULL; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = IPPROTO_UDP; - err = getaddrinfo(sfuName.c_str(), sPort.c_str(), &hints, &address_list); - if (err) - { - assert(0); - } - struct addrinfo *item = NULL, *found_addr = NULL; - for (item = address_list; item != NULL; item = item->ai_next) - { - if (item->ai_family == AF_INET && item->ai_socktype == SOCK_DGRAM && - item->ai_protocol == IPPROTO_UDP) - { - found_addr = item; - break; - } - } - if (found_addr == NULL) - { - assert(0); - } - - // TODO: make it work for IPv6 - struct sockaddr_in *ipv4 = (struct sockaddr_in *) &sfuAddr; - memcpy(ipv4, found_addr->ai_addr, found_addr->ai_addrlen); - ipv4->sin_port = htons(sfuPort); - sfuAddrLen = sizeof(sfuAddr); -} - -/// -/// Server TransportManager -/// - -NetTransportUDP::NetTransportUDP(TransportManager *t, uint16_t sfuPort) : - // create a server - m_isServer(true), - transportManager(t) -{ -#if defined(_WIN32) - WSADATA wsaData; - int wsa_err = WSAStartup(MAKEWORD(2, 2), &wsaData); - if (wsa_err) - { - assert(0); - } -#endif - - fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (fd < 0) - { - assert(0); // TODO - } - - // set for re-use - int one = 1; - int err = setsockopt( - fd, SOL_SOCKET, SO_REUSEADDR, (const char *) &one, sizeof(one)); - if (err != 0) - { - assert(0); // TODO - } - - // make socket non blocking IO - struct timeval timeOut; - timeOut.tv_sec = 0; - timeOut.tv_usec = 2000; // 2 ms - err = setsockopt( - fd, SOL_SOCKET, SO_RCVTIMEO, (const char *) &timeOut, sizeof(timeOut)); - if (err) - { - assert(0); // TODO - } - - struct sockaddr_in srvAddr; - memset((char *) &srvAddr, 0, sizeof(srvAddr)); - srvAddr.sin_port = htons(sfuPort); - srvAddr.sin_family = AF_INET; - srvAddr.sin_addr.s_addr = htonl(INADDR_ANY); - - err = bind(fd, (struct sockaddr *) &srvAddr, sizeof(srvAddr)); - if (err < 0) - { - assert(0); // TODO - } - - std::cout << "UdpSocket: port " << sfuPort << ", fd " << fd << std::endl; -} diff --git a/src/transport_manager.cc b/src/transport_manager.cc index b9b1c32..f3d8437 100644 --- a/src/transport_manager.cc +++ b/src/transport_manager.cc @@ -4,10 +4,8 @@ #include #include #include -#include #include "transport_manager.hh" -#include "netTransportUDP.hh" namespace neo_media { @@ -170,9 +168,7 @@ void TransportManager::runNetSend() // Used for testing only ClientTransportManager::ClientTransportManager() : - TransportManager(NetTransport::Type::UDP, "localhost", -1, nullptr, nullptr), - senderId(sframe::MLSContext::SenderID(0x0000)), - mls_context(sframe::CipherSuite::AES_GCM_128_SHA256, 8), + TransportManager(NetTransport::Type::QUICR, "localhost", -1, nullptr, nullptr), current_epoch(0) { rtx_mgr = std::make_unique(false, this, nullptr); @@ -187,17 +183,8 @@ ClientTransportManager::ClientTransportManager( TransportManager(type, sfuName_in, sfuPort_in, metricsPtr, parent_logger), sfuName(std::move(sfuName_in)), sfuPort(sfuPort_in), - senderId(sframe::MLSContext::SenderID(0x1234)), - mls_context(sframe::CipherSuite::AES_GCM_128_SHA256, 8), current_epoch(0) -{ - if (type == NetTransport::Type::UDP) - { - // quic/quicr have their own rtx mechanisms, hence enable rtx - // just for udp transport. - rtx_mgr = std::make_unique(true, this, metricsPtr); - } -} +{} void ClientTransportManager::start() { @@ -256,21 +243,16 @@ void ClientTransportManager::setCryptoKey(uint64_t epoch, const bytes &epoch_secret) { current_epoch = epoch; - mls_context.add_epoch(epoch, epoch_secret); } bytes ClientTransportManager::protect(const bytes &plaintext) { - auto ct_out = bytes(plaintext.size() + sframe::max_overhead); - auto ct = mls_context.protect(current_epoch, senderId, ct_out, plaintext); - return bytes(ct.begin(), ct.end()); + throw std::runtime_error("Protect Not Supported"); } bytes ClientTransportManager::unprotect(const bytes &ciphertext) { - auto pt_out = bytes(ciphertext.size()); - auto pt = mls_context.unprotect(pt_out, ciphertext); - return bytes(pt.begin(), pt.end()); + throw std::runtime_error("Unprotect Not Supported"); } /// From add6f96db7b8a1738253143dd00e3cafd803c07c Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Mon, 20 Jun 2022 12:07:29 -0700 Subject: [PATCH 2/3] upate cmake files --- CMakeLists.txt | 26 ++------------------------ cmd/CMakeLists.txt | 4 ++-- cmd/forty-bytes.cc | 35 +++++++---------------------------- src/extern/CMakeLists.txt | 2 +- 4 files changed, 12 insertions(+), 55 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9f9dd59..e1a56c2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,7 @@ endif(NOT EXISTS ${CMAKE_TOOLCHAIN_FILE}) ################################################################################ # top-level project name, version, and languages - this project will have # several projects, but this is declared as the top level project. -project(neoMedia VERSION 0.1 LANGUAGES C CXX) +project(qmedia VERSION 0.1 LANGUAGES C CXX) ################################################################################ # If this is a Makefile generator and the build type was not defined, then the @@ -44,8 +44,6 @@ option(BUILD_EXTERN "build external library" ON) option(BUILD_TESTS "build tests" ON) option(BUILD_SEND_VIDEO_FRAME "build sendVideoFrame cmd" ON) option(CLANG_TIDY "Perform linting with clang-tidy" OFF) -option(BUILD_WITH_QUIC "Build with quic transport" OFF) -option(BUILD_WITH_QUICR "Build with quicr transport" ON) ################################################################################ # dependencies @@ -176,27 +174,7 @@ target_include_directories(${LIB_NAME} $ ) -if(BUILD_WITH_QUIC) - # picoquic and friends - if (WIN32) - target_link_libraries(${LIB_NAME} picotls-openssl picotls-core picotls-fusion picoquic) - else(WIN32) - target_link_libraries(${LIB_NAME} picotls-openssl picotls-core picotls-fusion picoquic-core) - endif(WIN32) - add_compile_definitions(${LIB_NAME} ENABLE_QUIC) -endif() - -if(BUILD_WITH_QUICR) - message(STATUS "PICO = ${Picoquic_LIBRARIES}") - message(STATUS "PICO_TLS = ${PTLS_LIBRARIES}") - # picoquic and friends - if (WIN32) - target_link_libraries(${LIB_NAME} ${QUICR_LIBRARIES} ${Picoquic_LIBRARIES} ${PTLS_LIBRARIES}) - else(WIN32) - target_link_libraries(${LIB_NAME} ${QUICR_LIBRARIES} ${Picoquic_LIBRARIES} ${PTLS_LIBRARIES}) - endif(WIN32) - add_compile_definitions(${LIB_NAME} ENABLE_QUICR) -endif() +target_link_libraries(${LIB_NAME} ${QUICR_LIBRARIES} ${Picoquic_LIBRARIES} ${PTLS_LIBRARIES}) ### diff --git a/cmd/CMakeLists.txt b/cmd/CMakeLists.txt index 67ff1c9..bb87aeb 100644 --- a/cmd/CMakeLists.txt +++ b/cmd/CMakeLists.txt @@ -1,10 +1,10 @@ add_executable(forty forty-bytes.cc) -target_link_libraries( forty PUBLIC neoMedia) +target_link_libraries( forty PUBLIC qmedia) find_package(portaudio QUIET) if(portaudio_FOUND) add_executable( sound sound.cc) - target_link_libraries( sound PUBLIC neoMedia) + target_link_libraries( sound PUBLIC qmedia) if (WIN32) target_link_libraries( sound PUBLIC portaudio) diff --git a/cmd/forty-bytes.cc b/cmd/forty-bytes.cc index 2381f70..b05611f 100644 --- a/cmd/forty-bytes.cc +++ b/cmd/forty-bytes.cc @@ -90,44 +90,23 @@ int main(int argc, char *argv[]) std::cerr << "Must provide mode of operation" << std::endl; std::cerr << "Usage: forty " << std::endl; - std::cerr << "Transport: q (for quic), r (udp), qr(quicr)" - << std::endl; std::cerr << "Mode: sendrecv/send/recv" << std::endl; std::cerr << "ClientID - a sensible +ve 32 bit integer value" << std::endl; return -1; } - - transport_type.assign(argv[1]); - LoggerPointer logger = std::make_shared("FORTY_BYTES"); logger->SetLogFacility(LogFacility::CONSOLE); - if (transport_type == "q") - { - std::cout << "Transport is Quic [ !!! Not under active developmenr " - "!!!]\n"; - transportManager = new ClientTransportManager( - neo_media::NetTransport::PICO_QUIC, "localhost", 5004); - } - else if (transport_type == "qr") - { - std::cout << "Transport is QuicR\n"; - transportManager = new ClientTransportManager( - neo_media::NetTransport::QUICR, "127.0.0.1", 7777, nullptr, logger); - transportManager->start(); - } - else - { - std::cout << "Transport is UDP\n"; - transportManager = new ClientTransportManager( - neo_media::NetTransport::UDP, "localhost", 5004); - transportManager->start(); - } + transportManager = new ClientTransportManager( neo_media::NetTransport::QUICR, + "127.0.0.1", + 7777, + nullptr, logger); + transportManager->start(); - mode.assign(argv[2]); + mode.assign(argv[1]); if (mode != "send" && mode != "recv" && mode != "sendrecv") { std::cout << "Bad choice for mode.. Bye" << std::endl; @@ -135,7 +114,7 @@ int main(int argc, char *argv[]) } std::string client_id_str; - client_id_str.assign(argv[3]); + client_id_str.assign(argv[2]); if (client_id_str.empty()) { std::cout << "Bad choice for clientId .. Bye" << std::endl; diff --git a/src/extern/CMakeLists.txt b/src/extern/CMakeLists.txt index fc1ef9c..9051d6d 100644 --- a/src/extern/CMakeLists.txt +++ b/src/extern/CMakeLists.txt @@ -3,4 +3,4 @@ project(neo_media_client) set(headers neo_media_client.hh) set(sources neo_media_client.cc) add_library(${PROJECT_NAME} SHARED ${sources} ${headers}) -target_link_libraries(${PROJECT_NAME} PUBLIC neoMedia) +target_link_libraries(${PROJECT_NAME} PUBLIC qmedia) From 53f064eb1920968bd78b597434a2966a4dc09016 Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Mon, 20 Jun 2022 15:19:48 -0700 Subject: [PATCH 3/3] cleanup and formatting --- cmd/forty-bytes.cc | 13 ++-- cmd/sound.cc | 69 ++++++++++--------- include/h264_encoder.hh | 2 +- include/neo.hh | 6 +- include/netTransportQuicR.hh | 12 ++-- include/transport_manager.hh | 5 +- src/h264_decoder.cc | 23 ++++--- src/h264_encoder.cc | 129 ++++++++++++++++++++--------------- src/jitter.cc | 71 +++++++++++-------- src/neo.cc | 50 ++++++++------ src/netTransportQuicR.cc | 122 ++++++++++++++++++--------------- src/transport_manager.cc | 25 ++++--- 12 files changed, 295 insertions(+), 232 deletions(-) diff --git a/cmd/forty-bytes.cc b/cmd/forty-bytes.cc index b05611f..07d70e2 100644 --- a/cmd/forty-bytes.cc +++ b/cmd/forty-bytes.cc @@ -77,7 +77,6 @@ void send_loop(uint64_t client_id, uint64_t source_id) } } - int main(int argc, char *argv[]) { std::string mode; @@ -99,11 +98,8 @@ int main(int argc, char *argv[]) LoggerPointer logger = std::make_shared("FORTY_BYTES"); logger->SetLogFacility(LogFacility::CONSOLE); - - transportManager = new ClientTransportManager( neo_media::NetTransport::QUICR, - "127.0.0.1", - 7777, - nullptr, logger); + transportManager = new ClientTransportManager( + neo_media::NetTransport::QUICR, "127.0.0.1", 7777, nullptr, logger); transportManager->start(); mode.assign(argv[1]); @@ -128,9 +124,8 @@ int main(int argc, char *argv[]) std::weak_ptr tmp = std::static_pointer_cast(transport.lock()); auto quicr_transport = tmp.lock(); - quicr_transport->subscribe(source_id, - Packet::MediaType::Opus, - "forty_bytes_alice"); + quicr_transport->subscribe( + source_id, Packet::MediaType::Opus, "forty_bytes_alice"); // start the transport quicr_transport->start(); diff --git a/cmd/sound.cc b/cmd/sound.cc index 92205f1..0c6f17b 100644 --- a/cmd/sound.cc +++ b/cmd/sound.cc @@ -66,21 +66,24 @@ void recordThreadFunc(Neo *neo) PaError err; { std::lock_guard lock(audioReadMutex); - while( ( err = Pa_IsStreamActive( audioStream ) ) == 1 ) + while ((err = Pa_IsStreamActive(audioStream)) == 1) { long toRead = Pa_GetStreamReadAvailable(audioStream); - printf("available: %ld frames_per_buffer: %d\n", toRead, frames_per_buffer); - if (toRead == 0) { + printf("available: %ld frames_per_buffer: %d\n", + toRead, + frames_per_buffer); + if (toRead == 0) + { Pa_Sleep(10); continue; } - if (toRead > frames_per_buffer) - toRead = frames_per_buffer; - - if (toRead == frames_per_buffer) { + if (toRead > frames_per_buffer) toRead = frames_per_buffer; - // You may get underruns or overruns if the output is not primed by PortAudio. - err = Pa_ReadStream( audioStream, audioBuff, toRead ); + if (toRead == frames_per_buffer) + { + // You may get underruns or overruns if the output is not + // primed by PortAudio. + err = Pa_ReadStream(audioStream, audioBuff, toRead); if (err) { logger->error << "Failed to read PA stream: " @@ -93,17 +96,16 @@ void recordThreadFunc(Neo *neo) logger->debug << "0" << std::flush; } - timestamp = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + timestamp = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); neo->sendAudio(audioBuff, buff_size, timestamp, sourceID); logger->debug << "-" << std::flush; Pa_Sleep(10); } - } - } free(audioBuff); } @@ -281,7 +283,7 @@ void streamCallBack(uint64_t clientID, int main(int argc, char *argv[]) { - const uint64_t conference_id = 123456; + const uint64_t conference_id = 123456; #if defined(_WIN32) timeBeginPeriod(1); // timerstonk - push minimum resolution down to 1 // ms @@ -292,12 +294,10 @@ int main(int argc, char *argv[]) std::cerr << "Usage: sound " << std::endl; std::cerr << "Mode: pub/sub/pubsub" << std::endl; - std::cerr << "" - << std::endl; + std::cerr << "" << std::endl; return -1; } - std::string remote_address; remote_address.assign(argv[1]); @@ -325,7 +325,6 @@ int main(int argc, char *argv[]) exit(-1); } - std::ostringstream oss; std::chrono::steady_clock::time_point timePoint = std::chrono::steady_clock::now(); @@ -367,7 +366,6 @@ int main(int argc, char *argv[]) assert(0); // TODO } - PaStreamParameters inputParameters; PaStreamParameters outputParameters; @@ -416,35 +414,39 @@ int main(int argc, char *argv[]) assert(0); // TODO } - - if(mode == "pub") { + if (mode == "pub") + { std::thread recordThread(recordThreadFunc, &neo); - } else if (mode == "sub") { + } + else if (mode == "sub") + { std::thread playThread(playThreadFunc, &neo); playThread.detach(); } std::cout << "Mode:" << mode << std::endl; - if(mode == "pub") + if (mode == "pub") { // todo : use stringstream inseatd - auto url = "quicr://" + std::to_string(conference_id) + "/" + std::to_string(clientID) - + "/" + name + "/" + source; + auto url = "quicr://" + std::to_string(conference_id) + "/" + + std::to_string(clientID) + "/" + name + "/" + source; std::cout << "quicr publish url:" << url << std::endl; neo.publish(1, Packet::MediaType::Opus, url); - } else if (mode == "sub") + } + else if (mode == "sub") { - auto url = "quicr://" + std::to_string(conference_id) + "/" + std::to_string(clientID) - + "/" + name + "/" + source; + auto url = "quicr://" + std::to_string(conference_id) + "/" + + std::to_string(clientID) + "/" + name + "/" + source; std::cout << "quicr subscribe url:" << url << std::endl; neo.subscribe(1, Packet::MediaType::Opus, url); - } else { + } + else + { // pub/sub mode std::cout << "Pub and Sub together isn't supported\n"; exit(-1); } - logger->info << "Starting" << std::flush; int count = 0; while (!shutDown) @@ -459,9 +461,8 @@ int main(int argc, char *argv[]) } logger->info << "Shutting down" << std::flush; - - //recordThread.join(); - //playThread.join(); + // recordThread.join(); + // playThread.join(); err = Pa_StopStream(audioStream); if (err != paNoError) diff --git a/include/h264_encoder.hh b/include/h264_encoder.hh index 8a206a2..445c54f 100644 --- a/include/h264_encoder.hh +++ b/include/h264_encoder.hh @@ -18,7 +18,7 @@ public: unsigned int video_max_frame_rate, unsigned int video_max_bitrate, std::uint32_t video_pixel_format, - const LoggerPointer& logger); + const LoggerPointer &logger); ~H264Encoder(); diff --git a/include/neo.hh b/include/neo.hh index fe1832a..aa4887f 100644 --- a/include/neo.hh +++ b/include/neo.hh @@ -40,9 +40,9 @@ public: enum struct MediaDirection { - publish_only = 1, // sendonly - subscribe_only, // recvonly - publish_subscribe, //sendrecv + publish_only = 1, // sendonly + subscribe_only, // recvonly + publish_subscribe, // sendrecv unknown }; diff --git a/include/netTransportQuicR.hh b/include/netTransportQuicR.hh index 2d621fa..ac52bdc 100644 --- a/include/netTransportQuicR.hh +++ b/include/netTransportQuicR.hh @@ -66,7 +66,8 @@ struct PublisherContext Packet::MediaType media_type; std::string url; quicrq_media_source_ctx_t *source_ctx; - quicrq_media_object_source_ctx_t *object_source_ctx; // used with object api + quicrq_media_object_source_ctx_t *object_source_ctx; // used with + // object api TransportManager *transportManager; NetTransportQUICR *transport; }; @@ -74,9 +75,10 @@ struct PublisherContext struct ConsumerContext { Packet::MediaType media_type; - std::string url; // quicr name + std::string url; // quicr name quicrq_reassembly_context_t reassembly_ctx; - quicrq_object_stream_consumer_ctx* object_consumer_ctx; // used with object api + quicrq_object_stream_consumer_ctx *object_consumer_ctx; // used with + // object api quicrq_cnx_ctx_t *cnx_ctx; TransportManager *transportManager; NetTransportQUICR *transport; @@ -90,7 +92,7 @@ public: NetTransportQUICR(TransportManager *manager, std::string sfuName_in, uint16_t sfuPort_in, - const LoggerPointer& logger_in); + const LoggerPointer &logger_in); virtual ~NetTransportQUICR(); @@ -152,6 +154,7 @@ public: void wake_up_all_sources(); LoggerPointer logger; + private: const std::string alpn = "quicrq-h10"; TransportContext xport_ctx; @@ -165,7 +168,6 @@ private: std::map publishers = {}; // source_id -> consumer_ctx std::map consumers = {}; - }; } // namespace neo_media diff --git a/include/transport_manager.hh b/include/transport_manager.hh index 53813e4..a63b684 100644 --- a/include/transport_manager.hh +++ b/include/transport_manager.hh @@ -44,7 +44,8 @@ public: NetTransport *transport = nullptr; switch (type) { - case NetTransport::QUICR:{ + case NetTransport::QUICR: + { transport = new NetTransportQUICR( transportManager, sfuName_in, sfuPort_in, logger); transport->setLogger("TransportQuicR", logger); @@ -85,7 +86,7 @@ public: NetTransport::PeerConnectionInfo *info, socklen_t *addrLen); - bool getDataToSendToNet(NetTransport::Data& data); + bool getDataToSendToNet(NetTransport::Data &data); size_t hasDataToSendToNet(); virtual bool transport_ready() const = 0; diff --git a/src/h264_decoder.cc b/src/h264_decoder.cc index 1affca3..f253b4a 100644 --- a/src/h264_decoder.cc +++ b/src/h264_decoder.cc @@ -83,12 +83,15 @@ int H264Decoder::decode(const char *input_buffer, height = dst_info.UsrData.sSystemBuffer.iHeight; auto color_fmt = dst_info.UsrData.sSystemBuffer.iFormat; - if (debug) { + if (debug) + { std::cerr << "Decoded Width : " << width << std::endl; std::cerr << "Decoded height : " << height << std::endl; std::cerr << "Decoded Format : " << color_fmt << std::endl; - std::cerr << "Decoded Stride 0 : " << dst_info.UsrData.sSystemBuffer.iStride[0] << std::endl; - std::cerr << "Decoded Stride 1 : " << dst_info.UsrData.sSystemBuffer.iStride[1] << std::endl; + std::cerr << "Decoded Stride 0 : " + << dst_info.UsrData.sSystemBuffer.iStride[0] << std::endl; + std::cerr << "Decoded Stride 1 : " + << dst_info.UsrData.sSystemBuffer.iStride[1] << std::endl; } auto y_size = width * height; @@ -99,7 +102,7 @@ int H264Decoder::decode(const char *input_buffer, size_t i = 0; auto pPtr = dst[0]; - for(i = 0; i < height; i++) + for (i = 0; i < height; i++) { memcpy(outp, pPtr, width); outp += width; @@ -107,18 +110,18 @@ int H264Decoder::decode(const char *input_buffer, } pPtr = dst[1]; - for(i = 0; i < height/2; i++) + for (i = 0; i < height / 2; i++) { - memcpy(outp, pPtr, width/2); - outp += width/2; + memcpy(outp, pPtr, width / 2); + outp += width / 2; pPtr += dst_info.UsrData.sSystemBuffer.iStride[1]; } pPtr = dst[2]; - for(i = 0; i < height/2; i++) + for (i = 0; i < height / 2; i++) { - memcpy(outp, pPtr, width/2); - outp += width/2; + memcpy(outp, pPtr, width / 2); + outp += width / 2; pPtr += dst_info.UsrData.sSystemBuffer.iStride[1]; } } diff --git a/src/h264_encoder.cc b/src/h264_encoder.cc index 0ff08cf..64c40f1 100644 --- a/src/h264_encoder.cc +++ b/src/h264_encoder.cc @@ -12,7 +12,7 @@ using namespace neo_media; static bool debug = false; -static std::string to_hex(unsigned char* data, int stop) +static std::string to_hex(unsigned char *data, int stop) { std::stringstream hex(std::ios_base::out); hex.flags(std::ios::hex); @@ -23,13 +23,12 @@ static std::string to_hex(unsigned char* data, int stop) return hex.str(); } - H264Encoder::H264Encoder(unsigned int video_max_width, unsigned int video_max_height, unsigned int video_max_frame_rate, unsigned int video_max_bitrate, std::uint32_t video_pixel_format, - const LoggerPointer& logger_in) + const LoggerPointer &logger_in) { logger = logger_in; int rv = WelsCreateSVCEncoder(&encoder); @@ -40,8 +39,9 @@ H264Encoder::H264Encoder(unsigned int video_max_width, int logLevel = WELS_LOG_ERROR; encoder->SetOption(ENCODER_OPTION_TRACE_LEVEL, &logLevel); - - // ./h264enc -org your_input_I420.yuv -numl 1 numtl 1 -sw 1280 -sh 720 -dw 0 1280 -dh 0 720 -frin 30 -frout 0 30 -rc -1 -lqp 0 24 -utype 0 -iper 128 -nalSize 1300 -complexity 1 -denoise -1 -bf test.264 + // ./h264enc -org your_input_I420.yuv -numl 1 numtl 1 -sw 1280 -sh 720 -dw 0 + // 1280 -dh 0 720 -frin 30 -frout 0 30 -rc -1 -lqp 0 24 -utype 0 -iper 128 + // -nalSize 1300 -complexity 1 -denoise -1 -bf test.264 rv = encoder->GetDefaultParams(&encParmExt); assert(rv == cmResultSuccess); @@ -63,13 +63,14 @@ H264Encoder::H264Encoder(unsigned int video_max_width, encParmExt.sSpatialLayers[0].sSliceArgument.uiSliceMode = SM_SINGLE_SLICE; encParmExt.sSpatialLayers[0].sSliceArgument.uiSliceSizeConstraint = 1300; encParmExt.sSpatialLayers[0].iDLayerQp = 24; - //encParmExt.iMultipleThreadIdc = 1; // multi-threading not tested for mode SM_SIZELIMITED_SLICE - encParmExt.uiMaxNalSize = 1300; // max NAL size must fit in MTU limit + // encParmExt.iMultipleThreadIdc = 1; // multi-threading not + // tested for mode SM_SIZELIMITED_SLICE + encParmExt.uiMaxNalSize = 1300; // max NAL size must fit in MTU limit rv = encoder->InitializeExt(&encParmExt); assert(rv == cmResultSuccess); - int videoFormat = videoFormatI420; // openH264 only supports I420 + int videoFormat = videoFormatI420; // openH264 only supports I420 rv = encoder->SetOption(ENCODER_OPTION_DATAFORMAT, &videoFormat); assert(rv == cmResultSuccess); @@ -102,7 +103,6 @@ H264Encoder::~H264Encoder() } } - int H264Encoder::encode(const char *input_buffer, std::uint32_t input_length, std::uint32_t width, @@ -119,45 +119,53 @@ int H264Encoder::encode(const char *input_buffer, static std::uint64_t total_frames_encoded = 0; static std::uint64_t total_bytes_encoded = 0; static std::uint64_t total_time_encoded = 0; // microseconds - static unsigned char uv_array[1280 * 720] = {0}; + static unsigned char uv_array[1280 * 720] = {0}; - if (stride_uv != 1280 || height != 720) { + if (stride_uv != 1280 || height != 720) + { logger->warning << "!!!! image dimension change: stride_uv:" - << stride_uv << ", height:" << height - << std::flush; + << stride_uv << ", height:" << height << std::flush; } auto now = std::chrono::system_clock::now(); - auto now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); + auto now_ms = std::chrono::duration_cast( + now.time_since_epoch()) + .count(); // nv12 to i420 planar - auto* input = reinterpret_cast(const_cast(input_buffer)); - auto *p = input + (stride_y * height); // data buffer of inpt UV plane (Y+Ystride*Yheight) - int w = width/2; - int h = height/2; // width,height of input UV plane (Ywidth/2,Yheight/2) - int suv = stride_uv; // stride of input UV plane (often same as Y) - int su = stride_uv/2; // stride of output U plane (often half of Y) - int sv = stride_uv/2; // stride of output V plane (often half of Y) - - //auto uv = (unsigned char*) malloc(suv*h); // temp buffer to copy input UV plane + auto *input = reinterpret_cast( + const_cast(input_buffer)); + auto *p = input + (stride_y * height); // data buffer of inpt UV + // plane (Y+Ystride*Yheight) + int w = width / 2; + int h = height / 2; // width,height of input UV plane + // (Ywidth/2,Yheight/2) + int suv = stride_uv; // stride of input UV plane (often same as Y) + int su = stride_uv / 2; // stride of output U plane (often half of Y) + int sv = stride_uv / 2; // stride of output V plane (often half of Y) + + // auto uv = (unsigned char*) malloc(suv*h); // temp buffer to copy input UV + // plane auto uv = uv_array; - auto *u=p; // U plane of output (overwrites input) + auto *u = p; // U plane of output (overwrites input) auto orig_u = p; - auto *v=u+su*h; // V plane of output (overwrites input) - auto orig_v = orig_u+su*h; - memcpy(uv,p,suv*h); // copy input UV plane to temp buffer + auto *v = u + su * h; // V plane of output (overwrites input) + auto orig_v = orig_u + su * h; + memcpy(uv, p, suv * h); // copy input UV plane to temp buffer // de-interleave UV plane - for (int y=0; yinfo << "h264Encoder:: Force IDR, total_frames_encoded: " << total_frames_encoded << std::flush; + if (genKeyFrame || idr_frame) + { + logger->info << "h264Encoder:: Force IDR, total_frames_encoded: " + << total_frames_encoded << std::flush; auto ret = encoder->ForceIntraFrame(true); - logger->error << "h264Encoder:: IDR Frame Generation Result " << ret << std::flush; + logger->error << "h264Encoder:: IDR Frame Generation Result " << ret + << std::flush; } auto now_2 = std::chrono::system_clock::now(); - auto now_ms_2 = std::chrono::duration_cast(now_2.time_since_epoch()).count(); - logger->info << "h264Encoder:DeInterleave Delta:" << (now_ms_2 - now_ms) << std::flush; + auto now_ms_2 = std::chrono::duration_cast( + now_2.time_since_epoch()) + .count(); + logger->info << "h264Encoder:DeInterleave Delta:" << (now_ms_2 - now_ms) + << std::flush; - memset(&encodedFrame, 0, sizeof (SFrameBSInfo)); + memset(&encodedFrame, 0, sizeof(SFrameBSInfo)); int ret = encoder->EncodeFrame(&sourcePicture, &encodedFrame); if (ret == 0) { switch (encodedFrame.eFrameType) { case videoFrameTypeSkip: - logger->info << "h264Encoder:: videoFrameTypeSkip" << std::flush; + logger->info << "h264Encoder:: videoFrameTypeSkip" + << std::flush; return 0; case videoFrameTypeInvalid: logger->info << "h264Encoder: failed: " << ret << std::flush; @@ -203,29 +218,36 @@ int H264Encoder::encode(const char *input_buffer, } auto now_3 = std::chrono::system_clock::now(); - auto now_ms_3 = std::chrono::duration_cast(now_3.time_since_epoch()).count(); - logger->info << "h264Encoder: encode delta:" << (now_ms_3 - now_ms) << std::flush; + auto now_ms_3 = std::chrono::duration_cast( + now_3.time_since_epoch()) + .count(); + logger->info << "h264Encoder: encode delta:" << (now_ms_3 - now_ms) + << std::flush; // encode worked - if(debug) { - logger->debug << "Encoded iFrameSizeInBytes: " << encodedFrame.iFrameSizeInBytes << std::flush; - logger->debug << "Encoded num_layer: " << encodedFrame.iLayerNum << std::flush; - logger->debug << "Frame Type: " << encodedFrame.eFrameType << std::flush; + if (debug) + { + logger->debug << "Encoded iFrameSizeInBytes: " + << encodedFrame.iFrameSizeInBytes << std::flush; + logger->debug << "Encoded num_layer: " << encodedFrame.iLayerNum + << std::flush; + logger->debug << "Frame Type: " << encodedFrame.eFrameType + << std::flush; } // move the encoded data to output bitstream output_bitstream.resize(encodedFrame.iFrameSizeInBytes); unsigned char *out_bits = output_bitstream.data(); - for(int i = 0; i < encodedFrame.iLayerNum; i++) { + for (int i = 0; i < encodedFrame.iLayerNum; i++) + { auto len = 0; // pNalLengthInByte[0]+pNalLengthInByte[1]+…+pNalLengthInByte[iNalCount-1]. - for(int j =0; j < encodedFrame.sLayerInfo[i].iNalCount; j++) { + for (int j = 0; j < encodedFrame.sLayerInfo[i].iNalCount; j++) + { len += encodedFrame.sLayerInfo[i].pNalLengthInByte[j]; } - memcpy(out_bits, - encodedFrame.sLayerInfo[i].pBsBuf, - len); + memcpy(out_bits, encodedFrame.sLayerInfo[i].pBsBuf, len); out_bits += len; } @@ -234,8 +256,7 @@ int H264Encoder::encode(const char *input_buffer, total_frames_encoded++; logger->info << "h264Encoder: Output Frame type: " - << encodedFrame.eFrameType - << std::flush; + << encodedFrame.eFrameType << std::flush; // success return encodedFrame.eFrameType == videoFrameTypeIDR ? 1 : 0; } \ No newline at end of file diff --git a/src/jitter.cc b/src/jitter.cc index 4e4b744..684b170 100644 --- a/src/jitter.cc +++ b/src/jitter.cc @@ -33,7 +33,6 @@ Jitter::~Jitter() video.mq.flushPackets(); } - void Jitter::recordMetrics(MetaQueue &q, MetaQueue::media_type type, uint64_t clientID, @@ -49,14 +48,15 @@ void Jitter::recordMetrics(MetaQueue &q, if (measurement != nullptr) { - try { + try + { measurement->set(std::chrono::system_clock::now(), q.getMetricFields(type)); - } catch (std::exception & e) { + } + catch (std::exception &e) + { logger->warning << "metrics exception, ignore" << std::flush; } - - } } } @@ -90,20 +90,21 @@ bool Jitter::push(PacketPointer packet, case Packet::MediaType::AV1: case Packet::MediaType::Raw: { - - if (!video.sourceID) { + if (!video.sourceID) + { new_stream = true; - logger->info << "New video sourceID: " << sourceID << std::flush; + logger->info << "New video sourceID: " << sourceID + << std::flush; video.sourceID = packet->sourceID; } - if (packet->fragmentCount == 1) { + if (packet->fragmentCount == 1) + { // packet wasn't fragmented by us - logger->info << "[jitter-v: no assembly needed:" << std::flush; + logger->info << "[jitter-v: no assembly needed:" + << std::flush; // we got assembled frame, add it to jitter queue - video.push(std::move(packet), - sync.video_seq_popped, - now); + video.push(std::move(packet), sync.video_seq_popped, now); break; } @@ -115,7 +116,8 @@ bool Jitter::push(PacketPointer packet, PacketPointer raw = video.assembler->push(std::move(packet)); if (raw != nullptr) { - logger->info << "[jitter-v: assembled full frame:" << std::flush; + logger->info << "[jitter-v: assembled full frame:" + << std::flush; // we got assembled frame, add it to jitter queue video.push(std::move(raw), sync.video_seq_popped, now); } @@ -171,7 +173,8 @@ PacketPointer Jitter::popAudio(uint64_t sourceID, PacketPointer packet = nullptr; bool unableToThrowSilence = false; - if (idle_client) { + if (idle_client) + { idle_client = false; } @@ -182,10 +185,14 @@ PacketPointer Jitter::popAudio(uint64_t sourceID, return nullptr; } - logger->info << "[J-PopAudio: Q-depth:" << audio.getMsInQueue() << "]" << std::flush; - logger->info << "[J-PopAudio: Jitter-ms:" << audio_jitter.getJitterMs() << "]" << std::flush; - logger->info << "[J-PopAudio: Asking Length:" << length << "]" << std::flush; - logger->info << "[J-PopAudio: Playing total in buffers" << audio.playout.getTotalInBuffers() << "]" << std::flush; + logger->info << "[J-PopAudio: Q-depth:" << audio.getMsInQueue() << "]" + << std::flush; + logger->info << "[J-PopAudio: Jitter-ms:" << audio_jitter.getJitterMs() + << "]" << std::flush; + logger->info << "[J-PopAudio: Asking Length:" << length << "]" + << std::flush; + logger->info << "[J-PopAudio: Playing total in buffers" + << audio.playout.getTotalInBuffers() << "]" << std::flush; QueueMonitor(now); int num_depth_adjustments = 1; @@ -257,8 +264,8 @@ PacketPointer Jitter::popAudio(uint64_t sourceID, sync.audio_popped(packet->sourceRecordTime, packet->encodedSequenceNum, now); - logger->info << "[A:" << packet->encodedSequenceNum - << "]" << std::flush; + logger->info << "[A:" << packet->encodedSequenceNum + << "]" << std::flush; } } } @@ -294,7 +301,8 @@ PacketPointer Jitter::popAudio(uint64_t sourceID, audio.playout.fill(packet->data, length, timestamp); packet->encodedSequenceNum = sync.audio_seq_popped; packet->sourceRecordTime = timestamp; - logger->info << "PopAudio-final- encoded-seq-no" << packet->encodedSequenceNum << std::flush; + logger->info << "PopAudio-final- encoded-seq-no" + << packet->encodedSequenceNum << std::flush; logger->info << "[QA: " << audio.mq.size() << "]" << std::flush; return packet; } @@ -408,10 +416,11 @@ int Jitter::popVideo(uint64_t sourceID, return len; } - logger->info << "[Jitter: popVideo: queue has: " - << video.mq.size() << " ]" << std::flush; + logger->info << "[Jitter: popVideo: queue has: " << video.mq.size() << " ]" + << std::flush; - if (idle_client) { + if (idle_client) + { idle_client = false; } @@ -462,7 +471,8 @@ int Jitter::popVideo(uint64_t sourceID, for (unsigned int pops = 0; pops < num_pop; pops++) { packet = video.pop(now); - if (packet) { + if (packet) + { sourceRecordTime = packet->sourceRecordTime; packet.reset(nullptr); } @@ -524,16 +534,17 @@ void Jitter::QueueMonitor(std::chrono::steady_clock::time_point now) unsigned int plcs = 0; unsigned int lost_in_queue = audio.mq.lostInQueue(plcs, sync.audio_seq_popped); - logger->debug <<"[JQM: lostInQueue:" << lost_in_queue << ",plcs:" << plcs << "]" << std::flush; + logger->debug << "[JQM: lostInQueue:" << lost_in_queue << ",plcs:" << plcs + << "]" << std::flush; // total number of audio frames in queue unsigned int queue_size = audio.getMsInQueue(); - logger->debug <<"[JQM: Q-Size:" << queue_size << "]" << std::flush; + logger->debug << "[JQM: Q-Size:" << queue_size << "]" << std::flush; // unsigned int queue_size = audio.mq.size(); // average jitter since the last pop unsigned int jitter_ms = audio_jitter.getJitterMs(); - logger->debug <<"[JQM: audio-jitter-ms:" << jitter_ms << "]" << std::flush; + logger->debug << "[JQM: audio-jitter-ms:" << jitter_ms << "]" << std::flush; unsigned int ms_per_audio = audio.getMsPerAudioPacket(); - logger->debug <<"[JQM: ms_per_audio:" << ms_per_audio << "]" << std::flush; + logger->debug << "[JQM: ms_per_audio:" << ms_per_audio << "]" << std::flush; unsigned int client_fps = audio.fps.getFps(); bucket.tick( now, queue_size, lost_in_queue, jitter_ms, ms_per_audio, client_fps); diff --git a/src/neo.cc b/src/neo.cc index c71801b..fc007b6 100644 --- a/src/neo.cc +++ b/src/neo.cc @@ -47,13 +47,9 @@ void Neo::init(const std::string &remote_address, transport_type = xport_type; media_dir = dir; - + log->info << "Transport Type " << (int) transport_type << std::flush; transport = std::make_unique( - transport_type, - remote_address, - remote_port, - metrics, - log); + transport_type, remote_address, remote_port, metrics, log); transport->start(); @@ -86,7 +82,8 @@ void Neo::init(const std::string &remote_address, log->info << "MediaDirection:" << (int) media_dir << std::flush; - if (media_dir == MediaDirection::publish_only) { + if (media_dir == MediaDirection::publish_only) + { video_encoder = std::make_unique( video_max_width, video_max_height, @@ -117,10 +114,13 @@ void Neo::publish(std::uint64_t source_id, url += "/" + std::to_string((int) media_type); quicr_transport->publish(source_id, media_type, url); - log->info << "SourceID: " << source_id << ", Publish Url:" << url << std::flush; + log->info << "SourceID: " << source_id << ", Publish Url:" << url + << std::flush; } -void Neo::subscribe(uint64_t source_id, Packet::MediaType media_type, std::string url) +void Neo::subscribe(uint64_t source_id, + Packet::MediaType media_type, + std::string url) { auto pkt_transport = transport->transport(); std::weak_ptr tmp = @@ -209,15 +209,15 @@ void Neo::sendAudio(const char *buffer, uint64_t timestamp, uint64_t sourceID) { - - if (media_dir == MediaDirection::publish_only || media_dir == MediaDirection::publish_subscribe) + if (media_dir == MediaDirection::publish_only || + media_dir == MediaDirection::publish_subscribe) { std::shared_ptr audio_encoder = getAudioEncoder(sourceID); if (audio_encoder != nullptr) { log->debug << "sendAudio: SourceId:" << sourceID - << ", length:" << length << std::flush; + << ", length:" << length << std::flush; audio_encoder->encodeFrame( buffer, length, timestamp, mutedAudioEmptyFrames); } @@ -236,7 +236,8 @@ void Neo::sendVideoFrame(const char *buffer, uint64_t timestamp, uint64_t sourceID) { - if (video_encoder == nullptr) { + if (video_encoder == nullptr) + { log->debug << "Video Encoder, unavailable" << std::flush; } // TODO:implement clone() @@ -254,7 +255,9 @@ void Neo::sendVideoFrame(const char *buffer, Packet::MediaType::AV1; auto now = std::chrono::system_clock::now(); - auto now_ms = std::chrono::duration_cast(now.time_since_epoch()).count(); + auto now_ms = std::chrono::duration_cast( + now.time_since_epoch()) + .count(); // encode and packetize encodeVideoFrame(buffer, @@ -274,9 +277,10 @@ void Neo::sendVideoFrame(const char *buffer, return; } - auto now_2 = std::chrono::system_clock::now(); - auto now_ms_2 = std::chrono::duration_cast(now_2.time_since_epoch()).count(); + auto now_ms_2 = std::chrono::duration_cast( + now_2.time_since_epoch()) + .count(); video_seq_no++; @@ -286,10 +290,11 @@ void Neo::sendVideoFrame(const char *buffer, return; } - if (transport_type == NetTransport::Type::QUICR) { + if (transport_type == NetTransport::Type::QUICR) + { // quicr transport handles its own fragmentation and reassemble log->debug << "SendVideoFrame: Sending full object:" - << packet->data.size() << std::flush; + << packet->data.size() << std::flush; packet->fragmentCount = 1; transport->send(std::move(packet)); return; @@ -447,7 +452,8 @@ int Neo::getAudio(uint64_t clientID, PacketPointer packet; JitterInterface::JitterIntPtr jitter = getJitter(clientID); - if (jitter == nullptr) { + if (jitter == nullptr) + { return 0; } @@ -473,7 +479,8 @@ std::uint32_t Neo::getVideoFrame(uint64_t clientID, { int recv_length = 0; JitterInterface::JitterIntPtr jitter_instance = getJitter(clientID); - if (jitter_instance == nullptr) { + if (jitter_instance == nullptr) + { return 0; } @@ -520,7 +527,8 @@ void Neo::audioEncoderCallback(PacketPointer packet) } // send it over the network - log->debug << "Opus Encoded Audio Size:" << packet->data.size() << std::flush; + log->debug << "Opus Encoded Audio Size:" << packet->data.size() + << std::flush; transport->send(std::move(packet)); } diff --git a/src/netTransportQuicR.cc b/src/netTransportQuicR.cc index 76c135d..3179ae8 100644 --- a/src/netTransportQuicR.cc +++ b/src/netTransportQuicR.cc @@ -183,9 +183,11 @@ static int media_frame_publisher_fn(quicrq_media_source_action_enum action, } *data_length = pub_ctx->transportManager->hasDataToSendToNet(); - if (*data_length > data_max_size) { - logger->debug << "Transport Buffer Small: transport buffer size=" << data_max_size - << ", " << "data size=" << *data_length << std::flush; + if (*data_length > data_max_size) + { + logger->debug << "Transport Buffer Small: transport buffer size=" + << data_max_size << ", " + << "data size=" << *data_length << std::flush; *data_length = 0; *is_still_active = 1; return 0; @@ -199,15 +201,21 @@ static int media_frame_publisher_fn(quicrq_media_source_action_enum action, send_packet.data, &send_packet.peer, &send_packet.peer.addrLen); if (got) { - logger->debug << "Copied data to the quicr transport:" << *data_length << std::flush; + logger->debug + << "Copied data to the quicr transport:" << *data_length + << std::flush; std::copy( send_packet.data.begin(), send_packet.data.end(), data); *is_last_segment = 1; *is_still_active = 1; - } else { + } + else + { *is_still_active = 0; } - } else { + } + else + { *is_last_segment = 1; } ret = 0; @@ -228,18 +236,19 @@ media_consumer_frame_ready(void *media_ctx, size_t data_length, quicrq_reassembly_object_mode_enum frame_mode) { - int ret = 0; auto *cons_ctx = (ConsumerContext *) media_ctx; auto logger = cons_ctx->transport->logger; logger->debug << "[frame_ready: id:" << frame_id - << ", frame_mode:" << (int) frame_mode - << ",data_len:" << data_length << "]" << std::flush; + << ", frame_mode:" << (int) frame_mode + << ",data_len:" << data_length << "]" << std::flush; - if (frame_mode == quicrq_reassembly_object_mode_enum::quicrq_reassembly_object_peek) + if (frame_mode == + quicrq_reassembly_object_mode_enum::quicrq_reassembly_object_peek) { - logger->debug << "[frame_ready:quicrq_reassembly_frame_peek, ignoring" << std::flush; + logger->debug << "[frame_ready:quicrq_reassembly_frame_peek, ignoring" + << std::flush; return 0; } @@ -288,7 +297,7 @@ int media_consumer_learn_final_frame_id(void *media_ctx, int ret = 0; auto *cons_ctx = (ConsumerContext *) media_ctx; ret = quicrq_reassembly_learn_final_object_id(&cons_ctx->reassembly_ctx, - final_frame_id); + final_frame_id); return ret; } @@ -341,24 +350,26 @@ int media_consumer_fn(quicrq_media_consumer_enum action, // media consumer object callback from quicr stack int object_stream_consumer_fn( quicrq_media_consumer_enum action, - void* object_consumer_ctx, + void *object_consumer_ctx, uint64_t current_time, uint64_t object_id, - const uint8_t* data, + const uint8_t *data, size_t data_length, - quicrq_object_stream_consumer_properties_t* properties) + quicrq_object_stream_consumer_properties_t *properties) { - auto cons_ctx = (ConsumerContext *) object_consumer_ctx; - auto& logger = cons_ctx->transport->logger; - logger->info << cons_ctx->url << ": object_stream_consumer_fn: action:" - << (int) action << ",data_length:" << data_length<< std::flush; + auto &logger = cons_ctx->transport->logger; + logger->info << cons_ctx->url + << ": object_stream_consumer_fn: action:" << (int) action + << ",data_length:" << data_length << std::flush; int ret = 0; - switch (action) { + switch (action) + { case quicrq_media_datagram_ready: { - // logger->info << "quicrq_media_datagram_ready, object:" << object_id - // << std::flush; + // logger->info << "quicrq_media_datagram_ready, object:" << + // object_id + // << std::flush; struct sockaddr_storage stored_addr; struct sockaddr *peer_addr = nullptr; quicrq_get_peer_address(cons_ctx->cnx_ctx, &stored_addr); @@ -374,9 +385,10 @@ int object_stream_consumer_fn( cons_ctx->transportManager->recvDataFromNet(recv_data, std::move(peer_info)); } - break; + break; case quicrq_media_close: - /* Remove the reference to the media context, as the caller will free it. */ + /* Remove the reference to the media context, as the caller will + * free it. */ cons_ctx->object_consumer_ctx = nullptr; /* Close streams and other resource */ assert(0); @@ -389,7 +401,6 @@ int object_stream_consumer_fn( return ret; } - // main packet loop for the application int quicrq_app_loop_cb(picoquic_quic_t *quic, picoquic_packet_loop_cb_enum cb_mode, @@ -398,7 +409,7 @@ int quicrq_app_loop_cb(picoquic_quic_t *quic, { int ret = 0; auto *cb_ctx = (TransportContext *) callback_ctx; - auto& logger = cb_ctx->transport->logger; + auto &logger = cb_ctx->transport->logger; if (cb_ctx == nullptr) { @@ -411,7 +422,8 @@ int quicrq_app_loop_cb(picoquic_quic_t *quic, case picoquic_packet_loop_ready: if (callback_arg != nullptr) { - auto *options = (picoquic_packet_loop_options_t *) callback_arg; + auto *options = (picoquic_packet_loop_options_t *) + callback_arg; options->do_time_check = 1; } if (cb_ctx->transport) @@ -456,7 +468,8 @@ int quicrq_app_loop_cb(picoquic_quic_t *quic, #if defined(USE_OBJECT_API) NetTransport::Data send_packet; - auto got = cb_ctx->transportManager->getDataToSendToNet(send_packet); + auto got = cb_ctx->transportManager->getDataToSendToNet( + send_packet); if (!got || send_packet.empty()) { break; @@ -476,14 +489,13 @@ int quicrq_app_loop_cb(picoquic_quic_t *quic, assert(ret == 0); #endif } - break; + break; default: ret = PICOQUIC_ERROR_UNEXPECTED_ERROR; break; } } - return ret; } @@ -530,19 +542,20 @@ void NetTransportQUICR::publish(uint64_t source_id, #if defined(USE_OBJECT_API) // TODO: Set object source property - auto obj_src_context = quicrq_publish_object_source(quicr_ctx, - reinterpret_cast(const_cast(url.data())), - url.length(), - nullptr); + auto obj_src_context = quicrq_publish_object_source( + quicr_ctx, + reinterpret_cast(const_cast(url.data())), + url.length(), + nullptr); assert(obj_src_context); pub_context->object_source_ctx = obj_src_context; // enable publishing auto ret = quicrq_cnx_post_media( - cnx_ctx, - reinterpret_cast(const_cast(url.data())), - url.length(), - true); -assert(ret == 0); + cnx_ctx, + reinterpret_cast(const_cast(url.data())), + url.length(), + true); + assert(ret == 0); #else quicrq_media_source_ctx_t *src_ctx = nullptr; src_ctx = quicrq_publish_source( @@ -564,17 +577,17 @@ assert(ret == 0); true); assert(ret == 0); #endif - logger->info << "Added source [" << source_id - << " Url: " << url - << "]" << std::flush; + logger->info << "Added source [" << source_id << " Url: " << url << "]" + << std::flush; publishers[source_id] = *pub_context; } void NetTransportQUICR::remove_source(uint64_t source_id) { -#if defined (USE_OBJECT_API) +#if defined(USE_OBJECT_API) auto src_ctx = publishers[source_id]; - if (src_ctx.object_source_ctx) { + if (src_ctx.object_source_ctx) + { quicrq_publish_object_fin(src_ctx.object_source_ctx); quicrq_delete_object_source(src_ctx.object_source_ctx); logger->info << "Removed source [" << source_id << std::flush; @@ -584,7 +597,8 @@ void NetTransportQUICR::remove_source(uint64_t source_id) #endif } -void NetTransportQUICR::subscribe(uint64_t source_id, Packet::MediaType media_type, +void NetTransportQUICR::subscribe(uint64_t source_id, + Packet::MediaType media_type, const std::string &url) { auto consumer_media_ctx = new ConsumerContext{}; @@ -597,14 +611,14 @@ void NetTransportQUICR::subscribe(uint64_t source_id, Packet::MediaType media_ty #if defined(USE_OBJECT_API) constexpr auto use_datagram = true; constexpr auto in_order = true; - consumer_media_ctx->object_consumer_ctx = - quicrq_subscribe_object_stream(cnx_ctx, - reinterpret_cast(const_cast(url.data())), - url.length(), - use_datagram, - in_order, - object_stream_consumer_fn, - consumer_media_ctx); + consumer_media_ctx->object_consumer_ctx = quicrq_subscribe_object_stream( + cnx_ctx, + reinterpret_cast(const_cast(url.data())), + url.length(), + use_datagram, + in_order, + object_stream_consumer_fn, + consumer_media_ctx); assert(consumer_media_ctx->object_consumer_ctx); #else quicrq_reassembly_init(&consumer_media_ctx->reassembly_ctx); @@ -623,7 +637,7 @@ void NetTransportQUICR::subscribe(uint64_t source_id, Packet::MediaType media_ty NetTransportQUICR::NetTransportQUICR(TransportManager *t, std::string sfuName, uint16_t sfuPort, - const LoggerPointer& logger_in) : + const LoggerPointer &logger_in) : transportManager(t), quicConnectionReady(false), quicr_ctx(quicrq_create_empty()), diff --git a/src/transport_manager.cc b/src/transport_manager.cc index f3d8437..830d8d4 100644 --- a/src/transport_manager.cc +++ b/src/transport_manager.cc @@ -168,7 +168,11 @@ void TransportManager::runNetSend() // Used for testing only ClientTransportManager::ClientTransportManager() : - TransportManager(NetTransport::Type::QUICR, "localhost", -1, nullptr, nullptr), + TransportManager(NetTransport::Type::QUICR, + "localhost", + -1, + nullptr, + nullptr), current_epoch(0) { rtx_mgr = std::make_unique(false, this, nullptr); @@ -184,7 +188,8 @@ ClientTransportManager::ClientTransportManager( sfuName(std::move(sfuName_in)), sfuPort(sfuPort_in), current_epoch(0) -{} +{ +} void ClientTransportManager::start() { @@ -422,7 +427,8 @@ bool TransportManager::recvDataFromNet( return false; } - logger->info << "[R]: Type:" << packet->packetType << "," << packet->encodedSequenceNum << std::flush; + logger->info << "[R]: Type:" << packet->packetType << "," + << packet->encodedSequenceNum << std::flush; #if 0 // decrypt if its client transportManager if (Type::Client == type() && !packet->data.empty()) { @@ -458,7 +464,8 @@ bool TransportManager::recvDataFromNet( return true; } -bool TransportManager::getDataToSendToNet(NetTransport::Data& data) { +bool TransportManager::getDataToSendToNet(NetTransport::Data &data) +{ // get packet to send from Q PacketPointer packet = nullptr; { @@ -484,13 +491,12 @@ bool TransportManager::getDataToSendToNet(NetTransport::Data& data) { data.source_id = packet->sourceID; data.peer.addrLen = packet->peer_info.addrLen; - memcpy(&data.peer.addr, - &(packet->peer_info.addr), - packet->peer_info.addrLen); + memcpy( + &data.peer.addr, &(packet->peer_info.addr), packet->peer_info.addrLen); if (packet->mediaType == Packet::MediaType::AV1) { - logger->info << "[S]: SeqNo " << packet->transportSequenceNumber + logger->info << "[S]: SeqNo " << packet->transportSequenceNumber << " video_frame_type: " << (int) packet->videoFrameType << std::flush; } @@ -553,7 +559,8 @@ bool TransportManager::getDataToSendToNet( memcpy( &peer_info->addr, &(packet->peer_info.addr), packet->peer_info.addrLen); *addrLen = peer_info->addrLen; - logger->info << "[S]: Type:" << packet->packetType << ", " << packet->encodedSequenceNum << std::flush; + logger->info << "[S]: Type:" << packet->packetType << ", " + << packet->encodedSequenceNum << std::flush; data_out = std::move(packet->encoded_data);