Skip to content

Commit

Permalink
Used OrderedScheduler for ledger offload (#1808)
Browse files Browse the repository at this point in the history
We want to ensure that operations for the same ledger do not
overlap. This is particularly important for the ReadHandle as it has
state(the buffer) which could be corrupted by multiple concurrent
calls.

To avoid this overlap, we use an OrderedScheduler, so all operations
on a single ledger will run on the same single thread executor.

Master Issue: #1511
  • Loading branch information
ivankelly authored and sijie committed May 21, 2018
1 parent 79b0e28 commit fae2343
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
Expand Up @@ -43,6 +43,7 @@

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
Expand Down Expand Up @@ -136,7 +137,7 @@ public class PulsarService implements AutoCloseable {
.build();
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledExecutorService compactorExecutor;
private ScheduledExecutorService offloaderScheduler;
private OrderedScheduler offloaderScheduler;
private LedgerOffloader offloader;
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
Expand Down Expand Up @@ -723,11 +724,11 @@ public synchronized Compactor getCompactor() throws PulsarServerException {
return this.compactor;
}

protected synchronized ScheduledExecutorService getOffloaderScheduler(ServiceConfiguration conf) {
protected synchronized OrderedScheduler getOffloaderScheduler(ServiceConfiguration conf) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = Executors.newScheduledThreadPool(
conf.getManagedLedgerOffloadMaxThreads(),
new DefaultThreadFactory("offloader-"));
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.name("offloader").build();
}
return this.offloaderScheduler;
}
Expand Down
Expand Up @@ -38,8 +38,8 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -52,15 +52,15 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);

public static final String DRIVER_NAME = "S3";
private final ScheduledExecutorService scheduler;
private final OrderedScheduler scheduler;
private final AmazonS3 s3client;
private final String bucket;
// max block size for each data block.
private int maxBlockSize;
private final int readBufferSize;

public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
ScheduledExecutorService scheduler)
OrderedScheduler scheduler)
throws PulsarServerException {
String region = conf.getS3ManagedLedgerOffloadRegion();
String bucket = conf.getS3ManagedLedgerOffloadBucket();
Expand All @@ -85,7 +85,7 @@ public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize, readBufferSize);
}

S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler,
S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize) {
this.s3client = s3client;
this.bucket = bucket;
Expand All @@ -108,7 +108,7 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
UUID uuid,
Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.submit(() -> {
scheduler.chooseThread(readHandle.getId()).submit(() -> {
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
.withLedgerMetadata(readHandle.getLedgerMetadata());
String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
Expand Down Expand Up @@ -199,9 +199,10 @@ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = dataBlockOffloadKey(ledgerId, uid);
String indexKey = indexBlockOffloadKey(ledgerId, uid);
scheduler.submit(() -> {
scheduler.chooseThread(ledgerId).submit(() -> {
try {
promise.complete(S3BackedReadHandleImpl.open(scheduler, s3client,
promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
s3client,
bucket, key, indexKey,
ledgerId, readBufferSize));
} catch (Throwable t) {
Expand All @@ -214,16 +215,14 @@ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.submit(() -> {
scheduler.chooseThread(ledgerId).submit(() -> {
try {

s3client.deleteObjects(new DeleteObjectsRequest(bucket)
.withKeys(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid)));
promise.complete(null);
} catch (Throwable t) {
log.error("Failed delete s3 Object ", t);
promise.completeExceptionally(t);
return;
}
});

Expand Down
Expand Up @@ -32,7 +32,6 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
Expand All @@ -44,6 +43,7 @@
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -57,11 +57,11 @@
class S3ManagedLedgerOffloaderTest extends S3TestBase {
private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
final ScheduledExecutorService scheduler;
final OrderedScheduler scheduler;
final MockBookKeeper bk;

S3ManagedLedgerOffloaderTest() throws Exception {
scheduler = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("offloader-"));
scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
bk = new MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
}

Expand Down

0 comments on commit fae2343

Please sign in to comment.