Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ void createAndRestoreKeyedStateBackend() {
keyedStateBackend = stateBackend.createKeyedStateBackend(
env, new JobID(), "test", StringSerializer.INSTANCE, 10,
new KeyGroupRange(0, 9), env.getTaskKvStateRegistry(), timeProvider);
keyedStateBackend.setCurrentKey("defaultKey");
} catch (Exception e) {
throw new RuntimeException("unexpected");
throw new RuntimeException("unexpected", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private void initTest(StateTtlConfig ttlConfig) throws Exception {
this.ttlConfig = ttlConfig;
sbetc.createAndRestoreKeyedStateBackend();
sbetc.restoreSnapshot(null);
sbetc.setCurrentKey("defaultKey");
createState();
ctx().initTestValues();
}
Expand All @@ -129,6 +130,7 @@ private void takeAndRestoreSnapshot() throws Exception {
KeyedStateHandle snapshot = sbetc.takeSnapshot();
sbetc.createAndRestoreKeyedStateBackend();
sbetc.restoreSnapshot(snapshot);
sbetc.setCurrentKey("defaultKey");
createState();
}

Expand Down Expand Up @@ -397,6 +399,7 @@ public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception
sbetc.createAndRestoreKeyedStateBackend();

sbetc.restoreSnapshot(snapshot);
sbetc.setCurrentKey("defaultKey");
sbetc.createState(ctx().createStateDescriptor(), "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,14 @@ <K, N, SV, S extends State, IS extends S> IS createState(
/** The native metrics monitor. */
private RocksDBNativeMetricMonitor nativeMetricMonitor;

/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
/**
* Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.
*
* <p>We create the builder after the restore phase in the {@link #restore(Object)} method. The timing of
* the creation is important, because only after the restore we are certain that the key serializer
* is final after potential reconfigurations during the restore.
*/
private RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;

public RocksDBKeyedStateBackend(
String operatorIdentifier,
Expand Down Expand Up @@ -297,7 +303,6 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = new LinkedHashMap<>();

this.writeOptions = new WriteOptions().setDisableWAL(true);
this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);

this.metricOptions = metricOptions;
this.metricGroup = metricGroup;
Expand Down Expand Up @@ -512,10 +517,6 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception

LOG.info("Initializing RocksDB keyed state backend.");

if (LOG.isDebugEnabled()) {
LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, restoringThreadNum);
}

// clear all meta data
kvStateInformation.clear();

Expand All @@ -524,6 +525,10 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
if (restoreState == null || restoreState.isEmpty()) {
createDB();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, restoringThreadNum);
}

KeyedStateHandle firstStateHandle = restoreState.iterator().next();
if (firstStateHandle instanceof IncrementalKeyedStateHandle
|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
Expand All @@ -535,6 +540,14 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
}
}

// it is important that we only create the key builder after the restore, and not before;
// restore operations may reconfigure the key serializer, so accessing the key serializer
// only now we can be certain that the key serializer used in the builder is final.
this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(
getKeySerializer(),
keyGroupPrefixBytes,
32);

initializeSnapshotStrategy(incrementalRestoreOperation);
} catch (Exception ex) {
dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,4 @@ boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
return keySerializerTypeVariableSized &
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
}

@VisibleForTesting
boolean isKeySerializerTypeVariableSized() {
return keySerializerTypeVariableSized;
}
}