Skip to content
Permalink
Browse files
Fix race condition between instance drop and participant history upda…
…te (#2073)

Fix race condition between instance drop and participant history update
  • Loading branch information
qqu0127 committed May 3, 2022
1 parent 88d615d commit 7c521dda9ff48f3a6456041d0f0335288e9c8e4f
Showing 5 changed files with 129 additions and 54 deletions.
@@ -82,8 +82,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
.asList(HelixConstants.ChangeType.EXTERNAL_VIEW,
HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW);

private String _clusterName = AbstractDataCache.UNKNOWN_CLUSTER;
private String _pipelineName = AbstractDataCache.UNKNOWN_PIPELINE;
private final String _clusterName;
private final String _pipelineName;
private String _clusterEventId = AbstractDataCache.UNKNOWN_EVENT_ID;
private ClusterConfig _clusterConfig;

@@ -106,17 +106,17 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache;

// Special caches
private CurrentStateCache _currentStateCache;
private final CurrentStateCache _currentStateCache;
protected TaskCurrentStateCache _taskCurrentStateCache;
private InstanceMessagesCache _instanceMessagesCache;
private final InstanceMessagesCache _instanceMessagesCache;

// Other miscellaneous caches
private Map<String, Long> _instanceOfflineTimeMap;
private Map<String, Map<String, String>> _idealStateRuleMap;
private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
private Set<String> _disabledInstanceSet = new HashSet<>();
private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
private final Set<String> _disabledInstanceSet = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
private Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>();

public BaseControllerDataProvider() {
@@ -777,12 +777,15 @@ private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
PropertyKey propertyKey = keyBuilder.participantHistory(instance);
ParticipantHistory history = accessor.getProperty(propertyKey);
if (history == null) {
history = new ParticipantHistory(instance);
// this means the instance has been removed, skip history update
continue;
}
if (history.getLastOfflineTime() == ParticipantHistory.ONLINE) {
history.reportOffline();
// persist history back to ZK.
if (!accessor.setProperty(propertyKey, history)) {
// persist history back to ZK only if the node still exists
boolean succeed = accessor.updateProperty(propertyKey,
currentData -> (currentData == null) ? null : history.getRecord(), history);
if (!succeed) {
LogUtil.logError(logger, getClusterEventId(),
"Fails to persist participant online history back to ZK!");
}
@@ -822,6 +825,10 @@ private boolean isInstanceTimedOutDuringMaintenance(HelixDataAccessor accessor,
long timeOutWindow) {
ParticipantHistory history =
accessor.getProperty(accessor.keyBuilder().participantHistory(instance));
if (history == null) {
LogUtil.logWarn(logger, getClusterEventId(), "Participant history is null for instance " + instance);
return false;
}
List<Long> onlineTimestamps = history.getOnlineTimestampsAsMilliseconds();
List<Long> offlineTimestamps = history.getOfflineTimestampsAsMilliseconds();

@@ -351,7 +351,7 @@ private void createLiveInstance() {

ParticipantHistory history = getHistory();
history.reportOnline(_sessionId, _manager.getVersion());
persistHistory(history);
persistHistory(history, false);
}

/**
@@ -494,15 +494,16 @@ private void setupMsgHandler() throws Exception {
private ParticipantHistory getHistory() {
PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
ParticipantHistory history = _dataAccessor.getProperty(propertyKey);
if (history == null) {
history = new ParticipantHistory(_instanceName);
}
return history;
return history == null ? new ParticipantHistory(_instanceName) : history;
}

private void persistHistory(ParticipantHistory history) {
private void persistHistory(ParticipantHistory history, boolean skipOnEmptyPath) {
PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
if (!_dataAccessor.setProperty(propertyKey, history)) {
boolean result = skipOnEmptyPath
? _dataAccessor.updateProperty(
propertyKey, currentData -> (currentData == null) ? null : history.getRecord(), history)
: _dataAccessor.setProperty(propertyKey, history);
if (!result) {
LOG.error("Failed to persist participant history to zk!");
}
}
@@ -514,7 +515,7 @@ public void disconnect() {
try {
ParticipantHistory history = getHistory();
history.reportOffline();
persistHistory(history);
persistHistory(history, true);
} catch (Exception e) {
LOG.error("Failed to report participant offline.", e);
}
@@ -205,6 +205,10 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) {
_zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.participantHistory(nodeId), new ParticipantHistory(nodeId));
}

@Override
@@ -25,8 +25,6 @@
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -39,7 +37,7 @@
import org.slf4j.LoggerFactory;

public class MockParticipantManager extends ClusterManager {
private static Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class);
private static final Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class);

protected int _transDelay = 10;

0 comments on commit 7c521dd

Please sign in to comment.