From 5004beb571393c1c6638462908af71c8807b82d7 Mon Sep 17 00:00:00 2001 From: huajsj Date: Sun, 13 Mar 2022 01:18:29 -0800 Subject: [PATCH] [Runtime][PipelineExecutor] Getting the asynchronous output This patch create a new GlobalRuntime to check whether the output data ready and poll global output of pipeline, it also removed the sequence pipeline execution logic as the asynchronous logic already done. --- python/tvm/contrib/pipeline_executor.py | 4 +- src/runtime/pipeline/pipeline_executor.cc | 11 +- src/runtime/pipeline/pipeline_executor.h | 7 +- src/runtime/pipeline/pipeline_scheduler.cc | 61 +--- src/runtime/pipeline/pipeline_scheduler.h | 12 +- src/runtime/pipeline/pipeline_struct.h | 306 ++++++++++++------- tests/python/relay/test_pipeline_executor.py | 29 +- 7 files changed, 222 insertions(+), 208 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index b4a853a4ec10..dc276b1b0285 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -125,9 +125,9 @@ def __init__(self, module): self._get_input_pipeline_map = self.module["get_input_pipeline_map"] self._get_pipe_execute_count = self.module["get_execute_count"] - def run(self, sync=False): + def run(self): """Run the pipeline executor.""" - self._run(sync) + self._run() def get_input_pipeline_map(self, name): """Using the "name" to get the corresponding subgraph index and also get the "input name" diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 85eab912024f..aff7e5205c94 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -78,7 +78,7 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, return PackedFunc( [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); }); } else if (name == "run") { - return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); }); + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(); }); } else if (name == "get_execute_count") { return PackedFunc( [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetExecutionCount(); }); @@ -140,13 +140,8 @@ int PipelineExecutor::GetParamsGroupPipelineMap(const std::string& name) { return param_connection_config[name]; } -/*! - * \brief Run the pipeline executor. - * \param serialized_mode Whether run the pipeline executor in serialized mode. - */ -void PipelineExecutor::Run(bool serialized_mode) { - pipeline_scheduler_.PipelineRun(runtimes_, pipeline_config_, serialized_mode); -} +/*!\brief Run the pipeline executor.*/ +void PipelineExecutor::Run() { pipeline_scheduler_.PipelineRun(runtimes_, pipeline_config_); } /*! * \brief return A list of global output data. */ diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index 1d547f206a95..9a24acdc2741 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -113,11 +113,8 @@ class TVM_DLL PipelineExecutor : public ModuleNode { * \return The number of outputs. */ int NumOutputs() const { return num_outputs_; } - /*! - * \brief Run the pipeline executor. - * \param serialized_mode Whether run the pipeline executor in serialized mode. - */ - void Run(bool serialized_mode); + /*!\brief Run the pipeline executor.*/ + void Run(); /*! * \brief Get a list output data. * \return A list of output data. diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index 760bcd2c07a8..a417feb68301 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -32,6 +32,7 @@ std::vector> PipelineScheduler::PipelineInit( const std::vector& modules, const ConfigPipelineExecution& pipeline_config) { std::vector> runtimes; graph_modules_ = modules; + global_runtime_ = std::make_shared(GLOBAL_MODULE_INDEX); // Creating a list of runtimes. for (size_t i = 0; i < graph_modules_.size(); i++) { auto run_item = std::make_shared(graph_modules_[i], i); @@ -49,71 +50,25 @@ std::vector> PipelineScheduler::PipelineInit( } // Initializing and then running the worker thread. for (auto runtime : runtimes) { - runtime->InitializePipeline(pipeline_config, &runtimes); + runtime->InitializePipeline(pipeline_config, &runtimes, global_runtime_); } return runtimes; } -/*! - * \brief Running the pipeline logic in the sequential mode. - * \param runtimes A list of backend runtime modules. - * \param pipeline_config The dependent configuration of each runtime module. - */ -void PipelineScheduler::PipelineRunSequential( - const std::vector>& runtimes, - ConfigPipelineExecution pipeline_config) { - for (size_t i = 0; i < runtimes.size(); i++) { - // The "runtimes" is a list of runtime sorted by the runtime index which should be - // contiguous ascend. - if (static_cast(i) != runtimes[i]->GetModuleIndex()) { - LOG(FATAL) << "Runtime index " << runtimes[i]->GetModuleIndex() - << " is not as same as vector offset value " << i; - } - - if (!pipeline_config.FindModuleInConfig(i)) { - LOG(FATAL) << "Not find the configuration for the module " << i; - } - - runtimes[i]->Run(); - // Getting the output then forwarding into other module once it is configured as input of - // another module or storaging into the "output_array" when the output is a global one. - int outputs_num = runtimes[i]->NumOutputs(); - for (int j = 0; j < outputs_num; j++) { - ConfigBindings& out_binding = pipeline_config[i][j]; - std::unordered_map& input_connections = out_binding.Get(); - NDArray output = runtimes[i]->GetOutput(j); - for (auto bind : input_connections) { - // "bind.first < 0" means the bind is a global bind, by pass the forwarding for - // a global bind. - if (bind.first < 0) continue; - // Setting the output as an input data into the runtime module. - runtimes[bind.first]->SetInput(bind.second, const_cast(output.operator->())); - } - // Store the output. - if (out_binding.IsGlobalOutput()) { - int global_idx = out_binding.GetGlobalOutputIndex(); - TVMArrayCopyFromTo(const_cast(output.operator->()), - const_cast(output_arrays_[global_idx].operator->()), nullptr); - } - } - } -} /*! * \brief Running pipeline logic. * \param runtimes A list of backend runtime modules. * \param pipeline_config The dependency configuration of each runtime module. - * \param sequential_mode Whether the execution is in a sequential mode. */ void PipelineScheduler::PipelineRun(const std::vector>& runtimes, - ConfigPipelineExecution pipeline_config, bool sequential_mode) { - if (!sequential_mode) { - runtimes.front()->RunPipeline(); - } else { - PipelineRunSequential(runtimes, pipeline_config); - } + ConfigPipelineExecution pipeline_config) { + runtimes.front()->RunPipeline(); } /*! * \brief Get a list of output. */ -Array PipelineScheduler::PipelineGetOutput() { return output_arrays_; } +Array PipelineScheduler::PipelineGetOutput() { + bool ret = global_runtime_->GetOutput(&output_arrays_); + return ret ? output_arrays_ : Array{}; +} } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_scheduler.h b/src/runtime/pipeline/pipeline_scheduler.h index 3339d4376083..9fb357b8e9f0 100644 --- a/src/runtime/pipeline/pipeline_scheduler.h +++ b/src/runtime/pipeline/pipeline_scheduler.h @@ -47,17 +47,9 @@ class PipelineScheduler { * \brief Running the pipeline logic. * \param runtimes A list of backend runtime modules. * \param pipeline_config The dependency configuration of each runtime module. - * \param sequential_mode Whether the execution is in a sequential mode. */ void PipelineRun(const std::vector>& runtimes, - ConfigPipelineExecution pipeline_config, bool sequential_mode = false); - /*! - * \brief Running the pipeline logic in the sequential mode. - * \param runtimes A list of backend runtime modules. - * \param pipeline_config The dependent configuration of each runtime module. - */ - void PipelineRunSequential(const std::vector>& runtimes, - ConfigPipelineExecution pipeline_config); + ConfigPipelineExecution pipeline_config); /*! * \brief Get a list of outputs. */ @@ -68,6 +60,8 @@ class PipelineScheduler { std::vector graph_modules_; /*!\brief A list of NDArray used to storage outputs.*/ Array output_arrays_; + /*!\brief The global runtime to represent the pipeline executor.*/ + std::shared_ptr global_runtime_; }; } // namespace runtime } // namespace tvm diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index beb4f425e93a..c6d5fa8cc61a 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -66,7 +66,7 @@ enum InterfaceType { INPUT = 0, OUTPUT, }; -/*!\The state of the pipeline.*/ +/*!\brief The state of the pipeline.*/ enum PipelineState { STOPPED = 0, RUNNING, @@ -179,6 +179,14 @@ class DataNotify { /*!\brief The container used to store the forwarding data of the pipeline.*/ class QueueData { public: + explicit QueueData(DLTensor* data) { + if (data_ == data) { + LOG(FATAL) << "The value of 'data'(" << data << ") is the same as 'data_'(" << data_ << ")"; + } + data_ = data; + SetAsDataOwner(false); + } + QueueData() { SetAsDataOwner(true); } /*!\brief Doing a deep copy for the 'QueueData' structure.*/ QueueData& operator=(const QueueData& data) { CreateCopyFrom(data.GetDLData()); @@ -200,26 +208,26 @@ class QueueData { } size_t fromLen = tvm::runtime::GetDataSize(*from); size_t toLen = data_ ? tvm::runtime::GetDataSize(*data_) : 0; - if (!(device_type_ == from->device.device_type && device_id_ == from->device.device_id) || - fromLen != toLen) { - if (data_) { - TVMArrayFree(data_); - data_ = nullptr; + if (fromLen != toLen) { + // If this container ownes the variable 'data_', then recreating the 'data_' variable. + if (IsDataOwner()) { + if (data_) { + TVMArrayFree(data_); + data_ = nullptr; + } + TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, + from->dtype.lanes, from->device.device_type, from->device.device_id, &data_); + } else { + LOG(FATAL) << "The 'from' data is not matched with the 'data_'."; } - TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes, - from->device.device_type, from->device.device_id, &data_); } TVMArrayCopyFromTo(const_cast(from), data_, nullptr); - device_type_ = from->device.device_type; - device_id_ = from->device.device_id; return data_; } /*!\brief Return a pointer to the 'DLTensor' data.*/ DLTensor* GetDLData() const { return data_; } - const int DeviceType() { return device_type_; } - const int DeviceID() { return device_id_; } ~QueueData() { - if (data_) { + if (IsDataOwner() && data_) { TVMArrayFree(data_); data_ = nullptr; } @@ -228,10 +236,12 @@ class QueueData { private: /*!\brief Pointer to the forwarding data.*/ DLTensor* data_ = nullptr; - /*!\brief The type of device which generated the QueueData container.*/ - int device_type_; - /*!\brief The id of device which generated the data in this container.*/ - int device_id_; + /*!\brief Whether this container is the owner of the 'data_'.*/ + bool is_data_owner_ = false; + /*!\brief Set the current container as the owner of the 'data_'.*/ + void SetAsDataOwner(bool is_owner) { is_data_owner_ = is_owner; } + /*!Check whether the current container is the owner of the 'data_'.*/ + bool IsDataOwner() const { return is_data_owner_; } }; /*! * \brief All binding information of an output interface. @@ -253,6 +263,9 @@ class ConfigBindings { for (auto output : bindings_) { parse_function(output_idx, output.first, output.second); } + if (IsGlobalOutput()) { + parse_function(output_idx, GLOBAL_MODULE_INDEX, std::to_string(global_output_index_)); + } } /*! * \brief Create a module interface map from JSONReader. @@ -412,7 +425,7 @@ class ConfigPipelineExecution { ICHECK(config_.find(key) != config_.end()); return config_[key]; } - /**/ + /*Get the cpu affinity settings.*/ std::string GetCPUAffinity(int runtime_idx) { auto config = config_.find(runtime_idx); if (config == config_.end()) { @@ -617,41 +630,141 @@ struct ParamConnectionConfig { * interfaces of backend cores. */ using ForwardQueue = SPSCLockFreeQueue; -/* - *!\brief Backend Runtime. - */ -class BackendRuntime { - using ModuleInputPairList = std::vector, int>>; - using ForwardQueueMap = - std::unordered_map, ModuleIDHash>; +using ForwardQueueMap = + std::unordered_map, ModuleIDHash>; +/*!\brief The basic class for runtime.*/ +class BasicRuntime { + using ModuleInputPairList = std::vector, int>>; - private: + public: + explicit BasicRuntime(int runtime_idx) : runtime_idx_(runtime_idx) {} + /*!\brief Return the index of the current module.*/ + int GetModuleIndex() { return runtime_idx_; } + /*! + *\brief Creating a parent notification. + *\param input_index The input index of the 'current runtime'. + *\param parent_idx The index of 'parent runtime' which will send the notification. + *\param parent_output_idx The output index of the 'parent runtime' which will send + * the notification. + */ + virtual void CreateParentsNotify(int input_index, int parent_idx, int parent_output_idx) {} + /*! + * \brief Notifying an input is ready. + * \param input_index The index of 'input interface' which is ready for data. + */ + virtual void ParentNotify(int input_index) {} + + protected: /*!\brief The index of runtime indicates the runtime position in the pipeline.*/ int runtime_idx_; - /*!*/ - std::string cpu_affinity_ = ""; - /*!\brief The Runtime module of a backend graph executor.*/ - Module module_; - /*\brief The thread is associated with the current runtime*/ - std::thread thread_; - /*!\brief The state of the pipeline.*/ - std::atomic pipeline_state_{STOPPED}; /*!\brief A list of runtime which depends on the current runtime.*/ std::unordered_map children_; - /*!\brief A map including the runtime input index and the notification data structure.*/ - std::unordered_map> parents_notify_; - /*!\brief The execution count of the 'RunPipeline' function. */ - uint32_t pipeline_execution_count_ = 0; /*! * \brief A list of SPSC input queues in which the input interface will poll the data sent from * other backend cores. */ std::unordered_map> input_queue_; + /*! * \brief A list of SPSC output queues in which the output interface will push the data to * other backend cores. */ std::unordered_map output_queue_; + /*! + * \brief Generate the ID of an input queue. + * \param runtime_index The index of backend runtime. + * \param interface_index The index of the interface. + * \param type The type of the interface. + */ + ModuleInterfaceID GenerateQueueID(int runtime_index, int interface_index, InterfaceType type) { + return ModuleInterfaceID(runtime_index, interface_index, type); + } + /*! + * \brief Creating a forwarding queue for the pair of an output interface and an input interface. + * \param output_idx The index of an output interface which will send the forwarding data. + * \param child_runtime The backend runtime which owns the input interface. + * \param input_index The index of an input interface which will receive the forwarding data. + */ + void CreateForwardingQueue(int output_idx, std::shared_ptr child_runtime, + int input_index) { + auto queue_id = GenerateQueueID(child_runtime->GetModuleIndex(), input_index, INPUT); + // The forwarding queue map of a specified output interface. + auto& queue_map = output_queue_[output_idx]; + if (queue_map.find(queue_id) != queue_map.end()) { + LOG(FATAL) << "The queue " << queue_id.runtime_idx << "." << queue_id.runtime_interface_idx + << " is already created!"; + return; + } + auto queue = std::make_shared(queue_id); + queue_map[queue_id] = queue; + // Use the created queue as the consumer queue for the input interface of this forwarding + // pair. + child_runtime->AppendInputQueue(input_index, queue); + } + /*! + * \brief Setting the consumer queue for the input interface. + * \param input_index The index of the input interface. + * \param queue The consumer queue. + */ + void AppendInputQueue(int input_index, std::shared_ptr queue) { + input_queue_[input_index] = queue; + } +}; +/*! + * \brief This global runtime represents the pipeline executor and exposes the input and output + * interface. + */ +class GlobalRuntime : public BasicRuntime { + public: + explicit GlobalRuntime(int runtime_idx) : BasicRuntime(runtime_idx) {} + /*!\brief Whether the output data is ready.*/ + bool DataIsReady(bool wait_data) { + bool data_ready = true; + for (auto queue_pair : input_queue_) { + auto queue = queue_pair.second; + if (queue->Empty()) { + data_ready = false; + break; + } + } + if (!data_ready && wait_data) { + // TODO(huajsj): Waitting the data ready message. + } + return data_ready; + } + /*!\brief Get the output data.*/ + bool GetOutput(Array* outputs, bool wait_data = false) { + if (!DataIsReady(wait_data)) { + return false; + } + for (auto queue_pair : input_queue_) { + auto output_index = queue_pair.first; + auto queue = queue_pair.second; + QueueData data(const_cast(((*outputs)[output_index]).operator->())); + if (!queue->Poll(&data)) { + LOG(FATAL) << "There is no data in the data queue, it should not happen!"; + } + } + return true; + } +}; +/* + *!\brief Backend Runtime. + */ +class BackendRuntime : public BasicRuntime { + private: + /*!The cpu affinity settings for this runtime.*/ + std::string cpu_affinity_ = ""; + /*!\brief The Runtime module of a backend graph executor.*/ + Module module_; + /*\brief The thread is associated with the current runtime*/ + std::thread thread_; + /*!\brief The state of the pipeline.*/ + std::atomic pipeline_state_{STOPPED}; + /*!\brief A map including the runtime input index and the notification data structure.*/ + std::unordered_map> parents_notify_; + /*!\brief The execution count of the 'RunPipeline' function. */ + uint32_t pipeline_execution_count_ = 0; /*! *\brief In order to transfer data from one backend runtime to another, we need a local * tensor variable as a medium. "input_tensor_local_copy_" is a map including @@ -719,19 +832,14 @@ class BackendRuntime { if ((exit_notify = notify->second->GetExitState())) break; // Getting the source which sends this notification. auto target_input_interface_index = notify->first; - auto source_interface_id = notify->second->GetNotifySource(); // Loading the binding data. while (!this->LoadBindingData(target_input_interface_index)) { // Waiting for the notification. if (!notify->second->Wait()) { - VLOG(1) << "runtime index:" << runtime_idx_ << " receive exit notify."; exit_notify = true; break; } } - VLOG(1) << "Data forwarding from runtime(" << source_interface_id.runtime_idx << ").output(" - << source_interface_id.runtime_interface_idx << ") to runtime(" << runtime_idx_ - << ").input(" << target_input_interface_index << ")"; notifys.erase(notify); } return exit_notify; @@ -795,22 +903,6 @@ class BackendRuntime { } return true; } - /*! - *\brief Creating a parent notification. - *\param input_index The input index of the 'current runtime'. - *\param parent_idx The index of 'parent runtime' which will send the notification. - *\param parent_output_idx The output index of the 'parent runtime' which will send - * the notification. - */ - void CreateParentsNotify(int input_index, int parent_idx, int parent_output_idx) { - if (parents_notify_.find(input_index) != parents_notify_.end()) { - LOG(FATAL) << "The notification associated with the input interface " << input_index - << " in runtime " << runtime_idx_ << " already been created!"; - return; - } - parents_notify_[input_index] = - std::make_shared(ModuleInterfaceID(parent_idx, parent_output_idx, OUTPUT)); - } /*! * \brief Copying from a given tensor and using 'CPU' as the device. */ @@ -867,9 +959,7 @@ class BackendRuntime { } public: - BackendRuntime(Module mod, int mod_idx) { - module_ = mod; - runtime_idx_ = mod_idx; + BackendRuntime(Module mod, int mod_idx) : BasicRuntime(mod_idx), module_(mod) { get_input_index_ = module_.GetFunction("get_input_index"); get_num_output_ = module_.GetFunction("get_num_outputs"); get_num_inputs_ = module_.GetFunction("get_num_inputs"); @@ -884,8 +974,22 @@ class BackendRuntime { } StopPipeline(); } - /*!brief Getting the runtime index*/ - int GetIndex() const { return runtime_idx_; } + /*! + *\brief Creating a parent notification. + *\param input_index The input index of the 'current runtime'. + *\param parent_idx The index of 'parent runtime' which will send the notification. + *\param parent_output_idx The output index of the 'parent runtime' which will send + * the notification. + */ + void CreateParentsNotify(int input_index, int parent_idx, int parent_output_idx) { + if (parents_notify_.find(input_index) != parents_notify_.end()) { + LOG(FATAL) << "The notification associated with the input interface " << input_index + << " in runtime " << runtime_idx_ << " already been created!"; + return; + } + parents_notify_[input_index] = + std::make_shared(ModuleInterfaceID(parent_idx, parent_output_idx, OUTPUT)); + } /*! * \brief Getting the times of using pipeline function. * \return The times of using pipeline function. @@ -897,21 +1001,31 @@ class BackendRuntime { * \param runtimes A list of BackendRuntime. */ void InitializePipeline(ConfigPipelineExecution config, - std::vector>* runtimes) { - cpu_affinity_ = config.GetCPUAffinity(runtime_idx_); + std::vector>* runtimes, + std::shared_ptr global_runtime) { // Getting the 'binding configuration' for each runtime. config.VisitRuntimeOutputConfig( [&](int output_idx, int child_idx, std::string child_input_name) { - int runtime_idx_max = runtimes->size(); - if (child_idx < 0 || child_idx >= runtime_idx_max) { - LOG(FATAL) << "The runtime index " << child_idx << " is out of the range."; - } - auto child_runtime = runtimes->at(child_idx); - ICHECK(child_runtime->GetModuleIndex() == child_idx); - int input_index = child_runtime->GetInputIndex(child_input_name); - if (input_index < 0) { - LOG(FATAL) << "Can not find the input " << input_index << "in runtime " << child_idx; + std::shared_ptr child_runtime = nullptr; + int input_index; + if (GLOBAL_MODULE_INDEX == child_idx) { + int global_output_index = std::stoi(child_input_name); + input_index = global_output_index; + child_runtime = global_runtime; + } else { + int runtime_idx_max = runtimes->size(); + if (child_idx < 0 || child_idx >= runtime_idx_max) { + LOG(FATAL) << "The runtime index " << child_idx << " is out of the range."; + } + auto runtime = runtimes->at(child_idx); + ICHECK(runtime->GetModuleIndex() == child_idx); + input_index = runtime->GetInputIndex(child_input_name); + if (input_index < 0) { + LOG(FATAL) << "Can not find the input " << input_index << "in runtime " << child_idx; + } + child_runtime = runtime; } + ICHECK(child_runtime != nullptr); children_[output_idx].push_back(std::make_pair(child_runtime, input_index)); child_runtime->CreateParentsNotify(input_index, runtime_idx_, output_idx); VLOG(1) << " parent_idx.output:" << runtime_idx_ << "." << output_idx @@ -935,54 +1049,12 @@ class BackendRuntime { return; } notify->second->Notify(); - VLOG(1) << "Notification at runtime_index.input_index:" << runtime_idx_ << "." << input_index; } /*!\brief Creating a NDArray containing same shape and data type with a module output. */ NDArray CreateFromOutput(int idx) { NDArray data = get_output_(idx); return CreateNDArrayFromDLTensor(const_cast(data.operator->())); } - /*! - * \brief Generate the ID of an input queue. - * \param runtime_index The index of backend runtime. - * \param interface_index The index of the interface. - * \param type The type of the interface. - */ - ModuleInterfaceID GenerateQueueID(int runtime_index, int interface_index, InterfaceType type) { - return ModuleInterfaceID(runtime_index, interface_index, type); - } - /*! - * \brief Creating a forwarding queue for the pair of an output interface and an input interface. - * \param output_idx The index of an output interface which will send the forwarding data. - * \param child_runtime The backend runtime which owns the input interface. - * \param input_index The index of an input interface which will receive the forwarding data. - */ - void CreateForwardingQueue(int output_idx, std::shared_ptr child_runtime, - int input_index) { - auto queue_id = GenerateQueueID(child_runtime->GetModuleIndex(), input_index, INPUT); - // The forwarding queue map of a specified output interface. - auto& queue_map = output_queue_[output_idx]; - if (queue_map.find(queue_id) != queue_map.end()) { - LOG(FATAL) << "The queue " << queue_id.runtime_idx << "." << queue_id.runtime_interface_idx - << " is already created!"; - return; - } - auto queue = std::make_shared(queue_id); - queue_map[queue_id] = queue; - // Use the created queue as the consumer queue for the input interface of this forwarding - // pair. - child_runtime->AppendInputQueue(input_index, queue); - } - /*! - * \brief Setting the consumer queue for the input interface. - * \param input_index The index of the input interface. - * \param queue The consumer queue. - */ - void AppendInputQueue(int input_index, std::shared_ptr queue) { - input_queue_[input_index] = queue; - } - /*!\brief Return the index of the current module.*/ - int GetModuleIndex() { return runtime_idx_; } /*!\brief Return the number of output*/ int NumOutputs() const { return get_num_output_(); } /*!\brief Return the number of input*/ diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index 099be056e62c..cc58b8128e24 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -372,7 +372,8 @@ def test_pipeline(): assert module_index == 0 # Using the parameters group name to set parameters. pipeline_module_test.set_params("param_0", customized_parameters) - for data in datas: + for round in range(0, len(datas)): + data = datas[round] # Getting the result without setting customized parameters. wrong_output = run_modules( mconfig["module_connection"], @@ -401,23 +402,23 @@ def test_pipeline(): pipeline_module_test.set_input("data_b", data) input_data = pipeline_module_test.get_input("data_a") tvm.testing.assert_allclose(data, input_data.numpy()) - # Running the pipeline executor in sequential mode. - pipeline_module_test.run(True) + # Running the pipeline executor in the pipeline mode. + pipeline_module_test.run() + + statistic_time = 0 outputs = pipeline_module_test.get_output() + while len(outputs) == 0: + outputs = pipeline_module_test.get_output() + statistic_time = statistic_time + 1 + # Setting the timeout to 10 seconds. + assert statistic_time < 10 + time.sleep(1) + for i in range(len(outputs)): tvm.testing.assert_allclose(normal_output[i], outputs[i].numpy()) assert not (normal_output[i] == wrong_output[i]).all() - # Running the pipeline executor in the pipeline mode. - pipeline_module_test.run(False) - - # TODO(huajsj:) Replacing the checking logic with getting output logic. - # Checking the statistic value of pipeline. - statistic_time = 0 - while pipeline_module_test.num_executing_pipeline < len(datas): - statistic_time = statistic_time + 1 - # Setting the timeout to 10 seconds. - assert statistic_time < 10 - time.sleep(1) + + assert pipeline_module_test.num_executing_pipeline == round + 1 # Reset the cpu affinity after a test. reset_cpu_affinity(affinity)