Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a generic mechanism to specify compute accelerators to use in the configuration #36699

Merged
merged 9 commits into from Feb 23, 2022
Merged
6 changes: 6 additions & 0 deletions Configuration/StandardSequences/python/Accelerators_cff.py
@@ -0,0 +1,6 @@
import FWCore.ParameterSet.Config as cms

# This fragment is intended to collect all ProcessAccelerator objects
# used in production

from HeterogeneousCore.CUDACore.ProcessAcceleratorCUDA_cfi import ProcessAcceleratorCUDA
11 changes: 5 additions & 6 deletions Configuration/StandardSequences/python/Services_cff.py
Expand Up @@ -8,15 +8,14 @@
# DQM store service
from DQMServices.Core.DQMStore_cfi import *

# load CUDA services when the "gpu" or "pixelNtupletFit" modifiers are enabled
def _addCUDAServices(process):
process.load("HeterogeneousCore.CUDAServices.CUDAService_cfi")
process.load("FWCore.MessageService.MessageLogger_cfi")
process.MessageLogger.CUDAService = cms.untracked.PSet()
# load ProcessAccelerators (that set the e.g. the necessary CUDA
# stuff) when the "gpu" or "pixelNtupletFit" modifiers are enabled
def _addProcessAccelerators(process):
process.load("Configuration.StandardSequences.Accelerators_cff")

from Configuration.ProcessModifiers.gpu_cff import gpu
from Configuration.ProcessModifiers.pixelNtupletFit_cff import pixelNtupletFit
modifyConfigurationStandardSequencesServicesAddCUDAServices_ = (gpu | pixelNtupletFit).makeProcessModifier(_addCUDAServices)
modifyConfigurationStandardSequencesServicesAddProcessAccelerators_ = (gpu | pixelNtupletFit).makeProcessModifier(_addProcessAccelerators)

# load TritonService when SONIC workflow is enabled
def _addTritonService(process):
Expand Down
5 changes: 5 additions & 0 deletions FWCore/Framework/interface/ensureAvailableAccelerators.h
@@ -0,0 +1,5 @@
#include "FWCore/ParameterSet/interface/ParameterSet.h"

namespace edm {
void ensureAvailableAccelerators(edm::ParameterSet const& parameterSet);
}
2 changes: 2 additions & 0 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -37,6 +37,7 @@
#include "FWCore/Framework/interface/SharedResourcesRegistry.h"
#include "FWCore/Framework/interface/streamTransitionAsync.h"
#include "FWCore/Framework/interface/TransitionInfoTypes.h"
#include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
#include "FWCore/Framework/interface/globalTransitionAsync.h"

#include "FWCore/MessageLogger/interface/MessageLogger.h"
Expand Down Expand Up @@ -372,6 +373,7 @@ namespace edm {
fileModeNoMerge_ = (fileMode == "NOMERGE");
}
forceESCacheClearOnNewRun_ = optionsPset.getUntrackedParameter<bool>("forceEventSetupCacheClearOnNewRun");
ensureAvailableAccelerators(*parameterSet);

//threading
unsigned int nThreads = optionsPset.getUntrackedParameter<unsigned int>("numberOfThreads");
Expand Down
30 changes: 30 additions & 0 deletions FWCore/Framework/src/ensureAvailableAccelerators.cc
@@ -0,0 +1,30 @@
#include "FWCore/Framework/interface/ensureAvailableAccelerators.h"
#include "FWCore/Utilities/interface/EDMException.h"

#include <algorithm>
#include <vector>

namespace edm {
void ensureAvailableAccelerators(edm::ParameterSet const& parameterSet) {
auto const& selectedAccelerators =
parameterSet.getUntrackedParameter<std::vector<std::string>>("@selected_accelerators");
ParameterSet const& optionsPset(parameterSet.getUntrackedParameterSet("options"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also for a follow-up PR, this could be moved inside the if block

if (selectedAccelerators.empty()) {
Exception ex(errors::UnavailableAccelerator);
ex << "The system has no compute accelerators that match the patterns specified in "
"process.options.accelerators:\n";
auto const& patterns = optionsPset.getUntrackedParameter<std::vector<std::string>>("accelerators");
for (auto const& pat : patterns) {
ex << " " << pat << "\n";
}
ex << "\nThe following compute accelerators are available:\n";
auto const& availableAccelerators =
parameterSet.getUntrackedParameter<std::vector<std::string>>("@available_accelerators");
for (auto const& acc : availableAccelerators) {
ex << " " << acc << "\n";
}

throw ex;
}
}
} // namespace edm
4 changes: 2 additions & 2 deletions FWCore/Framework/test/test_module_delete_cfg.py
Expand Up @@ -118,8 +118,8 @@ class SwitchProducerTest(cms.SwitchProducer):
def __init__(self, **kargs):
super(SwitchProducerTest,self).__init__(
dict(
test1 = lambda: (True, -10),
test2 = lambda: (True, -9)
test1 = lambda accelerators: (True, -10),
test2 = lambda accelerators: (True, -9)
), **kargs)
process.producerEventSwitchProducerNotConsumed = cms.EDProducer("edmtest::TestModuleDeleteProducer")
process.producerEventSwitchProducerConsumed = intEventProducerMustRun.clone()
Expand Down
12 changes: 12 additions & 0 deletions FWCore/Integration/test/BuildFile.xml
Expand Up @@ -147,6 +147,8 @@
<use name="FWCore/Utilities"/>
</bin>

<test name="TestIntegrationProcessAccelerator" command="run_TestProcessAccelerator.sh"/>

<test name="CatchStdExceptiontest" command="CatchStdExceptiontest.sh"/>

<test name="CatchCmsExceptiontest" command="CatchCmsExceptiontest.sh"/>
Expand Down Expand Up @@ -177,6 +179,16 @@
<use name="catch2"/>
</bin>

<bin file="ProcessAccelerator_t.cpp">
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/ParameterSetReader"/>
<use name="FWCore/TestProcessor"/>
<use name="DataFormats/Provenance"/>
<use name="catch2"/>
<use name="fmt"/>
</bin>

<bin file="EDAlias_t.cpp">
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
Expand Down
158 changes: 158 additions & 0 deletions FWCore/Integration/test/ProcessAccelerator_t.cpp
@@ -0,0 +1,158 @@
#define CATCH_CONFIG_MAIN
#include "catch.hpp"

#include "DataFormats/TestObjects/interface/ToyProducts.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSetReader/interface/ParameterSetReader.h"
#include "FWCore/TestProcessor/interface/TestProcessor.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad to see TestProcessor being used. Out of curiosity, what drove you to use TestProcessor as apposed to a shell script running cmsRun?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with a similar TestProcessor test of SwitchProducer as a base, although the necessary tests here turned out to be much simpler than those for SwitchProducer alone. For simple things, like testing all the combinations of the configuration parameter x "available hardware", I found the test setup more concise than with script running cmsRun. I also like Catch's ability to ensure the content of the exception message.


#include <fmt/format.h>

#include <iostream>
#include <string_view>

static constexpr auto s_tag = "[ProcessAccelerator]";

namespace {
std::string makeConfig(bool test2Enabled,
std::string_view test1,
std::string_view test2,
std::string_view accelerator) {
const std::string appendTest2 = test2Enabled ? "self._enabled.append('test2')" : "";
return fmt::format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice use of fmt::format!

R"_(from FWCore.TestProcessor.TestProcess import *
import FWCore.ParameterSet.Config as cms

class ProcessAcceleratorTest(cms.ProcessAccelerator):
def __init__(self):
super(ProcessAcceleratorTest,self).__init__()
self._labels = ["test1", "test2"]
self._enabled = ["test1"]
{}
def labels(self):
return self._labels
def enabledLabels(self):
return self._enabled

class SwitchProducerTest(cms.SwitchProducer):
def __init__(self, **kargs):
super(SwitchProducerTest,self).__init__(
dict(
cpu = cms.SwitchProducer.getCpu(),
test1 = lambda accelerators: ("test1" in accelerators, 2),
test2 = lambda accelerators: ("test2" in accelerators, 3),
), **kargs)

process = TestProcess()
process.options.accelerators = ["{}"]
process.ProcessAcceleratorTest = ProcessAcceleratorTest()
process.s = SwitchProducerTest(
cpu = cms.EDProducer('IntProducer', ivalue = cms.int32(0)),
test1 = {},
test2 = {}
)
process.moduleToTest(process.s)
)_",
appendTest2,
accelerator,
test1,
test2);
}
} // namespace

TEST_CASE("Configuration", s_tag) {
const std::string test1{"cms.EDProducer('IntProducer', ivalue = cms.int32(1))"};
const std::string test2{"cms.EDProducer('ManyIntProducer', ivalue = cms.int32(2), values = cms.VPSet())"};

const std::string baseConfig_auto = makeConfig(true, test1, test2, "*");
const std::string baseConfig_test1 = makeConfig(true, test1, test2, "test1");
const std::string baseConfig_test2 = makeConfig(true, test1, test2, "test2");
const std::string baseConfigTest2Disabled_auto = makeConfig(false, test1, test2, "*");
const std::string baseConfigTest2Disabled_test1 = makeConfig(false, test1, test2, "test1");
const std::string baseConfigTest2Disabled_test2 = makeConfig(false, test1, test2, "test2");

SECTION("Configuration hash is not changed") {
auto pset_auto = edm::readConfig(baseConfig_auto);
auto pset_test1 = edm::readConfig(baseConfig_test1);
auto pset_test2 = edm::readConfig(baseConfig_test2);
auto psetTest2Disabled_auto = edm::readConfig(baseConfigTest2Disabled_auto);
auto psetTest2Disabled_test1 = edm::readConfig(baseConfigTest2Disabled_test1);
auto psetTest2Disabled_test2 = edm::readConfig(baseConfigTest2Disabled_test2);
pset_auto->registerIt();
pset_test1->registerIt();
pset_test2->registerIt();
psetTest2Disabled_auto->registerIt();
psetTest2Disabled_test1->registerIt();
psetTest2Disabled_test2->registerIt();
REQUIRE(pset_auto->id() == pset_test1->id());
REQUIRE(pset_auto->id() == pset_test2->id());
REQUIRE(pset_auto->id() == psetTest2Disabled_auto->id());
REQUIRE(pset_auto->id() == psetTest2Disabled_test1->id());
REQUIRE(pset_auto->id() == psetTest2Disabled_test2->id());
}

edm::test::TestProcessor::Config config_auto{baseConfig_auto};
edm::test::TestProcessor::Config config_test1{baseConfig_test1};
edm::test::TestProcessor::Config config_test2{baseConfig_test2};
edm::test::TestProcessor::Config configTest2Disabled_auto{baseConfigTest2Disabled_auto};
edm::test::TestProcessor::Config configTest2Disabled_test1{baseConfigTest2Disabled_test1};
edm::test::TestProcessor::Config configTest2Disabled_test2{baseConfigTest2Disabled_test2};

SECTION("Base configuration is OK") { REQUIRE_NOTHROW(edm::test::TestProcessor(config_auto)); }

SECTION("No event data") {
edm::test::TestProcessor tester(config_auto);
REQUIRE_NOTHROW(tester.test());
}

SECTION("beginJob and endJob only") {
edm::test::TestProcessor tester(config_auto);
REQUIRE_NOTHROW(tester.testBeginAndEndJobOnly());
}

SECTION("Run with no LuminosityBlocks") {
edm::test::TestProcessor tester(config_auto);
REQUIRE_NOTHROW(tester.testRunWithNoLuminosityBlocks());
}

SECTION("LuminosityBlock with no Events") {
edm::test::TestProcessor tester(config_auto);
REQUIRE_NOTHROW(tester.testLuminosityBlockWithNoEvents());
}

SECTION("Test2 enabled, acclerators=*") {
edm::test::TestProcessor tester(config_auto);
auto event = tester.test();
REQUIRE(event.get<edmtest::IntProduct>()->value == 2);
}

SECTION("Test2 enabled, acclerators=test1") {
edm::test::TestProcessor tester(config_test1);
auto event = tester.test();
REQUIRE(event.get<edmtest::IntProduct>()->value == 1);
}

SECTION("Test2 enabled, acclerators=test2") {
edm::test::TestProcessor tester(config_test2);
auto event = tester.test();
REQUIRE(event.get<edmtest::IntProduct>()->value == 2);
}

SECTION("Test2 disabled, accelerators=*") {
edm::test::TestProcessor tester(configTest2Disabled_auto);
auto event = tester.test();
REQUIRE(event.get<edmtest::IntProduct>()->value == 1);
}

SECTION("Test2 disabled, accelerators=test1") {
edm::test::TestProcessor tester(configTest2Disabled_test1);
auto event = tester.test();
REQUIRE(event.get<edmtest::IntProduct>()->value == 1);
}

SECTION("Test2 disabled, accelerators=test2") {
REQUIRE_THROWS_WITH(
edm::test::TestProcessor(configTest2Disabled_test2),
Catch::Contains("The system has no compute accelerators that match the patterns") && Catch::Contains("test1"));
}
}
4 changes: 2 additions & 2 deletions FWCore/Integration/test/SwitchProducer_t.cpp
Expand Up @@ -31,8 +31,8 @@ class SwitchProducerTest(cms.SwitchProducer):
def __init__(self, **kargs):
super(SwitchProducerTest,self).__init__(
dict(
test1 = lambda: (True, -10),
test2 = lambda: ()_"} +
test1 = lambda accelerators: (True, -10),
test2 = lambda accelerators: ()_"} +
(test2Enabled ? "True" : "False") + ", -9)\n" +
R"_( ), **kargs)
process = TestProcess()
Expand Down
38 changes: 38 additions & 0 deletions FWCore/Integration/test/run_TestProcessAccelerator.sh
@@ -0,0 +1,38 @@
#!/bin/bash

test=testProcessAccelerator
LOCAL_TEST_DIR=${CMSSW_BASE}/src/FWCore/Integration/test
LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH}

function die { echo Failure $1: status $2 ; exit $2 ; }

pushd ${LOCAL_TMP_DIR}

echo "*************************************************"
echo "accelerators=*"
cmsRun ${LOCAL_TEST_DIR}/${test}_cfg.py || die "cmsRun ${test}_cfg.py" $?

echo "*************************************************"
echo "accelerators=*, enableTest2"
cmsRun ${LOCAL_TEST_DIR}/${test}_cfg.py -- --enableTest2 || die "cmsRun ${test}_cfg.py -- --enableTest2" $?

echo "*************************************************"
echo "accelerators=test1"
cmsRun ${LOCAL_TEST_DIR}/${test}_cfg.py -- --accelerators=test1 || die "cmsRun ${test}_cfg.py -- --accelerators=test1" $?

echo "*************************************************"
echo "accelerators=test2"
cmsRun -j testProcessAccelerators_jobreport.xml ${LOCAL_TEST_DIR}/${test}_cfg.py -- --accelerators=test2 && die "cmsRun ${test}_cfg.py -- --accelerators=test2 did not fail" 1
EXIT_CODE=$(edmFjrDump --exitCode testProcessAccelerators_jobreport.xml)
if [ "x${EXIT_CODE}" != "x8035" ]; then
echo "ProcessAccelerator test for unavailable accelerator reported exit code ${EXIT_CODE} which is different from the expected 8035"
exit 1
fi

echo "*************************************************"
echo "accelerators=test1, enableTest2"
cmsRun ${LOCAL_TEST_DIR}/${test}_cfg.py -- --accelerators=test1 --enableTest2 || die "cmsRun ${test}_cfg.py -- --accelerators=test1 --enableTest2" $?

echo "*************************************************"
echo "accelerators=test2, enableTest2"
cmsRun ${LOCAL_TEST_DIR}/${test}_cfg.py -- --accelerators=test2 --enableTest2 || die "cmsRun ${test}_cfg.py -- --accelerators=test2 --enableTest2" $?
72 changes: 72 additions & 0 deletions FWCore/Integration/test/testProcessAccelerator_cfg.py
@@ -0,0 +1,72 @@
import FWCore.ParameterSet.Config as cms

import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test ProcessAccelerator.')

parser.add_argument("--enableTest2", help="Enable test2 accelerator", action="store_true")
parser.add_argument("--accelerators", type=str, help="Comma-separated string for accelerators to enable")

argv = sys.argv[:]
if '--' in argv:
argv.remove("--")
args, unknown = parser.parse_known_args(argv)

class ProcessAcceleratorTest(cms.ProcessAccelerator):
def __init__(self):
super(ProcessAcceleratorTest,self).__init__()
self._labels = ["test1", "test2"]
self._enabled = ["test1"]
if args.enableTest2:
self._enabled.append("test2")
def labels(self):
return self._labels
def enabledLabels(self):
return self._enabled

class SwitchProducerTest(cms.SwitchProducer):
def __init__(self, **kargs):
super(SwitchProducerTest,self).__init__(
dict(
cpu = cms.SwitchProducer.getCpu(),
test1 = lambda accelerators: ("test1" in accelerators, 2),
test2 = lambda accelerators: ("test2" in accelerators, 3),
), **kargs)

process = cms.Process("PROD1")

process.add_(ProcessAcceleratorTest())

process.source = cms.Source("EmptySource")
process.maxEvents.input = 3
if args.accelerators is not None:
process.options.accelerators = args.accelerators.split(",")

process.intProducer1 = cms.EDProducer("ManyIntProducer", ivalue = cms.int32(1))
process.intProducer2 = cms.EDProducer("ManyIntProducer", ivalue = cms.int32(2))
process.failIntProducer = cms.EDProducer("ManyIntProducer", ivalue = cms.int32(-1), throw = cms.untracked.bool(True))

if args.enableTest2 and ("test2" in process.options.accelerators or "*" in process.options.accelerators):
process.intProducer1.throw = cms.untracked.bool(True)
else:
process.intProducer2.throw = cms.untracked.bool(True)

process.intProducer = SwitchProducerTest(
cpu = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag("failIntProducer")),
test1 = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag("intProducer1")),
test2 = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag("intProducer2"))
)

process.intConsumer = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag("intProducer"))

process.t = cms.Task(
process.failIntProducer,
process.intProducer1,
process.intProducer2,
process.intProducer,
)
process.p = cms.Path(
process.intConsumer,
process.t
)