From e15346e7093096f683e4286dd76ff1217bb056a9 Mon Sep 17 00:00:00 2001 From: "Andy I. Christianson" Date: Tue, 19 Dec 2017 11:29:29 -0500 Subject: [PATCH] MINIFICPP-344 Added initial implementation of TFConvertImageToTensor --- .../tensorflow/TFConvertImageToTensor.cpp | 252 ++++++++++++++++++ .../tensorflow/TFConvertImageToTensor.h | 109 ++++++++ .../test/tensorflow-tests/TensorFlowTests.cpp | 134 ++++++++++ 3 files changed, 495 insertions(+) create mode 100644 extensions/tensorflow/TFConvertImageToTensor.cpp create mode 100644 extensions/tensorflow/TFConvertImageToTensor.h diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp new file mode 100644 index 0000000000..be5e7a128f --- /dev/null +++ b/extensions/tensorflow/TFConvertImageToTensor.cpp @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TFConvertImageToTensor.h" + +#include "tensorflow/cc/ops/standard_ops.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property TFConvertImageToTensor::ImageFormat( // NOLINT + "Input Format", + "The node of the TensorFlow graph to feed tensor inputs to (PNG or RAW). RAW is RGB24.", ""); +core::Property TFConvertImageToTensor::InputWidth( // NOLINT + "Input Width", + "The width, in pixels, of the input image.", ""); +core::Property TFConvertImageToTensor::InputHeight( // NOLINT + "Input Height", + "The height, in pixels, of the input image.", ""); +core::Property TFConvertImageToTensor::OutputWidth( // NOLINT + "Output Width", + "The width, in pixels, of the output image.", ""); +core::Property TFConvertImageToTensor::OutputHeight( // NOLINT + "Output Height", + "The height, in pixels, of the output image.", ""); +core::Property TFConvertImageToTensor::NumChannels( // NOLINT + "Channels", + "The number of channels (e.g. 3 for RGB, 4 for RGBA) in the input image", "3"); + +core::Relationship TFConvertImageToTensor::Success( // NOLINT + "success", + "Successful graph application outputs"); +core::Relationship TFConvertImageToTensor::Failure( // NOLINT + "failure", + "Failures which will not work if retried"); + +void TFConvertImageToTensor::initialize() { + std::set properties; + properties.insert(ImageFormat); + properties.insert(InputWidth); + properties.insert(InputHeight); + properties.insert(OutputWidth); + properties.insert(OutputHeight); + properties.insert(NumChannels); + setSupportedProperties(std::move(properties)); + + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(std::move(relationships)); +} + +void TFConvertImageToTensor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + context->getProperty(ImageFormat.getName(), input_format_); + + if (input_format_.empty()) { + logger_->log_error("Invalid image format"); + } + + std::string val; + + if (context->getProperty(InputWidth.getName(), val)) { + core::Property::StringToInt(val, input_width_); + } else { + logger_->log_error("Invalid Input Width"); + } + + if (context->getProperty(InputHeight.getName(), val)) { + core::Property::StringToInt(val, input_height_); + } else { + logger_->log_error("Invalid Input Height"); + } + + if (context->getProperty(OutputWidth.getName(), val)) { + core::Property::StringToInt(val, output_width_); + } else { + logger_->log_error("Invalid Output Width"); + } + + if (context->getProperty(OutputHeight.getName(), val)) { + core::Property::StringToInt(val, output_height_); + } else { + logger_->log_error("Invalid output height"); + } + + if (context->getProperty(NumChannels.getName(), val)) { + core::Property::StringToInt(val, num_channels_); + } else { + logger_->log_error("Invalid channel count"); + } +} + +void TFConvertImageToTensor::onTrigger(const std::shared_ptr &context, + const std::shared_ptr &session) { + auto flow_file = session->get(); + + if (!flow_file) { + return; + } + + try { + // Use an existing context, if one is available + std::shared_ptr ctx; + + if (tf_context_q_.try_dequeue(ctx)) { + logger_->log_debug("Using available TensorFlow context"); + } + + std::string input_tensor_name = "input"; + std::string output_tensor_name = "output"; + + if (!ctx) { + logger_->log_info("Creating new TensorFlow context"); + tensorflow::SessionOptions options; + ctx = std::make_shared(); + ctx->tf_session.reset(tensorflow::NewSession(options)); + + auto root = tensorflow::Scope::NewRootScope(); + auto input = tensorflow::ops::Placeholder(root.WithOpName(input_tensor_name), tensorflow::DT_UINT8); + + // Cast pixel values to floats + auto float_caster = tensorflow::ops::Cast(root.WithOpName("float_caster"), input, tensorflow::DT_FLOAT); + + // Expand into batches (of size 1) + auto dims_expander = tensorflow::ops::ExpandDims(root, float_caster, 0); + + // Resize tensor to output dimensions + auto resize = tensorflow::ops::ResizeBilinear( + root, dims_expander, + tensorflow::ops::Const(root.WithOpName("resize"), {output_height_, output_width_})); + + // Normalize tensor from 0-255 pixel values to 0.0-1.0 values + auto output = tensorflow::ops::Div(root.WithOpName(output_tensor_name), + tensorflow::ops::Sub(root, resize, {0.0f}), + {255.0f}); + tensorflow::GraphDef graph_def; + { + auto status = root.ToGraphDef(&graph_def); + + if (!status.ok()) { + std::string msg = "Failed to create TensorFlow graph: "; + msg.append(status.ToString()); + throw std::runtime_error(msg); + } + } + + { + auto status = ctx->tf_session->Create(graph_def); + + if (!status.ok()) { + std::string msg = "Failed to create TensorFlow session: "; + msg.append(status.ToString()); + throw std::runtime_error(msg); + } + } + } + + // Apply graph + // Read input tensor from flow file + tensorflow::Tensor img_tensor(tensorflow::DT_UINT8, {input_height_, input_width_, num_channels_}); + ImageReadCallback tensor_cb(&img_tensor); + session->read(flow_file, &tensor_cb); + std::vector outputs; + auto status = ctx->tf_session->Run({{input_tensor_name, img_tensor}}, {output_tensor_name + ":0"}, {}, &outputs); + + if (!status.ok()) { + std::string msg = "Failed to apply TensorFlow graph: "; + msg.append(status.ToString()); + throw std::runtime_error(msg); + } + + // Create output flow file for each output tensor + for (const auto &output : outputs) { + auto tensor_proto = std::make_shared(); + output.AsProtoTensorContent(tensor_proto.get()); + logger_->log_info("Writing output tensor flow file"); + TensorWriteCallback write_cb(tensor_proto); + session->write(flow_file, &write_cb); + session->transfer(flow_file, Success); + } + + // Make context available for use again + if (tf_context_q_.size_approx() < getMaxConcurrentTasks()) { + logger_->log_debug("Releasing TensorFlow context"); + tf_context_q_.enqueue(ctx); + } else { + logger_->log_info("Destroying TensorFlow context because it is no longer needed"); + } + } catch (std::exception &exception) { + logger_->log_error("Caught Exception %s", exception.what()); + session->transfer(flow_file, Failure); + this->yield(); + } catch (...) { + logger_->log_error("Caught Exception"); + session->transfer(flow_file, Failure); + this->yield(); + } +} + +int64_t TFConvertImageToTensor::ImageReadCallback::process(std::shared_ptr stream) { + if (tensor_->AllocatedBytes() < stream->getSize()) { + throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes"); + } + + auto num_read = stream->readData(tensor_->flat().data(), + static_cast(stream->getSize())); + + if (num_read != stream->getSize()) { + throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream"); + } + + return num_read; +} + +int64_t TFConvertImageToTensor::TensorWriteCallback::process(std::shared_ptr stream) { + auto tensor_proto_buf = tensor_proto_->SerializeAsString(); + auto num_wrote = stream->writeData(reinterpret_cast(&tensor_proto_buf[0]), + static_cast(tensor_proto_buf.size())); + + if (num_wrote != tensor_proto_buf.size()) { + std::string msg = "TensorWriteCallback failed to fully write flow file output stream; Expected "; + msg.append(std::to_string(tensor_proto_buf.size())); + msg.append(" and wrote "); + msg.append(std::to_string(num_wrote)); + throw std::runtime_error(msg); + } + + return num_wrote; +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h new file mode 100644 index 0000000000..63f889cb7a --- /dev/null +++ b/extensions/tensorflow/TFConvertImageToTensor.h @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H +#define NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H + +#include + +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class TFConvertImageToTensor : public core::Processor { + public: + explicit TFConvertImageToTensor(const std::string &name, uuid_t uuid = nullptr) + : Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { + } + + static core::Property ImageFormat; + static core::Property NumChannels; + static core::Property InputWidth; + static core::Property InputHeight; + static core::Property OutputWidth; + static core::Property OutputHeight; + + static core::Relationship Success; + static core::Relationship Failure; + + void initialize() override; + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; + void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override { + logger_->log_error("onTrigger invocation with raw pointers is not implemented"); + } + void onTrigger(const std::shared_ptr &context, + const std::shared_ptr &session) override; + + struct TFContext { + std::shared_ptr tf_session; + }; + + class ImageReadCallback : public InputStreamCallback { + public: + explicit ImageReadCallback(tensorflow::Tensor *tensor) + : tensor_(tensor) { + } + ~ImageReadCallback() override = default; + int64_t process(std::shared_ptr stream) override; + + private: + tensorflow::Tensor *tensor_; + }; + + class TensorWriteCallback : public OutputStreamCallback { + public: + explicit TensorWriteCallback(std::shared_ptr tensor_proto) + : tensor_proto_(std::move(tensor_proto)) { + } + ~TensorWriteCallback() override = default; + int64_t process(std::shared_ptr stream) override; + + private: + std::shared_ptr tensor_proto_; + }; + + private: + std::shared_ptr logger_; + + std::string input_format_; + int input_width_; + int input_height_; + int output_width_; + int output_height_; + int num_channels_; + + std::shared_ptr graph_def_; + moodycamel::ConcurrentQueue> tf_context_q_; +}; + +REGISTER_RESOURCE(TFConvertImageToTensor); // NOLINT + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif //NIFI_MINIFI_CPP_TFCONVERTIMAGETOTENSOR_H diff --git a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp index 4478105088..e9499de137 100644 --- a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp +++ b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp @@ -21,7 +21,12 @@ #include #include +#include +#include +#include +#include #include "TFApplyGraph.h" +#include "TFConvertImageToTensor.h" #define CATCH_CONFIG_MAIN @@ -165,3 +170,132 @@ TEST_CASE("TensorFlow: Apply Graph", "[executescriptTensorFlowApplyGraph]") { // REQUIRE(tensor_val == 4.0f); } } + +TEST_CASE("TensorFlow: ConvertImageToTensor", "[executescriptTensorFlowConvertImageToTensor]") { // NOLINT + TestController testController; + + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + LogTestController::getInstance().setTrace(); + + auto plan = testController.createPlan(); + auto repo = std::make_shared(); + + // Define directory for input protocol buffers + std::string in_dir("/tmp/gt.XXXXXX"); + REQUIRE(testController.createTempDirectory(&in_dir[0]) != nullptr); + + // Define input tensor protocol buffer file + std::string in_img_file(in_dir); + in_img_file.append("/img"); + + // Define directory for output protocol buffers + std::string out_dir("/tmp/gt.XXXXXX"); + REQUIRE(testController.createTempDirectory(&out_dir[0]) != nullptr); + + // Define output tensor protocol buffer file + std::string out_tensor_file(out_dir); + out_tensor_file.append("/img"); + + // Build MiNiFi processing graph + auto get_file = plan->addProcessor( + "GetFile", + "Get Proto"); + plan->setProperty( + get_file, + processors::GetFile::Directory.getName(), in_dir); + plan->setProperty( + get_file, + processors::GetFile::KeepSourceFile.getName(), + "false"); + plan->addProcessor( + "LogAttribute", + "Log Pre Graph Apply", + core::Relationship("success", "description"), + true); + auto tf_apply = plan->addProcessor( + "TFConvertImageToTensor", + "Convert Image", + core::Relationship("success", "description"), + true); + plan->addProcessor( + "LogAttribute", + "Log Post Graph Apply", + core::Relationship("success", "description"), + true); + plan->setProperty( + tf_apply, + processors::TFConvertImageToTensor::ImageFormat.getName(), + "RAW"); + plan->setProperty( + tf_apply, + processors::TFConvertImageToTensor::InputWidth.getName(), + "2"); + plan->setProperty( + tf_apply, + processors::TFConvertImageToTensor::InputHeight.getName(), + "2"); + plan->setProperty( + tf_apply, + processors::TFConvertImageToTensor::OutputWidth.getName(), + "10"); + plan->setProperty( + tf_apply, + processors::TFConvertImageToTensor::OutputHeight.getName(), + "10"); + plan->setProperty( + tf_apply, + processors::TFConvertImageToTensor::NumChannels.getName(), + "1"); + auto put_file = plan->addProcessor( + "PutFile", + "Put Output Tensor", + core::Relationship("success", "description"), + true); + plan->setProperty( + put_file, + processors::PutFile::Directory.getName(), + out_dir); + plan->setProperty( + put_file, + processors::PutFile::ConflictResolution.getName(), + processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE); + + // Write test input image + { + // 2x2 single-channel 8 bit per channel + const uint8_t in_img_raw[2*2] = {0, 0, + 0, 0}; + + std::ofstream in_file_stream(in_img_file); + in_file_stream << in_img_raw; + } + + plan->reset(); + plan->runNextProcessor(); // GetFile + plan->runNextProcessor(); // Log + plan->runNextProcessor(); // TFConvertImageToTensor + plan->runNextProcessor(); // Log + plan->runNextProcessor(); // PutFile + + // Read test output tensor + { + std::ifstream out_file_stream(out_tensor_file); + tensorflow::TensorProto tensor_proto; + tensor_proto.ParseFromIstream(&out_file_stream); + tensorflow::Tensor tensor; + tensor.FromProto(tensor_proto); + + // Verify output tensor + auto shape = tensor.shape(); + auto shapeString = shape.DebugString(); + + // Ensure output tensor is of the expected shape + REQUIRE(shape.IsSameSize({1, // Batch size + 10, // Width + 10, // Height + 1})); // Channels + } +}