Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added mechanism to rampup Containerized from POLL dispatch #2779

Merged
merged 1 commit into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions az-core/src/main/java/azkaban/Constants.java
Expand Up @@ -524,6 +524,8 @@ public static class ContainerizedDispatchManagerProperties {
AZKABAN_CONTAINERIZED_PREFIX + "execution.processing.thread.pool.size";
public static final String CONTAINERIZED_CREATION_RATE_LIMIT =
AZKABAN_CONTAINERIZED_PREFIX + "creation.rate.limit";
public static final String CONTAINERIZED_RAMPUP =
AZKABAN_CONTAINERIZED_PREFIX + "rampup";

// Kubernetes related properties
public static final String AZKABAN_KUBERNETES_PREFIX = "azkaban.kubernetes.";
Expand Down
15 changes: 12 additions & 3 deletions azkaban-common/src/main/java/azkaban/DispatchMethod.java
Expand Up @@ -22,10 +22,19 @@
* This enum contains list of dispatch types implemented in Azkaban.
*/
public enum DispatchMethod {
PUSH,
POLL,
CONTAINERIZED;
PUSH(0),
POLL(1),
CONTAINERIZED(2);
private static final Logger logger = LoggerFactory.getLogger(DispatchMethod.class);
private final int numVal;

DispatchMethod(final int numVal) {
this.numVal = numVal;
}

public int getNumVal() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any need to add method which will return enum for given num value?
What is the significance of this num value? Can you please add it in comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Till now we don't have a use-case for num to enum.
We are using Int as the data-type of dispatch_method in the execution_flows table and hence getNumVal is needed.

return this.numVal;
}

public static DispatchMethod getDispatchMethod(String value) {
try {
Expand Down
Expand Up @@ -280,6 +280,7 @@ protected String uploadExecutableFlow(
exflow.setSubmitUser(userId);
exflow.setStatus(getStartStatus());
exflow.setSubmitTime(System.currentTimeMillis());
exflow.setDispatchMethod(getDispatchMethod());

// Get collection of running flows given a project and a specific flow name
final List<Integer> running = getRunningFlows(projectId, flowId);
Expand Down
10 changes: 10 additions & 0 deletions azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.sla.SlaOption;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
// For Flow_Status_Changed event
private String failedJobId = "unknown";
private String modifiedBy = "unknown";
private DispatchMethod dispatchMethod;

// For slaOption information
private String slaOptionStr = "null";
Expand Down Expand Up @@ -101,6 +103,14 @@ public static ExecutableFlow createExecutableFlow(final Object obj, final Status
return exFlow;
}

public DispatchMethod getDispatchMethod() {
return this.dispatchMethod;
}

public void setDispatchMethod(final DispatchMethod dispatchMethod) {
this.dispatchMethod = dispatchMethod;
}

@Override
public String getId() {
return getFlowId();
Expand Down
Expand Up @@ -45,7 +45,7 @@ public class ExecutionController extends AbstractExecutorManagerAdapter {


@Inject
protected ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
public ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
final CommonMetrics commonMetrics,
final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder, final
ExecutorHealthChecker executorHealthChecker) {
Expand Down
30 changes: 19 additions & 11 deletions azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
Expand Up @@ -16,6 +16,7 @@

package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.db.DatabaseOperator;
import azkaban.db.EncodingType;
import azkaban.db.SQLTransaction;
Expand Down Expand Up @@ -71,9 +72,11 @@ public void uploadExecutableFlow(final ExecutableFlow flow)

final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
+ "(project_id, flow_id, version, status, submit_time, submit_user, update_time, "
+ "use_executor, flow_priority, execution_source) values (?,?,?,?,?,?,?,?,?,?)";
+ "use_executor, flow_priority, execution_source, dispatch_method) values (?,?,?,?,?,?,?,"
+ "?,?,?,?)";
final long submitTime = flow.getSubmitTime();
final String executionSource = flow.getExecutionSource();
final DispatchMethod dispatchMethod = flow.getDispatchMethod();

/**
* Why we need a transaction to get last insert ID?
Expand All @@ -84,7 +87,8 @@ public void uploadExecutableFlow(final ExecutableFlow flow)
final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
flow.getFlowId(), flow.getVersion(), flow.getStatus().getNumVal(),
submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority, executionSource);
submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority, executionSource
, dispatchMethod.getNumVal());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you are keeping num value for the optimization that int will provide from database side instead of enum name itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

transOperator.getConnection().commit();
return transOperator.getLastInsertId();
};
Expand Down Expand Up @@ -380,7 +384,8 @@ public void unsetExecutorIdForExecution(final int executionId) throws ExecutorMa
}
}

public int selectAndUpdateExecution(final int executorId, final boolean isActive)
public int selectAndUpdateExecution(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ?, update_time = ?, status=? "
+ "where exec_id = ?";
Expand All @@ -392,7 +397,7 @@ public int selectAndUpdateExecution(final int executorId, final boolean isActive
transOperator.getConnection().setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);

final List<Integer> execIds = transOperator.query(selectExecutionForUpdate,
new SelectFromExecutionFlows(), Status.READY.getNumVal(), executorId);
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(), executorId);

int execId = -1;
if (!execIds.isEmpty()) {
Expand All @@ -412,7 +417,8 @@ public int selectAndUpdateExecution(final int executorId, final boolean isActive
}
}

public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive)
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ?, update_time = ?, status=? "
+ "where exec_id = ?";
Expand All @@ -427,7 +433,8 @@ public int selectAndUpdateExecutionWithLocking(final int executorId, final boole
if (hasLocked) {
try {
final List<Integer> execIds = transOperator.query(selectExecutionForUpdate,
new SelectFromExecutionFlows(), Status.READY.getNumVal(), executorId);
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(),
executorId);
if (CollectionUtils.isNotEmpty(execIds)) {
execId = execIds.get(0);
transOperator.update(UPDATE_EXECUTION, executorId, System.currentTimeMillis(),
Expand Down Expand Up @@ -464,7 +471,8 @@ public int selectAndUpdateExecutionWithLocking(final int executorId, final boole
*/
public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabled,
final int limit,
final Status updatedStatus)
final Status updatedStatus,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
final String UPDATE_EXECUTION = "UPDATE execution_flows SET status = ?, update_time = ? "
+ "where exec_id = ?";
Expand All @@ -479,11 +487,11 @@ public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabl
if (batchEnabled) {
execIds = transOperator.query(String
.format(SelectFromExecutionFlows.SELECT_EXECUTION_IN_BATCH_FOR_UPDATE_FORMAT, ""),
new SelectFromExecutionFlows(), Status.READY.getNumVal(), limit);
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(), limit);
} else {
execIds = transOperator.query(
String.format(SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_FORMAT, ""),
new SelectFromExecutionFlows(), Status.READY.getNumVal());
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal());
}
if (CollectionUtils.isNotEmpty(execIds)) {
executions.addAll(execIds);
Expand Down Expand Up @@ -544,14 +552,14 @@ public static class SelectFromExecutionFlows implements

private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT =
"SELECT exec_id from execution_flows WHERE exec_id = (SELECT exec_id from execution_flows"
+ " WHERE status = ?"
+ " WHERE status = ? and dispatch_method = ?"
+ " and executor_id is NULL and flow_data is NOT NULL %s"
+ " ORDER BY flow_priority DESC, update_time ASC, exec_id ASC LIMIT 1) and "
+ "executor_id is NULL FOR UPDATE";

private static final String SELECT_EXECUTION_IN_BATCH_FOR_UPDATE_FORMAT =
"SELECT exec_id from execution_flows WHERE exec_id in (SELECT exec_id from execution_flows"
+ " WHERE status = ?"
+ " WHERE status = ? and dispatch_method = ?"
+ " and executor_id is NULL and flow_data is NOT NULL %s ) "
+ " ORDER BY flow_priority DESC, update_time ASC, exec_id ASC "
+ " LIMIT ? FOR UPDATE";
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
Expand Down Expand Up @@ -309,10 +310,10 @@ int removeExecutionLogsByTime(long millis, int recordCleanupLimit)

void unsetExecutorIdForExecution(final int executionId) throws ExecutorManagerException;

int selectAndUpdateExecution(final int executorId, boolean isActive)
int selectAndUpdateExecution(final int executorId, boolean isActive, final DispatchMethod dispatchMethod)
throws ExecutorManagerException;

int selectAndUpdateExecutionWithLocking(final int executorId, boolean isActive)
int selectAndUpdateExecutionWithLocking(final int executorId, boolean isActive, final DispatchMethod dispatchMethod)
throws ExecutorManagerException;

/**
Expand All @@ -328,7 +329,7 @@ int selectAndUpdateExecutionWithLocking(final int executorId, boolean isActive)
* @throws ExecutorManagerException
*/
Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabled, final int limit,
Status updatedStatus) throws ExecutorManagerException;
Status updatedStatus, final DispatchMethod dispatchMethod) throws ExecutorManagerException;

ExecutableRampMap fetchExecutableRampMap()
throws ExecutorManagerException;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
Expand Down Expand Up @@ -368,22 +369,24 @@ public void unassignExecutor(final int executionId) throws ExecutorManagerExcept
}

@Override
public int selectAndUpdateExecution(final int executorId, final boolean isActive)
public int selectAndUpdateExecution(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
return this.executionFlowDao.selectAndUpdateExecution(executorId, isActive);
return this.executionFlowDao.selectAndUpdateExecution(executorId, isActive, dispatchMethod);
}

@Override
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive)
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
return this.executionFlowDao.selectAndUpdateExecutionWithLocking(executorId, isActive);
return this.executionFlowDao.selectAndUpdateExecutionWithLocking(executorId, isActive, dispatchMethod);
}

@Override
public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabled, int limit,
Status updatedStatus) throws ExecutorManagerException {
Status updatedStatus, final DispatchMethod dispatchMethod) throws ExecutorManagerException {
return this.executionFlowDao.selectAndUpdateExecutionWithLocking(batchEnabled, limit,
updatedStatus);
updatedStatus, dispatchMethod);
}

@Override
Expand Down
Expand Up @@ -33,13 +33,15 @@
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -60,7 +62,7 @@
*/
@Singleton
public class ContainerizedDispatchManager extends AbstractExecutorManagerAdapter {

private int rampUp;
private final ContainerizedImpl containerizedImpl;
private QueueProcessorThread queueProcessor;
private final RateLimiter rateLimiter;
Expand All @@ -76,6 +78,19 @@ public ContainerizedDispatchManager(final Props azkProps, final ExecutorLoader e
RateLimiter.create(azkProps
.getInt(ContainerizedDispatchManagerProperties.CONTAINERIZED_CREATION_RATE_LIMIT, 20));
this.containerizedImpl = containerizedImpl;
int rampUp = azkProps.getInt(ContainerizedDispatchManagerProperties.CONTAINERIZED_RAMPUP, 100);
if (rampUp > 100 || rampUp < 0) {
String errorMessage = "RampUp must be an integer between [0, 100]: " + rampUp;
logger.error(errorMessage);
throw new ExecutorManagerException(errorMessage);
} else {
this.rampUp = rampUp;
}
}

@VisibleForTesting
public void setRampUp(int rampUp) {
this.rampUp = rampUp;
}

/**
Expand Down Expand Up @@ -123,7 +138,18 @@ public long getQueuedFlowSize() {
*/
@Override
public DispatchMethod getDispatchMethod() {
return DispatchMethod.CONTAINERIZED;
if (this.rampUp == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DispatchMethod is very useful and it controlling the behaviour at the moment by turning off and turning on feature. DispatchMethod is currently specified through configs. Hence, dispatch method is initialized during web server startup. The containerization feature is turned off if dispath method is POLL. If dispatch method is set to CONTAINERIZED then only all the guava juice binding for containerization are initialized. As we are dynamically toggling between POLL and CONTAINERIZED based on rampup we need to reload the bindings as well. In other words, we need to always intialized the bindings or look at different way of handling this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I didn't see any changes related to injection over here. How will implementation for POLL method be injected in case of ramp up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a difference between DispatchMethod.CONTAINERIZED specified through the configs and that will always be DispatchMethod.CONTAINERIZED for the ramp-up feature.
Here it is about the return value of getDispatchMethod() which till now is not used anywhere in the code and hence doesn't affect anything. The return value of this is utilized to set the value of dispatch_method column in the execution_flows table while inserting the entry to the table. Hence at no place injections will be affected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. And specifying dispatch method as POLL from executor side will take care of polling executions. Thank you for clarifying this.

return DispatchMethod.POLL;
} else if (this.rampUp == 100) {
return DispatchMethod.CONTAINERIZED;
}
Random rand = new Random();
int randomInt = rand.nextInt(100);
if (randomInt < this.rampUp) {
return DispatchMethod.CONTAINERIZED;
} else {
return DispatchMethod.POLL;
}
}

/**
Expand Down Expand Up @@ -240,7 +266,7 @@ private void processQueuedFlows() throws ExecutorManagerException {
final Set<Integer> executionIds =
executorLoader.selectAndUpdateExecutionWithLocking(this.executionsBatchProcessingEnabled,
this.executionsBatchSize,
Status.DISPATCHING);
Status.DISPATCHING, DispatchMethod.CONTAINERIZED);

for (final int executionId : executionIds) {
rateLimiter.acquire();
Expand Down
Expand Up @@ -116,6 +116,21 @@ public void setup() throws Exception {
executionReferencePair));
}

@Test
public void testRampUpDispatchMethod() throws Exception {
initializeContainerizedDispatchImpl();
this.containerizedDispatchManager.setRampUp(0);
for (int i = 0; i < 100; i++) {
DispatchMethod dispatchMethod = this.containerizedDispatchManager.getDispatchMethod();
assertThat(dispatchMethod).isEqualTo(DispatchMethod.POLL);
}
this.containerizedDispatchManager.setRampUp(100);
for (int i = 0; i < 100; i++) {
DispatchMethod dispatchMethod = this.containerizedDispatchManager.getDispatchMethod();
assertThat(dispatchMethod).isEqualTo(DispatchMethod.CONTAINERIZED);
}
}

@Test
public void testFetchAllActiveFlows() throws Exception {
initializeContainerizedDispatchImpl();
Expand Down