Skip to content

Commit

Permalink
BinaryData. File segment representation. (#29047)
Browse files Browse the repository at this point in the history
* add weird flux.

* that works.

* Handling BinaryData requests. Unbuffered writes.

* do this.

* this is better.

* right constant size.

* more tests.

* call timeout.

* expose call timeout.

* pr feedback.

* undo formatting change.

* un-flaky this test.

* ugh..

* .

* this works.

* don't use side effect operator for this. it's flaky.

* improve flux send.

* validate timeout.

* more pr feedback.

* changelog.

* testing.

* chlog.

* changelog.

* add synchronization.

* fix file transfer.

* pr feedback.

* comment.
  • Loading branch information
kasobol-msft committed May 25, 2022
1 parent 9b0b5d6 commit 8901e4c
Show file tree
Hide file tree
Showing 11 changed files with 834 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private static NettyOutbound sendFile(

try {
return new ChunkedNioFile(
fc, 0, fileContent.getLength(), fileContent.getChunkSize());
fc, fileContent.getPosition(), fileContent.getLength(), fileContent.getChunkSize());
} catch (IOException e) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
}
Expand All @@ -233,7 +233,7 @@ private static NettyOutbound sendFile(
// Beware of NettyOutbound.sendFile(Path) it involves extra file length lookup.
// This is going to use zero-copy transfer if there's no ssl
return reactorNettyOutbound.sendFile(
fileContent.getFile(), 0, fileContent.getLength());
fileContent.getFile(), fileContent.getPosition(), fileContent.getLength());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ public OkHttpFileRequestBody(FileContent content, long effectiveContentLength, M

@Override
public void writeTo(BufferedSink bufferedSink) throws IOException {
long count = effectiveContentLength;
if (count < 0) {
// OkHttp marks chunked encoding as -1.
// The content length is not specified so sending all remaining content.
count = Long.MAX_VALUE;
}
// RequestBody.create(File) does not support position and length.
// BufferedSink implements WritableByteChannel so we can leverage FileChannel as source.
// FileChannel supports positional reads.
try (FileChannel channel = FileChannel.open(content.getFile(), StandardOpenOption.READ)) {
channel.transferTo(0, count, bufferedSink);
// FileContent.getLength always returns non-null.
long pendingTransfer = content.getLength();
long position = content.getPosition();
do {
long transferred = channel.transferTo(position, pendingTransfer, bufferedSink);
if (transferred < 0) {
break;
}
position += transferred;
pendingTransfer -= transferred;
} while (pendingTransfer > 0);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.okhttp.implementation;


import com.azure.core.implementation.util.FileContent;
import okio.Buffer;
import okio.BufferedSink;
import okio.ByteString;
import okio.Source;
import okio.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Random;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;

public class OkHttpFileRequestBodyTest {

private static final Random RANDOM = new Random();

@ParameterizedTest
@ValueSource(ints = {1, 10, 127, 1024, 1024 + 113, 10 * 1024 * 1024, 10 * 1024 * 1024 + 113})
public void transferContentTransferAll(int size) throws Exception {
Path file = Files.createTempFile("OkHttpFileRequestBodyTest", null);
file.toFile().deleteOnExit();
byte[] bytes = new byte[size];
RANDOM.nextBytes(bytes);
Files.write(file, bytes);

OkHttpFileRequestBody fileRequestBody = new OkHttpFileRequestBody(
new FileContent(file, 1024, 0L, null), bytes.length, null);

TestSink sink = new TestSink(false);

fileRequestBody.writeTo(sink);

assertArrayEquals(sink.getData(), bytes);
}

@ParameterizedTest
@ValueSource(ints = {1, 10, 127, 1024, 1024 + 113, 10 * 1024 * 1024, 10 * 1024 * 1024 + 113})
public void transferContentWithIncompleteTransferTo(int size) throws Exception {
Path file = Files.createTempFile("OkHttpFileRequestBodyTest", null);
file.toFile().deleteOnExit();
byte[] bytes = new byte[size];
RANDOM.nextBytes(bytes);
Files.write(file, bytes);

OkHttpFileRequestBody fileRequestBody = new OkHttpFileRequestBody(
new FileContent(file, 1024, 0L, null), bytes.length, null);

TestSink sink = new TestSink(true);

fileRequestBody.writeTo(sink);

assertArrayEquals(sink.getData(), bytes);
}

@ParameterizedTest
@ValueSource(ints = {1, 10, 127, 1024, 1024 + 113, 10 * 1024 * 1024, 10 * 1024 * 1024 + 113})
public void transferContentWithIncompleteTransferToWithOversizeContent(int size) throws Exception {
Path file = Files.createTempFile("OkHttpFileRequestBodyTest", null);
file.toFile().deleteOnExit();
byte[] bytes = new byte[size];
RANDOM.nextBytes(bytes);
Files.write(file, bytes);

OkHttpFileRequestBody fileRequestBody = new OkHttpFileRequestBody(
new FileContent(file, 1024, 0L, size + 112L), bytes.length, null);

TestSink sink = new TestSink(true);

fileRequestBody.writeTo(sink);

assertArrayEquals(sink.getData(), bytes);
}

private static final class TestSink implements BufferedSink {

private final boolean simulateIncompleteRead;

ByteArrayOutputStream bos = new ByteArrayOutputStream();

/**
* @param simulateIncompleteRead a flag that enables partial buffer consumption.
* This behavior makes FileChannel abandon transferTo prematurely before reaching
* out to end of file.
*/
private TestSink(boolean simulateIncompleteRead) {
this.simulateIncompleteRead = simulateIncompleteRead;
}

public byte[] getData() {
return bos.toByteArray();
}

@Override
public Buffer getBuffer() {
return null;
}

@SuppressWarnings("deprecation")
@Override
public Buffer buffer() {
return null;
}

@Override
public BufferedSink emit() throws IOException {
return null;
}

@Override
public BufferedSink emitCompleteSegments() throws IOException {
return null;
}

@Override
public void flush() throws IOException {

}

@Override
public OutputStream outputStream() {
return null;
}

@Override
public BufferedSink write(byte[] bytes) throws IOException {
return null;
}

@Override
public BufferedSink write(byte[] bytes, int i, int i1) throws IOException {
return null;
}

@Override
public BufferedSink write(ByteString byteString) throws IOException {
return null;
}

@Override
public BufferedSink write(ByteString byteString, int i, int i1) throws IOException {
return null;
}

@Override
public BufferedSink write(Source source, long l) throws IOException {
return null;
}

@Override
public long writeAll(Source source) throws IOException {
return 0;
}

@Override
public BufferedSink writeByte(int i) throws IOException {
return null;
}

@Override
public BufferedSink writeDecimalLong(long l) throws IOException {
return null;
}

@Override
public BufferedSink writeHexadecimalUnsignedLong(long l) throws IOException {
return null;
}

@Override
public BufferedSink writeInt(int i) throws IOException {
return null;
}

@Override
public BufferedSink writeIntLe(int i) throws IOException {
return null;
}

@Override
public BufferedSink writeLong(long l) throws IOException {
return null;
}

@Override
public BufferedSink writeLongLe(long l) throws IOException {
return null;
}

@Override
public BufferedSink writeShort(int i) throws IOException {
return null;
}

@Override
public BufferedSink writeShortLe(int i) throws IOException {
return null;
}

@Override
public BufferedSink writeString(String s, Charset charset) throws IOException {
return null;
}

@Override
public BufferedSink writeString(String s, int i, int i1, Charset charset) throws IOException {
return null;
}

@Override
public BufferedSink writeUtf8(String s) throws IOException {
return null;
}

@Override
public BufferedSink writeUtf8(String s, int i, int i1) throws IOException {
return null;
}

@Override
public BufferedSink writeUtf8CodePoint(int i) throws IOException {
return null;
}

@Override
public int write(ByteBuffer src) throws IOException {
byte[] buf;

if (simulateIncompleteRead && src.remaining() > 1) {
buf = new byte[src.remaining() - 1];
} else {
buf = new byte[src.remaining()];
}
src.get(buf);
bos.write(buf);
return buf.length;
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() throws IOException {

}

@Override
public Timeout timeout() {
return null;
}

@Override
public void write(Buffer buffer, long l) throws IOException {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -279,6 +280,13 @@ private static Stream<Arguments> getBinaryDataBodyVariants() {
Files.write(wholeFile, bytes);
BinaryData fileData = BinaryData.fromFile(wholeFile);

Path sliceFile = Files.createTempFile("http-client-tests", null);
sliceFile.toFile().deleteOnExit();
Files.write(sliceFile, new byte[size], StandardOpenOption.APPEND);
Files.write(sliceFile, bytes, StandardOpenOption.APPEND);
Files.write(sliceFile, new byte[size], StandardOpenOption.APPEND);
BinaryData sliceFileData = BinaryData.fromFile(sliceFile, Long.valueOf(size), Long.valueOf(size));


return Stream.of(
Arguments.of(Named.named("byte[]", byteArrayData), Named.named("" + size, bytes)),
Expand All @@ -291,7 +299,8 @@ private static Stream<Arguments> getBinaryDataBodyVariants() {
Arguments.of(Named.named("async Flux", asyncFluxBinaryData), Named.named("" + size, bytes)),
Arguments.of(Named.named("async Flux with length", asyncFluxBinaryDataWithLength), Named.named("" + size, bytes)),
Arguments.of(Named.named("Object", objectBinaryData), Named.named("" + size, bytes)),
Arguments.of(Named.named("File", fileData), Named.named("" + size, bytes))
Arguments.of(Named.named("File", fileData), Named.named("" + size, bytes)),
Arguments.of(Named.named("File slice", sliceFileData), Named.named("" + size, bytes))
);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- Added `HttpRequest.getBodyAsBinaryData()`.
- Added `HttpRequest.setBody(BinaryData)`.
- Added `BinaryData.fromFlux(Flux<ByteBuffer>, Long, boolean)` that allows both buffered and non-buffered handling of `Flux<ByteBuffer>`.
- Added `BinaryData.fromFile(Path file, Long position, Long length)` and `BinaryData.fromFile(Path file, Long position, Long length, int chunkSize)`
that represents slice of the file.

### Breaking Changes

Expand Down
Loading

0 comments on commit 8901e4c

Please sign in to comment.