Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add adiak import/export support #188

Merged
merged 11 commits into from Jul 11, 2019
28 changes: 28 additions & 0 deletions CMakeLists.txt
Expand Up @@ -42,6 +42,7 @@ option(WITH_GOTCHA "Enable GOTCHA wrapping" TRUE)
option(WITH_SOS "Enable SOSFlow data management" FALSE)
option(WITH_TAU "Enable TAU service (TAU Performance System)" FALSE)
option(WITH_VTUNE "Enable Intel(R) VTune(tm) annotation bindings" FALSE)
option(WITH_ADIAK "Enable adiak support" FALSE)

option(USE_EXTERNAL_GOTCHA "Use pre-installed gotcha instead of building our own" FALSE)

Expand Down Expand Up @@ -198,6 +199,32 @@ if (WITH_GOTCHA)
endif()
endif()

if (WITH_ADIAK)
# find_library(adiak_LIBRARIES
# NAMES adiak
# HINTS ${adiak_DIR}/lib)
# find_path(adiak_INCLUDE_DIRS
# NAMES adiak.h
# HINTS ${adiak_DIR}/include)
# include(FindPackageHandleStandardArgs)
# find_package_handle_standard_args(adiak DEFAULT_MSG
# adiak_LIBRARIES
# adiak_INCLUDE_DIRS)
# mark_as_advanced(adiak_LIBRARIES adiak_INCLUDE_DIRS)
find_package(adiak)
if (adiak_FOUND)
get_target_property(_adiak_location adiak LOCATION)
set(CALIPER_adiak_CMAKE_MSG "Yes, using ${_adiak_location}")
set(CALIPER_HAVE_ADIAK TRUE)
list(APPEND CALIPER_EXTERNAL_LIBS ${adiak_LIBRARIES})
if (${CMAKE_HOST_SYSTEM_NAME} STREQUAL "Linux")
list(APPEND CALIPER_EXTERNAL_LIBS "-ldl")
endif()
else()
message(WARNING "Adiak support was requested but Adiak was not found")
endif()
endif()

if (WITH_SOS)
include(FindSOSFlow)
if (SOSFlow_FOUND)
Expand Down Expand Up @@ -359,6 +386,7 @@ message(STATUS "Build Caliper tools : ${WITH_TOOLS}")

set(CALIPER_MODULES
cpuinfo
adiak
GOTCHA
PAPI
Libpfm
Expand Down
1 change: 1 addition & 0 deletions caliper-config.h.in
Expand Up @@ -18,6 +18,7 @@
#cmakedefine CALIPER_HAVE_CUPTI
#cmakedefine CALIPER_HAVE_LIBDW
#cmakedefine CALIPER_HAVE_VTUNE
#cmakedefine CALIPER_HAVE_ADIAK

#cmakedefine CALIPER_REDUCED_CONSTEXPR_USAGE

Expand Down
4 changes: 4 additions & 0 deletions include/caliper/reader/CaliperMetadataDB.h
Expand Up @@ -143,6 +143,10 @@ class CaliperMetadataDB : public CaliperMetadataAccessInterface
/// \brief Import global entries from metadata DB \a db into this
/// metadata DB
std::vector<Entry> import_globals(CaliperMetadataAccessInterface& db);

/// \brief Import globals in record \a rec from metadata DB \a db into this
/// metadata DB
std::vector<Entry> import_globals(CaliperMetadataAccessInterface& db, const std::vector<Entry>& globals);
};

}
Expand Down
4 changes: 4 additions & 0 deletions src/mpi-rt/controllers/CMakeLists.txt
Expand Up @@ -7,3 +7,7 @@ add_library(caliper-mpi-controllers OBJECT ${CALIPER_MPI_CONTROLLER_SOURCES})
if (${BUILD_SHARED_LIBS})
set_property(TARGET caliper-mpi-controllers PROPERTY POSITION_INDEPENDENT_CODE TRUE)
endif()

if (${CALIPER_HAVE_ADIAK})
target_include_directories(caliper-mpi-controllers PRIVATE ${adiak_INCLUDE_DIRS})
endif()
94 changes: 78 additions & 16 deletions src/mpi-rt/controllers/SpotController.cpp
Expand Up @@ -8,19 +8,25 @@
#include <caliper/reader/CaliperMetadataDB.h>
#include <caliper/reader/CalQLParser.h>
#include <caliper/reader/FormatProcessor.h>
#include <caliper/reader/NestedExclusiveRegionProfile.h>

#include <caliper/common/Log.h>
#include <caliper/common/OutputStream.h>
#include <caliper/common/StringConverter.h>

#include <unistd.h>

#include <algorithm>
#include <ctime>

#include "caliper/cali-mpi.h"

#include <mpi.h>

#ifdef CALIPER_HAVE_ADIAK
#include <adiak.hpp>
#endif

using namespace cali;

namespace
Expand All @@ -36,15 +42,49 @@ make_filename()
return std::string(timestr) + std::to_string(getpid()) + ".cali";
}

#ifdef CALIPER_HAVE_ADIAK
void
write_adiak(CaliperMetadataDB& db, Aggregator& output_agg)
{
Log(2).stream() << "[spot controller]: Writing adiak output" << std::endl;

// Extract / calculate avg inclusive time for paths.
// Use exclusive processor b/c values are inclusive already.

NestedExclusiveRegionProfile rp(db, "avg#inclusive#sum#time.duration");
output_agg.flush(db, rp);

std::map<std::string, double> nested_region_times;
double total_time;

std::tie(nested_region_times, std::ignore, total_time) =
rp.result();

// export time profile into adiak

adiak::value("total_time", total_time, adiak_performance);

// copy our map into a vector that adiak can process
std::vector< std::tuple<std::string, double> > tmpvec(nested_region_times.size());

std::transform(nested_region_times.begin(), nested_region_times.end(), tmpvec.begin(),
[](std::pair<std::string,double>&& p){
return std::make_tuple(std::move(p.first), p.second);
});

adiak::value("avg#inclusive#sum#time.duration", tmpvec, adiak_performance);
}
#endif

class SpotController : public cali::ChannelController
{
std::string m_output;
bool m_use_mpi;

public:

void
flush() {
flush() {
Log(1).stream() << "[spot controller]: Flushing Caliper data" << std::endl;

// --- Setup output reduction aggregator
Expand Down Expand Up @@ -109,26 +149,43 @@ class SpotController : public cali::ChannelController
Log(2).stream() << "[spot controller]: Writing output" << std::endl;

// import globals from Caliper runtime object
db.import_globals(c);
db.import_globals(c, c.get_globals(channel()));

std::string spot_metrics =
"avg#inclusive#sum#time.duration"
",min#inclusive#sum#time.duration"
",max#inclusive#sum#time.duration";

// set the spot.metrics value
db.set_global(db.create_attribute("spot.metrics", CALI_TYPE_STRING, CALI_ATTR_GLOBAL),
Variant(CALI_TYPE_STRING, spot_metrics.data(), spot_metrics.length()));

std::string output = m_output;

if (m_output.empty())
output = ::make_filename();

OutputStream stream;
stream.set_filename(output.c_str(), c, c.get_globals());
if (output == "adiak") {
#ifdef CALIPER_HAVE_ADIAK
::write_adiak(db, output_agg);
#else
Log(0).stream() << "[spot controller]: cannot use adiak output: adiak is not enabled!" << std::endl;
#endif
} else {
if (m_output.empty())
output = ::make_filename();

OutputStream stream;
stream.set_filename(output.c_str(), c, c.get_globals());

FormatProcessor formatter(output_spec, stream);
FormatProcessor formatter(output_spec, stream);

output_agg.flush(db, formatter);
formatter.flush(db);
output_agg.flush(db, formatter);
formatter.flush(db);
}
}
}

SpotController(bool use_mpi, const char* output)
: ChannelController("spot", 0, {
{ "CALI_SERVICES_ENABLE", "aggregate,env,event,timestamp" },
{ "CALI_SERVICES_ENABLE", "aggregate,event,timestamp" },
{ "CALI_EVENT_ENABLE_SNAPSHOT_INFO", "false" },
{ "CALI_TIMER_INCLUSIVE_DURATION", "false" },
{ "CALI_TIMER_SNAPSHOT_DURATION", "true" },
Expand All @@ -138,8 +195,13 @@ class SpotController : public cali::ChannelController
}),
m_output(output),
m_use_mpi(use_mpi)
{ }

{
#ifdef CALIPER_HAVE_ADIAK
if (output != "adiak")
config()["CALI_SERVICES_ENABLE"].append(",adiak_import");
#endif
}

~SpotController()
{ }
};
Expand Down
13 changes: 9 additions & 4 deletions src/reader/CaliperMetadataDB.cpp
Expand Up @@ -478,9 +478,7 @@ struct CaliperMetadataDB::CaliperMetadataDBImpl
}
}

std::vector<Entry> import_globals(CaliperMetadataAccessInterface& db) {
std::vector<Entry> import_globals = db.get_globals();

std::vector<Entry> import_globals(CaliperMetadataAccessInterface& db, const std::vector<Entry>& import_globals) {
std::lock_guard<std::mutex>
g(m_globals_lock);

Expand Down Expand Up @@ -646,5 +644,12 @@ CaliperMetadataDB::set_global(const Attribute& attr, const Variant& value)
std::vector<Entry>
CaliperMetadataDB::import_globals(CaliperMetadataAccessInterface& db)
{
return mP->import_globals(db);
return mP->import_globals(db, db.get_globals());
}


std::vector<Entry>
CaliperMetadataDB::import_globals(CaliperMetadataAccessInterface& db, const std::vector<Entry>& globals)
{
return mP->import_globals(db, globals);
}
10 changes: 8 additions & 2 deletions src/services/CMakeLists.txt
Expand Up @@ -18,8 +18,11 @@ macro(add_service_sources)
endmacro()

macro(add_caliper_service)
string(REPLACE " " ";" NEW_SERVICE ${ARGV0})
set(CALIPER_SERVICE_NAMES "${CALIPER_SERVICE_NAMES} ${NEW_SERVICE}" PARENT_SCOPE)
foreach (_srv ${ARGN})
string(REPLACE " " ":" NEW_SERVICE ${_srv})
list(APPEND CALIPER_SERVICE_NAMES ${NEW_SERVICE})
endforeach()
set(CALIPER_SERVICE_NAMES ${CALIPER_SERVICE_NAMES} PARENT_SCOPE)
endmacro()
# A macro to include service modules as object libs in the caliper runtime lib.
# Used when service subdirectories needs additional includes etc.
Expand All @@ -35,6 +38,9 @@ endmacro()
# Service subdirectories

add_subdirectory(alloc)
if (CALIPER_HAVE_ADIAK)
add_subdirectory(adiak)
endif()
add_subdirectory(aggregate)
if (CALIPER_HAVE_LIBUNWIND)
add_subdirectory(callpath)
Expand Down
111 changes: 111 additions & 0 deletions src/services/adiak/AdiakExport.cpp
@@ -0,0 +1,111 @@
// Copyright (c) 2019, Lawrence Livermore National Security, LLC.
// See top-level LICENSE file for details.

#include "caliper/CaliperService.h"

#include "caliper/Caliper.h"

#include "caliper/common/Log.h"

#include <adiak.hpp>

#include <algorithm>
#include <map>
#include <vector>

using namespace cali;

namespace
{

std::map< cali_id_t, std::vector<Variant> >
get_caliper_globals(Caliper* c, Channel* chn)
{
std::map< cali_id_t, std::vector<Variant> > ret;
auto globals = c->get_globals(chn);

// expand globals

for (const Entry& e : globals) {
if (e.is_reference()) {
for (const Node* node = e.node(); node; node = node->parent())
ret[node->attribute()].push_back(node->data());
} else if (e.is_immediate()) {
ret[e.attribute()].push_back(e.value());
}
}

return ret;
}

void
export_globals_to_adiak(Caliper* c, Channel* chn)
{
auto globals_map = get_caliper_globals(c, chn);

for (auto p : globals_map) {
if (p.second.empty())
continue;

Attribute attr = c->get_attribute(p.first);

switch (attr.type()) {
case CALI_TYPE_INT:
case CALI_TYPE_BOOL:
{
std::vector<int> values(p.second.size());
std::transform(p.second.begin(), p.second.end(), values.begin(), [](const Variant& v){
return v.to_int();
});
if (values.size() > 1)
adiak::value(attr.name(), values);
else
adiak::value(attr.name(), values.front());
}
break;
case CALI_TYPE_UINT:
{
std::vector<unsigned long> values(p.second.size());
std::transform(p.second.begin(), p.second.end(), values.begin(), [](const Variant& v){
return static_cast<unsigned long>(v.to_uint());
});
if (values.size() > 1)
adiak::value(attr.name(), values);
else
adiak::value(attr.name(), values.front());
}
break;
default:
{
std::vector<std::string> values(p.second.size());
std::transform(p.second.begin(), p.second.end(), values.begin(), [](const Variant& v){
return v.to_string();
});
if (values.size() > 1)
adiak::value(attr.name(), values);
else
adiak::value(attr.name(), values.front());
}
}
}
}

void
register_adiak_export(Caliper* c, Channel* chn)
{
chn->events().pre_flush_evt.connect(
[](Caliper* c, Channel* chn, const SnapshotRecord*){
export_globals_to_adiak(c, chn);
});

Log(1).stream() << chn->name() << ": Registered adiak_export service" << std::endl;
}

} // namespace [anonymous]

namespace cali
{

CaliperService adiak_export_service { "adiak_export", ::register_adiak_export };

}