Skip to content

Commit

Permalink
Added mechanism to rampup Containerized from POLL dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1105 committed Feb 26, 2021
1 parent 270defa commit c3060d7
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 82 deletions.
2 changes: 2 additions & 0 deletions az-core/src/main/java/azkaban/Constants.java
Expand Up @@ -524,6 +524,8 @@ public static class ContainerizedDispatchManagerProperties {
AZKABAN_CONTAINERIZED_PREFIX + "execution.processing.thread.pool.size";
public static final String CONTAINERIZED_CREATION_RATE_LIMIT =
AZKABAN_CONTAINERIZED_PREFIX + "creation.rate.limit";
public static final String CONTAINERIZED_RAMPUP =
AZKABAN_CONTAINERIZED_PREFIX + "rampup";

// Kubernetes related properties
public static final String AZKABAN_KUBERNETES_PREFIX = "azkaban.kubernetes.";
Expand Down
15 changes: 12 additions & 3 deletions azkaban-common/src/main/java/azkaban/DispatchMethod.java
Expand Up @@ -22,10 +22,19 @@
* This enum contains list of dispatch types implemented in Azkaban.
*/
public enum DispatchMethod {
PUSH,
POLL,
CONTAINERIZED;
PUSH(0),
POLL(1),
CONTAINERIZED(2);
private static final Logger logger = LoggerFactory.getLogger(DispatchMethod.class);
private final int numVal;

DispatchMethod(final int numVal) {
this.numVal = numVal;
}

public int getNumVal() {
return this.numVal;
}

public static DispatchMethod getDispatchMethod(String value) {
try {
Expand Down
Expand Up @@ -280,6 +280,7 @@ protected String uploadExecutableFlow(
exflow.setSubmitUser(userId);
exflow.setStatus(getStartStatus());
exflow.setSubmitTime(System.currentTimeMillis());
exflow.setDispatchMethod(getDispatchMethod());

// Get collection of running flows given a project and a specific flow name
final List<Integer> running = getRunningFlows(projectId, flowId);
Expand Down
10 changes: 10 additions & 0 deletions azkaban-common/src/main/java/azkaban/executor/ExecutableFlow.java
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.flow.Flow;
import azkaban.project.Project;
import azkaban.sla.SlaOption;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class ExecutableFlow extends ExecutableFlowBase {
// For Flow_Status_Changed event
private String failedJobId = "unknown";
private String modifiedBy = "unknown";
private DispatchMethod dispatchMethod;

// For slaOption information
private String slaOptionStr = "null";
Expand Down Expand Up @@ -101,6 +103,14 @@ public static ExecutableFlow createExecutableFlow(final Object obj, final Status
return exFlow;
}

public DispatchMethod getDispatchMethod() {
return this.dispatchMethod;
}

public void setDispatchMethod(final DispatchMethod dispatchMethod) {
this.dispatchMethod = dispatchMethod;
}

@Override
public String getId() {
return getFlowId();
Expand Down
Expand Up @@ -45,7 +45,7 @@ public class ExecutionController extends AbstractExecutorManagerAdapter {


@Inject
protected ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
public ExecutionController(final Props azkProps, final ExecutorLoader executorLoader,
final CommonMetrics commonMetrics,
final ExecutorApiGateway apiGateway, final AlerterHolder alerterHolder, final
ExecutorHealthChecker executorHealthChecker) {
Expand Down
30 changes: 19 additions & 11 deletions azkaban-common/src/main/java/azkaban/executor/ExecutionFlowDao.java
Expand Up @@ -16,6 +16,7 @@

package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.db.DatabaseOperator;
import azkaban.db.EncodingType;
import azkaban.db.SQLTransaction;
Expand Down Expand Up @@ -71,9 +72,11 @@ public void uploadExecutableFlow(final ExecutableFlow flow)

final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows "
+ "(project_id, flow_id, version, status, submit_time, submit_user, update_time, "
+ "use_executor, flow_priority, execution_source) values (?,?,?,?,?,?,?,?,?,?)";
+ "use_executor, flow_priority, execution_source, dispatch_method) values (?,?,?,?,?,?,?,"
+ "?,?,?,?)";
final long submitTime = flow.getSubmitTime();
final String executionSource = flow.getExecutionSource();
final DispatchMethod dispatchMethod = flow.getDispatchMethod();

/**
* Why we need a transaction to get last insert ID?
Expand All @@ -84,7 +87,8 @@ public void uploadExecutableFlow(final ExecutableFlow flow)
final SQLTransaction<Long> insertAndGetLastID = transOperator -> {
transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
flow.getFlowId(), flow.getVersion(), flow.getStatus().getNumVal(),
submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority, executionSource);
submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority, executionSource
, dispatchMethod.getNumVal());
transOperator.getConnection().commit();
return transOperator.getLastInsertId();
};
Expand Down Expand Up @@ -380,7 +384,8 @@ public void unsetExecutorIdForExecution(final int executionId) throws ExecutorMa
}
}

public int selectAndUpdateExecution(final int executorId, final boolean isActive)
public int selectAndUpdateExecution(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ?, update_time = ?, status=? "
+ "where exec_id = ?";
Expand All @@ -392,7 +397,7 @@ public int selectAndUpdateExecution(final int executorId, final boolean isActive
transOperator.getConnection().setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);

final List<Integer> execIds = transOperator.query(selectExecutionForUpdate,
new SelectFromExecutionFlows(), Status.READY.getNumVal(), executorId);
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(), executorId);

int execId = -1;
if (!execIds.isEmpty()) {
Expand All @@ -412,7 +417,8 @@ public int selectAndUpdateExecution(final int executorId, final boolean isActive
}
}

public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive)
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ?, update_time = ?, status=? "
+ "where exec_id = ?";
Expand All @@ -427,7 +433,8 @@ public int selectAndUpdateExecutionWithLocking(final int executorId, final boole
if (hasLocked) {
try {
final List<Integer> execIds = transOperator.query(selectExecutionForUpdate,
new SelectFromExecutionFlows(), Status.READY.getNumVal(), executorId);
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(),
executorId);
if (CollectionUtils.isNotEmpty(execIds)) {
execId = execIds.get(0);
transOperator.update(UPDATE_EXECUTION, executorId, System.currentTimeMillis(),
Expand Down Expand Up @@ -464,7 +471,8 @@ public int selectAndUpdateExecutionWithLocking(final int executorId, final boole
*/
public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabled,
final int limit,
final Status updatedStatus)
final Status updatedStatus,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
final String UPDATE_EXECUTION = "UPDATE execution_flows SET status = ?, update_time = ? "
+ "where exec_id = ?";
Expand All @@ -479,11 +487,11 @@ public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabl
if (batchEnabled) {
execIds = transOperator.query(String
.format(SelectFromExecutionFlows.SELECT_EXECUTION_IN_BATCH_FOR_UPDATE_FORMAT, ""),
new SelectFromExecutionFlows(), Status.READY.getNumVal(), limit);
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(), limit);
} else {
execIds = transOperator.query(
String.format(SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_FORMAT, ""),
new SelectFromExecutionFlows(), Status.READY.getNumVal());
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal());
}
if (CollectionUtils.isNotEmpty(execIds)) {
executions.addAll(execIds);
Expand Down Expand Up @@ -544,14 +552,14 @@ public static class SelectFromExecutionFlows implements

private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT =
"SELECT exec_id from execution_flows WHERE exec_id = (SELECT exec_id from execution_flows"
+ " WHERE status = ?"
+ " WHERE status = ? and dispatch_method = ?"
+ " and executor_id is NULL and flow_data is NOT NULL %s"
+ " ORDER BY flow_priority DESC, update_time ASC, exec_id ASC LIMIT 1) and "
+ "executor_id is NULL FOR UPDATE";

private static final String SELECT_EXECUTION_IN_BATCH_FOR_UPDATE_FORMAT =
"SELECT exec_id from execution_flows WHERE exec_id in (SELECT exec_id from execution_flows"
+ " WHERE status = ?"
+ " WHERE status = ? and dispatch_method = ?"
+ " and executor_id is NULL and flow_data is NOT NULL %s ) "
+ " ORDER BY flow_priority DESC, update_time ASC, exec_id ASC "
+ " LIMIT ? FOR UPDATE";
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
Expand Down Expand Up @@ -309,10 +310,10 @@ int removeExecutionLogsByTime(long millis, int recordCleanupLimit)

void unsetExecutorIdForExecution(final int executionId) throws ExecutorManagerException;

int selectAndUpdateExecution(final int executorId, boolean isActive)
int selectAndUpdateExecution(final int executorId, boolean isActive, final DispatchMethod dispatchMethod)
throws ExecutorManagerException;

int selectAndUpdateExecutionWithLocking(final int executorId, boolean isActive)
int selectAndUpdateExecutionWithLocking(final int executorId, boolean isActive, final DispatchMethod dispatchMethod)
throws ExecutorManagerException;

/**
Expand All @@ -328,7 +329,7 @@ int selectAndUpdateExecutionWithLocking(final int executorId, boolean isActive)
* @throws ExecutorManagerException
*/
Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabled, final int limit,
Status updatedStatus) throws ExecutorManagerException;
Status updatedStatus, final DispatchMethod dispatchMethod) throws ExecutorManagerException;

ExecutableRampMap fetchExecutableRampMap()
throws ExecutorManagerException;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.executor;

import azkaban.DispatchMethod;
import azkaban.executor.ExecutorLogEvent.EventType;
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
Expand Down Expand Up @@ -368,22 +369,24 @@ public void unassignExecutor(final int executionId) throws ExecutorManagerExcept
}

@Override
public int selectAndUpdateExecution(final int executorId, final boolean isActive)
public int selectAndUpdateExecution(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
return this.executionFlowDao.selectAndUpdateExecution(executorId, isActive);
return this.executionFlowDao.selectAndUpdateExecution(executorId, isActive, dispatchMethod);
}

@Override
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive)
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive,
final DispatchMethod dispatchMethod)
throws ExecutorManagerException {
return this.executionFlowDao.selectAndUpdateExecutionWithLocking(executorId, isActive);
return this.executionFlowDao.selectAndUpdateExecutionWithLocking(executorId, isActive, dispatchMethod);
}

@Override
public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabled, int limit,
Status updatedStatus) throws ExecutorManagerException {
Status updatedStatus, final DispatchMethod dispatchMethod) throws ExecutorManagerException {
return this.executionFlowDao.selectAndUpdateExecutionWithLocking(batchEnabled, limit,
updatedStatus);
updatedStatus, dispatchMethod);
}

@Override
Expand Down
Expand Up @@ -33,13 +33,15 @@
import azkaban.utils.FileIOUtils.LogData;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -60,7 +62,7 @@
*/
@Singleton
public class ContainerizedDispatchManager extends AbstractExecutorManagerAdapter {

private int rampUp;
private final ContainerizedImpl containerizedImpl;
private QueueProcessorThread queueProcessor;
private final RateLimiter rateLimiter;
Expand All @@ -76,6 +78,19 @@ public ContainerizedDispatchManager(final Props azkProps, final ExecutorLoader e
RateLimiter.create(azkProps
.getInt(ContainerizedDispatchManagerProperties.CONTAINERIZED_CREATION_RATE_LIMIT, 20));
this.containerizedImpl = containerizedImpl;
int rampUp = azkProps.getInt(ContainerizedDispatchManagerProperties.CONTAINERIZED_RAMPUP, 100);
if (rampUp > 100 || rampUp < 0) {
String errorMessage = "RampUp must be an integer between [0, 100]: " + rampUp;
logger.error(errorMessage);
throw new ExecutorManagerException(errorMessage);
} else {
this.rampUp = rampUp;
}
}

@VisibleForTesting
public void setRampUp(int rampUp) {
this.rampUp = rampUp;
}

/**
Expand Down Expand Up @@ -123,7 +138,18 @@ public long getQueuedFlowSize() {
*/
@Override
public DispatchMethod getDispatchMethod() {
return DispatchMethod.CONTAINERIZED;
if (this.rampUp == 0) {
return DispatchMethod.POLL;
} else if (this.rampUp == 100) {
return DispatchMethod.CONTAINERIZED;
}
Random rand = new Random();
int randomInt = rand.nextInt(100);
if (randomInt < this.rampUp) {
return DispatchMethod.CONTAINERIZED;
} else {
return DispatchMethod.POLL;
}
}

/**
Expand Down Expand Up @@ -240,7 +266,7 @@ private void processQueuedFlows() throws ExecutorManagerException {
final Set<Integer> executionIds =
executorLoader.selectAndUpdateExecutionWithLocking(this.executionsBatchProcessingEnabled,
this.executionsBatchSize,
Status.DISPATCHING);
Status.DISPATCHING, DispatchMethod.CONTAINERIZED);

for (final int executionId : executionIds) {
rateLimiter.acquire();
Expand Down
Expand Up @@ -116,6 +116,21 @@ public void setup() throws Exception {
executionReferencePair));
}

@Test
public void testRampUpDispatchMethod() throws Exception {
initializeContainerizedDispatchImpl();
this.containerizedDispatchManager.setRampUp(0);
for (int i = 0; i < 100; i++) {
DispatchMethod dispatchMethod = this.containerizedDispatchManager.getDispatchMethod();
assertThat(dispatchMethod).isEqualTo(DispatchMethod.POLL);
}
this.containerizedDispatchManager.setRampUp(100);
for (int i = 0; i < 100; i++) {
DispatchMethod dispatchMethod = this.containerizedDispatchManager.getDispatchMethod();
assertThat(dispatchMethod).isEqualTo(DispatchMethod.CONTAINERIZED);
}
}

@Test
public void testFetchAllActiveFlows() throws Exception {
initializeContainerizedDispatchImpl();
Expand Down

0 comments on commit c3060d7

Please sign in to comment.