-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-2854: Refactor repositories and swap files to use schema-based s… #1202
Conversation
Reviewing |
Looks like the PR failed RAT check in travis |
Here are the failures:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @markap14, I took a look at the supporting classes to get a base understanding of the changes and will start reviewing the changes in repos later tonight to tomorrow morning. I figured I'd post the feedback I have so far though.
* Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section | ||
* | ||
* @param other other claim | ||
* @return x such that x <=1 if this is less than other; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this should be "-1"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm this is a very odd comment in the code. will address.
import java.io.InputStream; | ||
|
||
/** | ||
* This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means | ||
* that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance. | ||
*/ | ||
public class BufferedInputStream extends java.io.BufferedInputStream { | ||
public class BufferedInputStream extends InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be renamed to explicitly call out that this is not synchronized? This is a part of nifi-utils and I could easily see someone using it unknowingly because their IDE gives them the option. Or since it is part of nifi-utils that makes it public and can't be changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not explicitly public, but I do believe that others are depending on it, so I would be very hesitant to rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Joe... we've had instance where eclipse (and I assume IntelliJ / other IDEs) suggest the nifi version of BufferedInputStream, resulting in bad, unexpected behavior. The name is really unfortunate. Perhaps move the functionality to "UnsycnchronizedBufferedInputStream", modify nifi's BufferedInputStream to be an empty extension, and deprecate it. Then complete the rename / remove in a future release (like a major version bump, if the breaking change is the concern...).
|
||
@Override | ||
public SerDe<T> createSerDe(final String encodingName) { | ||
return serde; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignoring the "encodingName" parameter seems wrong. Any way to check if the serde set matches the passed encoding name?
this(fieldName, repetition, Arrays.asList(possibilities)); | ||
} | ||
|
||
public UnionRecordField(final String fieldName, final Repetition repetition, final List<RecordField> possibilities) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ComplexRecordField and SimpleRecordField call "Objects.requireNonNull" on their three inputs but "UnionRecordField" and "MapRecordField" do not. Is that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Will add requireNonNull checks.
|
||
package org.apache.nifi.repository.schema; | ||
|
||
public enum Requirement { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is leftover from design iterations and it's functionality has been merged into "Repetition"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you are correct.
} | ||
|
||
private void writeField(final RecordField field, final DataOutputStream dos) throws IOException { | ||
dos.writeInt(4); // 4 fields. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is/was very confusing to me. (I believe) this is writing the 4 values defining a field, not actually writing 4 fields.
This goes hand-in-hand with the loop in "readField" (that it's not reading multiple fields but is reading the values defining a field).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. Will re-word the comments.
Have you considered using a library such as Google Protocol Buffers to reduce the debt on NiFi in maintaining custom serialization logic? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @markap14, took a bit longer than anticipated but I just got finished reviewing the code. I'll let you address comments as necessary and I'll start testing soon.
} | ||
ffBuilder.entryDate(in.readLong()); | ||
|
||
if (version > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this versioning information documented anywhere?
|
||
final FlowFileQueue queue = getFlowFileQueue(connectionId); | ||
final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile); | ||
if (swapLocation != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is slightly different than the old WriteAheadRepositorySerde. In the old, it would check it if the FlowFile Queue was null and only do this logic of checking the swap location, connectionID and queue if it wasn't (line 637 of WriteAheadFlowFileRepository).
Just wanted to draw attention to the slight difference to be sure it's correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, it was checking of the flowFileQueueMap was null, I believe, not the flowfile queue itself. So this is okay - at this point, it's guaranteed non-null.
private final AtomicLong bytesRead = new AtomicLong(0L); | ||
private final AtomicLong bytesWritten = new AtomicLong(0L); | ||
private long bytesRead = 0L; | ||
private long bytesWritten = 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change these from AtomicLongs? None of the places where they're written/read could be from multiple threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProcessSession is not thread-safe. AtomicLong was used only because it was passed to ByteCountingInputStream / ByteCountingOutputStream. However, that was refactored and so there is no longer a need to use AtomicLong's
final String queueIdentifier = splits[1]; | ||
if (!queueIdentifier.equals(flowFileQueue.getIdentifier())) { | ||
continue; | ||
if (splits.length > 6) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this broken before? The ID scheme for queue hasn't changed (still a UUID)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - this was broken before. It was never noticed because it was a simple performance tweak but i noticed this as I was stepping through code.
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); | ||
} | ||
private SwapDeserializer createSwapDeserializer(final DataInputStream dis) throws IOException { | ||
dis.mark(4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably best to use MAGIC_HEADER.length here instead of 4 like the init for the byte array uses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call.
} | ||
|
||
@Test | ||
public void testWritePerformance() throws IOException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be ignored so that it isn't run every build? There aren't any assertions, just the println at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - good catch.
headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer. | ||
|
||
if (serializationVersion < 1 || serializationVersion > 9) { | ||
throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-9"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be version 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really, this logic shouldn't exist at all. The version is really local to the specific reader/writer being used. Will move it there.
final long curOffset = rawInputStream.getBytesConsumed(); | ||
|
||
final long bytesToSkip = offset - curOffset; | ||
if (bytesToSkip >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this throw an exception when the bytesToSkip is < 0 (ie. it already passed the block). Currently it silently does nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we want to silently do nothing. This allows us to just call skipToBlock(curBlock)
for subsequent records without having to add logic to determine if we are already on the proper block. I.e., this allows us to do something like skipToBlock(1); readRecord(); skipToBlock(1); readRecord();
If we threw an Exception when bytesToSkip < 0 then the second call to skipToBlock(1)
would fail since we would have already consumed data past the beginning of the block.
|
||
import org.apache.nifi.provenance.ProvenanceEventRecord; | ||
|
||
public interface FieldSerializer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intellij tells me this class isn't used, remnant of design iterations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - good catch.
@@ -1914,112 +1916,6 @@ public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws I | |||
} | |||
|
|||
|
|||
@Test | |||
public void testBehaviorOnOutOfMemory() throws IOException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this test removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was assuming that the Record Writer was the SimpleRecordWriter and really made some other assumptions that it shouldn't make. It wasn't a well-written unit test, in general, as it was assuming dependencies between classes, so was really more of an integration test. I considered refactoring to make it work but then decided to simply remove it, since we already have a JIRA anyway for fixing how NiFi handles OOME by letting the process die and having the bootstrap restart it.
@joshelser I did actually look into using both Protocol Buffers as well as Avro to perform the serialization/deserialization. That really would be preferred, as they are both very stable libraries and much more "robust"/feature-rich than what we have here. Unfortunately, though, because of the way that their readers/writers work, using those would have required some pretty intense refactoring of some of the core repository code. This is largely due to the API that was created for the repository wasn't thought through well enough. For example, the RecordWriter has a |
…erialization so that nifi can be rolled back to a previous version after an upgrade.
Thanks for the thoughtful explanation, @markap14! It's very apparent that you have put the thought into this one. Sorry for doubting :) |
@joshelser no worries - I am glad that someone at least doubted that decision :) |
@JPercivall i have pushed a new commit that I believe should address your feedback. Thanks! |
private final List<RecordField> subFields; | ||
|
||
public ComplexRecordField(final String fieldName, final Repetition repetition, final RecordField... subFields) { | ||
this(fieldName, repetition, Stream.of(subFields).collect(Collectors.toList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NPE if null explicitly passed as the last argument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - it is assumed non-null.
} | ||
|
||
@Override | ||
public boolean equals(final Object obj) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic here appears almost identical to the one in ComplexRecordField. I'd consider some utility or base class with default implementations of equals, toString and hashCode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be but would require exposing private member variables and the logic is trivial enough that I don't think it's necessary.
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class MapRecordField implements RecordField { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above. . . consider abstract class
|
||
package org.apache.nifi.repository.schema; | ||
|
||
public class NamedValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need this? We already have Tuple<A, B>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have Tuple, which I would be inclined to use for a private variable but would prefer not to expose that publicly, as NamedValue I believe is more explicit in indicating its purpose than a Tuple<String, Object> would be
} | ||
|
||
public RecordSchema(final RecordField... fields) { | ||
this(Arrays.asList(fields)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NPE if null explicitly passed
return new UnionRecordField(fieldName, repetition, subFields); | ||
} else if (FieldType.MAP.name().equals(fieldTypeName)) { | ||
if (subFields.size() != 2) { | ||
throw new IOException("Found a Map that did not have a 'Key' field and a 'Value' field but instead had " + subFields.size() + " fields: " + subFields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO message is a bit confusing. I'd say "Field of type Map must only contain 2 elements representing key/value. Was. . . ."
((buffer[2] & 0xFF) << 8) + | ||
(buffer[3] & 0xFF); | ||
|
||
return value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably fall back on using ByteBuffer (just to avoid extra code to maintain)
The above could be eliminated with this:
return ByteBuffer.wrap(buffer).getInt();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good call. Will update.
import java.util.List; | ||
import java.util.Objects; | ||
|
||
public class SimpleRecordField implements RecordField { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as previous comment about abstract class. This one is actually identical to ComplexRecordField. The only difference is in toString() and even in both cases the string representing the class name (i.e., return "SimpleRecordField[f. . .
) could itself derive from this.getClass().getSimpleName()
@@ -16,19 +16,445 @@ | |||
*/ | |||
package org.apache.nifi.stream.io; | |||
|
|||
import java.io.IOException; | |||
import java.io.InputStream; | |||
|
|||
/** | |||
* This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means | |||
* that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the performance statement above is actually true. Synchronization by itself does not cause significant performance concerns (if any). It only comes to play when more then one thread is involved. Perhaps investigate and deprecate and use IO BufferedIS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're welcome to create your own benchmarks. My experience is that it causes tremendous performance implications when used in 'critical sections' of code. For example, if you have code that reads 1 GB of data, one byte at a time, you'd be crossing a synchronization barrier over 1 billion times. This can equate to several seconds spent simply crossing that barrier, even when using a single thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, FWIW, I just read the 5GB file 1B at the time and here are the numbers (in millis) of elapsed time:
121714
124480
123031
121378
The first two using java.io, the second using NIFI's.
There are differences but as you can see non can be attributed to sync especially if you look at the first and last number. In fact one can argue that second, third and fourth run was already on a pre-warmed page cache.
All I am questioning if it's worth for us to maintain extra code when one is available in core Java?
return oldValue; | ||
} | ||
|
||
return new TimedCountSize(oldValue.getCount() + toAdd.getCount(), oldValue.getSize() + toAdd.getSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a dead code?
I think the below code would do the same
if (oldValue == null && toAdd == null) {
return new TimedCountSize(0L, 0L);
} else if (oldValue == null) {
return toAdd;
} else {
return oldValue;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe those are the same...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you right, if both not null. My bad!
@Override | ||
public TimedCountSize aggregate(final TimedCountSize oldValue, final TimedCountSize toAdd) { | ||
if (oldValue == null && toAdd == null) { | ||
return new TimedCountSize(0L, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it more concise, consider delegating to createNew()
// does get thrown when calling close(), we don't need to blacklist the partition, as the stream that was getting | ||
// closed is not the stream being written to for the partition anyway. | ||
for (final OutputStream partitionStream : partitionStreams) { | ||
partitionStream.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if close()
fails (i.e., IOException) on one of the streams in the loop? Perhaps wrapping in try/catch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If close() fails we do want to throw an Exception. But you're right - we should catch the Exception first and close the other streams so that there is no resource leak. Will update that.
@olegz I did push a commit that I believe addresses the main concerns here. I did not update some of the stylistic changes proposed. |
if (tocReader != null) { | ||
tocReader.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should all close() calls be wrapped in try/catch. Worried about resource leaks if any (other then last) fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good call. Pushed another commit to address.
import org.apache.nifi.repository.schema.RecordField; | ||
import org.apache.nifi.repository.schema.RecordSchema; | ||
|
||
public class ProvenanceEventSchema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps change visibility to package since it is the only place it is used while also ensuring that it is not for public consumption until it is.
import org.apache.nifi.repository.schema.RecordField; | ||
import org.apache.nifi.repository.schema.RecordSchema; | ||
|
||
public class EventRecord implements Record { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same visibility comment as for ProvenanceEventSchema
|
||
logger.trace("Closing Record Writer for {}", file == null ? null : file.getName()); | ||
|
||
lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering what is the purpose of explicit locking when synchronized is used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused about the call to lock() as well... I thought it might be to guard tocWriter and the other methods used within close(), but methods like isDirty() are called elsewhere without calling lock(), and there's a getter for tocWriter. What is actually being protected by the lock that's not covered by the method being synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'synchronized' and the lock are there to protected two different things. The writer exposes an ability to lock it externally so that multiple operations (such as writeRecord, flush, etc) can be called atomically without anything else being written. The synchronized protects a few different member variables. Essentially, it's employing two completely disparate synchronization barriers in order to improve the throughput (no need to wait for a writer to finish writing many records and flush before returning the number of records written via getRecordsWritten() ).
I believe the code was more clear before I separated the writers out into abstract classes. As they are now, it is a bit confusing and perhaps is worth simply using the lock and not synchronizing for simplicity purposes. However, I would be very hesitant to refactor something like that now, as this ticket is blocking the 1.1.0 release, and I believe it is correct as-is. It is simply an abstraction of existing classes to produce more layers to avoid code repetition.
import org.apache.nifi.repository.schema.RecordField; | ||
import org.apache.nifi.repository.schema.RecordSchema; | ||
|
||
public class SwapSummaryFieldMap implements Record { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used outside the package, visibility may be reduced
private final RecordSchema schema; | ||
|
||
public ResourceClaimFieldMap(final ResourceClaim resourceClaim, final RecordSchema schema) { | ||
this.resourceClaim = resourceClaim; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps null checks,
…Reader.close() even if an IOException is thrown when closing one of them
LGTM, merging |
…erialization so that nifi can be rolled back to a previous version after an upgrade. NIFI-2854: Incorporated PR review feedback NIFI-2854: Implemented feedback from PR Review NIFI-2854: Ensure that all resources are closed on CompressableRecordReader.close() even if an IOException is thrown when closing one of them This closes apache#1202
…erialization so that nifi can be rolled back to a previous version after an upgrade. NIFI-2854: Incorporated PR review feedback NIFI-2854: Implemented feedback from PR Review NIFI-2854: Ensure that all resources are closed on CompressableRecordReader.close() even if an IOException is thrown when closing one of them This closes apache#1202
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
…erialization so that nifi can be rolled back to a previous version after an upgrade.