-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-5231 Added RecordStats processor. #2737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@MikeThomsen this is cool. The only thing it makes me wonder is if this kind of data can't be automatically generated and sent to a repository, almost like a new ( or actual ) reporting task. This seems like it lends itself to time series analysis like other things. Nifi doesn't necessarily have to provide that repo. |
|
I'm not sure how that'd work out because you need to actually read the flowfiles and calculate the stats. |
|
prob the reader and write would have to get some context passed where they can track states or increment stats, then be configured with a 'reporting' task to send the stats from a given context to |
|
Not sure if I like that approach because it could get pretty complicated to make the hand-off not impact the processing to any meaningful degree. The beauty of what we did was it just puts the data into the provenance repository and there aren't that many flowfiles to track. Maybe a few hundred thousand over the entire data set if we use appropriately-sized batches from |
|
That makes sense, just thinking it through, and obviously I don't understand everything as well ;) |
|
I'm not too familiar with the deep internals of the framework either. What we've seen is that with the records API it just makes sense to leverage the provenance system because it already tracks the attributes in a clean way you can leverage for stuff like giving managers a nice little ELK dashboard for the warm fuzzies. |
ottobackwards
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MikeThomsen thanks for the contribution, just a couple of nits.
|
|
||
| protected Map<String, String> getStats(FlowFile input, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) { | ||
| try (InputStream is = session.read(input)) { | ||
| RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input and is as var names is confusing here, can we name them closer to what they are to keep them straight?
flowFile and inputStream?
There is only one flowFile to track in this processor so none to get fancy ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| for (Map.Entry<String, RecordPath> entry : paths.entrySet()) { | ||
| RecordPathResult result = entry.getValue().evaluate(record); | ||
| Optional<FieldValue> value = result.getSelectedFields().findFirst(); | ||
| if (value.isPresent() && value.get().getValue() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is fieldValue needed? There are a lot of *values in this loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can drop that.
| } | ||
|
|
||
| recordCount++; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic Strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"record_count" => constant field maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a biggie
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@ijokarumawak can you review? |
ijokarumawak
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MikeThomsen Thanks for adding this, this looks really useful! I haven't run it yet but looked through the code, and added few comments, as the first review cycle. Please check those out. Thanks!
| .name("record-stats-reader") | ||
| .displayName("Record Reader") | ||
| .description("A record reader to use for reading the records.") | ||
| .addValidator(Validator.VALID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No validator is required for ControllerService.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| String approxValue = value.get().getValue().toString(); | ||
| String key = String.format("%s.%s", entry.getKey(), approxValue); | ||
| Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0; | ||
| Integer baseStat = retVal.containsKey(entry.getKey()) ? retVal.get(entry.getKey()) : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trivial, map.getOrDefault method can make these statement simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| RecordPathResult result = entry.getValue().evaluate(record); | ||
| Optional<FieldValue> value = result.getSelectedFields().findFirst(); | ||
| if (value.isPresent() && value.get().getValue() != null) { | ||
| String approxValue = value.get().getValue().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea for improvement. If the RecordPath result is number, providing the counts based on value will be useful? Rather, I'd like to see in addition to counts (current baseStat), also min, max, sum and optionally sum of squared x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do that as a separate ticket if you don't mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can add this later.
I hoped it will be included when this processor is released. Changing it after would require adding new configuration property. Well, that makes sense in some cases, too. E.g. if a number field represents some category information, or act as an enum, then user would expected the number of occurrence per the number value.
| return retVal.entrySet().stream() | ||
| .collect(Collectors.toMap( | ||
| e -> e.getKey(), | ||
| e -> e.getValue().toString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the non-number values, such as the 'sport' dataset, the list can be huge, order of thousands or more. That can be too big to store as FlowFile attributes. At lease we need a fixed number of attributes for this processor can add. It would be more helpful if we can limit the N number of values (i.e. soccer, football, basketball ... etc) and report the highest N variables, to report better stats with a long-tail dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. We can add a limit there.
|
@MikeThomsen Thanks for the updates. I will continue more close review on this when I have time, probably tomorrow. |
|
@ijokarumawak Anything to add now or can we close this out? |
ijokarumawak
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MikeThomsen I have built and test it locally. Posted several comments but I believe this would be the final review cycle before getting this merged. Thanks!
| @WritesAttributes({ | ||
| @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.") | ||
| }) | ||
| public class RecordStats extends AbstractProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since NiFi uses naming convention starting with a verve, this processor should be named such as 'CalcurateRecordStats' .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| @WritesAttributes({ | ||
| @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.") | ||
| }) | ||
| public class RecordStats extends AbstractProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.nifi.processor.Processor file is not updated to use this new processor from NiFi flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| .displayName("Record Reader") | ||
| .description("A record reader to use for reading the records.") | ||
| .identifiesControllerService(RecordReaderFactory.class) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'Record Reader' should be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| "user-defined criteria on subsets of the record set.") | ||
| @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) | ||
| @WritesAttributes({ | ||
| @WritesAttribute(attribute = RecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add recordStats.<User Defined Property Name>.count and recordStats.<User Defined Property Name>.count.<value>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the names, but done.
| <li>record_count: 5</li> | ||
| <li>sport: 5</li> | ||
| <li>sport.Soccer: 3</li> | ||
| <li>sport.Football: 2</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These property names should be more self-descriptive and not to overlap other property name spaces.
I suggest following names:
| current | suggestion |
|---|---|
| record_count | recordStats.count |
| sport | recordStats.sport.count |
| sport.Soccer | recordStats.sport.count.Soccer |
| sport.Football | recordStats.sport.count.Football |
Then we can add more stats later, such as recordStats.age.min or recordStats.age.max ... etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| protected Map<String, RecordPath> getRecordPaths(ProcessContext context) { | ||
| return context.getProperties().keySet() | ||
| .stream().filter(p -> p.isDynamic() && !p.getName().contains(RECORD_READER.getName())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think !p.getName().contains(RECORD_READER.getName()) part is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| .collect(Collectors.toMap( | ||
| e -> e.getName(), | ||
| e -> { | ||
| String val = context.getProperty(e).getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expect dynamic properties supports EL with FlowFile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@MikeThomsen Thanks for the updates. LGTM, +1! Merging. |
|
For future work, I've submitted this to add stats for numerical values. |
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.