Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
MediaPipe Team Project import generated by Copybara. 2946872 Aug 17, 2019
0 contributors

Users who have contributed to this file

118 lines (106 sloc) 4.05 KB
// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
// For every packet that appears in B, outputs the most recent packet from each
// of the A_i on a separate stream.
#include <vector>
#include "absl/strings/str_cat.h"
#include "mediapipe/calculators/core/packet_cloner_calculator.pb.h"
#include "mediapipe/framework/calculator_framework.h"
namespace mediapipe {
// For every packet received on the last stream, output the latest packet
// obtained on all other streams. Therefore, if the last stream outputs at a
// higher rate than the others, this effectively clones the packets from the
// other streams to match the last.
//
// Example config:
// node {
// calculator: "PacketClonerCalculator"
// input_stream: "first_base_signal"
// input_stream: "second_base_signal"
// input_stream: "tick_signal"
// output_stream: "cloned_first_base_signal"
// output_stream: "cloned_second_base_signal"
// }
//
// Related:
// packet_cloner_calculator.proto: Options for this calculator.
// merge_input_streams_calculator.cc: One output stream.
// packet_inner_join_calculator.cc: Don't output unless all inputs are new.
class PacketClonerCalculator : public CalculatorBase {
public:
static ::mediapipe::Status GetContract(CalculatorContract* cc) {
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
for (int i = 0; i < tick_signal_index; ++i) {
cc->Inputs().Index(i).SetAny();
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
}
cc->Inputs().Index(tick_signal_index).SetAny();
return ::mediapipe::OkStatus();
}
::mediapipe::Status Open(CalculatorContext* cc) final {
// Load options.
const auto calculator_options =
cc->Options<mediapipe::PacketClonerCalculatorOptions>();
output_only_when_all_inputs_received_ =
calculator_options.output_only_when_all_inputs_received();
// Parse input streams.
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
current_.resize(tick_signal_index_);
// Pass along the header for each stream if present.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
}
}
return ::mediapipe::OkStatus();
}
::mediapipe::Status Process(CalculatorContext* cc) final {
// Store input signals.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
current_[i] = cc->Inputs().Index(i).Value();
}
}
// Output according to the TICK signal.
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
if (output_only_when_all_inputs_received_) {
// Return if one of the input is null.
for (int i = 0; i < tick_signal_index_; ++i) {
if (current_[i].IsEmpty()) {
return ::mediapipe::OkStatus();
}
}
}
// Output each stream.
for (int i = 0; i < tick_signal_index_; ++i) {
if (!current_[i].IsEmpty()) {
cc->Outputs().Index(i).AddPacket(
current_[i].At(cc->InputTimestamp()));
} else {
cc->Outputs().Index(i).SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
}
}
}
return ::mediapipe::OkStatus();
}
private:
std::vector<Packet> current_;
int tick_signal_index_;
bool output_only_when_all_inputs_received_;
};
REGISTER_CALCULATOR(PacketClonerCalculator);
} // namespace mediapipe
You can’t perform that action at this time.