New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1211] Make Coders more efficient by removing unnecessary nesting #1680
Conversation
R: @lukecwik This is a first prototype to see if we should generalize the discussion we had on BEAM-469 about |
I'm hoping all the ITs will pass. Because nothing should depend on the value coder being force-nested, I think |
Refer to this link for build results (access rights to CI server needed): Build result: FAILURE[...truncated 16953 lines...] ^/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java:27: error: reference not found * Returns the current time as an {@link Instant}. ^Command line was: /usr/local/asfpackages/java/jdk1.8.0_102/jre/../bin/javadoc @options @packagesRefer to the generated Javadoc files in '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_MavenInstall@2/runners/direct-java/target/apidocs' dir. at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeJavadocCommandLine(AbstractJavadocMojo.java:5163) at org.apache.maven.plugin.javadoc.AbstractJavadocMojo.executeReport(AbstractJavadocMojo.java:2075) at org.apache.maven.plugin.javadoc.JavadocJar.execute(JavadocJar.java:188) ... 33 more2016-12-22T00:18:15.351 [ERROR] 2016-12-22T00:18:15.351 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2016-12-22T00:18:15.351 [ERROR] 2016-12-22T00:18:15.351 [ERROR] For more information about the errors and possible solutions, please read the following articles:2016-12-22T00:18:15.351 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2016-12-22T00:18:15.351 [ERROR] 2016-12-22T00:18:15.351 [ERROR] After correcting the problems, you can resume the build with the command2016-12-22T00:18:15.351 [ERROR] mvn -rf :beam-runners-direct-javachannel stoppedSetting status of 621e825 to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6200/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install--none-- |
retest this please (failure caused by snappy, which is transient thing we've seen for unknown reasons) |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 2beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-sdks-java-core: 2
--none-- |
There are some wire format tests that ensure the encoding matches a predetermined value that is hardcoded. |
(the "stash work" commit is meant to be ignored). KVCoder's tests didn't break because they used a valuecoder that isn't context-sensitive. |
See [BEAM-469] for more information about why this is correct.
1dea97e
to
b5b18ac
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review of my own PR :)
VarIntCoder.of().decode(inStream, context), | ||
VarLongCoder.of().decode(inStream, context), | ||
VarIntCoder.of().decode(inStream, context.nested()), | ||
VarLongCoder.of().decode(inStream, context.nested()), | ||
VarLongCoder.of().decode(inStream, context)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code was broken, but I think none of the coders are context-sensitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested()); | ||
} else { | ||
valueCoder.registerByteSizeObserver(entry.getValue(), observer, context); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relatively complex changes. But as single-element maps are probably likely to be used in global combining, suspect it should be worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the pattern I showed above about how to use an iterator to get special treatment for last value without needing to check whether its the last value more than once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -109,7 +109,7 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, | |||
T value = valueCoder.decode(inStream, nestedContext); | |||
Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); | |||
BoundedWindow window = windowCoder.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note I only updated decode
-- encode
already had the changes. Suspect this was not a bug in practice b/c the PaneInfoCoder
is not context-sensitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anytime we have a case where we know that all but one of the coders are not context sensitive, we should always place the context sensitive coder to be last.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done here by putting valueCoder
last.
coders.get(i).encode(value[i], outStream, nestedContext); | ||
} | ||
coders.get(codersCount - 1).encode(value[codersCount - 1], outStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broken in the case of length == 0. verify that this is impossible or fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rewrote code to avoid the issue.
tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, Context.NESTED); | ||
} | ||
tagListCoder(lastTag).encode(value.valueMap.get(lastTag), outStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broken if size == 0. verify this is impossible or fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rewrote code to avoid the issue.
@@ -671,7 +671,7 @@ public void encode(WindowedValue<T> windowedElem, | |||
Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); | |||
Collection<? extends BoundedWindow> windows = | |||
windowsCoder.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing the same "bug" as in the the other use of PaneInfoCoder
. Likely not context-sensitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this coder to put value last, and fixed the registerByteSizeObserver
that I missed the last time around.
+ "AAAAAA", | ||
"AAn_______-wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" | ||
+ "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" | ||
+ "AAAAAA"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verifying that we save a small number of bytes.
@@ -85,7 +85,7 @@ public void testEncodingId() throws Exception { | |||
*/ | |||
private static final List<String> TEST_ENCODINGS = Arrays.asList( | |||
"AAAAAA", | |||
"AAAAAv____8PA2ZvbwEFaGVsbG8"); | |||
"AAAAAv____8PA2ZvbwFoZWxsbw"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verifying that we can save a small number of bytes.
@@ -905,14 +905,14 @@ public CountSum createAccumulator() { | |||
@Override | |||
public void encode(CountSum value, OutputStream outStream, | |||
Context context) throws CoderException, IOException { | |||
LONG_CODER.encode(value.count, outStream, context); | |||
LONG_CODER.encode(value.count, outStream, context.nested()); | |||
DOUBLE_CODER.encode(value.sum, outStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug the other way, but in practice LONG_CODER is not context-sensitive.
Refer to this link for build results (access rights to CI server needed): |
Filed a proper JIRA issue, ready for review. I may make a test-improvement pass next week. |
Refer to this link for build results (access rights to CI server needed): Failed Tests: 2beam_PreCommit_Java_MavenInstall/org.apache.beam:beam-examples-java: 2
--none-- |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swapping to use the iterator pattern, other than that looks good to me.
@@ -115,7 +123,12 @@ public void encode( | |||
Map<K, V> retval = Maps.newHashMapWithExpectedSize(size); | |||
for (int i = 0; i < size; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update to remove the if from the loop:
if (size == 0) {
return empty map
}
for (i = 0; i < size - 1; ++i) {
decode nested key and value
}
decode nested key
decode value using context passed in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} else { | ||
valueCoder.encode(entry.getValue(), outStream, context); | ||
} | ||
++i; | ||
} | ||
dataOutStream.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing is not needed or expected since DataOutputStream is not a buffering output stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dataOutStream.writeInt(map.size()); | ||
int size = map.size(); | ||
dataOutStream.writeInt(size); | ||
int i = 0; | ||
for (Entry<K, V> entry : map.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be cleaner using an iterator:
write out size
if size == 0; return
Iterator<Map.Entry> iterator = map.entrySet().iterator();
Map.Entry<K, V> entry = iterator.next();
while (iterator.hasNext()) {
encode key and value using nested context
entry = iterator.next();
}
encode key using nested context
encode value using passed in context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested()); | ||
} else { | ||
valueCoder.registerByteSizeObserver(entry.getValue(), observer, context); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the pattern I showed above about how to use an iterator to get special treatment for last value without needing to check whether its the last value more than once.
@@ -109,7 +109,7 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, | |||
T value = valueCoder.decode(inStream, nestedContext); | |||
Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); | |||
BoundedWindow window = windowCoder.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anytime we have a case where we know that all but one of the coders are not context sensitive, we should always place the context sensitive coder to be last.
@@ -996,7 +996,11 @@ public void encode(Object[] value, OutputStream outStream, Context context) | |||
checkArgument(value.length == codersCount); | |||
Context nestedContext = context.nested(); | |||
for (int i = 0; i < codersCount; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto on using iterator to differentiate the loop from the last element case as done in MapCoder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1006,7 +1010,11 @@ public void encode(Object[] value, OutputStream outStream, Context context) | |||
Object[] ret = new Object[codersCount]; | |||
Context nestedContext = context.nested(); | |||
for (int i = 0; i < codersCount; ++i) { | |||
ret[i] = coders.get(i).decode(inStream, nestedContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto on using iterator to differentiate loop from last element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -269,7 +269,11 @@ public void encode( | |||
throw new CoderException("input schema does not match coder schema"); | |||
} | |||
for (int unionTag = 0; unionTag < schema.size(); unionTag++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use iterator to differentiate loop from last element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -280,7 +284,11 @@ public CoGbkResult decode( | |||
throws CoderException, IOException { | |||
List<Iterable<?>> valueMap = new ArrayList<>(); | |||
for (int unionTag = 0; unionTag < schema.size(); unionTag++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use iterator to differentiate loop from last element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -175,7 +175,7 @@ public void encode(TestType value, OutputStream outStream, Context context) | |||
Context subContext = context.nested(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename subContext to nestedContext here and below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…nsitive coders last
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL
@@ -109,7 +109,7 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, | |||
T value = valueCoder.decode(inStream, nestedContext); | |||
Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); | |||
BoundedWindow window = windowCoder.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done here by putting valueCoder
last.
valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested()); | ||
} else { | ||
valueCoder.registerByteSizeObserver(entry.getValue(), observer, context); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -996,7 +996,11 @@ public void encode(Object[] value, OutputStream outStream, Context context) | |||
checkArgument(value.length == codersCount); | |||
Context nestedContext = context.nested(); | |||
for (int i = 0; i < codersCount; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1006,7 +1010,11 @@ public void encode(Object[] value, OutputStream outStream, Context context) | |||
Object[] ret = new Object[codersCount]; | |||
Context nestedContext = context.nested(); | |||
for (int i = 0; i < codersCount; ++i) { | |||
ret[i] = coders.get(i).decode(inStream, nestedContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -269,7 +269,11 @@ public void encode( | |||
throw new CoderException("input schema does not match coder schema"); | |||
} | |||
for (int unionTag = 0; unionTag < schema.size(); unionTag++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -280,7 +284,11 @@ public CoGbkResult decode( | |||
throws CoderException, IOException { | |||
List<Iterable<?>> valueMap = new ArrayList<>(); | |||
for (int unionTag = 0; unionTag < schema.size(); unionTag++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -671,7 +671,7 @@ public void encode(WindowedValue<T> windowedElem, | |||
Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); | |||
Collection<? extends BoundedWindow> windows = | |||
windowsCoder.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); | |||
PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this coder to put value last, and fixed the registerByteSizeObserver
that I missed the last time around.
@@ -175,7 +175,7 @@ public void encode(TestType value, OutputStream outStream, Context context) | |||
Context subContext = context.nested(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Which coders are cross-Fn-API and need to stay fixed? Should have a list. E.g., is WindowedValueCoder okay to change? |
Refer to this link for build results (access rights to CI server needed): |
No point in worrying about binary representations right now. Someone will have to make a pass through on a coder by coder basis per SDK to make them align. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nits
LGTM
return; | ||
} | ||
|
||
// Since we handled size == 0 above, entry is guaranteed to exist after before and after loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exist after before
-> exist before
@@ -162,15 +172,17 @@ public void registerByteSizeObserver( | |||
Map<K, V> map, ElementByteSizeObserver observer, Context context) | |||
throws Exception { | |||
observer.update(4L); | |||
if (map.isEmpty()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spacing around ){
if (schema.size() == 0) { | ||
return; | ||
} | ||
int nested = schema.size() - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename nested -> lastIndex
} else { | ||
coders.get(i).encode(value[i], outStream, context); | ||
} | ||
for (int i = 0; i < codersCount - 1; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how you used the place holder variable to store the lastIndex
in CoGbkResult.java, would recommend using it here and in decode
if (schema.size() == 0) { | ||
return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of()); | ||
} | ||
int nested = schema.size() - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename nested -> lastIndex
Thanks Luke! Fixed nits and merging. |
Refer to this link for build results (access rights to CI server needed): |
See BEAM-469 for more information about why this is
correct.