From 53498501bece0fdaed80ff0a9cdd57e210af90dc Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Mon, 26 Nov 2018 08:50:28 +0100 Subject: [PATCH 1/5] MINIFICPP-682 - C API: provide functions to create custom processors --- nanofi/include/api/nanofi.h | 20 +++++++- nanofi/include/core/cstructs.h | 6 +++ nanofi/include/core/cxxstructs.h | 4 ++ nanofi/include/cxx/CallbackProcessor.h | 17 ++++--- nanofi/include/cxx/Plan.h | 15 +++++- nanofi/src/api/nanofi.cpp | 70 ++++++++++++++++++++++---- nanofi/src/cxx/CallbackProcessor.cpp | 16 ++++-- nanofi/src/cxx/Plan.cpp | 61 +++++++++++++++++++--- nanofi/tests/CAPITests.cpp | 60 ++++++++++++++++++++++ 9 files changed, 238 insertions(+), 31 deletions(-) diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h index e25a3a09c4..ba2ed37e69 100644 --- a/nanofi/include/api/nanofi.h +++ b/nanofi/include/api/nanofi.h @@ -37,6 +37,9 @@ extern "C" { */ #define API_VERSION "0.02" +#define SUCCESS_RELATIONSHIP "success" +#define FAILURE_RELATIONSHIP "failure" + void enable_logging(); void set_terminate_callback(void (*terminate_callback)()); @@ -97,7 +100,7 @@ int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*) * Set failure strategy. Please use the enum defined in cstructs.h * Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?) * Can be changed runtime. -* The defailt strategy is AS IS. +* The default strategy is AS IS. */ int set_failure_strategy(flow *flow, FailureStrategy strategy); @@ -107,6 +110,8 @@ int set_standalone_property(standalone_processor*, const char*, const char *); int set_instance_property(nifi_instance *instance, const char*, const char *); +char * get_property(const processor_context * context, const char * name); + int free_flow(flow *); flow_file_record *get_next_flow_file(nifi_instance *, flow *); @@ -135,13 +140,18 @@ flow_file_record* create_ff_object(const char *file, const size_t len, const uin flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size); +/** + * Get incoming flow file. To be used in processor logic callbacks. + */ +flow_file_record* get_flowfile(processor_session* session, processor_context* context); + void free_flowfile(flow_file_record*); uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size); void update_attribute(flow_file_record*, const char *key, void *value, size_t size); -uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute); +uint8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute); int get_attribute_qty(const flow_file_record* ff); @@ -165,6 +175,12 @@ uint8_t remove_attribute(flow_file_record*, char *key); int transmit_flowfile(flow_file_record *, nifi_instance *); +int add_custom_processor(const char * name, processor_logic* logic); + +int delete_custom_processor(const char * name); + +int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship); + /**** * ################################################################## * Persistence Operations diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h index e49316600a..dbb194f000 100644 --- a/nanofi/include/core/cstructs.h +++ b/nanofi/include/core/cstructs.h @@ -71,6 +71,8 @@ typedef struct standalone_processor standalone_processor; typedef struct processor_session processor_session; +typedef struct processor_context processor_context; + /**** * ################################################################## * FLOWFILE OPERATIONS @@ -105,6 +107,8 @@ typedef struct { void *ffp; + uint8_t keepContent; + } flow_file_record; typedef struct flow flow; @@ -114,4 +118,6 @@ typedef enum FS { ROLLBACK } FailureStrategy; +typedef void (processor_logic)(processor_session*, processor_context *); + #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */ diff --git a/nanofi/include/core/cxxstructs.h b/nanofi/include/core/cxxstructs.h index dfa327c5b1..d32b720e2d 100644 --- a/nanofi/include/core/cxxstructs.h +++ b/nanofi/include/core/cxxstructs.h @@ -38,4 +38,8 @@ struct processor_session : public core::ProcessSession { using core::ProcessSession::ProcessSession; }; +struct processor_context : public core::ProcessContext { + using core::ProcessContext::ProcessContext; +}; + #endif //NIFI_MINIFI_CPP_CXXSTRUCTS_H diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h index 7cfcaf22db..81c55467e9 100644 --- a/nanofi/include/cxx/CallbackProcessor.h +++ b/nanofi/include/cxx/CallbackProcessor.h @@ -46,6 +46,8 @@ namespace processors { // CallbackProcessor Class class CallbackProcessor : public core::Processor { public: + static core::Relationship Success; + static core::Relationship Failure; // Constructor /*! * Create a new processor @@ -65,24 +67,23 @@ class CallbackProcessor : public core::Processor { public: - void setCallback(void *obj,std::function ontrigger_callback) { + void setCallback(void *obj,std::function ontrigger_callback) { objref_ = obj; callback_ = ontrigger_callback; } // OnTrigger method, implemented by NiFi CallbackProcessor - virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); // override; // Initialize, over write by NiFi CallbackProcessor - virtual void initialize() { - std::set relationships; - core::Relationship Success("success", "description"); - relationships.insert(Success); - setSupportedRelationships(relationships); + virtual void initialize(); // override; + + virtual bool supportsDynamicProperties() /*override*/ { + return true; } protected: void *objref_; - std::function callback_; + std::function callback_; private: // Logger std::shared_ptr logger_; diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h index e2cb827664..96f858f8b4 100644 --- a/nanofi/include/cxx/Plan.h +++ b/nanofi/include/cxx/Plan.h @@ -46,6 +46,8 @@ #include "core/reporting/SiteToSiteProvenanceReportingTask.h" #include "api/nanofi.h" +static const std::string CallbackProcessorName = "CallbackProcessor"; + using failure_callback_type = std::function; using content_repo_sptr = std::shared_ptr; @@ -70,7 +72,7 @@ namespace { auto path = claim->getContentFullPath(); auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); ffr->attributes = ff->getAttributesPtr(); - ffr->ffp = ff.get(); + ffr->ffp = static_cast(new std::shared_ptr(ff)); auto content_repo_ptr = static_cast*>(ffr->crp); *content_repo_ptr = cr_ptr; user_callback(ffr); @@ -92,7 +94,9 @@ class ExecutionPlan { explicit ExecutionPlan(std::shared_ptr content_repo, std::shared_ptr flow_repo, std::shared_ptr prov_repo); - std::shared_ptr addCallback(void *, std::function); + std::shared_ptr addSimpleCallback(void *, std::function); + + std::shared_ptr addCallback(void *obj, std::function fp); std::shared_ptr addProcessor(const std::shared_ptr &processor, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), @@ -143,6 +147,8 @@ class ExecutionPlan { static std::shared_ptr createProcessor(const std::string &processor_name, const std::string &name); + static std::shared_ptr createCallback(void *obj, std::function fp); + static std::shared_ptr getPlan(const std::string& uuid) { auto it = proc_plan_map_.find(uuid); return it != proc_plan_map_.end() ? it->second : nullptr; @@ -160,6 +166,10 @@ class ExecutionPlan { return proc_plan_map_.size(); } + static bool addCustomProcessor(const char * name, processor_logic* logic); + + static int deleteCustomProcessor(const char * name); + protected: class FailureHandler { public: @@ -224,6 +234,7 @@ class ExecutionPlan { std::shared_ptr logger_; std::shared_ptr failure_handler_; static std::unordered_map> proc_plan_map_; + static std::map custom_processors; }; #endif /* LIBMINIFI_CAPI_PLAN_H_ */ diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index 290a0ec13f..a7e1125fed 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -27,6 +27,7 @@ #include "core/expect.h" #include "cxx/Instance.h" #include "cxx/Plan.h" +#include "cxx/CallbackProcessor.h" #include "ResourceClaim.h" #include "processors/GetFile.h" #include "core/logging/LoggerConfiguration.h" @@ -187,6 +188,7 @@ flow_file_record* create_flowfile(const char *file, const size_t len) { std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); // set the size of the flow file. new_ff->size = in.tellg(); + new_ff->keepContent = 0; return new_ff; } @@ -212,6 +214,7 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const // set the size of the flow file. new_ff->size = size; new_ff->crp = static_cast(new std::shared_ptr); + new_ff->keepContent = 0; return new_ff; } /** @@ -223,13 +226,16 @@ void free_flowfile(flow_file_record *ff) { return; } auto content_repo_ptr = static_cast*>(ff->crp); - if (content_repo_ptr->get()) { + if (content_repo_ptr->get() && (ff->keepContent == 0)) { std::shared_ptr claim = std::make_shared(ff->contentLocation, *content_repo_ptr); (*content_repo_ptr)->remove(claim); } if (ff->ffp == nullptr) { auto map = static_cast(ff->attributes); delete map; + } else { + auto ff_sptr = reinterpret_cast*>(ff->ffp); + delete ff_sptr; } free(ff->contentLocation); free(ff); @@ -269,7 +275,7 @@ void update_attribute(flow_file_record *ff, const char *key, void *value, size_t * @param caller_attribute caller supplied object in which we will copy the data ptr * @return 0 if successful, -1 if the key does not exist */ -uint8_t get_attribute(flow_file_record * ff, attribute * caller_attribute) { +uint8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) { if (ff == nullptr) { return -1; } @@ -403,7 +409,7 @@ processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor auto lambda = [ontrigger_callback](core::ProcessSession *ps) { ontrigger_callback(static_cast(ps)); //Meh, sorry for this }; - auto proc = flow->addCallback(nullptr, lambda); + auto proc = flow->addSimpleCallback(nullptr, lambda); return static_cast(proc.get()); } @@ -460,6 +466,18 @@ int set_standalone_property(standalone_processor *proc, const char *name, const return -1; } +char * get_property(const processor_context * context, const char * name) { + std::string value; + if(!context->getDynamicProperty(name, value)) { + return nullptr; + } + size_t len = value.length(); + char * ret_val = (char*)malloc((len +1) * sizeof(char)); + strncpy(ret_val, value.data(), len); + ret_val[len] = '\0'; + return ret_val; +} + int free_flow(flow *flow) { if (flow == nullptr) return -1; @@ -467,10 +485,7 @@ int free_flow(flow *flow) { return 0; } -flow_file_record* flowfile_to_record(std::shared_ptr ff, ExecutionPlan* plan) { - if (ff == nullptr) { - return nullptr; - } +flow_file_record* flowfile_to_record(std::shared_ptr ff, const std::shared_ptr& crp) { auto claim = ff->getResourceClaim(); if(claim == nullptr) { return nullptr; @@ -480,10 +495,30 @@ flow_file_record* flowfile_to_record(std::shared_ptr ff, Executi claim->increaseFlowFileRecordOwnedCount(); auto path = claim->getContentFullPath(); auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); - ffr->ffp = ff.get(); + ffr->ffp = static_cast(new std::shared_ptr(ff)); ffr->attributes = ff->getAttributesPtr(); auto content_repo_ptr = static_cast*>(ffr->crp); - *content_repo_ptr = plan->getContentRepo(); + *content_repo_ptr = crp; + return ffr; +} + +flow_file_record* flowfile_to_record(std::shared_ptr ff, ExecutionPlan* plan) { + if (ff == nullptr) { + return nullptr; + } + + return flowfile_to_record(ff, plan->getContentRepo()); +} + +flow_file_record* get_flowfile(processor_session* session, processor_context* context) { + auto ff = session->get(); + if(!ff) { + return nullptr; + } + + auto ffr = flowfile_to_record(ff, context->getContentRepository()); + // The content of the flow file must be kept in a processor logic + ffr->keepContent = 1; return ffr; } @@ -610,3 +645,20 @@ int transfer(processor_session* session, flow *flow, const char *rel) { session->transfer(ff, relationship); return 0; } + +int add_custom_processor(const char * name, processor_logic* logic) { + return ExecutionPlan::addCustomProcessor(name, logic) ? 0 : -1; +} + +int delete_custom_processor(const char * name) { + return ExecutionPlan::deleteCustomProcessor(name) - 1; +} + +int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship) { + if(ffr == nullptr || ffr->ffp == nullptr || ps == nullptr || relationship == nullptr || strlen(relationship) == 0) { + return -1; + } + auto ff_sptr = reinterpret_cast*>(ffr->ffp); + ps->transfer(*ff_sptr, core::Relationship(relationship, "desc")); + return 0; +} diff --git a/nanofi/src/cxx/CallbackProcessor.cpp b/nanofi/src/cxx/CallbackProcessor.cpp index 013ec47b2c..c527f865f3 100644 --- a/nanofi/src/cxx/CallbackProcessor.cpp +++ b/nanofi/src/cxx/CallbackProcessor.cpp @@ -23,10 +23,20 @@ namespace nifi { namespace minifi { namespace processors { +core::Relationship CallbackProcessor::Success("success", "All files are routed to success"); +core::Relationship CallbackProcessor::Failure("failure", "Failed files (based on callback logic) are transferred to failure"); + +void CallbackProcessor::initialize() { + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - if (callback_ != nullptr) { - callback_(session); - } + if (callback_ != nullptr) { + callback_(session, context); + } } } /* namespace processors */ diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp index b2b4690f14..2ab464c551 100644 --- a/nanofi/src/cxx/Plan.cpp +++ b/nanofi/src/cxx/Plan.cpp @@ -25,6 +25,7 @@ std::shared_ptr ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator(); std::unordered_map> ExecutionPlan::proc_plan_map_ = {}; +std::map ExecutionPlan::custom_processors = {}; ExecutionPlan::ExecutionPlan(std::shared_ptr content_repo, std::shared_ptr flow_repo, std::shared_ptr prov_repo) : content_repo_(content_repo), @@ -41,19 +42,27 @@ ExecutionPlan::ExecutionPlan(std::shared_ptr content_re * Add a callback to obtain and pass processor session to a generated processor * */ -std::shared_ptr ExecutionPlan::addCallback(void *obj, std::function fp) { +std::shared_ptr ExecutionPlan::addSimpleCallback(void *obj, std::function fp) { if (finalized) { return nullptr; } - auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor"); - if (!ptr) + auto simple_func_wrapper = [fp](core::ProcessSession *session, core::ProcessContext *context)->void { fp(session); }; + + return addCallback(obj, simple_func_wrapper); +} + +std::shared_ptr ExecutionPlan::addCallback(void *obj, std::function fp) { + if (finalized) { return nullptr; + } - std::shared_ptr processor = std::static_pointer_cast(ptr); - processor->setCallback(obj, fp); + auto proc = createCallback(obj, fp); - return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true); + if (!proc) + return nullptr; + + return addProcessor(proc, CallbackProcessorName, core::Relationship("success", "description"), true); } bool ExecutionPlan::setProperty(const std::shared_ptr proc, const std::string &prop, const std::string &value) { @@ -198,7 +207,7 @@ std::shared_ptr ExecutionPlan::buildFinalConnection(std::sha void ExecutionPlan::finalize() { if (failure_handler_) { - auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor"); + auto failure_proc = createProcessor(CallbackProcessorName, CallbackProcessorName); std::shared_ptr callback_proc = std::static_pointer_cast(failure_proc); callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1)); @@ -237,6 +246,17 @@ std::shared_ptr ExecutionPlan::createProcessor(const std::strin utils::Identifier uuid; id_generator_->generate(uuid); + auto custom_proc = custom_processors.find(processor_name); + + if(custom_proc != custom_processors.end()) { + auto c_func = custom_proc->second; + auto wrapper_func = [c_func](core::ProcessSession * session, core::ProcessContext * context) { + return c_func(reinterpret_cast(session), reinterpret_cast(context)); + }; + return createCallback(nullptr, wrapper_func); + } + + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid); if (nullptr == ptr) { return nullptr; @@ -247,6 +267,17 @@ std::shared_ptr ExecutionPlan::createProcessor(const std::strin return processor; } +std::shared_ptr ExecutionPlan::createCallback(void *obj, std::function fp) { + auto ptr = createProcessor(CallbackProcessorName, CallbackProcessorName); + if (!ptr) + return nullptr; + + std::shared_ptr processor = std::static_pointer_cast(ptr); + processor->setCallback(obj, fp); + + return ptr; +} + std::shared_ptr ExecutionPlan::connectProcessors(std::shared_ptr src_proc, std::shared_ptr dst_proc, core::Relationship relationship, bool set_dst) { std::stringstream connection_name; @@ -292,3 +323,19 @@ bool ExecutionPlan::setFailureStrategy(FailureStrategy start) { return true; } +bool ExecutionPlan::addCustomProcessor(const char * name, processor_logic* logic) { + if(CallbackProcessorName == name) { + return false; // This name cannot be registered + } + + if (custom_processors.count(name) > 0 ) { + return false; // Already exists + } + custom_processors[name] = logic; + return true; +} + +int ExecutionPlan::deleteCustomProcessor(const char * name) { + return custom_processors.erase(name); +} + diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index 1c94b87156..a61aae8c8d 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -54,6 +54,36 @@ void big_failure_counter(flow_file_record * fr) { free_flowfile(fr); } +void custom_processor_logic(processor_session * ps, processor_context * ctx) { + flow_file_record * ffr = get_flowfile(ps, ctx); + REQUIRE(ffr != nullptr); + uint8_t * buffer = (uint8_t*)malloc(ffr->size* sizeof(uint8_t)); + get_content(ffr, buffer, ffr->size); + REQUIRE(strncmp(reinterpret_cast(buffer), test_file_content.c_str(), test_file_content.size()) == 0); + + attribute attr; + attr.key = "filename"; + attr.value_size = 0; + REQUIRE(get_attribute(ffr, &attr) == 0); + REQUIRE(attr.value_size > 0); + + const char * custom_value = "custom value"; + + REQUIRE(add_attribute(ffr, "custom attribute", (void*)custom_value, strlen(custom_value)) == 0); + + char * prop_value = get_property(ctx, "Some test propery"); + + REQUIRE(prop_value != nullptr); + REQUIRE(strncmp("test value", prop_value, strlen(prop_value)) == 0); + + free(prop_value); + + transfer_to_relationship(ffr, ps, SUCCESS_RELATIONSHIP); + + free_flowfile(ffr); + free(buffer); +} + std::string create_testfile_for_getfile(const char* sourcedir, const std::string& filename = test_file_name) { std::fstream file; std::stringstream ss; @@ -392,3 +422,33 @@ TEST_CASE("Test standalone processors with file input", "[testStandaloneWithFile free_flowfile(ffr); free_standalone_processor(extract_test); } + +TEST_CASE("Test custom processor", "[TestCutomProcessor]") { + TestController testController; + + char src_format[] = "/tmp/gt.XXXXXX"; + const char *sourcedir = testController.createTempDirectory(src_format); + + create_testfile_for_getfile(sourcedir); + + add_custom_processor("myproc", custom_processor_logic); + + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + + processor *my_proc = add_processor(test_flow, "myproc"); + REQUIRE(my_proc != nullptr); + + REQUIRE(set_property(my_proc, "Some test propery", "test value") == 0); + + flow_file_record *record = get_next_flow_file(instance, test_flow); + + REQUIRE(record != nullptr); +} \ No newline at end of file From 6739e3b92e3af8a2d0ba61d3919089c1c3cbdbb5 Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Fri, 30 Nov 2018 18:28:22 +0100 Subject: [PATCH 2/5] MINIFICPP-682 - add doxygen style comments to C API --- nanofi/include/api/nanofi.h | 259 +++++++++++++++++++++++++++++---- nanofi/include/core/cstructs.h | 8 + nanofi/src/api/nanofi.cpp | 7 +- 3 files changed, 238 insertions(+), 36 deletions(-) diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h index ba2ed37e69..7c57441c03 100644 --- a/nanofi/include/api/nanofi.h +++ b/nanofi/include/api/nanofi.h @@ -40,8 +40,15 @@ extern "C" { #define SUCCESS_RELATIONSHIP "success" #define FAILURE_RELATIONSHIP "failure" +/** + * Enables logging (disabled by default) + */ void enable_logging(); +/** + * Sets terminate callback. The callback is executed upon termination (undhandled exception in C++ backend) + * @param terminate_callback the callback to execute + */ void set_terminate_callback(void (*terminate_callback)()); /**** @@ -50,11 +57,27 @@ void set_terminate_callback(void (*terminate_callback)()); * ################################################################## */ +/** + * Creates a new MiNiFi instance + * @param url remote URL the instance connects to + * @param port remote port the instance connects to + * @return pointer to the new instance + */ nifi_instance *create_instance(const char *url, nifi_port *port); -void initialize_instance(nifi_instance *); +/** + * Initialize remote connection of instance for transfers + * @param instance + */ +void initialize_instance(nifi_instance * instance); -void free_instance(nifi_instance*); +/** + * Frees instance + * @attention Any action on flows that belong to the freed instance are undefined after this is done! + * It's recommended to free all flows before freeing the instance. + * @param instance instance to be freed + */ +void free_instance(nifi_instance * instance); /**** * ################################################################## @@ -71,62 +94,173 @@ typedef int c2_start_callback(char *); void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *); +/** + * Creates a new, empty flow + * @param instance the instance new flow will belong to + * @return a pointer to the created flow + **/ +flow *create_new_flow(nifi_instance * instance); -uint8_t run_processor(const processor *processor); - -flow *create_new_flow(nifi_instance *); -flow *create_flow(nifi_instance *, const char *); +/** + * Creates new flow and adds the first processor in case a valid name is provided + * @deprecated as there is no proper indication of processor adding errors, + * usage of "create_new_flow" and "add_processor is recommended instead + * @param instance the instance new flow will belong to + * @param first_processor name of the first processor to be instanciated + * @attention in case first processor is empty or doesn't name any existing processor, an empty flow is returned. + * @return a pointer to the created flow + **/ +DEPRECATED flow *create_flow(nifi_instance * instance, const char * first_processor); -flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); +/** + * Add a getfile processor to "parent" flow. + * Creates new flow in instance in case "parent" is nullptr + * @deprecated as getfile processor can be added using "add_processor" function, + * properties can be set using "set_property". + * @param instance the instance the flow belongs to + * @param parent the flow to be extended with a new getfile processor + * @param c configuration of the new processor + * @return parent in case it wasn't null, otherwise a pointer to a new flow + */ +DEPRECATED flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); -processor *add_processor(flow *, const char *); +/** + * Extend a flow with a new processor + * @param flow the flow to be extended with the new processor + * @param name name of the new processor + * @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?) + */ +processor *add_processor(flow * flow, const char * name); processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); -standalone_processor *create_processor(const char *); +/** + * Create a standalone instance of the given processor. + * Standalone instances can be invoked without having an instance/flow that contains them. + * @param name the name of the processor to instanciate + * @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?) + **/ +standalone_processor *create_processor(const char * name); -void free_standalone_processor(standalone_processor*); +/** + * Free a standalone processor + * @param processor the processor to be freed + */ +void free_standalone_processor(standalone_processor* processor); /** -* Register your callback to received flow files that the flow failed to process -* The flow file ownership is transferred to the caller! -* The first callback should be registered before the flow is used. Can be changed later during runtime. -*/ + * Register your callback to received flow files that the flow failed to process + * The flow file ownership is transferred to the caller! + * The first callback should be registered before the flow is used. Can be changed later during runtime. + * @param flow flow the callback belongs to + * @param onerror_callback callback to execute in case of failure + * @return 0 in case of success, -1 otherwise (flow is already in use) + **/ int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)); - /** -* Set failure strategy. Please use the enum defined in cstructs.h -* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?) -* Can be changed runtime. -* The default strategy is AS IS. -*/ + * Set failure strategy. Please use the enum defined in cstructs.h + * Can be changed runtime. + * The default strategy is AS IS. + * @param flow the flow to set strategy for + * @param strategy the strategy to be set + * @return 0 (success), -1 (strategy cannot be set - no failure callback added?) + **/ int set_failure_strategy(flow *flow, FailureStrategy strategy); -int set_property(processor *, const char *, const char *); +/** + * Set property for a processor + * @param processor the processor the property is set for + * @param name name of the property + * @param value value of the property + * @return 0 in case of success, -1 otherwise (the processor doesn't support such property) + **/ +int set_property(processor * processor, const char * name, const char * value); -int set_standalone_property(standalone_processor*, const char*, const char *); +/** + * Set property for a standalone processor + * @param processor the processor the property is set for + * @param name name of the property + * @param value value of the property + * @return 0 in case of success, -1 otherwise (the processor doesn't support such property) + **/ +int set_standalone_property(standalone_processor * processor, const char * name, const char * value); -int set_instance_property(nifi_instance *instance, const char*, const char *); +/** + * Set property for an instance + * @param instance the instance the property is set for + * @param name name of the property + * @param value value of the property + * @return 0 in case of success, -1 otherwise. Always succeeds unless instance or name is nullptr/emtpy. + **/ +int set_instance_property(nifi_instance *instance, const char * name, const char * value); -char * get_property(const processor_context * context, const char * name); +/** + * Get a property. Should be used in custom processor logic callbacks. + * @attention The returned value transfers ownership, it's the callers responsibility to free it! + * @param context the current processor context + * @param name name of the property + * @return null-terminated char* in case of success, nullptr otherwise + **/ +char * get_property(const processor_context * context, const char * name); -int free_flow(flow *); +/** + * Free a flow + * @param flow the flow to free + * @attention All the processor in the flow are freed, too! Actions performed on freed processors are undefined! + * @return 0 in case of success, -1 otherwise. Always succeeds unless flow is nullptr. + **/ +int free_flow(flow * flow); +/** + * Get the next flow file of the given flow + * @param instance the instance the flow belongs to + * @param flow the flow to get flowfile from + * @return a flow file record or nullptr in case no flowfile was generated by the flow + **/ flow_file_record *get_next_flow_file(nifi_instance *, flow *); -size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t); +/** + * Get all flow files of the given flow + * @param instance the instance the flow belongs to + * @param flow the flow to get flowfiles from + * @param flowfiles target area to copy the flowfiles to + * @param size the maximum number of flowfiles to copy to target (size of target) + * @return the number of flow files copies to target. Less or equal to size. + **/ +size_t get_flow_files(nifi_instance * instance, flow * flow, flow_file_record ** flowfiles, size_t size); flow_file_record *get(nifi_instance *,flow *, processor_session *); +/** + * Invoke a standalone processor without input data. + * The processor is expected to generate flow file. + * @return a flow file record or nullptr in case no flowfile was generated + **/ flow_file_record *invoke(standalone_processor* proc); +/** + * Invoke a standalone processor with input flow file + * @param input_ff input flow file, which can belong be the output of another processor or flow + * @return a flow file record or nullptr in case no flowfile was generated + **/ flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record *input_ff); +/** + * Invoke a standalone processor with file input + * @param path specifies the file system path of the input file + * @return a flow file record or nullptr in case no flowfile was generated + **/ flow_file_record *invoke_file(standalone_processor* proc, const char* path); -flow_file_record *invoke_chunck(standalone_processor* proc, uint8_t* buf, uint64_t); +/** + * Invoke a standalone processor with some in-memory data + * @param buf specifies the beginning of the input buffer + * @param size specifies the size of the buffer + * @return a flow file record or nullptr in case no flowfile was generated + **/ +flow_file_record *invoke_chunck(standalone_processor* proc, uint8_t* buf, uint64_t size); int transfer(processor_session* session, flow *flow, const char *rel); @@ -142,19 +276,60 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const /** * Get incoming flow file. To be used in processor logic callbacks. - */ + * @param session current processor session + * @param context current processor context + * @return a flow file record or nullptr in case there is none in the session + **/ flow_file_record* get_flowfile(processor_session* session, processor_context* context); -void free_flowfile(flow_file_record*); +/** + * Free flow file + * @param ff flow file + **/ +void free_flowfile(flow_file_record* ff); + +/** + * Adds an attribute, fails in case there is already an attribute with the given key. + * @param ff flow file + * @param key name of attribute + * @param value location of value + * @size size size of the data pointed by "value" + * @return 0 in case of success, -1 otherwise (already existed) + **/ uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size); -void update_attribute(flow_file_record*, const char *key, void *value, size_t size); +/** + * Updates an attribute (adds if it hasn't existed before) + * @param ff flow file + * @param key name of attribute + * @param value location of value + * @size size size of the data pointed by "value" + **/ +void update_attribute(flow_file_record* ff, const char *key, void *value, size_t size); +/** + * Get the value of an attribute. Value and value size are written to parameter "caller_attribute" + * @param ff flow file + * @param caller_attribute attribute structure to provide name and get value, size + * @return 0 in case of success, -1 otherwise (no such attribute) + **/ uint8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute); +/** + * Get the quantity of attributes + * @param ff flow file + * @return the number of attributes + **/ int get_attribute_qty(const flow_file_record* ff); + +/** + * Copies all attributes of the flowfile that fits target. + * @param ff flow file + * @param target attribute set to copy to. target->size determines the maximum number of attributes copied + * @return the number of attributes copied, which is the minimum of attribute quantity and target size + **/ int get_all_attributes(const flow_file_record* ff, attribute_set *target); /** @@ -165,6 +340,11 @@ int get_all_attributes(const flow_file_record* ff, attribute_set *target); **/ int get_content(const flow_file_record* ff, uint8_t* target, int size); +/** + * Removes an attribute + * @param name name of the attribute + * @return 0 on success, -1 otherwise (doesn't exist) + **/ uint8_t remove_attribute(flow_file_record*, char *key); /**** @@ -175,10 +355,29 @@ uint8_t remove_attribute(flow_file_record*, char *key); int transmit_flowfile(flow_file_record *, nifi_instance *); +/** + * Adds a custom processor for later instantiation + * @param name name of the processor + * @param logic the callback to be invoked when the processor is triggered + * @return 0 on success, -1 otherwise (name already in use for eg.) + **/ int add_custom_processor(const char * name, processor_logic* logic); +/** + * Removes a custom processor + * @param name name of the processor + * @return 0 on success, -1 otherwise (didn't exist) + **/ int delete_custom_processor(const char * name); +/** + * Transfers a flowfile to the given relationship + * This function is only to be used within processor logic callback + * @param ffr flow file to be transfered + * @param ps processor session the transfer happens within + * @param relationship name of the relationship ("success" and "failure" are supported currently) + * @return 0 on success, -1 otherwise (didn't exist) + **/ int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship); /**** diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h index dbb194f000..d85c856d36 100644 --- a/nanofi/include/core/cstructs.h +++ b/nanofi/include/core/cstructs.h @@ -22,6 +22,14 @@ #include #include +#ifdef _MSC_VER +#define DEPRECATED __declspec(deprecated) +#elif defined(__GNUC__) | defined(__clang__) +#define DEPRECATED __attribute__((__deprecated__)) +#else +#define DEPRECATED +#endif + /** * NiFi Port struct */ diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index a7e1125fed..11f5f4e3bc 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -376,12 +376,7 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { } flow * create_new_flow(nifi_instance * instance) { - auto minifi_instance_ref = static_cast(instance->instance_ptr); - flow * area = static_cast(malloc(1*sizeof(flow))); - if(area == nullptr) { - return nullptr; - } - return new(area) flow(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); + return create_flow(instance, ""); } flow *create_flow(nifi_instance *instance, const char *first_processor) { From 175fc40ec4d24f1c463a0ba5b14ee7143ac5218c Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Mon, 3 Dec 2018 16:55:23 +0100 Subject: [PATCH 3/5] MINIFICPP-682 - fix leaking ExecutionPlans --- nanofi/src/api/nanofi.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index 11f5f4e3bc..d050c3e2aa 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -476,6 +476,7 @@ char * get_property(const processor_context * context, const char * name) { int free_flow(flow *flow) { if (flow == nullptr) return -1; + flow->~flow(); free(flow); return 0; } From e3a71d59f0791738f322f2a83ab92cd4c84ce837 Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Tue, 4 Dec 2018 17:24:14 +0100 Subject: [PATCH 4/5] MINIFICPP-682 - Remove usage of deprecated functions --- nanofi/src/api/nanofi.cpp | 18 +++++++++--------- nanofi/tests/CAPITests.cpp | 16 ++++++++-------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index d050c3e2aa..37ca9fc882 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -96,7 +96,6 @@ nifi_instance *create_instance(const char *url, nifi_port *port) { } standalone_processor *create_processor(const char *name) { - static int proc_counter = 0; auto ptr = ExecutionPlan::createProcessor(name, name); if (!ptr) { return nullptr; @@ -107,8 +106,7 @@ standalone_processor *create_processor(const char *name) { port.port_id = portnum; standalone_instance = create_instance("internal_standalone", &port); } - std::string flow_name = std::to_string(proc_counter++); - auto flow = create_flow(standalone_instance, flow_name.c_str()); + auto flow = create_new_flow(standalone_instance); std::shared_ptr plan(flow); plan->addProcessor(ptr, name); ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan); @@ -376,10 +374,6 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { } flow * create_new_flow(nifi_instance * instance) { - return create_flow(instance, ""); -} - -flow *create_flow(nifi_instance *instance, const char *first_processor) { if (nullptr == instance || nullptr == instance->instance_ptr) { return nullptr; } @@ -388,9 +382,15 @@ flow *create_flow(nifi_instance *instance, const char *first_processor) { if(area == nullptr) { return nullptr; } + flow *new_flow = new(area) flow(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); + return new_flow; +} + +flow *create_flow(nifi_instance *instance, const char *first_processor) { + auto new_flow = create_new_flow(instance); - if (first_processor != nullptr && strlen(first_processor) > 0) { + if(new_flow != nullptr && first_processor != nullptr && strlen(first_processor) > 0) { // automatically adds it with success new_flow->addProcessor(first_processor, first_processor); } @@ -410,7 +410,7 @@ processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfig * c) { static const std::string first_processor = "GetFile"; - flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow; + flow *new_flow = parent_flow == nullptr ? create_new_flow(instance) : parent_flow; // automatically adds it with success auto getFile = new_flow->addProcessor(first_processor, first_processor); diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index a61aae8c8d..1c46e0651f 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -97,7 +97,7 @@ std::string create_testfile_for_getfile(const char* sourcedir, const std::string TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); REQUIRE(test_flow != nullptr); processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); REQUIRE(test_proc != nullptr); @@ -108,7 +108,7 @@ TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); processor *test_proc = add_processor(test_flow, "NeverExisted"); REQUIRE(test_proc == nullptr); processor *no_proc = add_processor(test_flow, ""); @@ -120,7 +120,7 @@ TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") { TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); REQUIRE(test_flow != nullptr); processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); REQUIRE(test_proc != nullptr); @@ -144,7 +144,7 @@ TEST_CASE("get file and put file", "[getAndPutFile]") { const char *putfiledir = testController.createTempDirectory(put_format); auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); REQUIRE(test_flow != nullptr); processor *get_proc = add_processor(test_flow, "GetFile"); REQUIRE(get_proc != nullptr); @@ -194,7 +194,7 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); REQUIRE(test_flow != nullptr); processor *get_proc = add_processor(test_flow, "GetFile"); @@ -267,7 +267,7 @@ TEST_CASE("Test error handling callback", "[errorHandling]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); REQUIRE(test_flow != nullptr); // Failure strategy cannot be set before a valid callback is added @@ -364,7 +364,7 @@ TEST_CASE("Test interaction of flow and standlone processors", "[testStandaloneW auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); REQUIRE(test_flow != nullptr); processor *get_proc = add_processor(test_flow, "GetFile"); @@ -435,7 +435,7 @@ TEST_CASE("Test custom processor", "[TestCutomProcessor]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); + flow *test_flow = create_new_flow(instance); REQUIRE(test_flow != nullptr); processor *get_proc = add_processor(test_flow, "GetFile"); From d915ba37b4d176174d83c22d74b38d87ef32a859 Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Wed, 5 Dec 2018 17:21:39 +0100 Subject: [PATCH 5/5] MINIFICPP-682 - review comment fixes --- nanofi/examples/terminate_handler.c | 4 +++- nanofi/include/api/nanofi.h | 24 +++++++++++------------- nanofi/src/api/nanofi.cpp | 2 +- nanofi/tests/CAPITests.cpp | 8 ++++---- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/nanofi/examples/terminate_handler.c b/nanofi/examples/terminate_handler.c index 1d5150d077..8b8989b92c 100644 --- a/nanofi/examples/terminate_handler.c +++ b/nanofi/examples/terminate_handler.c @@ -40,7 +40,9 @@ int main(int argc, char **argv) { nifi_instance *instance = create_instance("random instance", &port); - flow *new_flow = create_flow(instance, "GenerateFlowFile"); + flow *new_flow = create_new_flow(instance); + + processor *generate_proc = add_processor(new_flow, "GenerateFlowFile"); processor *put_proc = add_processor(new_flow, "PutFile"); diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h index 7c57441c03..99eadf830a 100644 --- a/nanofi/include/api/nanofi.h +++ b/nanofi/include/api/nanofi.h @@ -42,13 +42,13 @@ extern "C" { /** * Enables logging (disabled by default) - */ + **/ void enable_logging(); /** * Sets terminate callback. The callback is executed upon termination (undhandled exception in C++ backend) * @param terminate_callback the callback to execute - */ + **/ void set_terminate_callback(void (*terminate_callback)()); /**** @@ -62,13 +62,13 @@ void set_terminate_callback(void (*terminate_callback)()); * @param url remote URL the instance connects to * @param port remote port the instance connects to * @return pointer to the new instance - */ + **/ nifi_instance *create_instance(const char *url, nifi_port *port); /** * Initialize remote connection of instance for transfers * @param instance - */ + **/ void initialize_instance(nifi_instance * instance); /** @@ -76,7 +76,7 @@ void initialize_instance(nifi_instance * instance); * @attention Any action on flows that belong to the freed instance are undefined after this is done! * It's recommended to free all flows before freeing the instance. * @param instance instance to be freed - */ + **/ void free_instance(nifi_instance * instance); /**** @@ -116,21 +116,19 @@ DEPRECATED flow *create_flow(nifi_instance * instance, const char * first_proces /** * Add a getfile processor to "parent" flow. * Creates new flow in instance in case "parent" is nullptr - * @deprecated as getfile processor can be added using "add_processor" function, - * properties can be set using "set_property". * @param instance the instance the flow belongs to * @param parent the flow to be extended with a new getfile processor * @param c configuration of the new processor * @return parent in case it wasn't null, otherwise a pointer to a new flow */ -DEPRECATED flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); +flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); /** * Extend a flow with a new processor * @param flow the flow to be extended with the new processor * @param name name of the new processor * @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?) - */ + **/ processor *add_processor(flow * flow, const char * name); processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); @@ -151,8 +149,8 @@ void free_standalone_processor(standalone_processor* processor); /** * Register your callback to received flow files that the flow failed to process - * The flow file ownership is transferred to the caller! - * The first callback should be registered before the flow is used. Can be changed later during runtime. + * @attention The flow file ownership is transferred to the callback! + * @attention The first callback should be registered before the flow is used. Can be changed later during runtime. * @param flow flow the callback belongs to * @param onerror_callback callback to execute in case of failure * @return 0 in case of success, -1 otherwise (flow is already in use) @@ -260,7 +258,7 @@ flow_file_record *invoke_file(standalone_processor* proc, const char* path); * @param size specifies the size of the buffer * @return a flow file record or nullptr in case no flowfile was generated **/ -flow_file_record *invoke_chunck(standalone_processor* proc, uint8_t* buf, uint64_t size); +flow_file_record *invoke_chunk(standalone_processor *proc, uint8_t *buf, uint64_t size); int transfer(processor_session* session, flow *flow, const char *rel); @@ -321,7 +319,7 @@ uint8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute); * @param ff flow file * @return the number of attributes **/ -int get_attribute_qty(const flow_file_record* ff); +int get_attribute_quantity(const flow_file_record *ff); /** diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index 37ca9fc882..78d1c09e08 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -290,7 +290,7 @@ uint8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) return -1; } -int get_attribute_qty(const flow_file_record* ff) { +int get_attribute_quantity(const flow_file_record *ff) { if (ff == nullptr) { return 0; } diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index 1c46e0651f..721a0cdc83 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -45,7 +45,7 @@ static int failure_count = 0; void failure_counter(flow_file_record * fr) { failure_count++; - REQUIRE(get_attribute_qty(fr) > 0); + REQUIRE(get_attribute_quantity(fr) > 0); free_flowfile(fr); } @@ -231,7 +231,7 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { // Update overwrites values update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)); // NOLINT - int attr_size = get_attribute_qty(record); + int attr_size = get_attribute_quantity(record); REQUIRE(attr_size > 0); attribute_set attr_set; @@ -319,7 +319,7 @@ TEST_CASE("Test standalone processors", "[testStandalone]") { flow_file_record* ffr = invoke(getfile_proc); REQUIRE(ffr != nullptr); - REQUIRE(get_attribute_qty(ffr) > 0); + REQUIRE(get_attribute_quantity(ffr) > 0); standalone_processor* extract_test = create_processor("ExtractText"); REQUIRE(extract_test != nullptr); @@ -331,7 +331,7 @@ TEST_CASE("Test standalone processors", "[testStandalone]") { // Verify the transfer of attributes REQUIRE(ffr2 != nullptr); - REQUIRE(get_attribute_qty(ffr2) > 0); + REQUIRE(get_attribute_quantity(ffr2) > 0); char filename_key[] = "filename"; attribute attr;