Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public SharedStateRegistryImpl(Executor asyncDisposalExecutor) {

@Override
public StreamStateHandle registerReference(
SharedStateRegistryKey registrationKey,
StreamStateHandle state,
long checkpointID,
boolean preventDiscardingCreatedCheckpoint) {
final SharedStateRegistryKey registrationKey,
final StreamStateHandle newHandle,
final long checkpointID,
final boolean preventDiscardingCreatedCheckpoint) {

checkNotNull(state);
checkNotNull(newHandle, "State handle should not be null.");

StreamStateHandle scheduledStateDeletion = null;
SharedStateEntry entry;
Expand All @@ -95,60 +95,78 @@ public StreamStateHandle registerReference(
entry = registeredStates.get(registrationKey);

if (entry == null) {
// Additional check that should never fail, because only state handles that are not
// placeholders should
// ever be inserted to the registry.
checkState(
!isPlaceholder(state),
!isPlaceholder(newHandle),
"Attempt to reference unknown state: " + registrationKey);

entry = new SharedStateEntry(state, checkpointID);
LOG.trace(
"Registered new shared state {} under key {}.", newHandle, registrationKey);
entry = new SharedStateEntry(newHandle, checkpointID);
registeredStates.put(registrationKey, entry);
LOG.trace("Registered new shared state {} under key {}.", entry, registrationKey);

} else {
// Delete if this is a real duplicate.
// Note that task (backend) is not required to re-upload state
// if the confirmation notification was missing.
// However, it's also not required to use exactly the same handle or placeholder
if (!Objects.equals(state, entry.stateHandle)) {
if (entry.confirmed || isPlaceholder(state)) {
scheduledStateDeletion = state;
} else {
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = state;
}
LOG.trace(
"Identified duplicate state registration under key {}. New state {} was determined to "
+ "be an unnecessary copy of existing state {} and will be dropped.",
registrationKey,
state,
entry.stateHandle);
}
// no further handling
return entry.stateHandle;

} else if (entry.stateHandle == newHandle) {
// might be a bug but state backend is not required to use a place-holder
LOG.info(
"Duplicated registration under key {} with the same object: {}",
registrationKey,
newHandle);
} else if (Objects.equals(entry.stateHandle, newHandle)) {
// might be a bug but state backend is not required to use a place-holder
LOG.info(
"Duplicated registration under key {} with the new object: {}.",
registrationKey,
newHandle);
} else if (isPlaceholder(newHandle)) {
LOG.trace(
"Updating last checkpoint for {} from {} to {}",
"Duplicated registration under key {} with a placeholder (normal case)",
registrationKey);
scheduledStateDeletion = newHandle;
} else if (entry.confirmed) {
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. "
+ "Discarding the new state and keeping the old one which is included into a completed checkpoint",
registrationKey,
entry.lastUsedCheckpointID,
checkpointID);
entry.advanceLastUsingCheckpointID(checkpointID);
if (preventDiscardingCreatedCheckpoint) {
entry.preventDiscardingCreatedCheckpoint();
}
newHandle);
scheduledStateDeletion = newHandle;
} else {
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. "
+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint",
registrationKey,
newHandle);
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = newHandle;
}
}

LOG.trace(
"Updating last checkpoint for {} from {} to {}",
registrationKey,
entry.lastUsedCheckpointID,
checkpointID);
entry.advanceLastUsingCheckpointID(checkpointID);

if (preventDiscardingCreatedCheckpoint) {
entry.preventDiscardingCreatedCheckpoint();
}
} // end of synchronized (registeredStates)

scheduleAsyncDelete(scheduledStateDeletion);
return entry.stateHandle;
Expand Down Expand Up @@ -246,7 +264,7 @@ public String toString() {
private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
// We do the small optimization to not issue discards for placeholders, which are NOPs.
if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
LOG.debug("Scheduled delete of state handle {}.", streamStateHandle);
AsyncDisposalRunnable asyncDisposalRunnable =
new AsyncDisposalRunnable(streamStateHandle);
try {
Expand Down