Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add direct operator calls in debug mode #3734

Merged
merged 24 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1f31170
Add direct operators prototype
ksztenderski Mar 8, 2022
94c4978
Add PipelineDebug class in the backend and general cleanup
ksztenderski Mar 9, 2022
c58d62c
Add template of cuda stream support in PipelineDebug
ksztenderski Mar 10, 2022
83fe0a0
Remove copy of outputs for TL
ksztenderski Mar 15, 2022
6405be9
Remove experimental ops exposure and direct_operator_call_test
ksztenderski Mar 15, 2022
aef8a73
Typo fix and cuda_stream fix
ksztenderski Mar 15, 2022
da04779
Add aritm_op support in direct operator for debug mode
ksztenderski Mar 16, 2022
596bafe
Add default layout support in direct op
ksztenderski Mar 16, 2022
cc7a4dc
Clean up
ksztenderski Mar 16, 2022
e843e85
Remove python eager operator exposure add operator device check
ksztenderski Mar 22, 2022
554c698
Add SetDefaultLayout usage in eager operator
ksztenderski Mar 22, 2022
23d4795
Clean up
ksztenderski Mar 22, 2022
d0091be
Add layout set fix
ksztenderski Mar 23, 2022
c1a8edc
Revert PresentAsTensorList
ksztenderski Mar 23, 2022
aecbc75
Merge branch 'NVIDIA:main' into eager_operator_calls
ksztenderski Mar 23, 2022
da27a7e
Change default device for shared thread pool in eager operator
ksztenderski Mar 23, 2022
b134aaa
Fix operator exposure in python
ksztenderski Mar 23, 2022
8148a56
Add OperatorManager util class for debug mode
ksztenderski Mar 29, 2022
2da6e95
Rename back to _debug_mode
ksztenderski Mar 29, 2022
92bfc65
Fix multiple input sets support in debug mode
ksztenderski Mar 30, 2022
aa0da7f
Revert to fn stype names for operators in debug mode
ksztenderski Mar 30, 2022
ea94e0e
Clean up
ksztenderski Mar 30, 2022
8ff9509
Fix input sets len check and adding inputs to OpSpec
ksztenderski Apr 1, 2022
b3846c4
Clean up
ksztenderski Apr 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions dali/pipeline/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "dali/pipeline/graph/op_graph_verifier.h"
#include "dali/pipeline/operator/batch_size_provider.h"
#include "dali/pipeline/operator/common.h"
#include "dali/pipeline/util/batch_utils.h"
#include "dali/pipeline/util/event_pool.h"
#include "dali/pipeline/util/stream_pool.h"
#include "dali/pipeline/util/thread_pool.h"
Expand Down Expand Up @@ -355,15 +356,6 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {
WorkspacePolicy ws_policy_;

private:
template <typename InputRef>
static bool SetDefaultLayoutIfNeeded(InputRef &in, const OpSchema &schema, int in_idx) {
if (!in.GetLayout().empty()) return false;
auto default_layout = schema.GetInputLayout(in_idx, in.shape().sample_dim(), in.GetLayout());
if (default_layout.empty()) return false;
in.SetLayout(default_layout);
return true;
}

template <typename Workspace>
void RunHelper(OpNode &op_node, Workspace &ws);

Expand Down
239 changes: 239 additions & 0 deletions dali/pipeline/operator/eager_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DALI_PIPELINE_OPERATOR_EAGER_OPERATOR_H_
#define DALI_PIPELINE_OPERATOR_EAGER_OPERATOR_H_

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "dali/core/cuda_stream_pool.h"
#include "dali/pipeline/data/tensor_list.h"
#include "dali/pipeline/operator/op_spec.h"
#include "dali/pipeline/operator/operator.h"
#include "dali/pipeline/util/backend2workspace_map.h"
#include "dali/pipeline/util/batch_utils.h"
#include "dali/pipeline/util/thread_pool.h"
#include "dali/pipeline/workspace/workspace.h"

namespace dali {

template <typename Backend>
std::shared_ptr<TensorList<Backend>> AsTensorList(const std::shared_ptr<TensorList<Backend>> &in) {
return in;
}

template <typename Backend>
std::shared_ptr<TensorList<Backend>> AsTensorList(
const std::shared_ptr<TensorVector<Backend>> &in) {
if (in->IsContiguous()) {
// Filled contiguous TensorVector, we can return TensorList directly.
return in->AsTensorList(false);
}

auto tl = std::make_shared<TensorList<Backend>>();
tl->Copy(*in);
return tl;
}

/**
* @brief Direct operator providing eager execution of an operator in Run.
*/
template <typename Backend>
class DLL_PUBLIC EagerOperator {
public:
DLL_PUBLIC inline EagerOperator(const OpSpec &spec)
: batch_size_(spec.GetArgument<int>("max_batch_size")),
op_spec_(spec),
op_(InstantiateOperator(spec)) {
num_outputs_ = op_spec_.GetSchema().CalculateOutputs(op_spec_) +
op_spec_.GetSchema().CalculateAdditionalOutputs(op_spec_);
}

// Runs operator using shared thread pool and shared CUDA stream.
template <typename InBackend, typename OutBackend>
DLL_PUBLIC std::vector<std::shared_ptr<TensorList<OutBackend>>> Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs) {
DALI_FAIL("Unsupported backends in EagerOperator.Run().");
}

// Runs operator using specified thread pool.
template <typename InBackend, typename OutBackend>
DLL_PUBLIC std::vector<std::shared_ptr<TensorList<OutBackend>>> Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
ThreadPool *tp) {
DALI_FAIL("Unsupported backends in EagerOperator.Run() with thread pool.");
}

// Runs operator using specified CUDA stream.
template <typename InBackend, typename OutBackend>
DLL_PUBLIC std::vector<std::shared_ptr<TensorList<OutBackend>>> Run(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
CUDAStreamLease &cuda_stream) {
DALI_FAIL("Unsupported backends in EagerOperator.Run() with CUDA stream");
}

// Update shared thread pool used for all direct operators.
DLL_PUBLIC inline static void UpdateThreadPool(int num_threads, int device_id,
bool set_affinity) {
shared_thread_pool = std::make_unique<ThreadPool>(num_threads, device_id, set_affinity);
}

// Update shared CUDA stream used for all direct operators.
DLL_PUBLIC inline static void UpdateCudaStream(int device_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the naming and semantics are a bit unfortunate here. We may need to think about how do we work on multi-gpu systems - the GPU operators should use a specific device and stream pair, for now we are using an internal stream, but the device should be assigned to the PipelineDebug instance probably.

Copy link
Contributor Author

@ksztenderski ksztenderski Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably an overlook, we already have a specific device and stream for a single PipelineDebug instance.

if (device_id != CPU_ONLY_DEVICE_ID) {
DeviceGuard g(device_id);
shared_cuda_stream = CUDAStreamPool::instance().Get(device_id);
}
}

private:
template <typename InBackend, typename OutBackend, typename WSInputType, typename WSOutputType>
std::vector<std::shared_ptr<TensorList<OutBackend>>> RunImpl(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs);

int batch_size_;
size_t num_outputs_;
workspace_t<Backend> ws_;
OpSpec op_spec_;
std::unique_ptr<OperatorBase> op_;

static CUDAStreamLease shared_cuda_stream;
static std::unique_ptr<ThreadPool> shared_thread_pool;
};

template <>
template <>
std::vector<std::shared_ptr<TensorList<CPUBackend>>> EagerOperator<CPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
ThreadPool *thread_pool) {
ws_.Clear();
ws_.SetThreadPool(thread_pool);

return RunImpl<CPUBackend, CPUBackend, TensorVector<CPUBackend>, TensorVector<CPUBackend>>(
inputs, kwargs);
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<GPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<GPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
CUDAStreamLease &cuda_stream) {
ws_.Clear();
ws_.set_stream(cuda_stream);
auto output = RunImpl<GPUBackend, GPUBackend, TensorList<GPUBackend>, TensorList<GPUBackend>>(
inputs, kwargs);
CUDA_CALL(cudaStreamSynchronize(cuda_stream));
return output;
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<MixedBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs,
CUDAStreamLease &cuda_stream) {
ws_.Clear();
ws_.set_stream(cuda_stream);
auto output = RunImpl<CPUBackend, GPUBackend, TensorVector<CPUBackend>, TensorList<GPUBackend>>(
inputs, kwargs);
CUDA_CALL(cudaStreamSynchronize(cuda_stream));
return output;
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<CPUBackend>>> EagerOperator<CPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs) {
return Run<CPUBackend, CPUBackend>(inputs, kwargs, shared_thread_pool.get());
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<GPUBackend>::Run(
const std::vector<std::shared_ptr<TensorList<GPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs) {
return Run<GPUBackend, GPUBackend>(inputs, kwargs, shared_cuda_stream);
}

template <>
template <>
std::vector<std::shared_ptr<TensorList<GPUBackend>>> EagerOperator<MixedBackend>::Run(
const std::vector<std::shared_ptr<TensorList<CPUBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs) {
return Run<CPUBackend, GPUBackend>(inputs, kwargs, shared_cuda_stream);
}

template <typename Backend>
template <typename InBackend, typename OutBackend, typename WSInputType, typename WSOutputType>
std::vector<std::shared_ptr<TensorList<OutBackend>>> EagerOperator<Backend>::RunImpl(
const std::vector<std::shared_ptr<TensorList<InBackend>>> &inputs,
const std::unordered_map<std::string, std::shared_ptr<TensorList<CPUBackend>>> &kwargs) {
// Convert and add inputs to the workspace.
for (size_t in_idx = 0; in_idx < inputs.size(); ++in_idx) {
auto tensor_in = std::make_shared<WSInputType>();
tensor_in->ShareData(*inputs[in_idx]);
SetDefaultLayoutIfNeeded(*tensor_in, op_spec_.GetSchema(), in_idx);
ws_.AddInput(tensor_in);
}

for (auto &arg : kwargs) {
ws_.AddArgumentInput(arg.first, arg.second);
}

std::vector<OutputDesc> output_desc{};
std::vector<std::shared_ptr<TensorList<OutBackend>>> outputs{};

outputs.reserve(num_outputs_);

for (size_t i = 0; i < num_outputs_; ++i) {
ws_.AddOutput(std::make_shared<WSOutputType>(batch_size_));
}

ws_.SetBatchSizes(batch_size_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we probably should take the batch size from the inputs in this iteration, to support the iter-2-iter variable batch size. It is driven via the external source. Ofc user would be able to abuse it in DebugMode and run with different batch sizes among the operators in one iterations.

The problem is that it infers the BS from external source, so we would need to run them ahead of other operators to infer the BS, store it in PipelineDebug and pass it here to be used, as it also affects generator operators that are inputless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now staying with current implementation, might add support for variable batch size in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should highlight this as a limitation and check if the ES in the debug mode doesn't change the batch size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I see the check is there.


// Setup outputs.
if (op_->Setup(output_desc, ws_) && op_->CanInferOutputs()) {
for (size_t i = 0; i < num_outputs_; ++i) {
ws_.template Output<OutBackend>(i).Resize(output_desc[i].shape, output_desc[i].type);
}
}

op_->Run(ws_);

for (size_t i = 0; i < num_outputs_; ++i) {
outputs.push_back(AsTensorList<OutBackend>(ws_.template OutputPtr<OutBackend>(i)));
}

return outputs;
}

template <typename Backend>
std::unique_ptr<ThreadPool> EagerOperator<Backend>::shared_thread_pool =
std::make_unique<ThreadPool>(1, CPU_ONLY_DEVICE_ID, false);

template <typename Backend>
CUDAStreamLease EagerOperator<Backend>::shared_cuda_stream{};

} // namespace dali

#endif // DALI_PIPELINE_OPERATOR_EAGER_OPERATOR_H_
21 changes: 1 addition & 20 deletions dali/pipeline/operator/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "dali/pipeline/operator/op_schema.h"
#include "dali/pipeline/operator/op_spec.h"
#include "dali/pipeline/operator/operator_factory.h"
#include "dali/pipeline/util/batch_utils.h"
#include "dali/pipeline/util/backend2workspace_map.h"
#include "dali/pipeline/workspace/device_workspace.h"
#include "dali/pipeline/workspace/sample_workspace.h"
Expand Down Expand Up @@ -61,26 +62,6 @@ const std::string kSeed = "seed"; // NOLINT
const std::string kDtype = "dtype"; // NOLINT
} // namespace arg_names

/**
* @brief Verifies that the inputs in the workspace satisfy the layout
* constraints imposed by the schema.
*/
template <typename Workspace>
inline void CheckInputLayouts(const Workspace &ws, const OpSpec &spec) {
auto &schema = spec.GetSchema();
for (int i = 0; i < spec.NumRegularInput(); ++i) {
if (ws.template InputIsType<CPUBackend>(i)) {
auto &input = ws.template Input<CPUBackend>(i);
(void) schema.GetInputLayout(i, input.shape().sample_dim(), input.GetLayout());
} else if (ws.template InputIsType<GPUBackend>(i)) {
auto &input = ws.template Input<GPUBackend>(i);
(void) schema.GetInputLayout(i, input.shape().sample_dim(), input.GetLayout());
} else {
DALI_FAIL(make_string("Input ", i, " has an unknown backend"));
}
}
}

/**
* @brief Baseclass for the basic unit of computation in the pipeline.
*
Expand Down
Loading