Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import io.netty.buffer.ByteBuf;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -121,6 +122,9 @@ private boolean refillBufferIfNeeded() throws IOException {
if (null != this.offloaderStats) {
this.offloaderStats.recordReadOffloadError(this.topicName);
}
if (!blobStore.blobExists(bucket, key)) {
throw new FileNotFoundException("The file in the blobstore does not exist!");
}
throw new IOException("Error reading from BlobStore", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -195,7 +196,12 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
} catch (Throwable t) {
log.error("Failed to read entries {} - {} from the offloader in ledger {}",
firstEntry, lastEntry, ledgerId, t);
promise.completeExceptionally(t);
if (t instanceof FileNotFoundException) {
promise.completeExceptionally(new ManagedLedgerException
.NonRecoverableLedgerException("The blobstore file does not exist for ledger:" + ledgerId));
} else {
promise.completeExceptionally(t);
}
entries.forEach(LedgerEntry::close);
}
});
Expand Down Expand Up @@ -278,8 +284,11 @@ public static ReadHandle open(ScheduledExecutorService executor,
try (InputStream payLoadStream = blob.getPayload().openStream()) {
index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream);
} catch (IOException e) {
if (!blobStore.blobExists(bucket, indexKey)) {
throw new FileNotFoundException("The index file in the blob store does not exist!");
}
// retry to avoid the network issue caused read failure
log.warn("Failed to get index block from the offoaded index file {}, still have {} times to retry",
log.warn("Failed to get index block from the offloaded index file {}, still have {} times to retry",
indexKey, retryCount, e);
lastException = e;
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -216,7 +217,13 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
}
}
} catch (Throwable t) {
promise.completeExceptionally(t);
if (t instanceof FileNotFoundException) {
promise.completeExceptionally(new ManagedLedgerException
.NonRecoverableLedgerException(
"The blobstore file does not exist for ledger:" + ledgerId));
} else {
promise.completeExceptionally(t);
}
entries.forEach(LedgerEntry::close);
}

Expand Down Expand Up @@ -310,9 +317,22 @@ public static ReadHandle open(ScheduledExecutorService executor,
log.debug("indexKey blob: {} {}", indexKey, blob);
versionCheck.check(indexKey, blob);
OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
OffloadIndexBlockV2 index;
try (InputStream payloadStream = blob.getPayload().openStream()) {
index = indexBuilder.fromStream(payloadStream);
OffloadIndexBlockV2 index = null;
int retryCount = 3;
while (retryCount > 0) {
try (InputStream payloadStream = blob.getPayload().openStream()) {
index = indexBuilder.fromStream(payloadStream);
} catch (IOException e) {
if (!blobStore.blobExists(bucket, indexKey)) {
throw new FileNotFoundException("The index file in the blob store does not exist!");
}
// retry to avoid the network issue caused read failure
log.warn("Failed to get index block from the offloaded index file {}, still have {} times to retry",
indexKey, retryCount, e);
if (--retryCount == 0) {
throw e;
}
}
}

BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,36 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
Expand Down Expand Up @@ -445,4 +457,29 @@ public void testInvalidEntryIds() throws Exception {
} catch (Exception e) {
}
}

@Test
public void testThrowNonRecoverableLedgerExceptionAvoidEOFAndNPE() throws Exception {
BackedInputStream mockInputStream = spy(BackedInputStream.class);
OffloadIndexBlockV2 mockIndex = spy(OffloadIndexBlockV2.class);
LedgerMetadata mockMetadata = spy(LedgerMetadata.class);
when(mockMetadata.getLastEntryId()).thenReturn(100L);
when(mockIndex.getStartEntryId(anyLong())).thenReturn(0L);
when(mockIndex.getLedgerMetadata(anyLong())).thenReturn(mockMetadata);
Constructor<BlobStoreBackedReadHandleImplV2>[] constructors =
(Constructor<BlobStoreBackedReadHandleImplV2>[]) BlobStoreBackedReadHandleImplV2.class
.getDeclaredConstructors();
Constructor<BlobStoreBackedReadHandleImplV2> constructor = constructors[0];
constructor.setAccessible(true);
BlobStoreBackedReadHandleImplV2 blobStoreBackedReadHandleImplV2 = constructor.newInstance(1,
List.of(mockIndex, mockIndex, mockIndex),
List.of(mockInputStream, mockInputStream, mockInputStream),
Executors.newSingleThreadExecutor());
try {
blobStoreBackedReadHandleImplV2.read(1, 3);
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import static org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexTest.createLedgerMetadata;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -37,16 +41,21 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -447,6 +456,33 @@ public void testOffloadEmpty() throws Exception {
}
}

@Test
public void testThrowNonRecoverableLedgerExceptionAvoidEOFAndNPE() throws Exception {
BackedInputStream mockInputStream = spy(BackedInputStream.class);
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
LedgerMetadata metadata = createLedgerMetadata(1);
indexBuilder.withLedgerMetadata(metadata).withDataObjectLength(1)
.withDataBlockHeaderLength(23455);

indexBuilder.addBlock(0, 2, 64 * 1024 * 1024);
indexBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
indexBuilder.addBlock(2000, 4, 64 * 1024 * 1024);
Constructor<BlobStoreBackedReadHandleImpl> constructMethod = BlobStoreBackedReadHandleImpl.class
.getDeclaredConstructor(long.class, OffloadIndexBlock.class,
BackedInputStream.class, ExecutorService.class);
constructMethod.setAccessible(true);
BlobStoreBackedReadHandleImpl readHandle = constructMethod.newInstance(1,
indexBuilder.build(),
mockInputStream, Executors.newSingleThreadExecutor());

try {
readHandle.read(1, 3);
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException);
}
}

@Test
public void testReadUnknownDataVersion() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
Expand Down