NIFI-4946 nifi-spark-bundle : Adding support for pyfiles, file, jars options#2521
Conversation
|
Team, This MR was created in line with our current requirements. We would like to make this changes go in mainline, for which we need community help with reviewing and adding test code, since we are new to the Nifi extensions. Next plan is to add test cases :
Could any one point out some existing test cases that does similar testing. |
zenfenan
left a comment
There was a problem hiding this comment.
Thank you @Mageswaran1989 for the contribution. I haven't done actual testing with these changes. I'll do that update the review as well. Thanks
| properties.add(MAIN_PY_FILE); | ||
| properties.add(NAME); | ||
| properties.add(CODE); | ||
| // properties.add(ARGS); |
There was a problem hiding this comment.
Only pyfiles and file options are tested. Rest are yet to be tested.
Plan was to go with implementing test modules and test other features, since the current manual testing takes a long routine of compile, copy and restart of the Nifi.
| } | ||
| } | ||
|
|
||
| log.debug(" ====> jsonResponse: " + jsonResponse); |
There was a problem hiding this comment.
Cosmetic change: It would be great if this log.debug message can be changed to something of a proper standard, like "JSON Response : " i.e. Remove ====>
There was a problem hiding this comment.
Sure, this will be removed in the next commit.
| break; | ||
| default: | ||
| log.debug(" ====> default State: " + state); | ||
| session.transfer(flowFile, REL_WAIT); |
There was a problem hiding this comment.
Same as above for these log.debug messages as well.
|
|
||
| public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() | ||
| .name("exec-spark-iactive-jarfiles") | ||
| .displayName("jars") |
There was a problem hiding this comment.
displayName is what will be rendered on the UI. So lets change it to JARs or Application JARs?
| .expressionLanguageSupported(false) | ||
| .build(); | ||
|
|
||
| public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder() |
There was a problem hiding this comment.
Is this supposed to be the Spark app name? Looks it is never used anywhere other than adding to the PropertyDescriptor list
There was a problem hiding this comment.
Like said before, not yet considered. We just wanted to get a hang of the code with our basic requirements
|
|
||
| public static final PropertyDescriptor MAIN_PY_FILE = new PropertyDescriptor.Builder() | ||
| .name("exec-spark-iactive-main-py-file") | ||
| .displayName("file") |
There was a problem hiding this comment.
Same as the JARs case. Most of the PropertyDescriptor use all lowercase characters for displayName. Please change it.
|
|
||
| String jsonResponse = null; | ||
|
|
||
| if (StringUtils.isEmpty(jsonResponse)) { |
There was a problem hiding this comment.
This will be true all the time, right?
There was a problem hiding this comment.
Once current approach is accepted this can be taken care
| //Incoming flow file is not an JSON file hence consider it to be an triggering point | ||
|
|
||
| String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " + | ||
| "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }"; |
There was a problem hiding this comment.
This is confusing to me. Why are we saying that if the incoming flowfile is not a valid JSON, we are going ahead with the assumption that it is going to be PySpark? I mean the assumption here lacks clarity. Please correct me, if I'm wrong.
There was a problem hiding this comment.
Could you please check the description @ https://issues.apache.org/jira/browse/NIFI-4946
The assumption was made such that it doesn't break existing code flow and at the same time we wanted to know the status of the submitted job.
So the naive idea was to re-route the Livy Json response back to Spark processor only, so that it can get last submitted url from the custom (tampered) JSON response, wait for user specified wait time and again query the Livy for the Job status in a loop till it succeeds or fails.
So when the processor is configured to submit a Spark job, it will expect the incoming flowfile to be an custom Json response with an url field to query the Livy, if not it is considered as a triggering point nothing else.
I am open for any ideas from your end.
Thanks.
There was a problem hiding this comment.
@zenfenan could please review above logic and suggest a way to handle plain Scala/Python code and packages source files for pyfiles/jars ?
There was a problem hiding this comment.
The code can be submitted using the CODE property, right? That used to work. Or are you asking of a way to upload/send files or jars through Livy?
There was a problem hiding this comment.
As per the code flow @ https://issues.apache.org/jira/browse/NIFI-4946, currently I am able to send *.zip files (Python modules) through livy. My question was what should we do with flowfile, when we are using the processor to submit a batch job?
There was a problem hiding this comment.
Sometime this week I am planning to add support for jar files, args and application name over the Livy options.
The catch here is unlike plain Spark code, batch process code will take time to finish which is expected one as we know. So as a hack I was re-routing the Json response after batch submission to itself, where I poll the incoming flowfile and check whether it is a Json file and if so I will try to get the "livy url" to post again to know the status of the batch job as long as it runs. After knowing the the job finished, the success route is triggered.
That was the reason the I have made an assumption if the incoming file is Json, it is from previous batch job submission.
In short, the flow file :
- Is considered as a triggering point (or)
- Is considered as plain Spark code that compiles over Livy (or)
- Is a Livy Json response, which can further be used to check the status of long running Spark batch job
I was looking for the right Nifi way of handling this?
I feel I am too conservative and trying to fit all the functionalities on one processor.
- Flow file/property can be used to run a Spark code
- Pyfiles can be used to run Spark batch job
- Jars can be used to run Spark batch job
- Args options for batch mode
- By rerouting the success to itself, it can monitor the long running job over Livy rest APIs
There was a problem hiding this comment.
Since the current processor is called ExecuteSparkInteractive, you could move the batch functionality out into something called SubmitSparkJob or something. The outgoing flow file could contain the application ID and/or any other information that would allow you to monitor the job downstream (perhaps with InvokeHttp through Livy, e.g.) Are you still interested in working on this? It seems like very nice functionality to have in NiFi!
|
|
||
| } catch (JSONException | InterruptedException e) { | ||
|
|
||
| //Incoming flow file is not an JSON file hence consider it to be an triggering point |
There was a problem hiding this comment.
Cosmetic change: Multiple empty lines were left. IMHO, one empty line should be enough for better readability.
|
We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable. If you would like this PR re-opened you can do so and a committer can remove the stale tag. Or you can open a new PR. Try to help review other PRs to increase PR review bandwidth which in turn helps yours. |
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.