Skip to content

Commit

Permalink
Automatize the reuse of the CUDA stream of an input product (cms-sw#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
makortel authored and fwyzard committed Apr 8, 2019
1 parent ccb7761 commit 219ef3c
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 23 deletions.
35 changes: 34 additions & 1 deletion CUDADataFormats/Common/interface/CUDAProductBase.h
@@ -1,6 +1,7 @@
#ifndef CUDADataFormats_Common_CUDAProductBase_h
#define CUDADataFormats_Common_CUDAProductBase_h

#include <atomic>
#include <memory>

#include <cuda/api_wrappers.h>
Expand All @@ -13,14 +14,27 @@ class CUDAProductBase {
public:
CUDAProductBase() = default; // Needed only for ROOT dictionary generation

CUDAProductBase(CUDAProductBase&& other):
stream_{std::move(other.stream_)},
event_{std::move(other.event_)},
mayReuseStream_{other.mayReuseStream_.load()},
device_{other.device_}
{}
CUDAProductBase& operator=(CUDAProductBase&& other) {
stream_ = std::move(other.stream_);
event_ = std::move(other.event_);
mayReuseStream_ = other.mayReuseStream_.load();
device_ = other.device_;
return *this;
}

bool isValid() const { return stream_.get() != nullptr; }
bool isAvailable() const;

int device() const { return device_; }

const cuda::stream_t<>& stream() const { return *stream_; }
cuda::stream_t<>& stream() { return *stream_; }
const std::shared_ptr<cuda::stream_t<>>& streamPtr() const { return stream_; }

const cuda::event_t *event() const { return event_.get(); }
cuda::event_t *event() { return event_.get(); }
Expand All @@ -29,12 +43,31 @@ class CUDAProductBase {
explicit CUDAProductBase(int device, std::shared_ptr<cuda::stream_t<>> stream, std::shared_ptr<cuda::event_t> event);

private:
friend class CUDAScopedContext;

// Intended to be used only from CUDAScopedContext
const std::shared_ptr<cuda::stream_t<>>& streamPtr() const { return stream_; }

bool mayReuseStream() const {
bool expected = true;
bool changed = mayReuseStream_.compare_exchange_strong(expected, false);
// If the current thread is the one flipping the flag, it may
// reuse the stream.
return changed;
}

// The cuda::stream_t is really shared among edm::Event products, so
// using shared_ptr also here
std::shared_ptr<cuda::stream_t<>> stream_; //!
// shared_ptr because of caching in CUDAService
std::shared_ptr<cuda::event_t> event_; //!

// This flag tells whether the CUDA stream may be reused by a
// consumer or not. The goal is to have a "chain" of modules to
// queue their work to the same stream.
mutable std::atomic<bool> mayReuseStream_ = true; //!

// The CUDA device associated with this product
int device_ = -1; //!
};

Expand Down
28 changes: 17 additions & 11 deletions HeterogeneousCore/CUDACore/README.md
Expand Up @@ -202,15 +202,9 @@ void ProducerInputCUDA::acquire(edm::Event const& iEvent, edm::EventSetup& iSetu
CUDAProduct<InputData> const& inputDataWrapped = iEvent.get(inputToken_);

// Set the current device to the same that was used to produce
// InputData, and also use the same CUDA stream
// InputData, and possibly use the same CUDA stream
CUDAScopedContext ctx{inputDataWrapped, std::move(waitingTaskHolder)};

// Alternatively a new CUDA stream can be created here. This is for
// a case where there are two (or more) consumers of
// CUDAProduct<InputData> whose work is independent and thus can be run
// in parallel.
CUDAScopedContext ctx{iEvent.streamID(), std::move(waitingTaskHolder);

// Grab the real input data. Checks that the input data is on the
// current device. If the input data was produced in a different CUDA
// stream than the CUDAScopedContext holds, create an inter-stream
Expand Down Expand Up @@ -240,6 +234,12 @@ void ProducerInputCUDA::produce(edm::Event& iEvent, edm::EventSetup& iSetup) {
}
```
See [further below](#setting-the-current-device) for the conditions
when the `CUDAScopedContext` constructor reuses the CUDA stream. Note
that the `CUDAScopedContext` constructor taking `edm::StreamID` is
allowed, it will just always create a new CUDA stream.
### Producer with CUDA input and output (with ExternalWork)
```cpp
Expand Down Expand Up @@ -319,8 +319,8 @@ void ProducerInputOutputCUDA::produce(edm::StreamID streamID, edm::Event& iEvent
CUDAProduct<InputData> const& inputDataWrapped = iEvent.get(inputToken_);

// Set the current device to the same that was used to produce
// InputData, and also use the same CUDA stream
CUDAScopedContext ctx{streamID};
// InputData, and possibly use the same CUDA stream
CUDAScopedContext ctx{inputDataWrapped};

// Grab the real input data. Checks that the input data is on the
// current device. If the input data was produced in a different CUDA
Expand Down Expand Up @@ -368,7 +368,7 @@ void AnalyzerInputCUDA::analyze(edm::Event const& iEvent, edm::EventSetup& iSetu
CUDAProduct<InputData> const& inputDataWrapped = iEvent.get(inputToken_);
// Set the current device to the same that was used to produce
// InputData, and also use the same CUDA stream
// InputData, and possibly use the same CUDA stream
CUDAScopedContext ctx{inputDataWrapped};
// Alternatively a new CUDA stream can be created here. This is for
Expand Down Expand Up @@ -540,7 +540,13 @@ CUDAScopedContext ctx{cclus};
`CUDAScopedContext` works in the RAII way and does the following
* Sets the current device for the current scope
- If constructed from the `edm::StreamID`, chooses the device and creates a new CUDA stream
- If constructed from the `CUDAProduct<T>`, uses the same device and CUDA stream as was used to produce the `CUDAProduct<T>`
- If constructed from the `CUDAProduct<T>`, uses the same device and possibly the same CUDA stream as was used to produce the `CUDAProduct<T>`
* The CUDA stream is reused if this producer is the first consumer
of the `CUDAProduct<T>`, otherwise a new CUDA stream is created.
This approach is simple compromise to automatically express the work of
parallel producers in different CUDA streams, and at the same
time allow a chain of producers to queue their work to the same
CUDA stream.
* Gives access to the CUDA stream the algorithm should use to queue asynchronous work
* Calls `edm::WaitingTaskWithArenaHolder::doneWaiting()` when necessary
* [Synchronizes between CUDA streams if necessary](#synchronizing-between-cuda-streams)
Expand Down
10 changes: 2 additions & 8 deletions HeterogeneousCore/CUDACore/interface/CUDAScopedContext.h
Expand Up @@ -34,21 +34,15 @@ class CUDAScopedContext {
stream_(std::move(token.streamPtr()))
{}

template<typename T>
explicit CUDAScopedContext(const CUDAProduct<T>& data):
currentDevice_(data.device()),
setDeviceForThisScope_(currentDevice_),
stream_(data.streamPtr())
{}
explicit CUDAScopedContext(const CUDAProductBase& data);

explicit CUDAScopedContext(edm::StreamID streamID, edm::WaitingTaskWithArenaHolder waitingTaskHolder):
CUDAScopedContext(streamID)
{
waitingTaskHolder_ = std::move(waitingTaskHolder);
}

template <typename T>
explicit CUDAScopedContext(const CUDAProduct<T>& data, edm::WaitingTaskWithArenaHolder waitingTaskHolder):
explicit CUDAScopedContext(const CUDAProductBase& data, edm::WaitingTaskWithArenaHolder waitingTaskHolder):
CUDAScopedContext(data)
{
waitingTaskHolder_ = std::move(waitingTaskHolder);
Expand Down
14 changes: 14 additions & 0 deletions HeterogeneousCore/CUDACore/src/CUDAScopedContext.cc
Expand Up @@ -16,6 +16,20 @@ CUDAScopedContext::CUDAScopedContext(edm::StreamID streamID):
stream_ = cs->getCUDAStream();
}

CUDAScopedContext::CUDAScopedContext(const CUDAProductBase& data):
currentDevice_(data.device()),
setDeviceForThisScope_(currentDevice_)
{
if(data.mayReuseStream()) {
stream_ = data.streamPtr();
}
else {
edm::Service<CUDAService> cs;
stream_ = cs->getCUDAStream();
}
}


CUDAScopedContext::CUDAScopedContext(int device, std::unique_ptr<cuda::stream_t<>> stream, std::unique_ptr<cuda::event_t> event):
currentDevice_(device),
setDeviceForThisScope_(device),
Expand Down
5 changes: 5 additions & 0 deletions HeterogeneousCore/CUDACore/test/BuildFile.xml
@@ -1,6 +1,11 @@
<bin file="test_*.cc test_*.cu" name="testHeterogeneousCoreCUDACore">
<use name="CUDADataFormats/Common"/>
<use name="FWCore/TestProcessor"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/PluginManager"/>
<use name="FWCore/ServiceRegistry"/>
<use name="HeterogeneousCore/CUDACore"/>
<use name="HeterogeneousCore/CUDAServices"/>
<use name="catch2"/>
<use name="cuda"/>
</bin>
Expand Down
11 changes: 8 additions & 3 deletions HeterogeneousCore/CUDACore/test/test_CUDAScopedContext.cc
@@ -1,6 +1,8 @@
#include "catch.hpp"

#include "CUDADataFormats/Common/interface/CUDAProduct.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "HeterogeneousCore/CUDACore/interface/CUDAScopedContext.h"
#include "HeterogeneousCore/CUDAUtilities/interface/cudaCheck.h"
#include "HeterogeneousCore/CUDAUtilities/interface/exitSansCUDADevices.h"
Expand Down Expand Up @@ -59,6 +61,11 @@ TEST_CASE("Use of CUDAScopedContext", "[CUDACore]") {
CUDAScopedContext ctx2{data};
REQUIRE(cuda::device::current::get().id() == data.device());
REQUIRE(ctx2.stream().id() == data.stream().id());

// Second use of a product should lead to new stream
CUDAScopedContext ctx3{data};
REQUIRE(cuda::device::current::get().id() == data.device());
REQUIRE(ctx3.stream().id() != data.stream().id());
}

SECTION("Storing state as CUDAContextToken") {
Expand Down Expand Up @@ -118,9 +125,7 @@ TEST_CASE("Use of CUDAScopedContext", "[CUDACore]") {
}
}

// Destroy and clean up all resources so that the next test can
// assume to start from a clean state.
cudaCheck(cudaSetDevice(defaultDevice));
cudaCheck(cudaDeviceSynchronize());
cudaDeviceReset();
// Note: CUDA resources are cleaned up by CUDAService destructor
}
27 changes: 27 additions & 0 deletions HeterogeneousCore/CUDACore/test/test_main.cc
@@ -1,2 +1,29 @@
#define CATCH_CONFIG_MAIN
#include "catch.hpp"

#include "FWCore/PluginManager/interface/standard.h"
#include "FWCore/PluginManager/interface/PluginManager.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"

class ServiceRegistryListener: public Catch::TestEventListenerBase {
public:
using Catch::TestEventListenerBase::TestEventListenerBase; // inherit constructor

void testRunStarting(Catch::TestRunInfo const& testRunInfo) override {
edmplugin::PluginManager::configure(edmplugin::standard::config());

const std::string config{
R"_(import FWCore.ParameterSet.Config as cms
process = cms.Process('Test')
process.CUDAService = cms.Service('CUDAService')
)_"
};

edm::ServiceToken tempToken = edm::ServiceRegistry::createServicesFromConfig(config);
operate_.reset(new edm::ServiceRegistry::Operate(tempToken));
}

private:
std::unique_ptr<edm::ServiceRegistry::Operate> operate_;
};
CATCH_REGISTER_LISTENER(ServiceRegistryListener);

0 comments on commit 219ef3c

Please sign in to comment.