Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ConcurrentModification exception in Workflow Garbage Collection #741

Merged
merged 4 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -1043,23 +1043,33 @@ public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConf
* @param dataProvider
* @param manager
*/
public static void workflowGarbageCollection(WorkflowControllerDataProvider dataProvider,
public static void workflowGarbageCollection(final WorkflowControllerDataProvider dataProvider,
final HelixManager manager) {
// Garbage collections for conditions where workflow context exists but config is missing.
Map<String, ZNRecord> contexts = dataProvider.getContexts();
HelixDataAccessor accessor = manager.getHelixDataAccessor();
HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();

// toBeDeletedWorkflows is a set that contains the name of the workflows that their contexts
// should be deleted.
Set<String> toBeDeletedWorkflows = new HashSet<>();
for (Map.Entry<String, ZNRecord> entry : contexts.entrySet()) {
if (entry.getValue() != null
&& entry.getValue().getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
if (dataProvider.getWorkflowConfig(entry.getKey()) == null) {
toBeDeletedWorkflows.add(entry.getKey());
try {
Set<String> existingWorkflowContexts = new HashSet<>(dataProvider.getContexts().keySet());
for (String entry : existingWorkflowContexts) {
if (entry != null) {
WorkflowConfig cfg = dataProvider.getWorkflowConfig(entry);
WorkflowContext ctx = dataProvider.getWorkflowContext(entry);
if (ctx != null && ctx.getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW) && cfg == null) {
toBeDeletedWorkflows.add(entry);
}
}
}
} catch (Exception e) {
LOG.warn(
"Exception occurred while creating a list of all existing contexts with missing config!",
e);
alirezazamani marked this conversation as resolved.
Show resolved Hide resolved
}
alirezazamani marked this conversation as resolved.
Show resolved Hide resolved

HelixDataAccessor accessor = manager.getHelixDataAccessor();
HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();

for (String workflowName : toBeDeletedWorkflows) {
LOG.warn(String.format(
"WorkflowContext exists for workflow %s. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowConte
Assert.assertTrue(contextDeleted);
}

Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
final long expiryTime = 5000L;
Workflow.Builder builder = new Workflow.Builder(workflowName);

Expand Down