Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public SharedStateRegistryImpl(Executor asyncDisposalExecutor) {

@Override
public StreamStateHandle registerReference(
SharedStateRegistryKey registrationKey,
StreamStateHandle state,
long checkpointID,
boolean preventDiscardingCreatedCheckpoint) {
final SharedStateRegistryKey registrationKey,
final StreamStateHandle newHandle,
final long checkpointID,
final boolean preventDiscardingCreatedCheckpoint) {

checkNotNull(state);
checkNotNull(newHandle);

StreamStateHandle scheduledStateDeletion = null;
SharedStateEntry entry;
Expand All @@ -95,60 +95,77 @@ public StreamStateHandle registerReference(
entry = registeredStates.get(registrationKey);

if (entry == null) {
// Additional check that should never fail, because only state handles that are not
// placeholders should
// ever be inserted to the registry.
checkState(
!isPlaceholder(state),
!isPlaceholder(newHandle),
"Attempt to reference unknown state: " + registrationKey);

entry = new SharedStateEntry(state, checkpointID);
LOG.trace(
"Registered new shared state {} under key {}.", newHandle, registrationKey);
entry = new SharedStateEntry(newHandle, checkpointID);
registeredStates.put(registrationKey, entry);
LOG.trace("Registered new shared state {} under key {}.", entry, registrationKey);

} else {
// Delete if this is a real duplicate.
// Note that task (backend) is not required to re-upload state
// if the confirmation notification was missing.
// However, it's also not required to use exactly the same handle or placeholder
if (!Objects.equals(state, entry.stateHandle)) {
if (entry.confirmed || isPlaceholder(state)) {
scheduledStateDeletion = state;
} else {
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = state;
}
LOG.trace(
"Identified duplicate state registration under key {}. New state {} was determined to "
+ "be an unnecessary copy of existing state {} and will be dropped.",
registrationKey,
state,
entry.stateHandle);
}
// no further handling
return entry.stateHandle;

} else if (entry.stateHandle == newHandle) {
// might be a bug but state backend is not required to use a place-holder
LOG.debug(
"Duplicated registration under key {} with the same object: {}",
registrationKey,
newHandle);
} else if (Objects.equals(entry.stateHandle, newHandle)) {
// might be a bug but state backend is not required to use a place-holder
LOG.debug(
"Duplicated registration under key {} with the new object: {}.",
registrationKey,
newHandle);
} else if (isPlaceholder(newHandle)) {
LOG.trace(
"Updating last checkpoint for {} from {} to {}",
"Duplicated registration under key {} with a placeholder (normal case)",
registrationKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe lack of scheduledStateDeletion = newHandle;?

} else if (entry.confirmed) {
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. "
+ "Discarding the new state and keeping the old one which is included into a completed checkpoint",
registrationKey,
entry.lastUsedCheckpointID,
checkpointID);
entry.advanceLastUsingCheckpointID(checkpointID);
if (preventDiscardingCreatedCheckpoint) {
entry.preventDiscardingCreatedCheckpoint();
}
newHandle);
scheduledStateDeletion = newHandle;
} else {
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. "
+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint",
registrationKey,
newHandle);
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = newHandle;
}
}

LOG.trace(
"Updating last checkpoint for {} from {} to {}",
registrationKey,
entry.lastUsedCheckpointID,
checkpointID);
entry.advanceLastUsingCheckpointID(checkpointID);

if (preventDiscardingCreatedCheckpoint) {
entry.preventDiscardingCreatedCheckpoint();
}
} // end of synchronized (registeredStates)

scheduleAsyncDelete(scheduledStateDeletion);
return entry.stateHandle;
Expand Down