Skip to content

Commit

Permalink
PIP-17: impl offload() for S3ManagedLedgerOffloader (#1746)
Browse files Browse the repository at this point in the history
* add write for S3ManagedLedgerOffloader

* merge master, change following comments

* change following @ivan's comments

* change following @ivan's comments

* fix conf error
  • Loading branch information
zhaijack authored and sijie committed May 15, 2018
1 parent 75643da commit c4edead
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 29 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -492,6 +492,9 @@ s3ManagedLedgerOffloadBucket=
# For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
s3ManagedLedgerOffloadServiceEndpoint= s3ManagedLedgerOffloadServiceEndpoint=


# For Amazon S3 ledger offload, Max block size in bytes.
s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864

### --- Deprecated config variables --- ### ### --- Deprecated config variables --- ###


# Deprecated. Use configurationStoreServers # Deprecated. Use configurationStoreServers
Expand Down
Expand Up @@ -484,6 +484,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
// For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) // For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
private String s3ManagedLedgerOffloadServiceEndpoint = null; private String s3ManagedLedgerOffloadServiceEndpoint = null;


// For Amazon S3 ledger offload, Max block size in bytes.
private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;

public String getZookeeperServers() { public String getZookeeperServers() {
return zookeeperServers; return zookeeperServers;
} }
Expand Down Expand Up @@ -1682,4 +1685,13 @@ public void setS3ManagedLedgerOffloadServiceEndpoint(String endpoint) {
public String getS3ManagedLedgerOffloadServiceEndpoint() { public String getS3ManagedLedgerOffloadServiceEndpoint() {
return this.s3ManagedLedgerOffloadServiceEndpoint; return this.s3ManagedLedgerOffloadServiceEndpoint;
} }

public void setS3ManagedLedgerOffloadMaxBlockSizeInBytes(int blockSizeInBytes) {
this.s3ManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
}

public int getS3ManagedLedgerOffloadMaxBlockSizeInBytes() {
return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes;
}

} }
Expand Up @@ -18,34 +18,54 @@
*/ */
package org.apache.pulsar.broker.s3offload; package org.apache.pulsar.broker.s3offload;


import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.AmazonS3ClientBuilder;

import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.base.Strings; import com.google.common.base.Strings;

import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class S3ManagedLedgerOffloader implements LedgerOffloader { public class S3ManagedLedgerOffloader implements LedgerOffloader {
private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);

public static final String DRIVER_NAME = "S3"; public static final String DRIVER_NAME = "S3";
private final ScheduledExecutorService scheduler; private final ScheduledExecutorService scheduler;
private final AmazonS3 s3client; private final AmazonS3 s3client;
private final String bucket; private final String bucket;
// max block size for each data block.
private int maxBlockSize;


public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
ScheduledExecutorService scheduler) ScheduledExecutorService scheduler)
throws PulsarServerException { throws PulsarServerException {
String region = conf.getS3ManagedLedgerOffloadRegion(); String region = conf.getS3ManagedLedgerOffloadRegion();
String bucket = conf.getS3ManagedLedgerOffloadBucket(); String bucket = conf.getS3ManagedLedgerOffloadBucket();
String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint(); String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();

if (Strings.isNullOrEmpty(region)) { if (Strings.isNullOrEmpty(region)) {
throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled"); throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled");
} }
Expand All @@ -60,28 +80,110 @@ public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
} else { } else {
builder.setRegion(region); builder.setRegion(region);
} }
return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler); return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize);
} }


S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler) { S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler, int maxBlockSize) {
this.s3client = s3client; this.s3client = s3client;
this.bucket = bucket; this.bucket = bucket;
this.scheduler = scheduler; this.scheduler = scheduler;
this.maxBlockSize = maxBlockSize;
}

static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
return String.format("ledger-%d-%s", readHandle.getId(), uuid.toString());
}

static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
return String.format("ledger-%d-%s-index", readHandle.getId(), uuid.toString());
} }


// upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block,
@Override @Override
public CompletableFuture<Void> offload(ReadHandle ledger, public CompletableFuture<Void> offload(ReadHandle readHandle,
UUID uid, UUID uuid,
Map<String, String> extraMetadata) { Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>(); CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.submit(() -> { scheduler.submit(() -> {
try { OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
s3client.putObject(bucket, uid.toString(), uid.toString()); .withMetadata(readHandle.getLedgerMetadata());
promise.complete(null); String dataBlockKey = dataBlockOffloadKey(readHandle, uuid);
} catch (Throwable t) { String indexBlockKey = indexBlockOffloadKey(readHandle, uuid);
promise.completeExceptionally(t); InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey);
InitiateMultipartUploadResult dataBlockRes = null;

// init multi part upload for data block.
try {
dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq);
} catch (Throwable t) {
promise.completeExceptionally(t);
return;
}

// start multi part upload for data block.
try {
long startEntry = 0;
int partId = 1;
long entryBytesWritten = 0;
List<PartETag> etags = new LinkedList<>();
while (startEntry <= readHandle.getLastAddConfirmed()) {
int blockSize = BlockAwareSegmentInputStreamImpl
.calculateBlockSize(maxBlockSize, readHandle, startEntry, entryBytesWritten);

try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl(
readHandle, startEntry, blockSize)) {

UploadPartResult uploadRes = s3client.uploadPart(
new UploadPartRequest()
.withBucketName(bucket)
.withKey(dataBlockKey)
.withUploadId(dataBlockRes.getUploadId())
.withInputStream(blockStream)
.withPartSize(blockSize)
.withPartNumber(partId));
etags.add(uploadRes.getPartETag());
indexBuilder.addBlock(startEntry, partId, blockSize);

if (blockStream.getEndEntryId() != -1) {
startEntry = blockStream.getEndEntryId() + 1;
} else {
// could not read entry from ledger.
break;
}
entryBytesWritten += blockStream.getBlockEntryBytesCount();
partId++;
}
} }
});
s3client.completeMultipartUpload(new CompleteMultipartUploadRequest()
.withBucketName(bucket).withKey(dataBlockKey)
.withUploadId(dataBlockRes.getUploadId())
.withPartETags(etags));
} catch (Throwable t) {
s3client.abortMultipartUpload(
new AbortMultipartUploadRequest(bucket, dataBlockKey, dataBlockRes.getUploadId()));
promise.completeExceptionally(t);
return;
}

// upload index block
try (OffloadIndexBlock index = indexBuilder.build();
InputStream indexStream = index.toStream()) {
// write the index block
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(indexStream.available());
s3client.putObject(new PutObjectRequest(
bucket,
indexBlockOffloadKey(readHandle, uuid),
indexStream,
metadata));
promise.complete(null);
} catch (Throwable t) {
s3client.deleteObject(bucket, dataBlockOffloadKey(readHandle, uuid));
promise.completeExceptionally(t);
return;
}
});
return promise; return promise;
} }


Expand Down
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
// how many entries want to read from ReadHandle each time. // how many entries want to read from ReadHandle each time.
private static final int ENTRIES_PER_READ = 100; private static final int ENTRIES_PER_READ = 100;
// buf the entry size and entry id. // buf the entry size and entry id.
static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */; static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
private List<ByteBuf> entriesByteBuf = null; private List<ByteBuf> entriesByteBuf = null;


Expand Down Expand Up @@ -204,5 +203,16 @@ public long getEndEntryId() {
public int getBlockEntryBytesCount() { public int getBlockEntryBytesCount() {
return dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() - ENTRY_HEADER_SIZE * blockEntryCount; return dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() - ENTRY_HEADER_SIZE * blockEntryCount;
} }

// Calculate the block size after uploaded `entryBytesAlreadyWritten` bytes
public static int calculateBlockSize(int maxBlockSize, ReadHandle readHandle,
long firstEntryToWrite, long entryBytesAlreadyWritten) {
return (int)Math.min(
maxBlockSize,
(readHandle.getLastAddConfirmed() - firstEntryToWrite + 1) * ENTRY_HEADER_SIZE
+ (readHandle.getLength() - entryBytesAlreadyWritten)
+ DataBlockHeaderImpl.getDataStartOffset());
}

} }


0 comments on commit c4edead

Please sign in to comment.