Skip to content

Commit

Permalink
Ensure non-trailing parts are of equal size (#50)
Browse files Browse the repository at this point in the history
#### What type of PR is this?

/kind improvement

#### What this PR does / why we need it:

Reshape the DataBuffers into parts of the same size (5MB) except for the last part.

#### Which issue(s) this PR fixes:

Fixes #49

#### Does this PR introduce a user-facing change?

```release-note
保证分片上传时片段大小一致
```
  • Loading branch information
JohnNiang committed Jul 5, 2023
1 parent 8be39b9 commit 00537c1
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 3 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {

testImplementation 'run.halo.app:api'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}

test {
Expand Down
53 changes: 50 additions & 3 deletions src/main/java/run/halo/s3os/S3OsAttachmentHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.pf4j.Extension;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.MediaTypeFactory;
import org.springframework.lang.Nullable;
import org.springframework.web.server.ServerErrorException;
import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.util.UriUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import run.halo.app.core.extension.attachment.Attachment;
import run.halo.app.core.extension.attachment.Attachment.AttachmentSpec;
Expand Down Expand Up @@ -232,23 +237,65 @@ private S3Presigner buildS3Presigner(S3OsProperties properties) {
.build();
}

Flux<DataBuffer> reshape(Publisher<DataBuffer> content, int bufferSize) {
var dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
return Flux.<ByteBuffer>create(sink -> {
var byteBuffer = ByteBuffer.allocate(bufferSize);
Flux.from(content)
.doOnNext(dataBuffer -> {
var count = dataBuffer.readableByteCount();
for (var i = 0; i < count; i++) {
byteBuffer.put(dataBuffer.read());
// Emit the buffer when buffer
if (!byteBuffer.hasRemaining()) {
sink.next(deepCopy(byteBuffer));
byteBuffer.clear();
}
}
})
.doOnComplete(() -> {
// Emit the last part of buffer.
if (byteBuffer.position() > 0) {
sink.next(deepCopy(byteBuffer));
}
})
.subscribe(DataBufferUtils::release, sink::error, sink::complete,
Context.of(sink.contextView()));
})
.map(dataBufferFactory::wrap)
.cast(DataBuffer.class)
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}

ByteBuffer deepCopy(ByteBuffer src) {
src.flip();
var dest = ByteBuffer.allocate(src.limit());
dest.put(src);
src.rewind();
dest.flip();
return dest;
}

Mono<ObjectDetail> upload(UploadContext uploadContext, S3OsProperties properties) {
return Mono.using(() -> buildS3Client(properties),
client -> {
var uploadState = new UploadState(properties, uploadContext.file().filename());

var content = uploadContext.file().content();

return checkFileExistsAndRename(uploadState, client)
// init multipart upload
.flatMap(state -> Mono.fromCallable(() -> client.createMultipartUpload(
CreateMultipartUploadRequest.builder()
.bucket(properties.getBucket())
.contentType(state.contentType)
.key(state.objectKey)
.build())).subscribeOn(Schedulers.boundedElastic()))
.flatMapMany((response) -> {
.build())))
.doOnNext((response) -> {
checkResult(response, "createMultipartUpload");
uploadState.uploadId = response.uploadId();
return uploadContext.file().content();
})
.thenMany(reshape(content, MULTIPART_MIN_PART_SIZE))
// buffer to part
.windowUntil((buffer) -> {
uploadState.buffered += buffer.readableByteCount();
Expand Down
60 changes: 60 additions & 0 deletions src/test/java/run/halo/s3os/S3OsAttachmentHandlerTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package run.halo.s3os;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.attachment.Policy;

class S3OsAttachmentHandlerTest {
Expand All @@ -33,4 +40,57 @@ void acceptHandlingWhenPolicyTemplateIsExpected() {
// policy is null
assertFalse(handler.shouldHandle(null));
}

@Test
void reshapeDataBufferWithSmallerBufferSize() {
var handler = new S3OsAttachmentHandler();
var factory = DefaultDataBufferFactory.sharedInstance;
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));

StepVerifier.create(handler.reshape(content, 2))
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("ha", str);
})
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("lo", str);
})
.verifyComplete();
}

@Test
void reshapeDataBufferWithBiggerBufferSize() {
var handler = new S3OsAttachmentHandler();
var factory = DefaultDataBufferFactory.sharedInstance;
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));

StepVerifier.create(handler.reshape(content, 10))
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("halo", str);
})
.verifyComplete();
}

@Test
void reshapeDataBuffersWithBiggerBufferSize() {
var handler = new S3OsAttachmentHandler();
var factory = DefaultDataBufferFactory.sharedInstance;
var content = Flux.<DataBuffer>fromIterable(List.of(
factory.wrap("ha".getBytes()),
factory.wrap("lo".getBytes())
));

StepVerifier.create(handler.reshape(content, 3))
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("hal", str);
})
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("o", str);
})
.verifyComplete();
}
}

0 comments on commit 00537c1

Please sign in to comment.