New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added mechanism to rampup Containerized from POLL dispatch #2779
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
package azkaban.executor; | ||
|
||
import azkaban.DispatchMethod; | ||
import azkaban.db.DatabaseOperator; | ||
import azkaban.db.EncodingType; | ||
import azkaban.db.SQLTransaction; | ||
|
@@ -71,9 +72,11 @@ public void uploadExecutableFlow(final ExecutableFlow flow) | |
|
||
final String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows " | ||
+ "(project_id, flow_id, version, status, submit_time, submit_user, update_time, " | ||
+ "use_executor, flow_priority, execution_source) values (?,?,?,?,?,?,?,?,?,?)"; | ||
+ "use_executor, flow_priority, execution_source, dispatch_method) values (?,?,?,?,?,?,?," | ||
+ "?,?,?,?)"; | ||
final long submitTime = flow.getSubmitTime(); | ||
final String executionSource = flow.getExecutionSource(); | ||
final DispatchMethod dispatchMethod = flow.getDispatchMethod(); | ||
|
||
/** | ||
* Why we need a transaction to get last insert ID? | ||
|
@@ -84,7 +87,8 @@ public void uploadExecutableFlow(final ExecutableFlow flow) | |
final SQLTransaction<Long> insertAndGetLastID = transOperator -> { | ||
transOperator.update(INSERT_EXECUTABLE_FLOW, flow.getProjectId(), | ||
flow.getFlowId(), flow.getVersion(), flow.getStatus().getNumVal(), | ||
submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority, executionSource); | ||
submitTime, flow.getSubmitUser(), submitTime, executorId, flowPriority, executionSource | ||
, dispatchMethod.getNumVal()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume you are keeping num value for the optimization that int will provide from database side instead of enum name itself. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
transOperator.getConnection().commit(); | ||
return transOperator.getLastInsertId(); | ||
}; | ||
|
@@ -380,7 +384,8 @@ public void unsetExecutorIdForExecution(final int executionId) throws ExecutorMa | |
} | ||
} | ||
|
||
public int selectAndUpdateExecution(final int executorId, final boolean isActive) | ||
public int selectAndUpdateExecution(final int executorId, final boolean isActive, | ||
final DispatchMethod dispatchMethod) | ||
throws ExecutorManagerException { | ||
final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ?, update_time = ?, status=? " | ||
+ "where exec_id = ?"; | ||
|
@@ -392,7 +397,7 @@ public int selectAndUpdateExecution(final int executorId, final boolean isActive | |
transOperator.getConnection().setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); | ||
|
||
final List<Integer> execIds = transOperator.query(selectExecutionForUpdate, | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal(), executorId); | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(), executorId); | ||
|
||
int execId = -1; | ||
if (!execIds.isEmpty()) { | ||
|
@@ -412,7 +417,8 @@ public int selectAndUpdateExecution(final int executorId, final boolean isActive | |
} | ||
} | ||
|
||
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive) | ||
public int selectAndUpdateExecutionWithLocking(final int executorId, final boolean isActive, | ||
final DispatchMethod dispatchMethod) | ||
throws ExecutorManagerException { | ||
final String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ?, update_time = ?, status=? " | ||
+ "where exec_id = ?"; | ||
|
@@ -427,7 +433,8 @@ public int selectAndUpdateExecutionWithLocking(final int executorId, final boole | |
if (hasLocked) { | ||
try { | ||
final List<Integer> execIds = transOperator.query(selectExecutionForUpdate, | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal(), executorId); | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(), | ||
executorId); | ||
if (CollectionUtils.isNotEmpty(execIds)) { | ||
execId = execIds.get(0); | ||
transOperator.update(UPDATE_EXECUTION, executorId, System.currentTimeMillis(), | ||
|
@@ -464,7 +471,8 @@ public int selectAndUpdateExecutionWithLocking(final int executorId, final boole | |
*/ | ||
public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabled, | ||
final int limit, | ||
final Status updatedStatus) | ||
final Status updatedStatus, | ||
final DispatchMethod dispatchMethod) | ||
throws ExecutorManagerException { | ||
final String UPDATE_EXECUTION = "UPDATE execution_flows SET status = ?, update_time = ? " | ||
+ "where exec_id = ?"; | ||
|
@@ -479,11 +487,11 @@ public Set<Integer> selectAndUpdateExecutionWithLocking(final boolean batchEnabl | |
if (batchEnabled) { | ||
execIds = transOperator.query(String | ||
.format(SelectFromExecutionFlows.SELECT_EXECUTION_IN_BATCH_FOR_UPDATE_FORMAT, ""), | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal(), limit); | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal(), limit); | ||
} else { | ||
execIds = transOperator.query( | ||
String.format(SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_FORMAT, ""), | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal()); | ||
new SelectFromExecutionFlows(), Status.READY.getNumVal(), dispatchMethod.getNumVal()); | ||
} | ||
if (CollectionUtils.isNotEmpty(execIds)) { | ||
executions.addAll(execIds); | ||
|
@@ -544,14 +552,14 @@ public static class SelectFromExecutionFlows implements | |
|
||
private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT = | ||
"SELECT exec_id from execution_flows WHERE exec_id = (SELECT exec_id from execution_flows" | ||
+ " WHERE status = ?" | ||
+ " WHERE status = ? and dispatch_method = ?" | ||
+ " and executor_id is NULL and flow_data is NOT NULL %s" | ||
+ " ORDER BY flow_priority DESC, update_time ASC, exec_id ASC LIMIT 1) and " | ||
+ "executor_id is NULL FOR UPDATE"; | ||
|
||
private static final String SELECT_EXECUTION_IN_BATCH_FOR_UPDATE_FORMAT = | ||
"SELECT exec_id from execution_flows WHERE exec_id in (SELECT exec_id from execution_flows" | ||
+ " WHERE status = ?" | ||
+ " WHERE status = ? and dispatch_method = ?" | ||
+ " and executor_id is NULL and flow_data is NOT NULL %s ) " | ||
+ " ORDER BY flow_priority DESC, update_time ASC, exec_id ASC " | ||
+ " LIMIT ? FOR UPDATE"; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,13 +33,15 @@ | |
import azkaban.utils.FileIOUtils.LogData; | ||
import azkaban.utils.Pair; | ||
import azkaban.utils.Props; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.util.concurrent.RateLimiter; | ||
import java.io.IOException; | ||
import java.lang.Thread.State; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
@@ -60,7 +62,7 @@ | |
*/ | ||
@Singleton | ||
public class ContainerizedDispatchManager extends AbstractExecutorManagerAdapter { | ||
|
||
private int rampUp; | ||
private final ContainerizedImpl containerizedImpl; | ||
private QueueProcessorThread queueProcessor; | ||
private final RateLimiter rateLimiter; | ||
|
@@ -76,6 +78,19 @@ public ContainerizedDispatchManager(final Props azkProps, final ExecutorLoader e | |
RateLimiter.create(azkProps | ||
.getInt(ContainerizedDispatchManagerProperties.CONTAINERIZED_CREATION_RATE_LIMIT, 20)); | ||
this.containerizedImpl = containerizedImpl; | ||
int rampUp = azkProps.getInt(ContainerizedDispatchManagerProperties.CONTAINERIZED_RAMPUP, 100); | ||
if (rampUp > 100 || rampUp < 0) { | ||
String errorMessage = "RampUp must be an integer between [0, 100]: " + rampUp; | ||
logger.error(errorMessage); | ||
throw new ExecutorManagerException(errorMessage); | ||
} else { | ||
this.rampUp = rampUp; | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
public void setRampUp(int rampUp) { | ||
this.rampUp = rampUp; | ||
} | ||
|
||
/** | ||
|
@@ -123,7 +138,18 @@ public long getQueuedFlowSize() { | |
*/ | ||
@Override | ||
public DispatchMethod getDispatchMethod() { | ||
return DispatchMethod.CONTAINERIZED; | ||
if (this.rampUp == 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DispatchMethod is very useful and it controlling the behaviour at the moment by turning off and turning on feature. DispatchMethod is currently specified through configs. Hence, dispatch method is initialized during web server startup. The containerization feature is turned off if dispath method is POLL. If dispatch method is set to CONTAINERIZED then only all the guava juice binding for containerization are initialized. As we are dynamically toggling between POLL and CONTAINERIZED based on rampup we need to reload the bindings as well. In other words, we need to always intialized the bindings or look at different way of handling this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. I didn't see any changes related to injection over here. How will implementation for POLL method be injected in case of ramp up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a difference between DispatchMethod.CONTAINERIZED specified through the configs and that will always be DispatchMethod.CONTAINERIZED for the ramp-up feature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. And specifying dispatch method as POLL from executor side will take care of polling executions. Thank you for clarifying this. |
||
return DispatchMethod.POLL; | ||
} else if (this.rampUp == 100) { | ||
return DispatchMethod.CONTAINERIZED; | ||
} | ||
Random rand = new Random(); | ||
int randomInt = rand.nextInt(100); | ||
if (randomInt < this.rampUp) { | ||
return DispatchMethod.CONTAINERIZED; | ||
} else { | ||
return DispatchMethod.POLL; | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -240,7 +266,7 @@ private void processQueuedFlows() throws ExecutorManagerException { | |
final Set<Integer> executionIds = | ||
executorLoader.selectAndUpdateExecutionWithLocking(this.executionsBatchProcessingEnabled, | ||
this.executionsBatchSize, | ||
Status.DISPATCHING); | ||
Status.DISPATCHING, DispatchMethod.CONTAINERIZED); | ||
|
||
for (final int executionId : executionIds) { | ||
rateLimiter.acquire(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any need to add method which will return enum for given num value?
What is the significance of this num value? Can you please add it in comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Till now we don't have a use-case for num to enum.
We are using Int as the data-type of dispatch_method in the execution_flows table and hence getNumVal is needed.