Skip to content

Commit

Permalink
[tiered storage] store driver name and driver specific metadata in o…
Browse files Browse the repository at this point in the history
…riginal ledger metadata (#2398)



 ### Motivation

1) Currently the location of an offloaded ledger isn't stored in the original ledger metadata.
That means if configuration is changed or modified by mistake. We might potentially cause data loss.

2) The location of an offloaded ledger is needed by Pulsar SQL. so it is very inconvinient to
have the location information stored in a configuration and the approach is also problematic.

 ### Changes

Store `driverName` and driver-specific metadata (e.g. bucket name, region name, endpoint) in the
original ledger metadata. Change ManagedLedgerImpl to use the driver-specific metadata to read
the offloaded ledger. If the driver-specific metadata is missed, it will fall back to use the configuration.

 ### Tests

This change doesn't change the behavior. Existing unit tests and integration tests already covered the logic.

 ### NOTES

Currently the driver name in metadata is not used. We need to use driver name to load different offloader driver
after #2393 is implemented
  • Loading branch information
sijie committed Aug 21, 2018
1 parent d804701 commit 5980169
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 84 deletions.
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.Beta;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -31,6 +32,25 @@
*/
@Beta
public interface LedgerOffloader {

/**
* Get offload driver name.
*
* @return offload driver name.
*/
String getOffloadDriverName();

/**
* Get offload driver metadata.
*
* <p>The driver metadata will be recorded as part of the metadata of the original ledger.
*
* @return offload driver metadata.
*/
default Map<String, String> getOffloadDriverMetadata() {
return Collections.emptyMap();
}

/**
* Offload the passed in ledger to longterm storage.
* Metadata passed in is for inspection purposes only and should be stored
Expand All @@ -51,10 +71,9 @@ public interface LedgerOffloader {
*
* @param ledger the ledger to offload
* @param uid unique id to identity this offload attempt
* @param extraMetadata metadata to be stored with the ledger for informational
* @param extraMetadata metadata to be stored with the offloaded ledger for informational
* purposes
* @return a future, which when completed, denotes that the offload has been
* successful
* @return a future, which when completed, denotes that the offload has been successful.
*/
CompletableFuture<Void> offload(ReadHandle ledger,
UUID uid,
Expand All @@ -69,9 +88,11 @@ CompletableFuture<Void> offload(ReadHandle ledger,
*
* @param ledgerId the ID of the ledger to load from longterm storage
* @param uid unique ID for previous successful offload attempt
* @param offloadDriverMetadata offload driver metadata
* @return a future, which when completed, returns a ReadHandle
*/
CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid);
CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata);

/**
* Delete a ledger from long term storage.
Expand All @@ -81,9 +102,11 @@ CompletableFuture<Void> offload(ReadHandle ledger,
*
* @param ledgerId the ID of the ledger to delete from longterm storage
* @param uid unique ID for previous offload attempt
* @param offloadDriverMetadata offload driver metadata
* @return a future, which when completed, signifies that the ledger has
* been deleted
*/
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid);
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata);
}

Expand Up @@ -96,6 +96,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
Expand Down Expand Up @@ -1390,7 +1391,9 @@ CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
UUID uid = new UUID(info.getOffloadContext().getUidMsb(),
info.getOffloadContext().getUidLsb());
openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid);
// TODO: improve this to load ledger offloader by driver name recorded in metadata
openFuture = config.getLedgerOffloader()
.readOffloaded(ledgerId, uid, OffloadUtils.getOffloadDriverMetadata(info));
} else {
openFuture = bookKeeper.newOpenLedgerOp()
.withRecovery(!isReadOnly())
Expand Down Expand Up @@ -1771,7 +1774,16 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
newInfoBuilder.getOffloadContextBuilder()
.setBookkeeperDeleted(true);
String driverName = OffloadUtils.getOffloadDriverName(
ls, config.getLedgerOffloader().getOffloadDriverName());
Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(
ls, config.getLedgerOffloader().getOffloadDriverMetadata());
OffloadUtils.setOffloadDriverMetadata(
newInfoBuilder,
driverName, driverMetadata
);
ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
}

Expand Down Expand Up @@ -1903,7 +1915,11 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
if (info.getOffloadContext().hasUidMsb()) {
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
info.getOffloadContext().getUidLsb());
cleanupOffloaded(ledgerId, uuid, "Trimming");
cleanupOffloaded(
ledgerId, uuid,
OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()),
OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
"Trimming");
}
}

Expand Down Expand Up @@ -2105,7 +2121,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
UUID uuid = UUID.randomUUID();
Map<String, String> extraMetadata = ImmutableMap.of("ManagedLedgerName", name);

prepareLedgerInfoForOffloaded(ledgerId, uuid)
String driverName = config.getLedgerOffloader().getOffloadDriverName();
Map<String, String> driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata();

prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata)
.thenCompose((ignore) -> getLedgerHandle(ledgerId))
.thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
.thenCompose((ignore) -> {
Expand All @@ -2116,7 +2135,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
scheduledExecutor, name)
.whenComplete((ignore2, exception) -> {
if (exception != null) {
cleanupOffloaded(ledgerId, uuid, "Metastore failure");
cleanupOffloaded(
ledgerId, uuid,
driverName, driverMetadata,
"Metastore failure");
}
});
})
Expand Down Expand Up @@ -2216,7 +2238,10 @@ public void operationFailed(MetaStoreException e) {
}
}

private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUID uuid) {
private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId,
UUID uuid,
String offloadDriverName,
Map<String, String> offloadDriverMetadata) {
log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", name, ledgerId, uuid);
return transformLedgerInfo(ledgerId,
(oldInfo) -> {
Expand All @@ -2225,12 +2250,24 @@ private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUI
oldInfo.getOffloadContext().getUidLsb());
log.info("[{}] Found previous offload attempt for ledger {}, uuid {}"
+ ", cleaning up", name, ledgerId, uuid);
cleanupOffloaded(ledgerId, oldUuid, "Previous failed offload");
cleanupOffloaded(
ledgerId,
oldUuid,
OffloadUtils.getOffloadDriverName(oldInfo,
config.getLedgerOffloader().getOffloadDriverName()),
OffloadUtils.getOffloadDriverMetadata(oldInfo,
config.getLedgerOffloader().getOffloadDriverMetadata()),
"Previous failed offload");
}
LedgerInfo.Builder builder = oldInfo.toBuilder();
builder.getOffloadContextBuilder()
.setUidMsb(uuid.getMostSignificantBits())
.setUidLsb(uuid.getLeastSignificantBits());
OffloadUtils.setOffloadDriverMetadata(
builder,
offloadDriverName,
offloadDriverMetadata
);
return builder.build();
})
.whenComplete((result, exception) -> {
Expand All @@ -2254,6 +2291,16 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
builder.getOffloadContextBuilder()
.setTimestamp(clock.millis())
.setComplete(true);

String driverName = OffloadUtils.getOffloadDriverName(
oldInfo, config.getLedgerOffloader().getOffloadDriverName());
Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(
oldInfo, config.getLedgerOffloader().getOffloadDriverMetadata());
OffloadUtils.setOffloadDriverMetadata(
builder,
driverName,
driverMetadata
);
return builder.build();
} else {
throw new OffloadConflict(
Expand All @@ -2272,10 +2319,14 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
});
}

private void cleanupOffloaded(long ledgerId, UUID uuid, String cleanupReason) {
private void cleanupOffloaded(long ledgerId,
UUID uuid,
String offloadDriverName, /* TODO: use driver name to identify offloader */
Map<String, String> offloadDriverMetadata,
String cleanupReason) {
Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10),
Retries.NonFatalPredicate,
() -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid),
() -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata),
scheduledExecutor, name)
.whenComplete((ignored, exception) -> {
if (exception != null) {
Expand Down
Expand Up @@ -31,6 +31,11 @@
public class NullLedgerOffloader implements LedgerOffloader {
public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader();

@Override
public String getOffloadDriverName() {
return "NullLedgerOffloader";
}

@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uid,
Expand All @@ -41,14 +46,16 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
}

@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
}

@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
Expand Down
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;

public final class OffloadUtils {

private OffloadUtils() {}

public static Map<String, String> getOffloadDriverMetadata(LedgerInfo ledgerInfo) {
Map<String, String> metadata = Maps.newHashMap();
if (ledgerInfo.hasOffloadContext()) {
OffloadContext ctx = ledgerInfo.getOffloadContext();
if (ctx.hasDriverMetadata()) {
OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
if (driverMetadata.getPropertiesCount() > 0) {
driverMetadata.getPropertiesList().forEach(kv -> metadata.put(kv.getKey(), kv.getValue()));
}
}
}
return metadata;
}

public static Map<String, String> getOffloadDriverMetadata(LedgerInfo ledgerInfo,
Map<String, String> defaultOffloadDriverMetadata) {
if (ledgerInfo.hasOffloadContext()) {
OffloadContext ctx = ledgerInfo.getOffloadContext();
if (ctx.hasDriverMetadata()) {
OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
if (driverMetadata.getPropertiesCount() > 0) {
Map<String, String> metadata = Maps.newHashMap();
driverMetadata.getPropertiesList().forEach(kv -> metadata.put(kv.getKey(), kv.getValue()));
return metadata;
}
}
}
return defaultOffloadDriverMetadata;
}

public static String getOffloadDriverName(LedgerInfo ledgerInfo, String defaultDriverName) {
if (ledgerInfo.hasOffloadContext()) {
OffloadContext ctx = ledgerInfo.getOffloadContext();
if (ctx.hasDriverMetadata()) {
OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
if (driverMetadata.hasName()) {
return driverMetadata.getName();
}
}
}
return defaultDriverName;
}

public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder,
String driverName,
Map<String, String> offloadDriverMetadata) {
infoBuilder.getOffloadContextBuilder()
.getDriverMetadataBuilder()
.setName(driverName);
offloadDriverMetadata.forEach((k, v) -> {
infoBuilder.getOffloadContextBuilder()
.getDriverMetadataBuilder()
.addProperties(KeyValue.newBuilder()
.setKey(k)
.setValue(v)
.build());
});
}

}
11 changes: 11 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Expand Up @@ -21,12 +21,23 @@ syntax = "proto2";
option java_package = "org.apache.bookkeeper.mledger.proto";
option optimize_for = SPEED;

message KeyValue {
required string key = 1;
required string value = 2;
}

message OffloadDriverMetadata {
required string name = 1;
repeated KeyValue properties = 2;
}

message OffloadContext {
optional int64 uidMsb = 1;
optional int64 uidLsb = 2;
optional bool complete = 3;
optional bool bookkeeperDeleted = 4;
optional int64 timestamp = 5;
optional OffloadDriverMetadata driverMetadata = 6;
}

message ManagedLedgerInfo {
Expand Down

0 comments on commit 5980169

Please sign in to comment.