// Copyright 2019 The MediaPipe Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Declares CalculatorGraph, which links Calculators into a directed acyclic
// graph, and allows its evaluation.
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "absl/base/macros.h"
#include "absl/container/fixed_array.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_base.h"
#include "mediapipe/framework/calculator_node.h"
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/graph_output_stream.h"
#include "mediapipe/framework/graph_service.h"
#include "mediapipe/framework/mediapipe_profiling.h"
#include "mediapipe/framework/output_side_packet_impl.h"
#include "mediapipe/framework/output_stream.h"
#include "mediapipe/framework/output_stream_manager.h"
#include "mediapipe/framework/output_stream_poller.h"
#include "mediapipe/framework/output_stream_shard.h"
#include "mediapipe/framework/packet.h"
#include "mediapipe/framework/packet_generator.pb.h"
#include "mediapipe/framework/packet_generator_graph.h"
#include "mediapipe/framework/port.h"
#include "mediapipe/framework/port/integral_types.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/scheduler.h"
#include "mediapipe/framework/thread_pool_executor.pb.h"
namespace mediapipe {
class GpuResources;
struct GpuSharedData;
} // namespace mediapipe
#endif // !defined(MEDIAPIPE_DISABLE_GPU)
namespace mediapipe {
typedef ::mediapipe::StatusOr<OutputStreamPoller> StatusOrPoller;
// The class representing a DAG of calculator nodes.
// CalculatorGraph is the primary API for the MediaPipe Framework.
// In general, CalculatorGraph should be used if the only thing you need
// to do is run the graph (without pushing data in or extracting it as
// the graph runs).
// Example:
// // Build dependency "//mediapipe/framework:calculator_framework".
// #include "mediapipe/framework/calculator_framework.h"
// mediapipe::CalculatorGraphConfig config;
// RETURN_IF_ERROR(mediapipe::tool::ParseGraphFromString(kGraphStr, &config));
// mediapipe::CalculatorGraph graph;
// RETURN_IF_ERROR(graph.Initialize(config));
// std::map<std::string, mediapipe::Packet> extra_side_packets;
// extra_side_packets["video_id"] = mediapipe::MakePacket<std::string>(
// "3edb9503834e9b42");
// RETURN_IF_ERROR(graph.Run(extra_side_packets));
// // Run again (demonstrating the more concise initializer list syntax).
// RETURN_IF_ERROR(graph.Run(
// {{"video_id", mediapipe::MakePacket<std::string>("Ex-uGhDzue4")}}));
// // See mediapipe/framework/graph_runner.h for an interface
// // to insert and extract packets from a graph as it runs.
class CalculatorGraph {
// Defines possible modes for adding a packet to a graph input stream.
// WAIT_TILL_NOT_FULL can be used to control the memory usage of a graph by
// avoiding adding a new packet until all dependent input streams fall below
// the maximum queue size specified in the graph configuration.
// ADD_IF_NOT_FULL could also be used to control the latency if used in a
// real-time graph (e.g. drop camera frames if the MediaPipe graph queues are
// full).
enum class GraphInputStreamAddMode {
// Blocks and waits until none of the affected streams
// are full. Note that if max_queue_size is set to -1, the packet will be
// added regardless of queue size.
// Returns and does not add packet if any affected input
// stream is full.
// Creates an uninitialized graph.
CalculatorGraph(const CalculatorGraph&) = delete;
CalculatorGraph& operator=(const CalculatorGraph&) = delete;
// Initializes the graph from its proto description (using Initialize())
// and crashes if something goes wrong.
explicit CalculatorGraph(const CalculatorGraphConfig& config);
virtual ~CalculatorGraph();
// Initializes the graph from a its proto description.
// side_packets that are provided at this stage are common across all Run()
// invocations and could be used to execute PacketGenerators immediately.
::mediapipe::Status Initialize(
const CalculatorGraphConfig& config,
const std::map<std::string, Packet>& side_packets);
// Convenience version which does not take side packets.
::mediapipe::Status Initialize(const CalculatorGraphConfig& config);
// Initializes the CalculatorGraph from the specified graph and subgraph
// configs. Template graph and subgraph configs can be specified through
// |input_templates|. Every subgraph must have its graph type specified in
// CalclatorGraphConfig.type. A subgraph can be instantiated directly by
// specifying its type in |graph_type|. A template graph can be instantiated
// directly by specifying its template arguments in |options|.
::mediapipe::Status Initialize(
const std::vector<CalculatorGraphConfig>& configs,
const std::vector<CalculatorGraphTemplate>& templates,
const std::map<std::string, Packet>& side_packets = {},
const std::string& graph_type = "",
const Subgraph::SubgraphOptions* options = nullptr);
// Resturns the canonicalized CalculatorGraphConfig for this graph.
const CalculatorGraphConfig& Config() const {
return validated_graph_->Config();
// Observes the named output stream. packet_callback will be invoked on every
// packet emitted by the output stream. Can only be called before Run() or
// StartRun().
// TODO: Rename to AddOutputStreamCallback.
::mediapipe::Status ObserveOutputStream(
const std::string& stream_name,
std::function<::mediapipe::Status(const Packet&)> packet_callback);
// Adds an OutputStreamPoller for a stream. This provides a synchronous,
// polling API for accessing a stream's output. For asynchronous output, use
// ObserveOutputStream. See also the helpers in tool/sink.h.
StatusOrPoller AddOutputStreamPoller(const std::string& stream_name);
// Gets output side packet by name after the graph is done. However, base
// packets (generated by PacketGenerators) can be retrieved before
// graph is done. Returns error if the graph is still running (for non-base
// packets) or the output side packet is not found or empty.
::mediapipe::StatusOr<Packet> GetOutputSidePacket(
const std::string& packet_name);
// Runs the graph after adding the given extra input side packets. All
// arguments are forgotten after Run() returns.
// Run() is a blocking call and will return when all calculators are done.
virtual ::mediapipe::Status Run(
const std::map<std::string, Packet>& extra_side_packets);
// Run the graph without adding any input side packets.
::mediapipe::Status Run() { return Run({}); }
// Start a run of the graph. StartRun, WaitUntilDone, HasError,
// AddPacketToInputStream, and CloseInputStream allow more control over
// the execution of the graph run. You can insert packets directly into
// a stream while the graph is running. Once StartRun has been called,
// the graph will continue to run until WaitUntilDone() is called.
// If StartRun returns an error, then the graph is not started and a
// subsequent call to StartRun can be attempted.
// Example:
// RETURN_IF_ERROR(graph.StartRun(...));
// while (true) {
// if (graph.HasError() || want_to_stop) break;
// RETURN_IF_ERROR(graph.AddPacketToInputStream(...));
// }
// for (const std::string& stream : streams) {
// RETURN_IF_ERROR(graph.CloseInputStream(stream));
// }
// RETURN_IF_ERROR(graph.WaitUntilDone());
::mediapipe::Status StartRun(
const std::map<std::string, Packet>& extra_side_packets) {
return StartRun(extra_side_packets, {});
// In addition to the above StartRun, add additional parameter to set the
// stream header before running.
// Note: We highly discourage the use of stream headers, this is added for the
// compatibility of existing calculators that use headers during Open().
::mediapipe::Status StartRun(
const std::map<std::string, Packet>& extra_side_packets,
const std::map<std::string, Packet>& stream_headers);
// Wait for the current run to finish (block the current thread
// until all source calculators have returned StatusStop(), all
// graph_input_streams_ have been closed, and no more calculators can
// be run). This function can be called only after StartRun().
::mediapipe::Status WaitUntilDone();
// Wait until the running graph is in the idle mode, which is when nothing can
// be scheduled and nothing is running in the worker threads. This function
// can be called only after StartRun().
// NOTE: The graph must not have any source nodes because source nodes prevent
// the running graph from becoming idle until the source nodes are done.
::mediapipe::Status WaitUntilIdle();
// Wait until a packet is emitted on one of the observed output streams.
// Returns immediately if a packet has already been emitted since the last
// call to this function.
// Returns OutOfRangeError if the graph terminated while waiting.
::mediapipe::Status WaitForObservedOutput();
// Quick non-locking means of checking if the graph has encountered an error.
bool HasError() const { return has_error_; }
// Add a Packet to a graph input stream based on the graph input stream add
// mode. If the mode is ADD_IF_NOT_FULL, the packet will not be added if any
// queue exceeds max_queue_size specified by the graph config and will return
// StatusUnavailable. The WAIT_TILL_NOT_FULL mode (default) will block until
// the queues fall below the max_queue_size before adding the packet. If the
// mode is max_queue_size is -1, then the packet is added regardless of the
// sizes of the queues in the graph. The input stream must have been specified
// in the configuration as a graph level input_stream. On error, nothing is
// added.
::mediapipe::Status AddPacketToInputStream(const std::string& stream_name,
const Packet& packet);
// Same as the l-value version of this function by the same name, but moves
// the r-value referenced packet into the stream instead of copying it over.
// This allows the graph to take exclusive ownership of the packet, which may
// allow more memory optimizations. Note that, if an error is returned, the
// packet may remain valid. In particular, when using the ADD_IF_NOT_FULL
// mode with a full queue, this will return StatusUnavailable and the caller
// may try adding the packet again later.
::mediapipe::Status AddPacketToInputStream(const std::string& stream_name,
Packet&& packet);
// Sets the queue size of a graph input stream, overriding the graph default.
::mediapipe::Status SetInputStreamMaxQueueSize(const std::string& stream_name,
int max_queue_size);
// Check if an input stream exists in the graph
bool HasInputStream(const std::string& name);
// Close a graph input stream. If the graph has any graph input streams
// then Run() will not return until all the graph input streams have
// been closed (and all packets propagate through the graph).
// Note that multiple threads cannot call CloseInputStream() on the same
// stream_name at the same time.
::mediapipe::Status CloseInputStream(const std::string& stream_name);
// Closes all the graph input streams.
// TODO: deprecate this function in favor of CloseAllPacketSources.
::mediapipe::Status CloseAllInputStreams();
// Closes all the graph input streams and source calculator nodes.
::mediapipe::Status CloseAllPacketSources();
// Returns the pointer to the stream with the given name, or dies if none
// exists. The result remains owned by the CalculatorGraph.
"Prefer using a Calculator to get information of all sorts out of the "
const OutputStreamManager* FindOutputStreamManager(const std::string& name);
// Returns the ProfilingContext assocoaited with the CalculatorGraph.
ProfilingContext* profiler() { return profiler_.get(); }
// Collects the runtime profile for Open(), Process(), and Close() of each
// calculator in the graph. May be called at any time after the graph has been
// initialized.
ABSL_DEPRECATED("Use profiler()->GetCalculatorProfiles() instead")
::mediapipe::Status GetCalculatorProfiles(
std::vector<CalculatorProfile>*) const;
// Set the type of counter used in this graph.
void SetCounterFactory(CounterFactory* factory) {
CounterFactory* GetCounterFactory() { return counter_factory_.get(); }
// Callback when an error is encountered.
// Adds the error to the vector of errors.
void RecordError(const ::mediapipe::Status& error)
// Returns the maximum input stream queue size.
int GetMaxInputStreamQueueSize();
// Get the mode for adding packets to an input stream.
GraphInputStreamAddMode GetGraphInputStreamAddMode() const;
// Set the mode for adding packets to an input stream.
void SetGraphInputStreamAddMode(GraphInputStreamAddMode mode);
// Aborts the scheduler if the graph is not terminated; no-op otherwise.
void Cancel();
// Pauses the scheduler. Only used by calculator graph testing.
"CalculatorGraph will not allow external callers to explictly pause and "
"resume a graph.")
void Pause();
// Resumes the scheduler. Only used by calculator graph testing.
"CalculatorGraph will not allow external callers to explictly pause and "
"resume a graph.")
void Resume();
// Sets the executor that will run the nodes assigned to the executor
// named |name|. If |name| is empty, this sets the default executor. Must
// be called before the graph is initialized.
::mediapipe::Status SetExecutor(const std::string& name,
std::shared_ptr<Executor> executor);
// WARNING: the following public methods are exposed to Scheduler only.
// Return true if all the graph input streams have been closed.
bool GraphInputStreamsClosed() {
return num_closed_graph_input_streams_ == graph_input_streams_.size();
// Returns true if this node or graph input stream is connected to
// any input stream whose queue has hit maximum capacity.
bool IsNodeThrottled(int node_id) LOCKS_EXCLUDED(full_input_streams_mutex_);
// If any active source node or graph input stream is throttled and not yet
// closed, increases the max_queue_size for each full input stream in the
// graph.
// Returns true if at least one max_queue_size has been grown.
bool UnthrottleSources() LOCKS_EXCLUDED(full_input_streams_mutex_);
// Returns the scheduler's runtime measures for overhead measurement.
// Only meant for test purposes.
internal::SchedulerTimes GetSchedulerTimes() {
return scheduler_.GetSchedulerTimes();
// Returns a pointer to the GpuResources in use, if any.
// Only meant for internal use.
std::shared_ptr<::mediapipe::GpuResources> GetGpuResources() const;
::mediapipe::Status SetGpuResources(
std::shared_ptr<::mediapipe::GpuResources> resources);
// Helper for PrepareForRun. If it returns a non-empty map, those packets
// must be added to the existing side packets, replacing existing values
// that have the same key.
::mediapipe::StatusOr<std::map<std::string, Packet>> PrepareGpu(
const std::map<std::string, Packet>& side_packets);
#endif // !defined(MEDIAPIPE_DISABLE_GPU)
template <typename T>
::mediapipe::Status SetServiceObject(const GraphService<T>& service,
std::shared_ptr<T> object) {
return SetServicePacket(service,
template <typename T>
std::shared_ptr<T> GetServiceObject(const GraphService<T>& service) {
Packet p = GetServicePacket(service);
if (p.IsEmpty()) return nullptr;
return p.Get<std::shared_ptr<T>>();
// Only the Java API should call this directly.
::mediapipe::Status SetServicePacket(const GraphServiceBase& service,
Packet p);
// GraphRunState is used as a parameter in the function CallStatusHandlers.
enum class GraphRunState {
// State of the graph before the run; see status_handler.h for details.
// State of the graph after after the run; set by CleanUpAfterRun.
// The graph input streams (which have packets added to them from
// outside the graph). Since these will be connected directly to a
// node's input streams they are implemented as "output" streams.
// Based on the assumption that all the graph input packets must be added to a
// graph input stream sequentially, a GraphInputStream object only contains
// one reusable output stream shard.
class GraphInputStream {
explicit GraphInputStream(OutputStreamManager* manager)
: manager_(manager) {
void PrepareForRun(
std::function<void(::mediapipe::Status)> error_callback) {
void SetMaxQueueSize(int max_queue_size) {
void SetHeader(const Packet& header);
void AddPacket(const Packet& packet) { shard_.AddPacket(packet); }
void AddPacket(Packet&& packet) { shard_.AddPacket(std::move(packet)); }
void PropagateUpdatesToMirrors();
void Close();
bool IsClosed() const { return manager_->IsClosed(); }
OutputStreamManager* GetManager() { return manager_; }
OutputStreamManager* manager_ = nullptr;
OutputStreamShard shard_;
// Initializes the graph from a ValidatedGraphConfig object.
::mediapipe::Status Initialize(
std::unique_ptr<ValidatedGraphConfig> validated_graph,
const std::map<std::string, Packet>& side_packets);
// AddPacketToInputStreamInternal template is called by either
// AddPacketToInputStream(Packet&& packet) or
// AddPacketToInputStream(const Packet& packet).
template <typename T>
::mediapipe::Status AddPacketToInputStreamInternal(
const std::string& stream_name, T&& packet);
// Sets the executor that will run the nodes assigned to the executor
// named |name|. If |name| is empty, this sets the default executor.
// Does not check that the graph is uninitialized and |name| is not a
// reserved executor name.
::mediapipe::Status SetExecutorInternal(const std::string& name,
std::shared_ptr<Executor> executor);
// If the num_threads field in default_executor_options is not specified,
// assigns a reasonable value based on system configuration and the graph.
// Then, creates the default thread pool if appropriate.
// Only called by InitializeExecutors().
::mediapipe::Status InitializeDefaultExecutor(
const ThreadPoolExecutorOptions& default_executor_options,
bool use_application_thread);
// Creates a thread pool as the default executor. The num_threads argument
// overrides the num_threads field in default_executor_options.
::mediapipe::Status CreateDefaultThreadPool(
const ThreadPoolExecutorOptions& default_executor_options,
int num_threads);
// Returns true if |name| is a reserved executor name.
static bool IsReservedExecutorName(const std::string& name);
// Helper functions for Initialize().
::mediapipe::Status InitializeExecutors();
::mediapipe::Status InitializePacketGeneratorGraph(
const std::map<std::string, Packet>& side_packets);
::mediapipe::Status InitializeStreams();
::mediapipe::Status InitializeProfiler();
::mediapipe::Status InitializeCalculatorNodes();
// Iterates through all nodes and schedules any that can be opened.
void ScheduleAllOpenableNodes();
// Does the bulk of the work for StartRun but does not start the scheduler.
::mediapipe::Status PrepareForRun(
const std::map<std::string, Packet>& extra_side_packets,
const std::map<std::string, Packet>& stream_headers);
// Cleans up any remaining state after the run and returns any errors that may
// have occurred during the run. Called after the scheduler has terminated.
::mediapipe::Status FinishRun();
// Cleans up any remaining state after the run. All status handlers run here
// if their requested input side packets exist.
// The original |*status| is passed to all the status handlers. If any status
// handler fails, it appends its error to errors_, and CleanupAfterRun sets
// |*status| to the new combined errors on return.
void CleanupAfterRun(::mediapipe::Status* status)
// Combines errors into a status. Returns true if the vector of errors is
// non-empty.
bool GetCombinedErrors(const std::string& error_prefix,
::mediapipe::Status* error_status);
// Convenience overload which specifies a default error prefix.
bool GetCombinedErrors(::mediapipe::Status* error_status);
// Calls HandlePreRunStatus or HandleStatus on the StatusHandlers. Which one
// is called depends on the GraphRunState parameter (PRE_RUN or POST_RUN).
// current_run_side_packets_ must be set before this function is called.
// On error, has_error_ will be set.
void CallStatusHandlers(GraphRunState graph_run_state,
const ::mediapipe::Status& status);
// Callback function to throttle or unthrottle source nodes when a stream
// becomes full or non-full. A node is throttled (i.e. prevented being
// scheduled) if it has caused a downstream input queue to become full. Note
// that all sources (including graph input streams) that affect this stream
// will be throttled. A node is unthrottled (i.e. added to the scheduler
// queue) if all downstream input queues have become non-full.
// This method is invoked from an input stream when its queue becomes full or
// non-full. However, since streams are not allowed to hold any locks while
// invoking a callback, this method must re-lock the stream and query its
// status before taking any action.
void UpdateThrottledNodes(InputStreamManager* stream, bool* stream_was_full);
Packet GetServicePacket(const GraphServiceBase& service);
// Owns the legacy GpuSharedData if we need to create one for backwards
// compatibility.
std::unique_ptr<::mediapipe::GpuSharedData> legacy_gpu_shared_;
#endif // !defined(MEDIAPIPE_DISABLE_GPU)
// True if the graph was initialized.
bool initialized_ = false;
// A packet type that has SetAny() called on it.
PacketType any_packet_type_;
// The ValidatedGraphConfig object defining this CalculatorGraph.
std::unique_ptr<ValidatedGraphConfig> validated_graph_;
// The PacketGeneratorGraph to use to generate all the input side packets.
PacketGeneratorGraph packet_generator_graph_;
// True if the graph has source nodes.
bool has_sources_ = false;
// A flat array of InputStreamManager/OutputStreamManager/
// OutputSidePacketImpl/CalculatorNode corresponding to the input/output
// stream indexes, output side packet indexes, and calculator indexes
// respectively in validated_graph_.
// Once allocated these structures must not be reallocated since
// internal structures may point to individual entries in the array.
std::unique_ptr<InputStreamManager[]> input_stream_managers_;
std::unique_ptr<OutputStreamManager[]> output_stream_managers_;
std::unique_ptr<OutputSidePacketImpl[]> output_side_packets_;
std::unique_ptr<absl::FixedArray<CalculatorNode>> nodes_;
// The graph output streams.
// Maximum queue size for an input stream. This is used by the scheduler to
// restrict memory usage.
int max_queue_size_ = -1;
// Mode for adding packets to a graph input stream. Set to block until all
// affected input streams are not full by default.
GraphInputStreamAddMode graph_input_stream_add_mode_
// For a source node or graph input stream (specified using id),
// this stores the set of dependent input streams that have hit their
// maximum capacity. Graph input streams are also treated as nodes.
// A node is scheduled only if this set is empty. Similarly, a packet
// is added to a graph input stream only if this set is empty.
// Note that this vector contains an unused entry for each non-source node.
std::vector<std::unordered_set<InputStreamManager*>> full_input_streams_
// Maps stream names to graph input stream objects.
std::unordered_map<std::string, std::unique_ptr<GraphInputStream>>
// Maps graph input streams to their virtual node ids.
std::unordered_map<std::string, int> graph_input_stream_node_ids_;
// Maps graph input streams to their max queue size.
std::unordered_map<std::string, int> graph_input_stream_max_queue_size_;
// The factory for making counters associated with this graph.
std::unique_ptr<CounterFactory> counter_factory_;
// Executors for the scheduler, keyed by the executor's name. The default
// executor's name is the empty std::string.
std::map<std::string, std::shared_ptr<Executor>> executors_;
// The processed input side packet map for this run.
std::map<std::string, Packet> current_run_side_packets_;
std::map<std::string, Packet> service_packets_;
// Vector of errors encountered while running graph. Always use RecordError()
// to add an error to this vector.
std::vector<::mediapipe::Status> errors_ GUARDED_BY(error_mutex_);
// True if the default executor uses the application thread.
bool use_application_thread_ = false;
// Condition variable that waits until all input streams that depend on a
// graph input stream are below the maximum queue size.
absl::CondVar wait_to_add_packet_cond_var_
// Mutex for the vector of errors.
absl::Mutex error_mutex_;
// Status variable to indicate if the graph has encountered an error.
std::atomic<bool> has_error_;
// Mutex for full_input_streams_.
mutable absl::Mutex full_input_streams_mutex_;
// Number of closed graph input streams. This is a separate variable because
// it is not safe to hold a lock on the scheduler while calling Close() on an
// input stream. Hence, we decouple the closing of the stream and checking its
// status.
// TODO: update this comment.
std::atomic<unsigned int> num_closed_graph_input_streams_;
// The graph tracing and profiling interface. It is owned by the
// CalculatorGraph using a shared_ptr in order to allow threadsafe access
// to the ProfilingContext from clients that may outlive the CalculatorGraph
// such as GlContext. It is declared here before the Scheduler so that it
// remains available during the Scheduler destructor.
std::shared_ptr<ProfilingContext> profiler_;
internal::Scheduler scheduler_;
} // namespace mediapipe
