Skip to content

Commit

Permalink
Fix ConcurrentModification exception in Workflow Garbage Collection (#…
Browse files Browse the repository at this point in the history
…741)

In workflow Garbage collection, there is possibility that we encounter
ConcurrentMod exception while looping through the workflow contexts.
This commit fixes this issue by adding a try-catch.
  • Loading branch information
Ali Reza Zamani Zadeh Najari committed Feb 12, 2020
1 parent afc62ab commit 12f11a3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
37 changes: 27 additions & 10 deletions helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -1043,23 +1043,40 @@ public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConf
* @param dataProvider
* @param manager
*/
public static void workflowGarbageCollection(WorkflowControllerDataProvider dataProvider,
public static void workflowGarbageCollection(final WorkflowControllerDataProvider dataProvider,
final HelixManager manager) {
// Garbage collections for conditions where workflow context exists but config is missing.
Map<String, ZNRecord> contexts = dataProvider.getContexts();
HelixDataAccessor accessor = manager.getHelixDataAccessor();
HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();

Set<String> existingContexts;
/*
* Here try-catch is used to avoid concurrent modification exception while doing deep copy.
* Map.keySet() can produce concurrent modification exception.
* Reason: If the map is modified while an iteration over the set is in progress, concurrent
* modification exception will be thrown.
*/
try {
existingContexts = new HashSet<>(dataProvider.getContexts().keySet());
} catch (Exception e) {
LOG.warn(
"Exception occurred while creating a list of all workflow/job context names!",
e);
return;
}

// toBeDeletedWorkflows is a set that contains the name of the workflows that their contexts
// should be deleted.
Set<String> toBeDeletedWorkflows = new HashSet<>();
for (Map.Entry<String, ZNRecord> entry : contexts.entrySet()) {
if (entry.getValue() != null
&& entry.getValue().getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
if (dataProvider.getWorkflowConfig(entry.getKey()) == null) {
toBeDeletedWorkflows.add(entry.getKey());
}
for (String entry : existingContexts) {
WorkflowConfig cfg = dataProvider.getWorkflowConfig(entry);
WorkflowContext ctx = dataProvider.getWorkflowContext(entry);
if (ctx != null && ctx.getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW) && cfg == null) {
toBeDeletedWorkflows.add(entry);
}
}

HelixDataAccessor accessor = manager.getHelixDataAccessor();
HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();

for (String workflowName : toBeDeletedWorkflows) {
LOG.warn(String.format(
"WorkflowContext exists for workflow %s. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowConte
Assert.assertTrue(contextDeleted);
}

Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
final long expiryTime = 5000L;
Workflow.Builder builder = new Workflow.Builder(workflowName);

Expand Down

0 comments on commit 12f11a3

Please sign in to comment.