Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Adding configuration to optionally ignore lock acquire failures and c…
Browse files Browse the repository at this point in the history
…ontinue execution.
  • Loading branch information
kishorebanala committed Nov 13, 2019
1 parent 952953e commit b25f608
Show file tree
Hide file tree
Showing 36 changed files with 390 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,4 @@ public ConsistencyLevel getReadConsistencyLevel() {
public ConsistencyLevel getWriteConsistencyLevel() {
return ConsistencyLevel.LOCAL_ONE;
}

@Override
public boolean enableWorkflowExecutionLock(){return true;}
}
18 changes: 9 additions & 9 deletions contribs/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -167,7 +167,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -337,7 +337,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -507,7 +507,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -667,7 +667,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -827,7 +827,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -1003,7 +1003,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -1179,7 +1179,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down Expand Up @@ -1355,7 +1355,7 @@
"locked": "1.11.86"
},
"com.amazonaws:aws-java-sdk-sqs": {
"locked": "1.11.666",
"locked": "1.11.673",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,15 @@ public interface Configuration {

String ADDITIONAL_MODULES_PROPERTY_NAME = "conductor.additional.modules";

String EXECUTION_LOCK_ENABLED_PROPERTY_NAME = "decider.locking.enabled";
String EXECUTION_LOCK_ENABLED_PROPERTY_NAME = "workflow.decider.locking.enabled";
boolean EXECUTION_LOCK_ENABLED_DEFAULT_VALUE = false;

String LOCKING_SERVER_PROPERTY_NAME = "locking.server";
String LOCKING_SERVER_PROPERTY_NAME = "workflow.decider.locking.server";
String LOCKING_SERVER_DEFAULT_VALUE = "noop_lock";

String IGNORE_LOCKING_EXCEPTIONS_PROPERTY_NAME = "workflow.decider.locking.exceptions.ignore";
boolean IGNORE_LOCKING_EXCEPTIONS_DEFAULT_VALUE = false;

//TODO add constants for input/output external payload related properties.

default DB getDB() {
Expand All @@ -88,6 +91,17 @@ default String getLockingServerString() {
return getProperty(LOCKING_SERVER_PROPERTY_NAME, LOCKING_SERVER_DEFAULT_VALUE).toUpperCase();
}

default boolean ignoreLockingExceptions() {
return getBooleanProperty(IGNORE_LOCKING_EXCEPTIONS_PROPERTY_NAME, IGNORE_LOCKING_EXCEPTIONS_DEFAULT_VALUE);
}

/**
* @return when set to true(default), locking is enabled for workflow execution
*/
default boolean enableWorkflowExecutionLock() {
return getBooleanProperty(EXECUTION_LOCK_ENABLED_PROPERTY_NAME, EXECUTION_LOCK_ENABLED_DEFAULT_VALUE);
}

/**
* @return time frequency in seconds, at which the workflow sweeper should run to evaluate running workflows.
*/
Expand All @@ -104,11 +118,6 @@ default String getLockingServerString() {
*/
boolean disableAsyncWorkers();

/**
* @return when set to true(default), locking is enabled for workflow execution
*/
boolean enableWorkflowExecutionLock();

/**
* @return ID of the server. Can be host name, IP address or any other meaningful identifier. Used for logging
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ public boolean disableAsyncWorkers() {
return Boolean.getBoolean(disable);
}

@Override
public boolean enableWorkflowExecutionLock() {
return getBooleanProperty(EXECUTION_LOCK_ENABLED_PROPERTY_NAME, EXECUTION_LOCK_ENABLED_DEFAULT_VALUE);
}

@Override
public String getServerId() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 Netflix, Inc.
* Copyright 2019 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -49,7 +49,6 @@
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
Expand Down Expand Up @@ -1036,7 +1035,7 @@ List<Task> dedupAndAddTasks(Workflow workflow, List<Task> tasks) {
*/
public void pauseWorkflow(String workflowId) {
try {
executionLockService.acquireLock(workflowId);
executionLockService.acquireLock(workflowId, 60000);
WorkflowStatus status = WorkflowStatus.PAUSED;
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, false);
if (workflow.getStatus().isTerminal()) {
Expand All @@ -1057,19 +1056,14 @@ public void pauseWorkflow(String workflowId) {
* @throws IllegalStateException
*/
public void resumeWorkflow(String workflowId) {
try {
executionLockService.acquireLock(workflowId);
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, false);
if (!workflow.getStatus().equals(WorkflowStatus.PAUSED)) {
throw new IllegalStateException("The workflow " + workflowId + " is not PAUSED so cannot resume. " +
"Current status is " + workflow.getStatus().name());
}
workflow.setStatus(WorkflowStatus.RUNNING);
executionDAOFacade.updateWorkflow(workflow);
decide(workflowId);
} finally {
executionLockService.releaseLock(workflowId);
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, false);
if (!workflow.getStatus().equals(WorkflowStatus.PAUSED)) {
throw new IllegalStateException("The workflow " + workflowId + " is not PAUSED so cannot resume. " +
"Current status is " + workflow.getStatus().name());
}
workflow.setStatus(WorkflowStatus.RUNNING);
executionDAOFacade.updateWorkflow(workflow);
decide(workflowId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ public String createWorkflow(Workflow workflow) {
* @param workflow the workflow tp be updated
* @return the id of the updated workflow
*/
//TODO: include version in workflow object
public String updateWorkflow(Workflow workflow) {
workflow.setUpdateTime(System.currentTimeMillis());
if (workflow.getStatus().isTerminal()) {
Expand Down
26 changes: 25 additions & 1 deletion core/src/main/java/com/netflix/conductor/core/utils/Lock.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.conductor.core.utils;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -33,7 +49,7 @@ public interface Lock {
boolean acquireLock(String lockId, long timeToTry, TimeUnit unit);

/**
* acquires a re-entrant lock on lockId, blocks for timeToTry duration before giving up
* acquires a re-entrant lock on lockId with provided leaseTime duration. Blocks for timeToTry duration before giving up
* @param lockId resource to lock on
* @param timeToTry blocks up to timeToTry duration in attempt to acquire the lock
* @param leaseTime Lock lease expiration duration.
Expand All @@ -42,7 +58,15 @@ public interface Lock {
*/
boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUnit unit);

/**
* Release a previously acquired lock
* @param lockId resource to lock on
*/
void releaseLock(String lockId);

/**
* Explicitly cleanup lock resources, if releasing it wouldn't do so.
* @param lockId resource to lock on
*/
void deleteLock(String lockId);
}
16 changes: 16 additions & 0 deletions core/src/main/java/com/netflix/conductor/core/utils/NoopLock.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.conductor.core.utils;

import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.conductor.core.utils;

import com.google.inject.AbstractModule;
import com.google.inject.Provides;

public class NoopLockModule extends AbstractModule {

@Override
protected void configure() {}

@Provides
protected Lock provideLock() {
return new NoopLock();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ public static void recordDiscardedIndexingCount(String queueType) {
getCounter(Monitors.classQualifier, "discarded_index_count", "queueType", queueType).increment();
}

public static void recordAcquireLockUnsuccessful(String lockId) {
counter(classQualifier, "acquire_lock_unsuccessful", "lockId", lockId);
public static void recordAcquireLockUnsuccessful() {
counter(classQualifier, "acquire_lock_unsuccessful");
}

public static void recordAcquireLockFailure(String exceptionClassName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
/*
* Copyright (c) 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.conductor.service;

import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.utils.Lock;
import com.netflix.conductor.metrics.Monitors;
Expand All @@ -12,7 +27,6 @@
import java.util.concurrent.TimeUnit;

public class ExecutionLockService {
public static final String LOCK_NAMESPACE = "executionlock";

private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionLockService.class);
private final Configuration config;
Expand All @@ -24,8 +38,8 @@ public class ExecutionLockService {
public ExecutionLockService(Configuration config, Provider<Lock> lockProvider) {
this.config = config;
this.lockProvider = lockProvider;
LOCK_LEASE_TIME = config.getLongProperty("locking.leaseTimeInMilliSeconds", 60000);
LOCK_TIME_TO_TRY = config.getLongProperty("locking.lockTimeToTryInMilliSeconds", 500);
LOCK_LEASE_TIME = config.getLongProperty("workflow.locking.lease.time.ms", 60000);
LOCK_TIME_TO_TRY = config.getLongProperty("workflow.locking.time.to.try.ms", 500);
}

/**
Expand All @@ -47,8 +61,8 @@ public boolean acquireLock(String lockId, long timeToTryMs, long leaseTimeMs) {
if (config.enableWorkflowExecutionLock()) {
Lock lock = lockProvider.get();
if (!lock.acquireLock(lockId, timeToTryMs, leaseTimeMs, TimeUnit.MILLISECONDS)) {
LOGGER.info("Thread {} failed to acquire lock to lockId {}.", Thread.currentThread().getId(), lockId);
Monitors.recordAcquireLockUnsuccessful(lockId);
LOGGER.debug("Thread {} failed to acquire lock to lockId {}.", Thread.currentThread().getId(), lockId);
Monitors.recordAcquireLockUnsuccessful();
return false;
}
LOGGER.debug("Thread {} acquired lock to lockId {}.", Thread.currentThread().getId(), lockId);
Expand Down
Loading

0 comments on commit b25f608

Please sign in to comment.