Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the read performance issue in the offload readAsync #12443

Merged
merged 2 commits into from
Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,9 @@ public void seekForward(long position) throws IOException {
public void close() {
buffer.release();
}

@Override
public int available() throws IOException {
return (int)(objectLen - cursor) + buffer.readableBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,13 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
long entriesToRead = (lastEntry - firstEntry) + 1;
long nextExpectedId = firstEntry;

// seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
// the first read, that would cause read an unexpected entry id which is out of range between firstEntry
// and lastEntry
// for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
// will read the entry id from the stream and that is not the correct entry id, so it will seek to the
// correct position then read the stream as normal. But the entry id may exceed the last entry id, that
// will cause we are hardly to know the edge of the request range.
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
// checking the data stream has enough data to read to avoid throw EOF exception when reading data.
// 12 bytes represent the stream have the length and entryID to read.
if (dataStream.available() < 12) {
log.warn("There hasn't enough data to read, current available data has {} bytes,"
+ " seek to the first entry {} to avoid EOF exception", inputStream.available(), firstEntry);
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
}

while (entriesToRead > 0) {
if (state == State.Closed) {
Expand Down Expand Up @@ -161,6 +160,8 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
log.warn("Skip {} size to continue to read, first {}, last {}, current {}, next {}",
length, firstEntry, lastEntry, entryId, nextExpectedId);
long ignore = inputStream.skip(length);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,27 @@ public void testSeekForward() throws Exception {
toTest.seekForward(after);
assertStreamsMatch(toTest, toCompare);
}

@Test
public void testAvailable() throws IOException {
String objectKey = "testAvailable";
int objectSize = 2048;
RandomInputStream toWrite = new RandomInputStream(0, objectSize);
Payload payload = Payloads.newInputStreamPayload(toWrite);
payload.getContentMetadata().setContentLength((long)objectSize);
Blob blob = blobStore.blobBuilder(objectKey)
.payload(payload)
.contentLength(objectSize)
.build();
String ret = blobStore.putBlob(BUCKET, blob);
BackedInputStream bis = new BlobStoreBackedInputStreamImpl(
blobStore, BUCKET, objectKey, (k, md) -> {}, objectSize, 512);
Assert.assertEquals(bis.available(), objectSize);
bis.seek(500);
Assert.assertEquals(bis.available(), objectSize - 500);
bis.seek(1024);
Assert.assertEquals(bis.available(), 1024);
bis.seek(2048);
Assert.assertEquals(bis.available(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -477,4 +478,22 @@ public void testReadUnknownIndexVersion() throws Exception {
Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
}
}

@Test
public void testReadEOFException() throws Throwable {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
LedgerOffloader offloader = getOffloader();
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();

ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
toTest.readAsync(0, toTest.getLastAddConfirmed()).get();

try {
toTest.readAsync(0, 0).get();
} catch (Exception e) {
fail("Get unexpected exception when reading entries", e);
}
}
}