Skip to content

Commit

Permalink
Merge pull request #15 from DUNE-DAQ/mroda/protobuf
Browse files Browse the repository at this point in the history
Monitoring upgrade
  • Loading branch information
mroda88 committed Jul 12, 2024
2 parents ba83133 + 1f8401c commit 6ef8ca3
Show file tree
Hide file tree
Showing 12 changed files with 739 additions and 4 deletions.
13 changes: 9 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.12)
project(kafkaopmon VERSION 1.5.1)
project(kafkaopmon VERSION 2.0.0)

find_package(daq-cmake REQUIRED)

Expand All @@ -10,17 +10,18 @@ find_package(CPR REQUIRED)
find_package(CURL REQUIRED)
find_package(RdKafka REQUIRED)
find_package(ers REQUIRED)
find_package(Boost COMPONENTS program_options REQUIRED)
find_package(Boost COMPONENTS unit_test_framework program_options REQUIRED)



daq_add_library(*.cpp LINK_LIBRARIES opmonlib::opmonlib)
daq_add_library(*.cpp LINK_LIBRARIES ${Boost_LIBRARIES} opmonlib::opmonlib RdKafka::rdkafka RdKafka::rdkafka++)


##############################################################################
# Plugins

daq_add_plugin(kafkaOpmonService duneOpmonService LINK_LIBRARIES opmonlib::opmonlib RdKafka::rdkafka RdKafka::rdkafka++ ers::ers kafkaopmon)
daq_add_plugin(streamOpMonFacility duneOpMonFacility LINK_LIBRARIES kafkaopmon)
target_include_directories(kafkaopmon_kafkaOpmonService_duneOpmonService PUBLIC)

## The following application is deprecated
Expand All @@ -35,12 +36,16 @@ target_include_directories(kafkaopmon_kafkaOpmonService_duneOpmonService PUBLIC)
# No integration tests written

##############################################################################
# No unit tests written
# unit tests
daq_add_unit_test( OpMonPublisher_test LINK_LIBRARIES kafkaopmon )
daq_add_unit_test( stream_OpMonFacility_test LINK_LIBRARIES kafkaopmon )

##############################################################################
# test application

daq_add_application( test_flattener test_flattener.cxx TEST LINK_LIBRARIES kafkaopmon)
daq_add_application( opmon_publisher_test opmon_publisher_test.cxx TEST LINK_LIBRARIES kafkaopmon )
daq_add_application( opmon_publish_to_kafka_test opmon_publish_to_kafka_test.cxx TEST LINK_LIBRARIES kafkaopmon )

daq_install()

96 changes: 96 additions & 0 deletions include/kafkaopmon/OpMonPublisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* @file OpMonPublisher.hpp
*
* This is the interface to broadcast OpMon entries object in our DAQ system
*
* This is part of the DUNE DAQ Application Framework, copyright 2020.
* Licensing/copyright details are in the COPYING file that you should have
* received with this code.
*/


#ifndef KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_
#define KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_

#include <librdkafka/rdkafkacpp.h>
#include <nlohmann/json.hpp>
#include <ers/ers.hpp>

#include <string>
#include <memory>

#include "opmonlib/opmon_entry.pb.h"
#include "opmonlib/Utils.hpp"

namespace dunedaq {

ERS_DECLARE_ISSUE( kafkaopmon,
MissingParameter,
"No " << parameter << " in " << conf,
((std::string)parameter)((std::string)conf)
)

ERS_DECLARE_ISSUE( kafkaopmon,
FailedConfiguration,
"Invalid " << parameter << ", cause: " << reason,
((std::string)parameter)((std::string)reason)
)

ERS_DECLARE_ISSUE( kafkaopmon,
FailedProducerCreation,
"Failed creation of a Kafka producer, cause: " << reason,
((std::string)reason)
)

ERS_DECLARE_ISSUE( kafkaopmon,
FailedProduce,
"Failed produce of message with key " << key << ", cause: " << reason,
((std::string)key)((std::string)reason)
)

ERS_DECLARE_ISSUE( kafkaopmon,
TimeoutReachedWhileFlushing,
"Publisher destroyed before all messages were completed, timeout: " << timeout << " ms",
((int)timeout)
)


} // dunedaq namespace




namespace dunedaq::kafkaopmon {

class OpMonPublisher {

public:
OpMonPublisher( const nlohmann::json& conf );

OpMonPublisher() = delete;
OpMonPublisher( const OpMonPublisher & ) = delete;
OpMonPublisher & operator = ( const OpMonPublisher & ) = delete;
OpMonPublisher( OpMonPublisher && ) = delete;
OpMonPublisher & operator = ( OpMonPublisher && ) = delete;

~OpMonPublisher();

void publish( dunedaq::opmon::OpMonEntry && ) const ;

protected:
std::string extract_topic( const dunedaq::opmon::OpMonEntry & ) const noexcept {
return m_default_topic;
}
std::string extract_key( const dunedaq::opmon::OpMonEntry & e) const noexcept {
return dunedaq::opmonlib::to_string(e.origin()) + '/' + e.measurement() ;
}

private:
std::unique_ptr<RdKafka::Producer> m_producer;
std::string m_default_topic = "monitoring.opmon_stream";

};

} // namespace dunedaq::kafkaopmon

#endif //KAFKAOPMON_INCLUDE_KAFKAOPMON_OPMONPUBLISHER_HPP_
52 changes: 52 additions & 0 deletions plugins/streamOpMonFacility.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* @file streamOpMonFacility.cpp kafkaopmon class implementation
*
* This is part of the DUNE DAQ software, copyright 2020.
* Licensing/copyright details are in the COPYING file that you should have
* received with this code.
*/

#include "streamOpMonFacility.hpp"

using json = nlohmann::json;

namespace dunedaq::kafkaopmon { // namespace dunedaq

streamOpMonFacility::streamOpMonFacility(std::string uri)
: dunedaq::opmonlib::OpMonFacility(uri) {

std::regex uri_re(R"(([a-zA-Z]+):\/\/([^:\/?#\s]+):(\d+)\/([^:\/?#\s]+))");
//* 1st Capturing Group `([a-zA-Z])`: Matches protocol
//* 2nd Capturing Group `([^:\/?#\s])+`: Matches hostname
//* 3rd Capturing Group `(\d)`: Matches port
//* 4th Capturing Group `([^\/?#]+)?`: Matches kafka topic

std::smatch uri_match;
if (!std::regex_match(uri, uri_match, uri_re)) {
throw WrongURI(ERS_HERE, uri);
}

json config;
std::string bootstrap = uri_match[2];
bootstrap += ':' ;
bootstrap += uri_match[3];
config["bootstrap"] = bootstrap;

// optionally set the ID of the application
// But really this is temporary, and in the future we should avoid env variables
if(auto env_p = std::getenv("DUNEDAQ_PARTITION")) {
if (auto app_p = std::getenv("DUNEDAQ_APPLICATION_NAME")) {
config["cliend_id"] = std::string(env_p) + '.' + std::string(app_p);
}
}

config["default_topic"] = uri_match[4];

m_publisher.reset( new OpMonPublisher(config) );
}

} // namespace dunedaq::kafkaopmon


DEFINE_DUNE_OPMON_FACILITY(dunedaq::kafkaopmon::streamOpMonFacility)

37 changes: 37 additions & 0 deletions plugins/streamOpMonFacility.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef KAFKAOPMON_PLUGIN_STREAMOPMONFACILITY_HPP_
#define KAFKAOPMON_PLUGIN_STREAMOPMONFACILITY_HPP_

#include "opmonlib/OpMonFacility.hpp"
#include "kafkaopmon/OpMonPublisher.hpp"

#include <memory>
#include <nlohmann/json.hpp>
#include <regex>
#include <string>

using json = nlohmann::json;

namespace dunedaq { // namespace dunedaq

ERS_DECLARE_ISSUE(kafkaopmon, WrongURI, "Incorrect URI: " << uri, ((std::string)uri))

} // namespace dunedaq

namespace dunedaq::kafkaopmon { // namespace dunedaq

class streamOpMonFacility : public dunedaq::opmonlib::OpMonFacility
{
std::unique_ptr<OpMonPublisher> m_publisher;

public:
explicit streamOpMonFacility(std::string uri);
void publish( opmon::OpMonEntry && e ) const override {
m_publisher -> publish(std::move(e));
}

};

} // namespace dunedaq::kafkaopmon

#endif // KAFKAOPMON_PLUGIN_STREAMOPMONFACILITY_HPP_

135 changes: 135 additions & 0 deletions python/kafkaopmon/OpMonSubscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env python3

from kafka import KafkaConsumer
import threading
import socket
import os
import re
import logging
import getpass
import sys

import opmonlib.opmon_entry_pb2 as entry
import google.protobuf.message as msg

class OpMonFunction :
def __init__(self,
function,
opmon_id : re.Pattern,
measurement : re.Pattern) :
self.function = function
self.opmon_id = opmon_id
self.measurement = measurement

def match(self, key : str) -> bool :
opmon_id,measure = key.split('/',1);
if not self.opmon_id.match(opmon_id) : return False
if not self.measurement.match(measure) : return False
return True

def execute(self, e : entry.OpMonEntry ) :
self.function(e)


class OpMonSubscriber:
def __init__(self, bootstrap, group_id=None, timeout_ms=500, topics=["opmon_stream"]) :
## Options from configurations
self.bootstrap = bootstrap
self.group_id = group_id
self.timeout = timeout_ms
if len(topics) == 0 : raise ValueError("topic list is empty")
self.topics = topics
## runtime options
self.running = False
self.functions = dict()
self.thread = threading.Thread(target=self.message_loop)

def default_id(self) -> str:
node = socket.gethostname()
user = getpass.getuser()
process = os.getpid()
thread = threading.get_ident()
id = "{}-{}-{}-{}".format(node, user, process, thread)
return id

def add_callback(self,
name, function,
opmon_id = '.*',
measurement = '.*') -> bool:
if ( name in self.functions ) : return False

was_running = self.running
if (was_running) : self.stop()

f = OpMonFunction( function = function,
opmon_id = re.compile(opmon_id),
measurement = re.compile(measurement) )

self.functions[name] = f

if (was_running) : self.start()
return True

def clear_callbacks(self):
if ( self.running ) :
self.stop()
self.functions.clear()

def remove_callback(self, name) -> bool:
if ( name not in sef.functions.keys() ) : return False

was_running = self.running
if (was_running) : self.stop()

self.functions.pop(name)

if ( was_running and len(self.functions)>0 ) : self.start()
return True

def start(self):
logging.info("Starting run")
self.running = True
self.thread.start()

def stop(self) :
self.running = False
self.thread.join()

def message_loop(self) :
if not self.group_id : group_id = self.default_id()
else: group_id = self.group_id

consumer = KafkaConsumer(bootstrap_servers=self.bootstrap,
group_id=group_id,
client_id=self.default_id(),
consumer_timeout_ms=self.timeout)

topics = self.topics
consumer.subscribe(["monitoring." + s for s in topics])

logging.info(f"ID: %s running with functions {('%s, ' * len(self.functions.keys()))[:-2]}",
group_id, *self.functions.keys())

while ( self.running ) :
try:
message_it = iter(consumer)
message = next(message_it)
timestamp = message.timestamp
key = message.key.decode('ascii')
## The key from the message is binary
## In order to correctly match an ascii regex, we have to convert

for function in self.functions.values() :
if function.match(key) :
e = entry.OpMonEntry()
e.ParseFromString( message.value )
function.execute(e)

except msg.DecodeError :
logging.error("Could not parse message")
except StopIteration :
pass
except Exception as e:
logging.error(e)

logging.info("Stop run")
Loading

0 comments on commit 6ef8ca3

Please sign in to comment.