Skip to content

Commit

Permalink
ReadHandle implementation backed by S3 (#1790)
Browse files Browse the repository at this point in the history
Implementation of bookkeeper ReadHandle, which reads from an S3
object.

Master Issue: #1511
  • Loading branch information
ivankelly authored and sijie committed May 16, 2018
1 parent b47c059 commit f0b1471
Show file tree
Hide file tree
Showing 7 changed files with 421 additions and 127 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ s3ManagedLedgerOffloadServiceEndpoint=
# For Amazon S3 ledger offload, Max block size in bytes. # For Amazon S3 ledger offload, Max block size in bytes.
s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864


# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576

### --- Deprecated config variables --- ### ### --- Deprecated config variables --- ###


# Deprecated. Use configurationStoreServers # Deprecated. Use configurationStoreServers
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
// For Amazon S3 ledger offload, Max block size in bytes. // For Amazon S3 ledger offload, Max block size in bytes.
private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;


// For Amazon S3 ledger offload, Read buffer size in bytes.
@FieldContext(minValue = 1024)
private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

public String getZookeeperServers() { public String getZookeeperServers() {
return zookeeperServers; return zookeeperServers;
} }
Expand Down Expand Up @@ -1694,4 +1698,12 @@ public int getS3ManagedLedgerOffloadMaxBlockSizeInBytes() {
return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes; return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes;
} }


public void setS3ManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) {
this.s3ManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
}

public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
}

} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ public interface OffloadIndexEntry {
int getPartId(); int getPartId();


/** /**
* Get the offset of this message entry in code storage. * Get the offset of this block within the object.
*/ */
long getOffset(); long getOffset();

/**
* Get the offset of the block's data within the object.
*/
long getDataOffset();
} }


Original file line number Original file line Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -57,6 +58,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
private final String bucket; private final String bucket;
// max block size for each data block. // max block size for each data block.
private int maxBlockSize; private int maxBlockSize;
private final int readBufferSize;


public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
ScheduledExecutorService scheduler) ScheduledExecutorService scheduler)
Expand All @@ -65,6 +67,7 @@ public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
String bucket = conf.getS3ManagedLedgerOffloadBucket(); String bucket = conf.getS3ManagedLedgerOffloadBucket();
String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint(); String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes(); int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
int readBufferSize = conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();


if (Strings.isNullOrEmpty(region)) { if (Strings.isNullOrEmpty(region)) {
throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled"); throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled");
Expand All @@ -80,22 +83,24 @@ public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
} else { } else {
builder.setRegion(region); builder.setRegion(region);
} }
return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize); return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize, readBufferSize);
} }


S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler, int maxBlockSize) { S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler,
int maxBlockSize, int readBufferSize) {
this.s3client = s3client; this.s3client = s3client;
this.bucket = bucket; this.bucket = bucket;
this.scheduler = scheduler; this.scheduler = scheduler;
this.maxBlockSize = maxBlockSize; this.maxBlockSize = maxBlockSize;
this.readBufferSize = readBufferSize;
} }


static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) { static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
return String.format("ledger-%d-%s", readHandle.getId(), uuid.toString()); return String.format("ledger-%d-%s", ledgerId, uuid.toString());
} }


static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) { static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
return String.format("ledger-%d-%s-index", readHandle.getId(), uuid.toString()); return String.format("ledger-%d-%s-index", ledgerId, uuid.toString());
} }


// upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block, // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block,
Expand All @@ -107,8 +112,8 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
scheduler.submit(() -> { scheduler.submit(() -> {
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
.withMetadata(readHandle.getLedgerMetadata()); .withMetadata(readHandle.getLedgerMetadata());
String dataBlockKey = dataBlockOffloadKey(readHandle, uuid); String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
String indexBlockKey = indexBlockOffloadKey(readHandle, uuid); String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey); InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey);
InitiateMultipartUploadResult dataBlockRes = null; InitiateMultipartUploadResult dataBlockRes = null;


Expand Down Expand Up @@ -174,12 +179,12 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
metadata.setContentLength(indexStream.available()); metadata.setContentLength(indexStream.available());
s3client.putObject(new PutObjectRequest( s3client.putObject(new PutObjectRequest(
bucket, bucket,
indexBlockOffloadKey(readHandle, uuid), indexBlockKey,
indexStream, indexStream,
metadata)); metadata));
promise.complete(null); promise.complete(null);
} catch (Throwable t) { } catch (Throwable t) {
s3client.deleteObject(bucket, dataBlockOffloadKey(readHandle, uuid)); s3client.deleteObject(bucket, dataBlockKey);
promise.completeExceptionally(t); promise.completeExceptionally(t);
return; return;
} }
Expand All @@ -190,7 +195,17 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
@Override @Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) { public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException()); String key = dataBlockOffloadKey(ledgerId, uid);
String indexKey = indexBlockOffloadKey(ledgerId, uid);
scheduler.submit(() -> {
try {
promise.complete(S3BackedReadHandleImpl.open(scheduler, s3client,
bucket, key, indexKey,
ledgerId, readBufferSize));
} catch (Throwable t) {
promise.completeExceptionally(t);
}
});
return promise; return promise;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public int getPartId() {
public long getOffset() { public long getOffset() {
return offset; return offset;
} }
@Override
public long getDataOffset() {
return offset + DataBlockHeaderImpl.getDataStartOffset();
}


public OffloadIndexEntryImpl(long entryId, int partId, long offset) { public OffloadIndexEntryImpl(long entryId, int partId, long offset) {
this.entryId = entryId; this.entryId = entryId;
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.s3offload.impl;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;

import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
import org.apache.pulsar.broker.s3offload.S3BackedInputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3BackedReadHandleImpl implements ReadHandle {
private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class);

private final long ledgerId;
private final OffloadIndexBlock index;
private final S3BackedInputStream inputStream;
private final DataInputStream dataStream;
private final ExecutorService executor;

private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
S3BackedInputStream inputStream,
ExecutorService executor) {
this.ledgerId = ledgerId;
this.index = index;
this.inputStream = inputStream;
this.dataStream = new DataInputStream(inputStream);
this.executor = executor;
}

@Override
public long getId() {
return ledgerId;
}

@Override
public LedgerMetadata getLedgerMetadata() {
return index.getLedgerMetadata();
}

@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
executor.submit(() -> {
try {
index.close();
inputStream.close();
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
}
});
return promise;
}

@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.submit(() -> {
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
promise.completeExceptionally(new BKException.BKIncorrectParameterException());
return;
}
long entriesToRead = (lastEntry - firstEntry) + 1;
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
try {
OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
inputStream.seek(entry.getDataOffset());

while (entriesToRead > 0) {
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
length = dataStream.readInt();
}
long entryId = dataStream.readLong();

if (entryId == nextExpectedId) {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
while (toWrite > 0) {
toWrite -= buf.writeBytes(dataStream, toWrite);
}
entriesToRead--;
nextExpectedId++;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
inputStream.skip(length);
}
}

promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
});
return promise;
}

@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
}

@Override
public CompletableFuture<Long> readLastAddConfirmedAsync() {
return CompletableFuture.completedFuture(getLastAddConfirmed());
}

@Override
public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
return CompletableFuture.completedFuture(getLastAddConfirmed());
}

@Override
public long getLastAddConfirmed() {
return getLedgerMetadata().getLastEntryId();
}

@Override
public long getLength() {
return getLedgerMetadata().getLength();
}

@Override
public boolean isClosed() {
return getLedgerMetadata().isClosed();
}

@Override
public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
long timeOutInMillis,
boolean parallel) {
CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
}

public static ReadHandle open(ScheduledExecutorService executor,
AmazonS3 s3client, String bucket, String key, String indexKey,
long ledgerId, int readBufferSize)
throws AmazonClientException, IOException {
GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
try (S3Object obj = s3client.getObject(req)) {
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());

ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index
S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
dataMetadata.getContentLength(),
readBufferSize);
return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
}
}
}
Loading

0 comments on commit f0b1471

Please sign in to comment.