Skip to content

Commit

Permalink
Store offloaded data object size in index (#1810)
Browse files Browse the repository at this point in the history
We need the size of the data object to set a bound on the stream we
read from S3. Without the size in the index we need to do an extra
call to S3 which is undesirable.

Master Issue: #1511
  • Loading branch information
ivankelly authored and sijie committed May 21, 2018
1 parent 2e4362d commit 79b0e28
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 10 deletions.
Expand Up @@ -36,7 +36,7 @@ public interface OffloadIndexBlock extends Closeable {
* Get the content of the index block as InputStream. * Get the content of the index block as InputStream.
* Read out in format: * Read out in format:
* | index_magic_header | index_block_len | index_entry_count | * | index_magic_header | index_block_len | index_entry_count |
* |segment_metadata_length | segment metadata | index entries | * | data_object_size | segment_metadata_length | segment metadata | index entries ... |
*/ */
InputStream toStream() throws IOException; InputStream toStream() throws IOException;


Expand All @@ -59,5 +59,9 @@ public interface OffloadIndexBlock extends Closeable {
*/ */
LedgerMetadata getLedgerMetadata(); LedgerMetadata getLedgerMetadata();


/**
* Get the total size of the data object.
*/
long getDataObjectLength();
} }


Expand Up @@ -37,7 +37,7 @@ public interface OffloadIndexBlockBuilder {
* *
* @param metadata the ledger metadata * @param metadata the ledger metadata
*/ */
OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata); OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata);


/** /**
* Add one payload block related information into index block. * Add one payload block related information into index block.
Expand All @@ -51,6 +51,12 @@ public interface OffloadIndexBlockBuilder {
*/ */
OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize); OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize);


/**
* Specify the length of data object this index is associated with.
* @param dataObjectLength the length of the data object
*/
OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);

/** /**
* Finalize the immutable OffloadIndexBlock * Finalize the immutable OffloadIndexBlock
*/ */
Expand Down
Expand Up @@ -110,7 +110,7 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
CompletableFuture<Void> promise = new CompletableFuture<>(); CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.submit(() -> { scheduler.submit(() -> {
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
.withMetadata(readHandle.getLedgerMetadata()); .withLedgerMetadata(readHandle.getLedgerMetadata());
String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid); String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid); String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey); InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey);
Expand All @@ -124,6 +124,7 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
return; return;
} }


long dataObjectLength = 0;
// start multi part upload for data block. // start multi part upload for data block.
try { try {
long startEntry = 0; long startEntry = 0;
Expand Down Expand Up @@ -157,6 +158,8 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
entryBytesWritten += blockStream.getBlockEntryBytesCount(); entryBytesWritten += blockStream.getBlockEntryBytesCount();
partId++; partId++;
} }

dataObjectLength += blockSize;
} }


s3client.completeMultipartUpload(new CompleteMultipartUploadRequest() s3client.completeMultipartUpload(new CompleteMultipartUploadRequest()
Expand All @@ -171,7 +174,7 @@ public CompletableFuture<Void> offload(ReadHandle readHandle,
} }


// upload index block // upload index block
try (OffloadIndexBlock index = indexBuilder.build(); try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
InputStream indexStream = index.toStream()) { InputStream indexStream = index.toStream()) {
// write the index block // write the index block
ObjectMetadata metadata = new ObjectMetadata(); ObjectMetadata metadata = new ObjectMetadata();
Expand Down
Expand Up @@ -34,6 +34,7 @@
public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder { public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {


private LedgerMetadata ledgerMetadata; private LedgerMetadata ledgerMetadata;
private long dataObjectLength;
private List<OffloadIndexEntryImpl> entries; private List<OffloadIndexEntryImpl> entries;
private int lastBlockSize; private int lastBlockSize;


Expand All @@ -42,7 +43,13 @@ public OffloadIndexBlockBuilderImpl() {
} }


@Override @Override
public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) { public OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength) {
this.dataObjectLength = dataObjectLength;
return this;
}

@Override
public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata) {
this.ledgerMetadata = metadata; this.ledgerMetadata = metadata;
return this; return this;
} }
Expand Down Expand Up @@ -73,7 +80,8 @@ public OffloadIndexBlock fromStream(InputStream is) throws IOException {
public OffloadIndexBlock build() { public OffloadIndexBlock build() {
checkState(ledgerMetadata != null); checkState(ledgerMetadata != null);
checkState(!entries.isEmpty()); checkState(!entries.isEmpty());
return OffloadIndexBlockImpl.get(ledgerMetadata, entries); checkState(dataObjectLength > 0);
return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, entries);
} }


} }
Expand Up @@ -54,6 +54,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
private static final int INDEX_MAGIC_WORD = 0xDE47DE47; private static final int INDEX_MAGIC_WORD = 0xDE47DE47;


private LedgerMetadata segmentMetadata; private LedgerMetadata segmentMetadata;
private long dataObjectLength;
private TreeMap<Long, OffloadIndexEntryImpl> indexEntries; private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;


private final Handle<OffloadIndexBlockImpl> recyclerHandle; private final Handle<OffloadIndexBlockImpl> recyclerHandle;
Expand All @@ -69,12 +70,14 @@ private OffloadIndexBlockImpl(Handle<OffloadIndexBlockImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle; this.recyclerHandle = recyclerHandle;
} }


public static OffloadIndexBlockImpl get(LedgerMetadata metadata, List<OffloadIndexEntryImpl> entries) { public static OffloadIndexBlockImpl get(LedgerMetadata metadata, long dataObjectLength,
List<OffloadIndexEntryImpl> entries) {
OffloadIndexBlockImpl block = RECYCLER.get(); OffloadIndexBlockImpl block = RECYCLER.get();
block.indexEntries = Maps.newTreeMap(); block.indexEntries = Maps.newTreeMap();
entries.forEach(entry -> block.indexEntries.putIfAbsent(entry.getEntryId(), entry)); entries.forEach(entry -> block.indexEntries.putIfAbsent(entry.getEntryId(), entry));
checkState(entries.size() == block.indexEntries.size()); checkState(entries.size() == block.indexEntries.size());
block.segmentMetadata = metadata; block.segmentMetadata = metadata;
block.dataObjectLength = dataObjectLength;
return block; return block;
} }


Expand All @@ -86,6 +89,7 @@ public static OffloadIndexBlockImpl get(InputStream stream) throws IOException {
} }


public void recycle() { public void recycle() {
dataObjectLength = -1;
segmentMetadata = null; segmentMetadata = null;
indexEntries.clear(); indexEntries.clear();
indexEntries = null; indexEntries = null;
Expand Down Expand Up @@ -116,6 +120,11 @@ public LedgerMetadata getLedgerMetadata() {
return this.segmentMetadata; return this.segmentMetadata;
} }


@Override
public long getDataObjectLength() {
return this.dataObjectLength;
}

private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) { private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder(); LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
builder.setQuorumSize(metadata.getWriteQuorumSize()) builder.setQuorumSize(metadata.getWriteQuorumSize())
Expand Down Expand Up @@ -159,6 +168,7 @@ public InputStream toStream() throws IOException {


indexBlockLength = 4 /* magic header */ indexBlockLength = 4 /* magic header */
+ 4 /* index block length */ + 4 /* index block length */
+ 8 /* data object length */
+ 4 /* segment metadata length */ + 4 /* segment metadata length */
+ 4 /* index entry count */ + 4 /* index entry count */
+ segmentMetadataLength + segmentMetadataLength
Expand All @@ -168,6 +178,7 @@ public InputStream toStream() throws IOException {


out.writeInt(INDEX_MAGIC_WORD) out.writeInt(INDEX_MAGIC_WORD)
.writeInt(indexBlockLength) .writeInt(indexBlockLength)
.writeLong(dataObjectLength)
.writeInt(segmentMetadataLength) .writeInt(segmentMetadataLength)
.writeInt(indexEntryCount); .writeInt(indexEntryCount);


Expand Down Expand Up @@ -306,6 +317,7 @@ private OffloadIndexBlock fromStream(InputStream stream) throws IOException {
magic, INDEX_MAGIC_WORD)); magic, INDEX_MAGIC_WORD));
} }
int indexBlockLength = dis.readInt(); int indexBlockLength = dis.readInt();
this.dataObjectLength = dis.readLong();
int segmentMetadataLength = dis.readInt(); int segmentMetadataLength = dis.readInt();
int indexEntryCount = dis.readInt(); int indexEntryCount = dis.readInt();


Expand Down
Expand Up @@ -197,9 +197,8 @@ public static ReadHandle open(ScheduledExecutorService executor,
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent()); OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());


ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index
S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
dataMetadata.getContentLength(), index.getDataObjectLength(),
readBufferSize); readBufferSize);
return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor); return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
} }
Expand Down
Expand Up @@ -108,7 +108,7 @@ public void offloadIndexBlockImplTest() throws Exception {
LedgerMetadata metadata = createLedgerMetadata(); LedgerMetadata metadata = createLedgerMetadata();
log.debug("created metadata: {}", metadata.toString()); log.debug("created metadata: {}", metadata.toString());


blockBuilder.withMetadata(metadata); blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1);


blockBuilder.addBlock(0, 2, 64 * 1024 * 1024); blockBuilder.addBlock(0, 2, 64 * 1024 * 1024);
blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024); blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
Expand Down Expand Up @@ -161,13 +161,15 @@ public void offloadIndexBlockImplTest() throws Exception {
ByteBuf wrapper = Unpooled.wrappedBuffer(b); ByteBuf wrapper = Unpooled.wrappedBuffer(b);
int magic = wrapper.readInt(); int magic = wrapper.readInt();
int indexBlockLength = wrapper.readInt(); int indexBlockLength = wrapper.readInt();
long dataObjectLength = wrapper.readLong();
int segmentMetadataLength = wrapper.readInt(); int segmentMetadataLength = wrapper.readInt();
int indexEntryCount = wrapper.readInt(); int indexEntryCount = wrapper.readInt();


// verify counter // verify counter
assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord()); assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord());
assertEquals(indexBlockLength, readoutLen); assertEquals(indexBlockLength, readoutLen);
assertEquals(indexEntryCount, 3); assertEquals(indexEntryCount, 3);
assertEquals(dataObjectLength, 1);


wrapper.readBytes(segmentMetadataLength); wrapper.readBytes(segmentMetadataLength);
log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}", log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
Expand Down

0 comments on commit 79b0e28

Please sign in to comment.