Skip to content

Commit

Permalink
[FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
Browse files Browse the repository at this point in the history
This _probably_ happens when a non-empty http content is received that does not contain any attribute data.
  • Loading branch information
zentol committed Aug 18, 2023
1 parent 329ee55 commit 9546f82
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
currentHttpPostRequestDecoder.offer(httpContent);

while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT
&& currentHttpPostRequestDecoder.hasNext()) {
&& hasNext(currentHttpPostRequestDecoder)) {
final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
final DiskFileUpload fileUpload = (DiskFileUpload) data;
Expand Down Expand Up @@ -212,6 +212,16 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
}
}

private static boolean hasNext(HttpPostRequestDecoder decoder) {
try {
return decoder.hasNext();
} catch (HttpPostRequestDecoder.EndOfDataDecoderException e) {
// this can occur if the final chuck wasn't empty, but didn't contain any attribute data
// unfortunately the Netty APIs don't give us any way to check this
return false;
}
}

private void handleError(
ChannelHandlerContext ctx,
String errorMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.function.BiConsumerWithException;

Expand All @@ -42,10 +43,12 @@

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -123,6 +126,15 @@ private Request buildJsonRequest(
return finalizeRequest(builder, headerUrl);
}

private Request buildMixedRequest(
String headerUrl, MultipartUploadExtension.TestRequestBody json, File file)
throws IOException {
MultipartBody.Builder builder = new MultipartBody.Builder();
builder = addJsonPart(builder, json, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
builder = addFilePart(builder, file, file.getName());
return finalizeRequest(builder, headerUrl);
}

private Request buildMixedRequest(
String headerUrl, MultipartUploadExtension.TestRequestBody json) throws IOException {
MultipartBody.Builder builder = new MultipartBody.Builder();
Expand Down Expand Up @@ -227,6 +239,50 @@ void testMixedMultipart() throws Exception {
verifyNoFileIsRegisteredToDeleteOnExitHook();
}

/**
* This test checks for a specific multipart request chunk layout using a magic number.
*
* <p>These things are very susceptible to interference from other requests or parts of the
* payload; for example if the JSON payload increases by a single byte it can already break the
* number. Do not reuse the client.
*
* <p>To find the magic number you can define a static counter, and loop the test in the IDE
* (without forking!) while incrementing the counter on each run.
*/
@Test
void testMixedMultipartEndOfDataDecoderExceptionHandling(@TempDir Path tmp) throws Exception {
OkHttpClient client = createOkHttpClientWithNoTimeouts();

MultipartUploadExtension.MultipartMixedHandler mixedHandler =
multipartUpdateExtensionWrapper.getCustomExtension().getMixedHandler();

MultipartUploadExtension.TestRequestBody json =
new MultipartUploadExtension.TestRequestBody();

File file = TempDirUtils.newFile(tmp);
try (RandomAccessFile rw = new RandomAccessFile(file, "rw")) {
// magic value that reliably reproduced EndOfDataDecoderException in hasNext()
rw.setLength(1424);
}
multipartUpdateExtensionWrapper
.getCustomExtension()
.setFileUploadVerifier(
(handlerRequest, restfulGateway) ->
MultipartUploadExtension.assertUploadedFilesEqual(
handlerRequest, Collections.singleton(file)));

Request singleFileMixedRequest =
buildMixedRequest(
mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), json, file);
try (Response response = client.newCall(singleFileMixedRequest).execute()) {
assertThat(response.code())
.isEqualTo(mixedHandler.getMessageHeaders().getResponseStatusCode().code());
assertThat(mixedHandler.lastReceivedRequest).isEqualTo(json);
}

verifyNoFileIsRegisteredToDeleteOnExitHook();
}

@Test
void testJsonMultipart() throws Exception {
OkHttpClient client = createOkHttpClientWithNoTimeouts();
Expand Down

0 comments on commit 9546f82

Please sign in to comment.