Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-740: Add ability to run NiFi processors from Java and Python #489

Closed
wants to merge 7 commits into from

Conversation

phrocker
Copy link
Contributor

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file?
  • If applicable, have you updated the NOTICE file?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

@phrocker phrocker requested a review from apiri February 21, 2019 16:44
@phrocker
Copy link
Contributor Author

not satisfied that I put all java code underneath one package. that was pure laziness. I will move those out at some point ( either in a follow on ticket or in this one )

CMakeLists.txt Outdated Show resolved Hide resolved
@apiri
Copy link
Member

apiri commented Feb 22, 2019

reviewing

@phrocker
Copy link
Contributor Author

reviewing

good luck

@apiri
Copy link
Member

apiri commented Feb 22, 2019

This is going to be the best functionality we add in 2020!

Copy link
Contributor

@arpadboda arpadboda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments, but nowhere near the end yet, this is A LOT.

if ( registry && registry->getConfigurationProperty( attribute_id , result) ) {
return Value(result);
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is needless. (Maybe it was my bad in my PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


jni_logger_ref_.logger_reference_ = nifi_logger_;

std::string dir_name_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be unreferenced. (Guess it's just left)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

init = java_servicer_->loadClass("org/apache/nifi/processor/JniInitializationContext");

if (context_instance_ != nullptr) {
java_servicer_->attach()->DeleteGlobalRef(context_instance_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't somehow JniProcessContext be responsible for the ref handling?
It would be nice to have it in RAII.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot. The lifetime of the object is inherently tied to a context, session, and flow file within Java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and to give more context: ProcessContext is probably the only object where we might be able to use RAII. The other objects ( flow file, session, and session factory we cannot ). As a result I made the painful decision to return the raw jobject versus a wrapped object. There are some things I'd like to improve in this class but hope to do so in subsequent releases.

if (registered_classes_.find(className) == std::end(registered_classes_)) {
auto cls = servicer->loadClass(className);
cls.registerMethods(env, signatures);
registered_classes_.insert(className);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::set::insert supports exactly what you need:

1-2) Returns a pair consisting of an iterator to the inserted element (or to the element that prevented the insertion) 
and a bool value set to true if the insertion took place.

So you can avoid using find.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment as to why find was chosen


JniSessionFactory *factory = new JniSessionFactory(ptr, java_servicer_, obj);

session_factories_.push_back(factory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess the mutex should be locked here, too. (Vector::push_back may result in moving elements)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I'm still testing. may not need more than one, in which case I Just applied a setter -- but will lock if this persists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the lock in the event I forget to remove this code -- but it seems like I can get away with just one ( hence why it was a setter versus being called add ). Please don't resolve this convo so it's a placesetter so I can respond in the even the code gets removed. Thanks.

* @return class name for the processor.
*/
virtual std::vector<std::string> getClassNames() {
std::vector<std::string> class_names;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static?

Copy link
Contributor Author

@phrocker phrocker Feb 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that won't have any negatives at all and certainly will help if someone were to call it many times thanks.

}
if (S_ISDIR(statbuf.st_mode)) {
// if this is a directory
if (strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason of using strcmp here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I copied this from elsewhere in the code as it did something similar, but could have easily used stringutils for that.

}

std::string modifiedName = name;
modifiedName = utils::StringUtils::replaceAll(modifiedName, "/", ".");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly what we expect clazz_name to be, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily ( and not always ) but I will add a comment here. Thanks

std::vector<std::string> options;
for (const auto &path : pathVector) {
if (str.str().length() > 0) {
#ifdef WIN32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#ifdef WIN32
const char * delimiter = ";";
#else 
const char * delimiter = ":";
#endif
stringstream  str;
std::copy(pathVector.begin(), pathVector.end(), std::ostream_iterator<std::string>(str, delimiter));

A bit Pythonic, but I guess easier to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just trying to limit trailing characters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pathVector contain empty strs?

arpadboda and others added 2 commits February 25, 2019 15:23
MINIFICPP-740: Fix autoconf location issue

MINIFICPP-740: minor updates

MINIFICPP-740: Topologically sort dependencies

MINIFICPP-740: Updates to package and jni subproject
}
if (S_ISDIR(statbuf.st_mode)) {
// if this is a directory
if (strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be nearly the same as "JVMCreator::addPath".

Can we make it an util function to avoid duplication?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! Thanks!


auto ff = ptr->get();
if (ff == nullptr) {
minifi::jni::ThrowJava(env, "Calling function on null flow file");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using a util func or just a macro, like

checkFlowFile(env, ff); 

would be useful to avoid duplicating the same error message.

Copy link
Contributor Author

@phrocker phrocker Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going through and doing cleanup now... and I extracted a lot of those string references to macros -- but I just saw this in an E-mail ( haven't gone through the rest ) -- just wanted to comment that I really like this idea. Thanks!

return true;
}
jboolean Java_org_apache_nifi_processor_JniLogger_isInfoEnabled(JNIEnv *env, jobject obj) {
minifi::jni::JniLogger *logger_ref = minifi::jni::JVMLoader::getPtr<minifi::jni::JniLogger>(env, obj);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what's happening here?
I would expect the to check the logger (to see if [info, error, trace] is enabled) and return that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I did some cleanup after I submitted review. I didn't realize you submitted this. I think these are addressed.

return true;
}

void Java_org_apache_nifi_processor_JniLogger_warn(JNIEnv *env, jobject obj, jstring msg) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a private function with a log level parameter could help here:

void Java_org_apache_nifi_processor_JniLogger_log(JNIEnv *env, jobject obj, jstring msg, spdlog::level log_level) {
    minifi::jni::JniLogger *logger_ref = minifi::jni::JVMLoader::getPtr<minifi::jni::JniLogger>(env, obj);
  if (!logger_ref)
    return;
  const char *msgStr = env->GetStringUTFChars(msg, 0);
  logger_ref->logger_reference_->log(log_level, msgStr);
  env->ReleaseStringUTFChars(msg, msgStr);
}

So the rest can be done in one line, like:

void Java_org_apache_nifi_processor_JniLogger_warn(JNIEnv *env, jobject obj, jstring msg) {
  ava_org_apache_nifi_processor_JniLogger_log(env, ojb, msg, spdlog::level::warn)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion.

method_ptr_ = std::unique_ptr<JNINativeMethod>(new JNINativeMethod[methods_.size()]);
size_ = methods_.size();
int i = 0;
for (auto &mthd : methods_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to just increment "i" in the for loop and use that to index methods_ as well, the mixture of the two (range-based for and using indexes) is a bit fuzzy in my opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha I don't know why I did that. Thanks for finding it!


virtual void remove() override;

std::shared_ptr<core::FlowFile> get() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some changes before seeing some of these. I think many of these were caught. But I may have missed something.

return ff_object;
}

bool operator==(const JniFlowFile &other) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh...Eclipse doesn't auto add the const when it generates these functions. Thanks!

return ref_ == other.ref_;
}

bool empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some changes before seeing some of these. I think many of these were caught, including this one


protected:
bool removed_;
std::mutex session_mutex_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this mutable and make some member funcs const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some changes before seeing some of these. I think many of these were caught, including this one

}
{

jmethodID mthd = env->GetMethodID(class_ref_.getReference(), "getSignature", "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These blocks have so much in common, can it be a loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely. I'll catch it on another round of cleanup. thanks!

processor.setSupportsDynamicProperties()
processor.addProperty("property name","description","default value", True, False)
```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a previous comment here, which I don't see now, so sorry for the duplication in case that happens!

Things I saw in the code and worth documenting in my opinion:
-getProperty call supports getting value of dynamic properties, too. (as it's different in C++, I think it would be beneficial to have it documented)
-Python processors support success and failure relationships, these are the ones the script can send to.

A question: what happens in case there is a Python exception thrown in "onTrigger"?
I wonder if a rollback should be triggered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will yield, as execute script does, if the python writer doesn't actually do the legwork of rolling back.

}
if (S_ISDIR(statbuf.st_mode)) {
// if this is a directory
if (strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same strcmp usage here as in Java

@@ -339,6 +339,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
// function to load the flow file repo.
void loadFlowRepo();

void initializeJNI();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if JNI extension is not built/added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a referenced processor isn't there we'll error out and tell them. Otherwise this will be a non issue.

} else {
getExternalMappings()[details.artifact].processors_.push_back(description);
}
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function always returns true, does it make sense to have bool return type?

options.erase(std::remove(options.begin(), options.end(), opt), options.end());
}

// even if a white list is configured, remove the black listed fields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind having both whitelist and blacklist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment does this a disservice but white listed fields allow selection only, blacklisted fields allow removal of fields from registry. 2 options, either it's an error condition to have both or we just remove blacklisted fields i the white list is specified. that was my preference for COOP.

while (!verify_not_exist(file_name)) {
file_name = "/tmp/" + non_repeating_string_generator_.generate();
file_name = tmpDir + FILE_SEPARATOR + non_repeating_string_generator_.generate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filesystem in cpp17 is really missed here... :(

@@ -292,7 +292,7 @@ class error_already_set : public std::runtime_error {
/// Constructs a new exception from the current Python error indicator, if any. The current
/// Python error indicator will be cleared.
error_already_set() : std::runtime_error(detail::error_string()) {
PyErr_Fetch(&type.ptr(), &value.ptr(), &trace.ptr());
//PyErr_Fetch(&type.ptr(), &value.ptr(), &trace.ptr());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a follow on ticket to explore this further -- but this was causing problems. I read several old tickets where people removed this line in regards to triggering seg faults. Usually this is an issue elsewhere, but ExecuteScript was causing problems. In most cases it was fine because we weren't really handling python exceptions, but I created a follow on because at some point I would like to better deal with that.

@arpadboda
Copy link
Contributor

arpadboda commented Feb 26, 2019

Looks good as far as I can tell! The comments I considered important are resolved.

@phrocker : could you make an exemption and add cleanups or anything else added later as a separate commit instead of amend?
The PR is very big, so it's difficult to find the changes. Thanks in advance!

@phrocker

This comment has been minimized.

Copy link
Member

@apiri apiri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had some minor comments on things but the functionality works well. I performed builds/testing on MacOS, Ubuntu 18.04, and CentOS 7 using both NARs and processors defined in python. Everything worked fairly well with the Ubuntu and CentOS items running for the past several days.

Of my comments, I think we need to tend to the licensing concerns before merge, but everything else can be addressed later if needed. I saw a few comments that seemed to still be open from @arpadboda. If you could give those a scan through and make sure you're good with those, I can take care of licensing and get this merged in.

@@ -0,0 +1,127 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we need gitignore for java in our c++ 😱

if (ENABLE_JNI)
set(BASE_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/jemalloc")

set(DIR "${BASE_DIR}/extensions/jemalloc-src")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to capture this in our LICENSE (2-clause bsd)

followed by the description, and default value. The last two arguments are booleans describing if the property is required or requires EL.

```python
def onIitialize(processor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onInitialize


Methods that are enabled within the processor are describe, onSchedule, onInitialize, and onTrigger.

Describe is passed the a processor and is a required function. You must set the description like so:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is passed the processor

```


void addProperty(const std::string &name, const std::string &description, const std::string &defaultvalue, bool required, bool el) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little confused as to why this is here. did you mean to include it as one of the available methods within the processor? There is also a mention of getProperty below.

See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to have NiFi in our L&N


final File bundledDependencies = new File(unpackedNar, "NAR-INF/bundled-dependencies");

unpackBundleDocs(docsDirectory, mapping, bundleCoordinate, bundledDependencies);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need to do this as the docs will not be used/exposed in the minifi context

Copy link
Contributor Author

@phrocker phrocker Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation provides useful info in the jar entries ( meta-inf/services/*) that could help us supplement the service building the agent manifest info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nm i don't think i could really get all the information I need without loading processors anyway. good point re the docs.

@@ -0,0 +1,398 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other Java that is substantially based off of NiFi should be treated as a source inclusion in our L&N. This is mostly just enumerating the particular files we are including given the ALv2 license.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh man missed this. thanks


// attempt to delete any docs files that exist so that any components that have been removed
// will no longer have entries in the docs folder
final File[] docsFiles = docsWorkingDir.listFiles();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc work should be irrelevant

@arpadboda
Copy link
Contributor

arpadboda commented Feb 28, 2019

@phrocker:
I don't like the fact that JNI API doesn't provide RAII wrapper from UTF string handling, so I quickly scratched one:

class JNIUTFString {
 public:
  JNIUTFString(JNIEnv * env, jstring jstr) {
    _env = env;
    _jstr = jstr;
    _cstr = _env->GetStringUTFChars(_jstr, NULL);
  }
  ~JNIUTFString() {
    _env->ReleaseStringUTFChars(_jstr, _cstr);
  }
  
  operator std::string() const { return _cstr; }
  
  const char * c_str() const { return _cstr; }
  
  friend std::ostream& operator<<(std::ostream& os, const JNIUTFString& jutfstr);
  
 private:
  JNIEnv * _env;
  const char * _cstr;
  jstring _jstr;
};

std::ostream& operator<<(std::ostream& os, const JNIUTFString& jutfstr)
{
    os << jutfstr.c_str();
    return os;
}

This can be used as an std::string, but you don't have to deal with conversion, releasing, etc, usage is simple:

  JNIUTFString jutf(env, jstr);
  std::set<std::string> test_set;
  test_set.insert(jutf);
  std::cout << jutf <<std::endl;
  std::cout << jutf.c_str() << std::endl;

Feel free to add if you like it.

@phrocker
Copy link
Contributor Author

@apiri I didn't resolve convos because I believe that is up to the author of that comment; however, I believe I addressed most issues. please take another gander. I'm not sure if that last comment was directed at me or Arpad, though.

@apiri
Copy link
Member

apiri commented Feb 28, 2019

@phrocker sounds fair. will do one last scan/build/test a bit later and we can move on. thanks!

@asfgit asfgit closed this in 8a4c2ac Mar 1, 2019
nghiaxlee pushed a commit to nghiaxlee/nifi-minifi-cpp that referenced this pull request Jul 8, 2019
This closes apache#489.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants