From a0e8aafdfbcda9ff6589f0033e90224f889eee65 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 20 Feb 2024 13:43:08 +0100 Subject: [PATCH 01/19] Initial draft of the Publihser --- include/kafkaopmon/OpMonPublisher.hpp | 51 +++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 include/kafkaopmon/OpMonPublisher.hpp diff --git a/include/kafkaopmon/OpMonPublisher.hpp b/include/kafkaopmon/OpMonPublisher.hpp new file mode 100644 index 0000000..8707c58 --- /dev/null +++ b/include/kafkaopmon/OpMonPublisher.hpp @@ -0,0 +1,51 @@ +/** + * @file OpMonPublisher.hpp + * + * This is the interface to broadcast OpMon entries object in our DAQ system + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + + +#ifndef KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_ +#define KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_ + +#include +#include + +#include +#include + +#include "opmonlib/info/test.pb.h" + +namespace dunedaq::kafkaopmon { + + class OpMonPublihser { + + OpMonPublihser( const nlohmann::json& conf ); + + OpMonPublihser() = delete; + OpMonPublihser( const OpMonPublihser & ) = delete; + OpMonPublihser & operator = ( const OpMonPublihser & ) = delete; + OpMonPublihser( OpMonPublihser && ) = delete; + OpMonPublihser & operator = ( OpMonPublihser && ) = delete; + + ~OpMonPublihser() {;} + + bool publish( dunedaq::opmon::OpMonEntry && ); + + protected: + std::string topic( const dunedaq::opmon::OpMonEntry & ) const; + std::string key( const dunedaq::opmon::OpMonEntry & ) const; + + private: + std::unique_ptr m_producer; + std::string m_default_topic = "monitoring.opmon_stream"; + + }; + +} + +#endif //KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_ From d9c6b7558e9293a6d5740c097ef7ff87eee208d1 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 20 Feb 2024 14:35:36 +0100 Subject: [PATCH 02/19] Initial skeleton for producer --- src/OpMonPublisher.cpp | 70 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 src/OpMonPublisher.cpp diff --git a/src/OpMonPublisher.cpp b/src/OpMonPublisher.cpp new file mode 100644 index 0000000..c49202f --- /dev/null +++ b/src/OpMonPublisher.cpp @@ -0,0 +1,70 @@ +/** + * @file OpMonPublisher.cpp OpMonPublisher Class Implementation + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + + +#include "kafkaopmon/OpMonPublisher.hpp" + +using namespace dunedaq::kafkaopmon; + +OpMonPublisher::OpMonPublisher( const nlohmann::json& conf) { + + RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + std::string errstr; + + // Good observations here https://www.confluent.io/blog/modern-cpp-kafka-api-for-safe-easy-messaging/ + + // auto it = conf.find("bootstrap"); + // if ( it == conf.end() ) { + // std::cerr << "Missing bootstrap from json file"; + // throw std::runtime_error( "Missing bootstrap from json file" ); + // } + + // k_conf->set("bootstrap.servers", *it, errstr); + // if(errstr != ""){ + // throw std::runtime_error( errstr ); + // } + + // std::string client_id; + // it = conf.find( "cliend_id" ); + // if ( it != conf.end() ) + // client_id = *it; + // else if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME")) + // client_id = env_p; + // else + // client_id = "erskafkaproducerdefault"; + + // k_conf->set("client.id", client_id, errstr); + // if(errstr != ""){ + // throw std::runtime_error( errstr ); + // } + + // //Create producer instance + // m_producer.reset(RdKafka::Producer::create(k_conf, errstr)); + + // if(errstr != ""){ + // throw std::runtime_error( errstr ); + // } + + // it = conf.find("default_topic"); + // if (it != conf.end()) m_default_topic = *it; + +} + + +bool OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) { + + std::string binary; + entry.SerializeToString( & binary ); + + auto topic = extract_topic( entry ); + auto key = extractkey( entry ); + + + + +} From bdedf5835073dbe255e9ac45318b8d716d7f1390 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 23 Feb 2024 18:44:43 +0100 Subject: [PATCH 03/19] Compiling draft of the publisher --- CMakeLists.txt | 2 +- include/kafkaopmon/OpMonPublisher.hpp | 69 +++++++++++++--- src/OpMonPublisher.cpp | 109 ++++++++++++++++++-------- 3 files changed, 134 insertions(+), 46 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8903068..3e6d66e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,7 +14,7 @@ find_package(Boost COMPONENTS program_options REQUIRED) -daq_add_library(*.cpp LINK_LIBRARIES opmonlib::opmonlib) +daq_add_library(*.cpp LINK_LIBRARIES opmonlib::opmonlib RdKafka::rdkafka RdKafka::rdkafka++) ############################################################################## diff --git a/include/kafkaopmon/OpMonPublisher.hpp b/include/kafkaopmon/OpMonPublisher.hpp index 8707c58..88aad2c 100644 --- a/include/kafkaopmon/OpMonPublisher.hpp +++ b/include/kafkaopmon/OpMonPublisher.hpp @@ -14,31 +14,74 @@ #include #include +#include #include #include -#include "opmonlib/info/test.pb.h" +#include "opmonlib/opmon_entry.pb.h" + +namespace dunedaq { + + ERS_DECLARE_ISSUE( kafkaopmon, + MissingParameter, + "No " << parameter << " in " << conf, + ((std::string)parameter)((std::string)conf) + ) + + ERS_DECLARE_ISSUE( kafkaopmon, + FailedConfiguration, + "Invalid " << parameter << ", cause: " << reason, + ((std::string)parameter)((std::string)reason) + ) + + ERS_DECLARE_ISSUE( kafkaopmon, + FailedProducerCreation, + "Failed creation of a Kafka producer, cause: " << reason, + ((std::string)reason) + ) + + ERS_DECLARE_ISSUE( kafkaopmon, + FailedProduce, + "Failed produce of message with key " << key << ", cause: " << reason, + ((std::string)key)((std::string)reason) + ) + + ERS_DECLARE_ISSUE( kafkaopmon, + TimeoutReachedWhileFlushing, + "Publisher destroyed before all messages were completed, timeout: " << timeout << " ms", + ((int)timeout) + ) + + +} // dunedaq namespace + + + namespace dunedaq::kafkaopmon { - class OpMonPublihser { + class OpMonPublisher { - OpMonPublihser( const nlohmann::json& conf ); + OpMonPublisher( const nlohmann::json& conf ); - OpMonPublihser() = delete; - OpMonPublihser( const OpMonPublihser & ) = delete; - OpMonPublihser & operator = ( const OpMonPublihser & ) = delete; - OpMonPublihser( OpMonPublihser && ) = delete; - OpMonPublihser & operator = ( OpMonPublihser && ) = delete; + OpMonPublisher() = delete; + OpMonPublisher( const OpMonPublisher & ) = delete; + OpMonPublisher & operator = ( const OpMonPublisher & ) = delete; + OpMonPublisher( OpMonPublisher && ) = delete; + OpMonPublisher & operator = ( OpMonPublisher && ) = delete; - ~OpMonPublihser() {;} + ~OpMonPublisher(); - bool publish( dunedaq::opmon::OpMonEntry && ); + bool publish( dunedaq::opmon::OpMonEntry && ) noexcept ; protected: - std::string topic( const dunedaq::opmon::OpMonEntry & ) const; - std::string key( const dunedaq::opmon::OpMonEntry & ) const; + std::string extract_topic( const dunedaq::opmon::OpMonEntry & e) const noexcept { + return e.opmon_id() + '/' + e.measurement() ; + } + std::string extract_key( const dunedaq::opmon::OpMonEntry & ) const noexcept { + return m_default_topic; + } private: std::unique_ptr m_producer; @@ -46,6 +89,6 @@ namespace dunedaq::kafkaopmon { }; -} +} // namespace dunedaq::kafkaopmon #endif //KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_ diff --git a/src/OpMonPublisher.cpp b/src/OpMonPublisher.cpp index c49202f..02d8acf 100644 --- a/src/OpMonPublisher.cpp +++ b/src/OpMonPublisher.cpp @@ -16,55 +16,100 @@ OpMonPublisher::OpMonPublisher( const nlohmann::json& conf) { RdKafka::Conf * k_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); std::string errstr; - // Good observations here https://www.confluent.io/blog/modern-cpp-kafka-api-for-safe-easy-messaging/ + // Good observations on threadsafety here + // https://www.confluent.io/blog/modern-cpp-kafka-api-for-safe-easy-messaging/ - // auto it = conf.find("bootstrap"); - // if ( it == conf.end() ) { - // std::cerr << "Missing bootstrap from json file"; - // throw std::runtime_error( "Missing bootstrap from json file" ); - // } + auto it = conf.find("bootstrap"); + if ( it == conf.end() ) { + throw MissingParameter(ERS_HERE, + "bootstrap", + nlohmann::to_string(conf) ); + } - // k_conf->set("bootstrap.servers", *it, errstr); - // if(errstr != ""){ - // throw std::runtime_error( errstr ); - // } + k_conf->set("bootstrap.servers", *it, errstr); + if( ! errstr.empty() ) { + throw FailedConfiguration(ERS_HERE, + "bootstrap.servers", + errstr); + } - // std::string client_id; - // it = conf.find( "cliend_id" ); - // if ( it != conf.end() ) - // client_id = *it; - // else if(const char* env_p = std::getenv("DUNEDAQ_APPLICATION_NAME")) - // client_id = env_p; - // else - // client_id = "erskafkaproducerdefault"; + std::string client_id; + it = conf.find( "cliend_id" ); + if ( it != conf.end() ) + client_id = *it; + else + client_id = "kafkaopmon_default_producer"; - // k_conf->set("client.id", client_id, errstr); - // if(errstr != ""){ - // throw std::runtime_error( errstr ); - // } + k_conf->set("client.id", client_id, errstr); + if ( ! errstr.empty() ) { + ers::error( FailedConfiguration(ERS_HERE, "client.id", errstr ) ); + } - // //Create producer instance - // m_producer.reset(RdKafka::Producer::create(k_conf, errstr)); + // Create producer instance + m_producer.reset(RdKafka::Producer::create(k_conf, errstr)); - // if(errstr != ""){ - // throw std::runtime_error( errstr ); - // } + if( ! m_producer ){ + throw FailedProducerCreation(ERS_HERE, errstr); + } + + it = conf.find("default_topic"); + if (it != conf.end()) m_default_topic = *it; - // it = conf.find("default_topic"); - // if (it != conf.end()) m_default_topic = *it; +} + + +OpMonPublisher::~OpMonPublisher() { + int timeout_ms = 500; + RdKafka::ErrorCode err = m_producer -> flush( timeout_ms ); + + if ( err == RdKafka::ERR__TIMED_OUT ) { + ers::warning( TimeoutReachedWhileFlushing( ERS_HERE, timeout_ms ) ); + } + } -bool OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) { +bool OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) noexcept { std::string binary; entry.SerializeToString( & binary ); auto topic = extract_topic( entry ); - auto key = extractkey( entry ); + auto key = extract_key( entry ); - + RdKafka::ErrorCode err = m_producer -> produce( topic, + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast(binary.c_str()), binary.size(), + key.c_str(), key.size(), + 0, + nullptr + ); + + if ( err == RdKafka::ERR_NO_ERROR ) return true; + std::string err_cause; + switch( err ) { + case RdKafka::ERR__QUEUE_FULL : + err_cause = "maximum number of outstanding messages reached"; + break; + case RdKafka::ERR_MSG_SIZE_TOO_LARGE : + err_cause = "message too large"; + break; + case RdKafka::ERR__UNKNOWN_PARTITION : + err_cause = "Unknown partition"; + break; + case RdKafka::ERR__UNKNOWN_TOPIC : + err_cause = "Unknown topic"; + break; + default: + err_cause = "unknown"; + break; + } + + ers::error( FailedProduce(ERS_HERE, key, err_cause)); + + return false; } From e2f9807cec08990fa95f87da1ebd7afb2858c053 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 27 Feb 2024 19:35:26 +0100 Subject: [PATCH 04/19] Add publisher test app --- CMakeLists.txt | 3 +- include/kafkaopmon/OpMonPublisher.hpp | 11 ++--- src/OpMonPublisher.cpp | 4 +- test/apps/opmon_publisher_test.cxx | 59 +++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 7 deletions(-) create mode 100644 test/apps/opmon_publisher_test.cxx diff --git a/CMakeLists.txt b/CMakeLists.txt index 3e6d66e..4f4d15a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,7 +14,7 @@ find_package(Boost COMPONENTS program_options REQUIRED) -daq_add_library(*.cpp LINK_LIBRARIES opmonlib::opmonlib RdKafka::rdkafka RdKafka::rdkafka++) +daq_add_library(*.cpp LINK_LIBRARIES ${Boost_LIBRARIES} opmonlib::opmonlib RdKafka::rdkafka RdKafka::rdkafka++) ############################################################################## @@ -41,6 +41,7 @@ target_include_directories(kafkaopmon_kafkaOpmonService_duneOpmonService PUBLIC) # test application daq_add_application( test_flattener test_flattener.cxx TEST LINK_LIBRARIES kafkaopmon) +daq_add_application( opmon_publisher_test opmon_publisher_test.cxx TEST LINK_LIBRARIES kafkaopmon ) daq_install() diff --git a/include/kafkaopmon/OpMonPublisher.hpp b/include/kafkaopmon/OpMonPublisher.hpp index 88aad2c..eadca93 100644 --- a/include/kafkaopmon/OpMonPublisher.hpp +++ b/include/kafkaopmon/OpMonPublisher.hpp @@ -63,8 +63,9 @@ namespace dunedaq::kafkaopmon { class OpMonPublisher { + public: OpMonPublisher( const nlohmann::json& conf ); - + OpMonPublisher() = delete; OpMonPublisher( const OpMonPublisher & ) = delete; OpMonPublisher & operator = ( const OpMonPublisher & ) = delete; @@ -76,11 +77,11 @@ namespace dunedaq::kafkaopmon { bool publish( dunedaq::opmon::OpMonEntry && ) noexcept ; protected: - std::string extract_topic( const dunedaq::opmon::OpMonEntry & e) const noexcept { - return e.opmon_id() + '/' + e.measurement() ; + std::string extract_topic( const dunedaq::opmon::OpMonEntry & ) const noexcept { + return m_default_topic; } - std::string extract_key( const dunedaq::opmon::OpMonEntry & ) const noexcept { - return m_default_topic; + std::string extract_key( const dunedaq::opmon::OpMonEntry & e) const noexcept { + return e.opmon_id() + '/' + e.measurement() ; } private: diff --git a/src/OpMonPublisher.cpp b/src/OpMonPublisher.cpp index 02d8acf..c7acc3b 100644 --- a/src/OpMonPublisher.cpp +++ b/src/OpMonPublisher.cpp @@ -102,7 +102,9 @@ bool OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) noexcept { err_cause = "Unknown partition"; break; case RdKafka::ERR__UNKNOWN_TOPIC : - err_cause = "Unknown topic"; + err_cause = "Unknown topic (" ; + err_cause += topic; + err_cause += ')'; break; default: err_cause = "unknown"; diff --git a/test/apps/opmon_publisher_test.cxx b/test/apps/opmon_publisher_test.cxx new file mode 100644 index 0000000..ae252d9 --- /dev/null +++ b/test/apps/opmon_publisher_test.cxx @@ -0,0 +1,59 @@ +/** + * @brief test application for the opmon publisher + */ + +#include +#include + +#include +#include "opmonlib/Utils.hpp" +#include "opmonlib/info/test.pb.h" + +#include + +using nlohmann::json; +namespace po = boost::program_options; + +using namespace dunedaq::kafkaopmon; +using namespace dunedaq::opmonlib; +int +main(int argc, char const* argv[]) +{ + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("bootstrap", po::value()->default_value("monkafka.cern.ch:30092"), "kafka bootstrap server") + ("topic,t", po::value(), "Optional specification of a topic" ) + ; + + po::variables_map input_map; + po::store(po::parse_command_line(argc, argv, desc), input_map); + po::notify(input_map); + + if ( input_map.count("help") ) { + std::cout << desc << std::endl; + return 0; + } + + json conf; + conf["bootstrap"] = input_map["bootstrap"].as(); + conf["cliend_id"] = "opmon_publisher_test"; + if ( input_map.count("topic") ) { + conf["default_topic"] = input_map["topic"].as() ; + } + + OpMonPublisher p(conf); + + + + for( auto i = 0 ; i < 50; ++i ) { + dunedaq::opmon::TestInfo ti; + ti.set_int_example( 10*i ); + ti.set_string_example( "test" ); + p.publish( to_entry( ti ) ); + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + + return 0; +} From a8eae5a310fb563f23e0ca3a0f330133ac2b22b8 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 27 Feb 2024 20:48:23 +0100 Subject: [PATCH 05/19] adding draft of OpMonSubscriber --- python/kafkaopmon/OpMonSubscriber.py | 130 +++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 python/kafkaopmon/OpMonSubscriber.py diff --git a/python/kafkaopmon/OpMonSubscriber.py b/python/kafkaopmon/OpMonSubscriber.py new file mode 100644 index 0000000..b9e6ab4 --- /dev/null +++ b/python/kafkaopmon/OpMonSubscriber.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 + +from kafka import KafkaConsumer +import threading +import socket +import os +import re +import logging +import getpass + +import opmonlib.opmon_entry_pb2 as entry +import google.protobuf.message as msg + +class OpMonFunction : + def __init(self, + function, + opmon_id : re.Pattern, + measurement : re.Patern) : + self.function = function + self.opmon_id = opmon_id + self.measurement = measurement + + def match(key : str) -> bool : + temp_opmon_id,temp_measurement = key.split('/',1); + if not self.opmon_id.match(temp_opmon_id) return False + if not self.measurement.match(temp_measurment) return False + return True + + def execute( e : entry.OpMonEntry ) : + self.function(e) + +class OpMonSubscriber: + def __init__(self, bootstrap, group_id=None, timeout_ms=500, topics=[]) : + ## Options from configurations + self.bootstrap = bootstrap + self.group_id = group_id + self.timeout = timeout_ms + self.topics = topics + ## runtime options + self.running = False + self.functions = dict() + self.thread = threading.Thread(target=self.message_loop) + + def default_id(self) -> str: + node = socket.gethostname() + user = getpass.getuser() + process = os.getpid() + thread = threading.get_ident() + id = "{}-{}-{}-{}".format(node, user, process, thread) + return id + + def add_callback(self, + name, function, opmon_id = '.*', measurement = '.*') -> bool: + if ( name in self.functions ) : return False + + was_running = self.running + if (was_running) : self.stop() + + f = OpMonFunction( function = function, + opmon_id = re.compile(opmon_id) + measurement = re.compile(measurement) ) + + self.functions[name] = f + + if (was_running) : self.start() + return True + + def clear_callbacks(self): + if ( self.running ) : + self.stop() + self.functions.clear() + + def remove_callback(self, name) -> bool: + if ( name not in sef.functions.keys() ) : return False + + was_running = self.running + if (was_running) : self.stop() + + self.functions.pop(name) + + if ( was_running and len(self.functions)>0 ) : self.start() + return True + + def start(self): + logging.info("Starting run") + self.running = True + self.thread.start() + + def stop(self) : + self.running = False + self.thread.join() + + def message_loop(self) : + if not self.group : group_id = self.default_id() + else: group_id = self.group + + consumer = KafkaConsumer(bootstrap_servers=self.bootstrap, + group_id=group_id, + client_id=self.default_id(), + consumer_timeout_ms=self.timeout) + + topics = self.topics + if len(topics) == 0 : topics = ["ers_stream"] + consumer.subscribe(["monitoring." + s for s in topics]) + + logging.info("ID:", group_id, "running with functions:", *self.functions.keys()) + + while ( self.running ) : + try: + message_it = iter(consumer) + message = next(message_it) + timestamp = message.timestamp + key = message.key.decode('ascii') + ## The key from the message is binary + ## In order to correctly match an ascii regex, we have to convert + + for function in self.functions.values() : + if function.match(key) : + e = entry.OpMonEntry() + e.ParseFromString( message.value ) + function.execute(e) + + except msg.DecodeError : + logging.error("Could not parse message") + except StopIteration : + pass + except Exception as e: + logging.error(e) + + logging.info("Stop run") From cdd9d79615bc5a14d4831b605ed50c7a14ac635c Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 28 Feb 2024 15:40:14 +0100 Subject: [PATCH 06/19] Initial draft of the test for the subscriber --- python/kafkaopmon/OpMonSubscriber.py | 5 +-- test/scripts/__init__.py | 0 test/scripts/opmon_subscriber_test | 52 ++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 test/scripts/__init__.py create mode 100755 test/scripts/opmon_subscriber_test diff --git a/python/kafkaopmon/OpMonSubscriber.py b/python/kafkaopmon/OpMonSubscriber.py index b9e6ab4..f3067ca 100644 --- a/python/kafkaopmon/OpMonSubscriber.py +++ b/python/kafkaopmon/OpMonSubscriber.py @@ -7,6 +7,7 @@ import re import logging import getpass +import sys import opmonlib.opmon_entry_pb2 as entry import google.protobuf.message as msg @@ -30,11 +31,12 @@ def execute( e : entry.OpMonEntry ) : self.function(e) class OpMonSubscriber: - def __init__(self, bootstrap, group_id=None, timeout_ms=500, topics=[]) : + def __init__(self, bootstrap, group_id=None, timeout_ms=500, topics=["opmon_stream"]) : ## Options from configurations self.bootstrap = bootstrap self.group_id = group_id self.timeout = timeout_ms + if len(topics) == 0 : raise ValueError("topic list is empty") self.topics = topics ## runtime options self.running = False @@ -100,7 +102,6 @@ def message_loop(self) : consumer_timeout_ms=self.timeout) topics = self.topics - if len(topics) == 0 : topics = ["ers_stream"] consumer.subscribe(["monitoring." + s for s in topics]) logging.info("ID:", group_id, "running with functions:", *self.functions.keys()) diff --git a/test/scripts/__init__.py b/test/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/scripts/opmon_subscriber_test b/test/scripts/opmon_subscriber_test new file mode 100755 index 0000000..6a140cb --- /dev/null +++ b/test/scripts/opmon_subscriber_test @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +import kafkaopmon.OpMonSubscriber as opmon_sub + +import google.protobuf.json_format as pb_json +import opmonlib.opmon_entry_pb2 as opmon_schema +import json +import click +import time +import logging + +CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--kafka-address', type=click.STRING, default="monkafka.cern.ch", help="address of the kafka broker") +@click.option('--kafka-port', type=click.INT, default=30092, help='port of the kafka broker') +@click.option('--topic', type=click.STRING, multiple=True, default=["opmon_stream"]) +@click.option('--running-seconds', type=click.INT, default=15, help='Number of seconds of the run') + +def cli(kafka_address, kafka_port, topic, running_seconds) : + + logging.basicConfig( format='%(asctime)s %(levelname)-8s %(message)s', + level=logging.DEBUG, + datefmt='%Y-%m-%d %H:%M:%S') + + bootstrap = f"{kafka_address}:{kafka_port}" + + sub = opmon_sub.OpMonSubscriber( bootstrap=bootstrap, + topics=topic ) + + sub.add_callback(name="acceptable", + function=acceptable_function ) + sub.add_callback(name="failure", + function=rejected_function, + opmon_id='[0-9]+' ) + + sub.start() + time.sleep(running_seconds) + sub.stop() + + +def acceptable_function( entry : opmon_schema.OpMonEntry ) : + logging.info(pb_json.MessageToJson(entry) + + +def rejected_function( entry : opmon_schema.OpMonEntry ) : + raise RuntimeError("This should never be callsed") + + + +if __name__ == '__main__': + cli(show_default=True, standalone_mode=True) \ No newline at end of file From 8ba2645d165422518c8d2ae757d17f121ef7c17e Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 28 Feb 2024 17:10:51 +0100 Subject: [PATCH 07/19] Messages seen correctly in the subscriber --- python/kafkaopmon/OpMonSubscriber.py | 34 ++++++++++++++++------------ test/apps/opmon_publisher_test.cxx | 8 ++++--- test/scripts/opmon_subscriber_test | 7 +++--- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/python/kafkaopmon/OpMonSubscriber.py b/python/kafkaopmon/OpMonSubscriber.py index f3067ca..150d518 100644 --- a/python/kafkaopmon/OpMonSubscriber.py +++ b/python/kafkaopmon/OpMonSubscriber.py @@ -13,23 +13,24 @@ import google.protobuf.message as msg class OpMonFunction : - def __init(self, - function, - opmon_id : re.Pattern, - measurement : re.Patern) : + def __init__(self, + function, + opmon_id : re.Pattern, + measurement : re.Pattern) : self.function = function self.opmon_id = opmon_id self.measurement = measurement - def match(key : str) -> bool : - temp_opmon_id,temp_measurement = key.split('/',1); - if not self.opmon_id.match(temp_opmon_id) return False - if not self.measurement.match(temp_measurment) return False + def match(self, key : str) -> bool : + opmon_id,measure = key.split('/',1); + if not self.opmon_id.match(opmon_id) : return False + if not self.measurement.match(measure) : return False return True - def execute( e : entry.OpMonEntry ) : + def execute(self, e : entry.OpMonEntry ) : self.function(e) + class OpMonSubscriber: def __init__(self, bootstrap, group_id=None, timeout_ms=500, topics=["opmon_stream"]) : ## Options from configurations @@ -52,14 +53,16 @@ def default_id(self) -> str: return id def add_callback(self, - name, function, opmon_id = '.*', measurement = '.*') -> bool: + name, function, + opmon_id = '.*', + measurement = '.*') -> bool: if ( name in self.functions ) : return False was_running = self.running if (was_running) : self.stop() f = OpMonFunction( function = function, - opmon_id = re.compile(opmon_id) + opmon_id = re.compile(opmon_id), measurement = re.compile(measurement) ) self.functions[name] = f @@ -93,8 +96,8 @@ def stop(self) : self.thread.join() def message_loop(self) : - if not self.group : group_id = self.default_id() - else: group_id = self.group + if not self.group_id : group_id = self.default_id() + else: group_id = self.group_id consumer = KafkaConsumer(bootstrap_servers=self.bootstrap, group_id=group_id, @@ -104,7 +107,8 @@ def message_loop(self) : topics = self.topics consumer.subscribe(["monitoring." + s for s in topics]) - logging.info("ID:", group_id, "running with functions:", *self.functions.keys()) + logging.info(f"ID: %s running with functions {('%s, ' * len(self.functions.keys()))[:-2]}", + group_id, *self.functions.keys()) while ( self.running ) : try: @@ -114,7 +118,7 @@ def message_loop(self) : key = message.key.decode('ascii') ## The key from the message is binary ## In order to correctly match an ascii regex, we have to convert - + for function in self.functions.values() : if function.match(key) : e = entry.OpMonEntry() diff --git a/test/apps/opmon_publisher_test.cxx b/test/apps/opmon_publisher_test.cxx index ae252d9..2e8dea6 100644 --- a/test/apps/opmon_publisher_test.cxx +++ b/test/apps/opmon_publisher_test.cxx @@ -43,17 +43,19 @@ main(int argc, char const* argv[]) } OpMonPublisher p(conf); - - for( auto i = 0 ; i < 50; ++i ) { dunedaq::opmon::TestInfo ti; ti.set_int_example( 10*i ); ti.set_string_example( "test" ); - p.publish( to_entry( ti ) ); + auto e = to_entry( ti ); + e.set_opmon_id("test.app"); + p.publish( std::move(e) ); std::this_thread::sleep_for(std::chrono::milliseconds(5)); } + + std::this_thread::sleep_for(std::chrono::seconds(2)); return 0; } diff --git a/test/scripts/opmon_subscriber_test b/test/scripts/opmon_subscriber_test index 6a140cb..21d5352 100755 --- a/test/scripts/opmon_subscriber_test +++ b/test/scripts/opmon_subscriber_test @@ -26,7 +26,8 @@ def cli(kafka_address, kafka_port, topic, running_seconds) : bootstrap = f"{kafka_address}:{kafka_port}" sub = opmon_sub.OpMonSubscriber( bootstrap=bootstrap, - topics=topic ) + topics=topic, + group_id = "subscriber_tester") sub.add_callback(name="acceptable", function=acceptable_function ) @@ -40,7 +41,7 @@ def cli(kafka_address, kafka_port, topic, running_seconds) : def acceptable_function( entry : opmon_schema.OpMonEntry ) : - logging.info(pb_json.MessageToJson(entry) + logging.info(pb_json.MessageToJson(entry)) def rejected_function( entry : opmon_schema.OpMonEntry ) : @@ -49,4 +50,4 @@ def rejected_function( entry : opmon_schema.OpMonEntry ) : if __name__ == '__main__': - cli(show_default=True, standalone_mode=True) \ No newline at end of file + cli(show_default=True, standalone_mode=True) From 6105c96e61d4ca4132edae0aa7566da5b94acfb8 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 28 Feb 2024 17:51:18 +0100 Subject: [PATCH 08/19] Update publisher test with multi threaded inputs --- test/apps/opmon_publisher_test.cxx | 37 ++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/test/apps/opmon_publisher_test.cxx b/test/apps/opmon_publisher_test.cxx index 2e8dea6..7ed4b23 100644 --- a/test/apps/opmon_publisher_test.cxx +++ b/test/apps/opmon_publisher_test.cxx @@ -4,6 +4,7 @@ #include #include +#include #include #include "opmonlib/Utils.hpp" @@ -16,6 +17,11 @@ namespace po = boost::program_options; using namespace dunedaq::kafkaopmon; using namespace dunedaq::opmonlib; + + + + + int main(int argc, char const* argv[]) { @@ -24,6 +30,7 @@ main(int argc, char const* argv[]) ("help,h", "produce help message") ("bootstrap", po::value()->default_value("monkafka.cern.ch:30092"), "kafka bootstrap server") ("topic,t", po::value(), "Optional specification of a topic" ) + ("n_threads,n", po::value()->default_value(10), "Number of threads used for test") ; po::variables_map input_map; @@ -43,18 +50,30 @@ main(int argc, char const* argv[]) } OpMonPublisher p(conf); + + auto pub_func = [&](int i){ + auto opmon_id = "test.app.thread_" + std::to_string(i); + for (auto j = 0; j < 20; ++j ) { + dunedaq::opmon::TestInfo ti; + ti.set_int_example( j*1000 + i ); + ti.set_string_example( "test" ); + auto e = to_entry( ti ); + e.set_opmon_id(opmon_id); + p.publish( std::move(e) ); + } + }; + + auto n = input_map["n_threads"].as() ; + std::vector> threads(n); - for( auto i = 0 ; i < 50; ++i ) { - dunedaq::opmon::TestInfo ti; - ti.set_int_example( 10*i ); - ti.set_string_example( "test" ); - auto e = to_entry( ti ); - e.set_opmon_id("test.app"); - p.publish( std::move(e) ); - - std::this_thread::sleep_for(std::chrono::milliseconds(5)); + for( auto i = 0 ; i < n; ++i ) { + threads[i] = async(std::launch::async, pub_func, i); } + for ( auto & t : threads ) { + t.get(); + } + std::this_thread::sleep_for(std::chrono::seconds(2)); return 0; From cc02269d217421b20073fd62e364fc950e5dc946 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 1 Mar 2024 15:57:59 +0100 Subject: [PATCH 09/19] Initial unittest --- CMakeLists.txt | 5 ++-- include/kafkaopmon/OpMonPublisher.hpp | 2 +- src/OpMonPublisher.cpp | 7 +++--- unittest/OpMonPublisher_test.cxx | 33 +++++++++++++++++++++++++++ 4 files changed, 40 insertions(+), 7 deletions(-) create mode 100644 unittest/OpMonPublisher_test.cxx diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f4d15a..dd229c9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ find_package(CPR REQUIRED) find_package(CURL REQUIRED) find_package(RdKafka REQUIRED) find_package(ers REQUIRED) -find_package(Boost COMPONENTS program_options REQUIRED) +find_package(Boost COMPONENTS unit_test_framework program_options REQUIRED) @@ -35,7 +35,8 @@ target_include_directories(kafkaopmon_kafkaOpmonService_duneOpmonService PUBLIC) # No integration tests written ############################################################################## -# No unit tests written +# unit tests +daq_add_unit_test( OpMonPublisher_test LINK_LIBRARIES kafkaopmon ) ############################################################################## # test application diff --git a/include/kafkaopmon/OpMonPublisher.hpp b/include/kafkaopmon/OpMonPublisher.hpp index eadca93..d185222 100644 --- a/include/kafkaopmon/OpMonPublisher.hpp +++ b/include/kafkaopmon/OpMonPublisher.hpp @@ -74,7 +74,7 @@ namespace dunedaq::kafkaopmon { ~OpMonPublisher(); - bool publish( dunedaq::opmon::OpMonEntry && ) noexcept ; + void publish( dunedaq::opmon::OpMonEntry && ) const ; protected: std::string extract_topic( const dunedaq::opmon::OpMonEntry & ) const noexcept { diff --git a/src/OpMonPublisher.cpp b/src/OpMonPublisher.cpp index c7acc3b..c1975d7 100644 --- a/src/OpMonPublisher.cpp +++ b/src/OpMonPublisher.cpp @@ -70,7 +70,7 @@ OpMonPublisher::~OpMonPublisher() { } -bool OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) noexcept { +void OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) const { std::string binary; entry.SerializeToString( & binary ); @@ -87,7 +87,7 @@ bool OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) noexcept { nullptr ); - if ( err == RdKafka::ERR_NO_ERROR ) return true; + if ( err == RdKafka::ERR_NO_ERROR ) return ; std::string err_cause; @@ -111,7 +111,6 @@ bool OpMonPublisher::publish( dunedaq::opmon::OpMonEntry && entry ) noexcept { break; } - ers::error( FailedProduce(ERS_HERE, key, err_cause)); + throw FailedProduce(ERS_HERE, key, err_cause) ; - return false; } diff --git a/unittest/OpMonPublisher_test.cxx b/unittest/OpMonPublisher_test.cxx new file mode 100644 index 0000000..567caf5 --- /dev/null +++ b/unittest/OpMonPublisher_test.cxx @@ -0,0 +1,33 @@ +/** + * @file OpMonPublisher_test.cxx Test application that tests invalid constructions of OpMonPublisher + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#define BOOST_TEST_MODULE opmon_publisher_test // NOLINT + +#include "boost/test/unit_test.hpp" + +#include + +using namespace dunedaq::kafkaopmon; +using namespace dunedaq::opmon; + +BOOST_AUTO_TEST_SUITE(OpMonPublisher_Test) + +BOOST_AUTO_TEST_CASE(Invalid_Creation) { + + nlohmann::json conf; + + BOOST_CHECK_THROW( OpMonPublisher p(conf), + ers::Issue ); + +} + +BOOST_AUTO_TEST_SUITE_END() + + + + From 17eefc1ffac79f1884758163474e69ceabdcfede Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 1 Mar 2024 18:32:47 +0100 Subject: [PATCH 10/19] Complete unit test for OpMonPublisher --- src/OpMonPublisher.cpp | 4 ++-- unittest/OpMonPublisher_test.cxx | 10 +++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/OpMonPublisher.cpp b/src/OpMonPublisher.cpp index c1975d7..a83f0f6 100644 --- a/src/OpMonPublisher.cpp +++ b/src/OpMonPublisher.cpp @@ -44,10 +44,10 @@ OpMonPublisher::OpMonPublisher( const nlohmann::json& conf) { if ( ! errstr.empty() ) { ers::error( FailedConfiguration(ERS_HERE, "client.id", errstr ) ); } - + // Create producer instance m_producer.reset(RdKafka::Producer::create(k_conf, errstr)); - + if( ! m_producer ){ throw FailedProducerCreation(ERS_HERE, errstr); } diff --git a/unittest/OpMonPublisher_test.cxx b/unittest/OpMonPublisher_test.cxx index 567caf5..20bab61 100644 --- a/unittest/OpMonPublisher_test.cxx +++ b/unittest/OpMonPublisher_test.cxx @@ -22,8 +22,16 @@ BOOST_AUTO_TEST_CASE(Invalid_Creation) { nlohmann::json conf; BOOST_CHECK_THROW( OpMonPublisher p(conf), - ers::Issue ); + MissingParameter ); + conf["bootstrap"] = "invalid.address.none:1234"; + + BOOST_CHECK_NO_THROW( OpMonPublisher p(conf) ); + + // this is a bit annoyting, but it is what it is + // Kakfa creates a producer but the check of the correctness is done asynchronously + // As a result, even with an invalid address, the best we get in a very silent error message + } BOOST_AUTO_TEST_SUITE_END() From 259af3ef8c09511f083f11a19c981627acb34427 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Tue, 2 Apr 2024 18:29:16 +0200 Subject: [PATCH 11/19] OpMonId is a class --- include/kafkaopmon/OpMonPublisher.hpp | 3 ++- test/apps/opmon_publisher_test.cxx | 9 ++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/include/kafkaopmon/OpMonPublisher.hpp b/include/kafkaopmon/OpMonPublisher.hpp index d185222..cf63761 100644 --- a/include/kafkaopmon/OpMonPublisher.hpp +++ b/include/kafkaopmon/OpMonPublisher.hpp @@ -20,6 +20,7 @@ #include #include "opmonlib/opmon_entry.pb.h" +#include "opmonlib/Utils.hpp" namespace dunedaq { @@ -81,7 +82,7 @@ namespace dunedaq::kafkaopmon { return m_default_topic; } std::string extract_key( const dunedaq::opmon::OpMonEntry & e) const noexcept { - return e.opmon_id() + '/' + e.measurement() ; + return dunedaq::opmonlib::to_string(e.origin()) + '/' + e.measurement() ; } private: diff --git a/test/apps/opmon_publisher_test.cxx b/test/apps/opmon_publisher_test.cxx index 7ed4b23..df8bc1b 100644 --- a/test/apps/opmon_publisher_test.cxx +++ b/test/apps/opmon_publisher_test.cxx @@ -52,13 +52,16 @@ main(int argc, char const* argv[]) OpMonPublisher p(conf); auto pub_func = [&](int i){ - auto opmon_id = "test.app.thread_" + std::to_string(i); + dunedaq::opmon::OpMonId id; + id.set_session("test"); + id.set_application("app"); + id.set_element("thread_" + std::to_string(i)); for (auto j = 0; j < 20; ++j ) { dunedaq::opmon::TestInfo ti; ti.set_int_example( j*1000 + i ); ti.set_string_example( "test" ); auto e = to_entry( ti ); - e.set_opmon_id(opmon_id); + *e.mutable_origin() = id; p.publish( std::move(e) ); } }; @@ -66,7 +69,7 @@ main(int argc, char const* argv[]) auto n = input_map["n_threads"].as() ; std::vector> threads(n); - for( auto i = 0 ; i < n; ++i ) { + for( size_t i = 0 ; i < n; ++i ) { threads[i] = async(std::launch::async, pub_func, i); } From 7da6457ee9ef49ab1c824fbf5ca7ba48ad4240ea Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 2 May 2024 13:31:51 +0200 Subject: [PATCH 12/19] Update for CustomOrigin --- test/apps/opmon_publisher_test.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/apps/opmon_publisher_test.cxx b/test/apps/opmon_publisher_test.cxx index df8bc1b..14362ec 100644 --- a/test/apps/opmon_publisher_test.cxx +++ b/test/apps/opmon_publisher_test.cxx @@ -60,7 +60,7 @@ main(int argc, char const* argv[]) dunedaq::opmon::TestInfo ti; ti.set_int_example( j*1000 + i ); ti.set_string_example( "test" ); - auto e = to_entry( ti ); + auto e = to_entry( ti, {} ); *e.mutable_origin() = id; p.publish( std::move(e) ); } From 911e97cc649f5ae9c3ad698e42ca1294d4a75c03 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 2 May 2024 17:05:25 +0200 Subject: [PATCH 13/19] Draft of the streamOpMonFacility --- CMakeLists.txt | 1 + plugins/streamOpMonFacility.cpp | 76 +++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 plugins/streamOpMonFacility.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index dd229c9..808d921 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,7 @@ daq_add_library(*.cpp LINK_LIBRARIES ${Boost_LIBRARIES} opmonlib::opmonlib RdKaf # Plugins daq_add_plugin(kafkaOpmonService duneOpmonService LINK_LIBRARIES opmonlib::opmonlib RdKafka::rdkafka RdKafka::rdkafka++ ers::ers kafkaopmon) +daq_add_plugin(streamOpMonFacility duneOpMonFacility LINK_LIBRARIES kafkaopmon) target_include_directories(kafkaopmon_kafkaOpmonService_duneOpmonService PUBLIC) ## The following application is deprecated diff --git a/plugins/streamOpMonFacility.cpp b/plugins/streamOpMonFacility.cpp new file mode 100644 index 0000000..515612b --- /dev/null +++ b/plugins/streamOpMonFacility.cpp @@ -0,0 +1,76 @@ +/** + * @file streamOpMonFacility.cpp kafkaopmon class implementation + * + * This is part of the DUNE DAQ software, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "opmonlib/OpMonFacility.hpp" +#include "kafkaopmon/OpMonPublisher.hpp" + +#include +#include +#include +#include + +using json = nlohmann::json; + +namespace dunedaq { // namespace dunedaq + + ERS_DECLARE_ISSUE(kafkaopmon, WrongURI, "Incorrect URI: " << uri, ((std::string)uri)) + +} // namespace dunedaq + +namespace dunedaq::kafkaopmon { // namespace dunedaq + +class streamOpMonFacility : public dunedaq::opmonlib::OpMonFacility +{ + std::unique_ptr m_publisher; + +public: + explicit streamOpMonFacility(std::string uri) + : dunedaq::opmonlib::OpMonFacility(uri) + { + + std::regex uri_re(R"(([a-zA-Z]+):\/\/([^:\/?#\s]+):(\d+)\/([^:\/?#\s]+))"); + //* 1st Capturing Group `([a-zA-Z])`: Matches protocol + //* 2nd Capturing Group `([^:\/?#\s])+`: Matches hostname + //* 3rd Capturing Group `(\d)`: Matches port + //* 4th Capturing Group `([^\/?#]+)?`: Matches kafka topic + + std::smatch uri_match; + if (!std::regex_match(uri, uri_match, uri_re)) { + ers::fatal(WrongURI(ERS_HERE, uri)); + } + + json config; + std::string bootstrap = uri_match[2]; + bootstrap += ':' ; + bootstrap += uri_match[3]; + config["bootstrap"] = bootstrap; + + // optionally set the ID of the application + // But really this is temporary, and in the future we should avoid env variables + if(auto env_p = std::getenv("DUNEDAQ_PARTITION")) { + if (auto app_p = std::getenv("DUNEDAQ_APPLICATION_NAME")) { + config["cliend_id"] = std::string(env_p) + '.' + std::string(app_p); + } + } + + config["default_topic"] = uri_match[4]; + + m_publisher.reset( new OpMonPublisher(config) ); + } + + void publish( opmon::OpMonEntry && e ) const override { + m_publisher -> publish(std::move(e)); + } + +}; + +} // namespace dunedaq::kafkaopmon + + +DEFINE_DUNE_OPMON_FACILITY(dunedaq::kafkaopmon::streamOpMonFacility) + From cf5332cf6dca0c3fbb4a01729c7af75ec3f57377 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 2 May 2024 17:41:05 +0200 Subject: [PATCH 14/19] adding stream facility test --- CMakeLists.txt | 3 ++- plugins/streamOpMonFacility.cpp | 34 ++++------------------- plugins/streamOpMonFacility.hpp | 37 ++++++++++++++++++++++++++ unittest/stream_OpMonFacility_test.cxx | 28 +++++++++++++++++++ 4 files changed, 72 insertions(+), 30 deletions(-) create mode 100644 plugins/streamOpMonFacility.hpp create mode 100644 unittest/stream_OpMonFacility_test.cxx diff --git a/CMakeLists.txt b/CMakeLists.txt index 808d921..d406a05 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,7 +37,8 @@ target_include_directories(kafkaopmon_kafkaOpmonService_duneOpmonService PUBLIC) ############################################################################## # unit tests -daq_add_unit_test( OpMonPublisher_test LINK_LIBRARIES kafkaopmon ) +daq_add_unit_test( OpMonPublisher_test LINK_LIBRARIES kafkaopmon ) +daq_add_unit_test( stream_OpMonFacility_test LINK_LIBRARIES kafkaopmon ) ############################################################################## # test application diff --git a/plugins/streamOpMonFacility.cpp b/plugins/streamOpMonFacility.cpp index 515612b..0579d0a 100644 --- a/plugins/streamOpMonFacility.cpp +++ b/plugins/streamOpMonFacility.cpp @@ -6,32 +6,14 @@ * received with this code. */ -#include "opmonlib/OpMonFacility.hpp" -#include "kafkaopmon/OpMonPublisher.hpp" - -#include -#include -#include -#include +#include "streamOpMonFacility.hpp" using json = nlohmann::json; -namespace dunedaq { // namespace dunedaq - - ERS_DECLARE_ISSUE(kafkaopmon, WrongURI, "Incorrect URI: " << uri, ((std::string)uri)) - -} // namespace dunedaq - namespace dunedaq::kafkaopmon { // namespace dunedaq -class streamOpMonFacility : public dunedaq::opmonlib::OpMonFacility -{ - std::unique_ptr m_publisher; - -public: - explicit streamOpMonFacility(std::string uri) - : dunedaq::opmonlib::OpMonFacility(uri) - { + streamOpMonFacility::streamOpMonFacility(std::string uri) + : dunedaq::opmonlib::OpMonFacility(uri) { std::regex uri_re(R"(([a-zA-Z]+):\/\/([^:\/?#\s]+):(\d+)\/([^:\/?#\s]+))"); //* 1st Capturing Group `([a-zA-Z])`: Matches protocol @@ -41,7 +23,7 @@ class streamOpMonFacility : public dunedaq::opmonlib::OpMonFacility std::smatch uri_match; if (!std::regex_match(uri, uri_match, uri_re)) { - ers::fatal(WrongURI(ERS_HERE, uri)); + throw WrongURI(ERS_HERE, uri); } json config; @@ -62,13 +44,7 @@ class streamOpMonFacility : public dunedaq::opmonlib::OpMonFacility m_publisher.reset( new OpMonPublisher(config) ); } - - void publish( opmon::OpMonEntry && e ) const override { - m_publisher -> publish(std::move(e)); - } - -}; - + } // namespace dunedaq::kafkaopmon diff --git a/plugins/streamOpMonFacility.hpp b/plugins/streamOpMonFacility.hpp new file mode 100644 index 0000000..ec0edf2 --- /dev/null +++ b/plugins/streamOpMonFacility.hpp @@ -0,0 +1,37 @@ +#ifndef KAFKAOPMON_PLUGIN_STREAMOPMONFACILITY_HPP_ +#define KAFKAOPMON_PLUGIN_STREAMOPMONFACILITY_HPP_ + +#include "opmonlib/OpMonFacility.hpp" +#include "kafkaopmon/OpMonPublisher.hpp" + +#include +#include +#include +#include + +using json = nlohmann::json; + +namespace dunedaq { // namespace dunedaq + + ERS_DECLARE_ISSUE(kafkaopmon, WrongURI, "Incorrect URI: " << uri, ((std::string)uri)) + +} // namespace dunedaq + +namespace dunedaq::kafkaopmon { // namespace dunedaq + +class streamOpMonFacility : public dunedaq::opmonlib::OpMonFacility +{ + std::unique_ptr m_publisher; + +public: + explicit streamOpMonFacility(std::string uri); + void publish( opmon::OpMonEntry && e ) const override { + m_publisher -> publish(std::move(e)); + } + +}; + +} // namespace dunedaq::kafkaopmon + +#endif // KAFKAOPMON_PLUGIN_STREAMOPMONFACILITY_HPP_ + diff --git a/unittest/stream_OpMonFacility_test.cxx b/unittest/stream_OpMonFacility_test.cxx new file mode 100644 index 0000000..60466d1 --- /dev/null +++ b/unittest/stream_OpMonFacility_test.cxx @@ -0,0 +1,28 @@ +/** + * @file stream_OpMoFacility_test.cxx Test application that tests and demonstrates + * basic functionality of the the streamOpMonFacility + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "opmonlib/OpMonFacility.hpp" +#include "opmonlib/Utils.hpp" +#include "opmonlib/info/test.pb.h" + +#define BOOST_TEST_MODULE stream_opmon_facility_test // NOLINT + +#include "boost/test/unit_test.hpp" + +using namespace dunedaq::opmonlib; +using namespace dunedaq::opmon; + +BOOST_AUTO_TEST_CASE(Invalid_Creation) { + + // failure due to wrong formatting + BOOST_CHECK_THROW( auto service = makeOpMonFacility("stream://bla_bla"), + OpMonFacilityCreationFailed ); + + +} From 851a7d737500a93af060b20ee5bca8ef1ab9f647 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Thu, 2 May 2024 17:44:39 +0200 Subject: [PATCH 15/19] Initial draft for test complete --- unittest/stream_OpMonFacility_test.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unittest/stream_OpMonFacility_test.cxx b/unittest/stream_OpMonFacility_test.cxx index 60466d1..83e672a 100644 --- a/unittest/stream_OpMonFacility_test.cxx +++ b/unittest/stream_OpMonFacility_test.cxx @@ -24,5 +24,5 @@ BOOST_AUTO_TEST_CASE(Invalid_Creation) { BOOST_CHECK_THROW( auto service = makeOpMonFacility("stream://bla_bla"), OpMonFacilityCreationFailed ); - + BOOST_CHECK_NO_THROW( auto service = makeOpMonFacility("stream://test.website.com:5005/no_topic") ); } From f39be10eacf38c0ba2a312cbf0bcecd55a0cf5a9 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Mon, 8 Jul 2024 14:39:04 +0200 Subject: [PATCH 16/19] OpMonId more flexible --- test/apps/opmon_publisher_test.cxx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/apps/opmon_publisher_test.cxx b/test/apps/opmon_publisher_test.cxx index 14362ec..4ad1309 100644 --- a/test/apps/opmon_publisher_test.cxx +++ b/test/apps/opmon_publisher_test.cxx @@ -53,9 +53,9 @@ main(int argc, char const* argv[]) auto pub_func = [&](int i){ dunedaq::opmon::OpMonId id; - id.set_session("test"); - id.set_application("app"); - id.set_element("thread_" + std::to_string(i)); + id += "test"; + id += "app"; + id += "thread_" + std::to_string(i); for (auto j = 0; j < 20; ++j ) { dunedaq::opmon::TestInfo ti; ti.set_int_example( j*1000 + i ); From e000226f553154ddfa2635aac49866df555500e7 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 10 Jul 2024 14:57:20 +0200 Subject: [PATCH 17/19] Add second test app --- CMakeLists.txt | 1 + test/apps/opmon_publish_to_kafka_test.cxx | 83 +++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 test/apps/opmon_publish_to_kafka_test.cxx diff --git a/CMakeLists.txt b/CMakeLists.txt index d406a05..31dc6af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,7 @@ daq_add_unit_test( stream_OpMonFacility_test LINK_LIBRARIES kafkaopmon ) daq_add_application( test_flattener test_flattener.cxx TEST LINK_LIBRARIES kafkaopmon) daq_add_application( opmon_publisher_test opmon_publisher_test.cxx TEST LINK_LIBRARIES kafkaopmon ) +daq_add_application( opmon_publish_to_kafka_test opmon_publish_to_kafka_test.cxx TEST LINK_LIBRARIES kafkaopmon ) daq_install() diff --git a/test/apps/opmon_publish_to_kafka_test.cxx b/test/apps/opmon_publish_to_kafka_test.cxx new file mode 100644 index 0000000..4ad1309 --- /dev/null +++ b/test/apps/opmon_publish_to_kafka_test.cxx @@ -0,0 +1,83 @@ +/** + * @brief test application for the opmon publisher + */ + +#include +#include +#include + +#include +#include "opmonlib/Utils.hpp" +#include "opmonlib/info/test.pb.h" + +#include + +using nlohmann::json; +namespace po = boost::program_options; + +using namespace dunedaq::kafkaopmon; +using namespace dunedaq::opmonlib; + + + + + +int +main(int argc, char const* argv[]) +{ + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("bootstrap", po::value()->default_value("monkafka.cern.ch:30092"), "kafka bootstrap server") + ("topic,t", po::value(), "Optional specification of a topic" ) + ("n_threads,n", po::value()->default_value(10), "Number of threads used for test") + ; + + po::variables_map input_map; + po::store(po::parse_command_line(argc, argv, desc), input_map); + po::notify(input_map); + + if ( input_map.count("help") ) { + std::cout << desc << std::endl; + return 0; + } + + json conf; + conf["bootstrap"] = input_map["bootstrap"].as(); + conf["cliend_id"] = "opmon_publisher_test"; + if ( input_map.count("topic") ) { + conf["default_topic"] = input_map["topic"].as() ; + } + + OpMonPublisher p(conf); + + auto pub_func = [&](int i){ + dunedaq::opmon::OpMonId id; + id += "test"; + id += "app"; + id += "thread_" + std::to_string(i); + for (auto j = 0; j < 20; ++j ) { + dunedaq::opmon::TestInfo ti; + ti.set_int_example( j*1000 + i ); + ti.set_string_example( "test" ); + auto e = to_entry( ti, {} ); + *e.mutable_origin() = id; + p.publish( std::move(e) ); + } + }; + + auto n = input_map["n_threads"].as() ; + std::vector> threads(n); + + for( size_t i = 0 ; i < n; ++i ) { + threads[i] = async(std::launch::async, pub_func, i); + } + + for ( auto & t : threads ) { + t.get(); + } + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + return 0; +} From 9e70c381e165f5d17da06d231864cddf857009f7 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Wed, 10 Jul 2024 15:38:44 +0200 Subject: [PATCH 18/19] Full test --- src/OpMonPublisher.cpp | 2 +- test/apps/opmon_publish_to_kafka_test.cxx | 52 +++++++++++++---------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/OpMonPublisher.cpp b/src/OpMonPublisher.cpp index a83f0f6..d09d775 100644 --- a/src/OpMonPublisher.cpp +++ b/src/OpMonPublisher.cpp @@ -53,7 +53,7 @@ OpMonPublisher::OpMonPublisher( const nlohmann::json& conf) { } it = conf.find("default_topic"); - if (it != conf.end()) m_default_topic = *it; + if (it != conf.end()) m_default_topic = "monitoring." + it->get(); } diff --git a/test/apps/opmon_publish_to_kafka_test.cxx b/test/apps/opmon_publish_to_kafka_test.cxx index 4ad1309..4b2661a 100644 --- a/test/apps/opmon_publish_to_kafka_test.cxx +++ b/test/apps/opmon_publish_to_kafka_test.cxx @@ -1,24 +1,29 @@ /** - * @brief test application for the opmon publisher + * @brief test application for the the whole chain from MonitorableObject to kafka */ #include #include #include -#include -#include "opmonlib/Utils.hpp" #include "opmonlib/info/test.pb.h" #include -using nlohmann::json; +#include "opmonlib/OpMonManager.hpp" + namespace po = boost::program_options; -using namespace dunedaq::kafkaopmon; using namespace dunedaq::opmonlib; +class TestObject : public MonitorableObject { + + public: + using MonitorableObject::register_child; + using MonitorableObject::publish; + TestObject() : MonitorableObject() {;} +}; @@ -29,7 +34,7 @@ main(int argc, char const* argv[]) desc.add_options() ("help,h", "produce help message") ("bootstrap", po::value()->default_value("monkafka.cern.ch:30092"), "kafka bootstrap server") - ("topic,t", po::value(), "Optional specification of a topic" ) + ("topic,t", po::value()->default_value("opmon_stream"), "Optional specification of a topic" ) ("n_threads,n", po::value()->default_value(10), "Number of threads used for test") ; @@ -42,35 +47,36 @@ main(int argc, char const* argv[]) return 0; } - json conf; - conf["bootstrap"] = input_map["bootstrap"].as(); - conf["cliend_id"] = "opmon_publisher_test"; - if ( input_map.count("topic") ) { - conf["default_topic"] = input_map["topic"].as() ; + std::string uri = "stream://"; + uri += input_map["bootstrap"].as(); + uri += '/'; + uri += input_map["topic"].as(); + + OpMonManager man( "test", + "application", + uri ); + + const auto n = input_map["n_threads"].as() ; + std::vector> objs(n); + for ( size_t i = 0; i < n; ++i ) { + auto p = objs[i] = std::make_shared(); + man.register_child( "element_" + std::to_string(i), p ); } - OpMonPublisher p(conf); - - auto pub_func = [&](int i){ - dunedaq::opmon::OpMonId id; - id += "test"; - id += "app"; - id += "thread_" + std::to_string(i); + auto pub_func = [&](int i, std::shared_ptr p){ + for (auto j = 0; j < 20; ++j ) { dunedaq::opmon::TestInfo ti; ti.set_int_example( j*1000 + i ); ti.set_string_example( "test" ); - auto e = to_entry( ti, {} ); - *e.mutable_origin() = id; - p.publish( std::move(e) ); + p->publish( std::move(ti) ); } }; - auto n = input_map["n_threads"].as() ; std::vector> threads(n); for( size_t i = 0 ; i < n; ++i ) { - threads[i] = async(std::launch::async, pub_func, i); + threads[i] = async(std::launch::async, pub_func, i, objs[i]); } for ( auto & t : threads ) { From 1f8401c10960678f295bfad9645c01e4f3d79075 Mon Sep 17 00:00:00 2001 From: Marco Roda Date: Fri, 12 Jul 2024 14:15:45 +0200 Subject: [PATCH 19/19] Update version --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 31dc6af..b7af903 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(kafkaopmon VERSION 1.5.1) +project(kafkaopmon VERSION 2.0.0) find_package(daq-cmake REQUIRED)