-
Notifications
You must be signed in to change notification settings - Fork 39
Add asynchronous input format and bulk key. #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| /** | ||
| * Support various Input Formats | ||
| */ | ||
| job.setInputFormatClass(env.getInputFormat()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this change as I was submitting the PR so I will run some quick tests
src/main/java/org/apache/accumulo/testing/continuous/ThreadedContinousInputFormat.java
Outdated
Show resolved
Hide resolved
| key.getColumnQualifier(cq); | ||
| key.getColumnVisibility(cv); | ||
|
|
||
| WritableUtils.writeVInt(out, row.getLength()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the bulk ingest data, the row, family, and qualifier are all really small. The length of each of these should fit in a single byte. Could just always store the len in a single byte here instead of using writeVInt. Seems like this could simplify the code in KeyShortCircuitComparator.
Could have a little util method like
void writeLen(OutputStream out, int len){
Preconditions.checkArgument(len<128);
out.write(len);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could take this even further and assume the row, family, and qual are always the same len (with sanity checks) and not bother serializing the len. This could make the compare in KeyShortCircuitComparator really simple and super fast (would not even need to worry about fields).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good ideas. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@phrocker curious if you ever got a chance to give this a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@keith-turner I'm iteratively burning down my PRs, so I haven't jumped on this yet. I will post updates to this on Friday with some testing ( I may get to it tomorrow but have some meetings -- but this is the next one I wanted to finish up and address ). I have a sizable cluster where I can run this and post timing then. thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to try it @keith-turner . Made a positive impact! Thanks! I'm actually adding some unit tests in the Key class, so I'll shoot a rebased and updated PR Monday morning since it's gettin a bit late. This one was fun to test.
FYI I took your suggestions of reducing the complexity of the key comparator and removing the timestamp/delete. I added some unit tests that function as regression tests in case any assumptions change in the testing framework. BulkKey was kind of general, but "TestKey" is very specific and focused on the specificities of size/type of keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. Yeah a more specific name for the class would be good.
I added some unit tests that function as regression tests in case any assumptions change in the testing framework.
Nice, much better than runtime sanity checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I did not change was writing V int that's because it writes smaller sizes. One issue I ran into was spilling. If I use writeInt it writes 4 bytes, whereas writeVInt writes 1 byte for our sizes. While there are more branches it writes significantly less data to the context, resulting in overall greater throughput.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that saved 12 bytes per key, which was enough to keep me from avoiding a second spill and drastically slowing my jobs down.
| WritableUtils.writeVInt(out, cv.getLength()); | ||
| out.write(cv.getBytes(), 0, cv.getLength()); | ||
|
|
||
| WritableUtils.writeVLong(out, key.getTimestamp()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the current code is not setting the timestamp (not sure if that is a good thing). If its not being set, we could just not serialize it (possible with a sanity check).
Could also not serialize delete and instead do a sanity check to ensure its not a delete.
| @Override | ||
| public void write(DataOutput out) throws IOException { | ||
|
|
||
| key.getRow(row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method calls Text.set() which copies into the array inside of text. Could call getRowData(), which returns the array w/o copying. Still allocating an object, but its probably no diff than allocating the Text object.
| final BulkKey key = genKey(cksum); | ||
| final Value value = new Value(createValue(uuid, key.getRowData().toArray(), cksum)); | ||
| while (!queue.offer(Maps.immutableEntry(key, value), 1, TimeUnit.SECONDS)) { | ||
| if (!running.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does anything ever set this to false after its been set to true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me double check. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a test failure so I'll check those too. Thanks for the review!
|
I didn't create an issue, so allow me to provide justification for this PR. In reviewing running jobs I realized two things: A) spills and sorts were spending an inordinate amount of time in the sort of Keys. To address this BulkKey will have a comparator that will reduce the deserialization time and memory. I ran these on a test system with very positive results. Happy to provide any benchmark results if desired. I think with Keith's ideas I'll likely be able to make some additional improvements. |
I would like to see your results. |
Let me re-gather them and I'll paste the timing and key output here. Thanks! I think the original job dropped from 140 minutes to 70 minutes, but I'd love to relay some more specific information with the most recent changes. |
|
|
||
| public TestKey() {} | ||
|
|
||
| public TestKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean deleted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor may not be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh! I'm jumping around too much. Thanks for helping!
| * Supports immediate sorting via eager deser of the key object. This has the benefit of reducing | ||
| * the amount of deserialization that may occur when sorting keys in memory | ||
| */ | ||
| public class BulkKey implements WritableComparable<BulkKey> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this class be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it was only provided as comparison. I don't intend this to be in the final instance. I was just showing you the comparison via the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in the unit tests to provide a point of comparison and reference -- but was planning on removing it since you are the only one with any reference to what it is.
| byte[] b = WritableUtils.toByteArray(genTestKey(adder, adder)); | ||
| int comparisonResult = comparator.compare(a, 0, a.length, b, 0, b.length); | ||
| Assert.assertTrue("Comparision should be negative, not " + comparisonResult, | ||
| comparisonResult < 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to test when row same and cols differ. Following may do that.
byte[] c = WritableUtils.toByteArray(genTestKey(i, adder));
comparisonResult = comparator.compare(a, 0, a.length, c, 0, c.length);
 Assert.assertTrue("Comparision should be negative, not " + comparisonResult,
 comparisonResult < 0);
// could compare c to b
src/test/java/org/apache/accumulo/testing/continous/KeySerializationComparisonTest.java
Outdated
Show resolved
Hide resolved
src/test/java/org/apache/accumulo/testing/continous/KeySerializationComparisonTest.java
Outdated
Show resolved
Hide resolved
|
@keith-turner I've run this a number of times in cases of three and 30 B keys generated. I tested increasing number of mappers the keys computed her mapper, etc. I'll focus on the three billion rows across 200 mappers for demonstration purposes only Here is a sample of those test runs. I ran three sets of tests. One was the original setup without async threads or TestKey. The second was with async threads and BulkKey ( which I kept in the last commit for you to see that timing is roughly the same and we save a marginal amount of space ). The third set of runs were with async threads and BulkKey. The async threads and specialized key comparator ( scenarios two and three ) won out in every case. The average runtime per mapper for all three scenarios were ( across 5 runs ). The BulkKey and TestKey seem nearly identical; however, the amount of data written is dramatically different. For TestKey (scenario three) the aggregate output bytes were: 154,500,000,000. The other two resulted in 169,499,996,800 of key output. This also meant that when I doubled the number of keys, resulting in a single spill from TestKey, ( runtimes increased ) but the BulkKey and regular key objects actually resulted in a second spill that slowed down the job dramatically. In my opinion async and the key comparator actually helped tremendously but your insight into optimizing TestKey to not output 12 additional bytes has a linear benefit on reducing spills and reducing transfer costs. In the case of the 30B runs the current ( regular job ) took about an hour and ten minutes. The async job with BulkKey took 77 minutes. The TestKey job took 70 minutes on a cluster that ran 200 map tasks. I certainly think the cluster can be optimized/improved -- but I ran these across a separate cluster of 100 VMs and while I obviously didn't have the same timing, the general trends were improved. I wonder if we could/should port BulkKey over to Accumulo Proper for map reduce jobs. I think there is definitely benefit in optimizing the comparison path ( and why I did the testing here). As far as fine grained profiling, Yourkit showed improvements in the write path but spills were the slowest part ( I/O ). |
|
@phrocker thanks for the writeup
I think that is a good idea.
What test configuration was this? |
|
This PR is stale, has conflicts, and hasn't been updated in over a year and a half. I'll close it now. If this still is a desired change, please rebase and re-submit. |
I forgot about this PR ( naturally -- that's what I do ha ). I want to make sure I don't misunderstand you. I'm interpreting this as submitting a new PR with the commits against main. I think this PR is valuable but definitely wanted to make sure Keith didn't have any issues with it. Happy to resolve conflicts and submit a new one or to push back into this PR and re-open. Thanks. |
Either way works. I have no preference for a new PR or re-opening and rebasing in-place. Whatever is more convenient. |
Currently validating some changes on this PR that I made as I submitted it. I am addressing some issues I found while running this on a cluster.