Skip to content

Commit

Permalink
fix graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Apr 7, 2016
1 parent 8d1cf15 commit e9613d6
Showing 1 changed file with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.PreparedStatementSetter;
Expand All @@ -40,8 +41,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* Use setter injection because constructor injection may not work when nFlow is
* used in some legacy systems.
* Use setter injection because constructor injection may not work when nFlow is used in some legacy systems.
*/
@Component
public class ExecutorDao {
Expand Down Expand Up @@ -137,9 +137,10 @@ private int allocateExecutorId() {
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbc.update(new PreparedStatementCreator() {
@Override
@SuppressFBWarnings(value="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE", justification="findbugs does not trust jdbctemplate")
@SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE", justification = "findbugs does not trust jdbctemplate")
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
PreparedStatement p = con.prepareStatement("insert into nflow_executor(host, pid, executor_group) values (?,?,?)", new String[] { "id" });
PreparedStatement p = con.prepareStatement("insert into nflow_executor(host, pid, executor_group) values (?,?,?)",
new String[] { "id" });
p.setString(1, host);
p.setInt(2, pid);
p.setString(3, executorGroup);
Expand All @@ -150,22 +151,22 @@ public PreparedStatement createPreparedStatement(Connection con) throws SQLExcep
}

public void updateActiveTimestamp() {
updateWithPreparedStatement("update nflow_executor set active=current_timestamp, expires=" + sqlVariants.currentTimePlusSeconds(timeoutSeconds) + " where id = " + getExecutorId());
updateWithPreparedStatement("update nflow_executor set active=current_timestamp, expires="
+ sqlVariants.currentTimePlusSeconds(timeoutSeconds) + " where id = " + getExecutorId());
}

public void recoverWorkflowInstancesFromDeadNodes() {
List<InstanceInfo> instances = jdbc.query(
"select id, state from nflow_workflow where executor_id in (select id from nflow_executor where "
+ executorGroupCondition + " and id <> " + getExecutorId() + " and expires < current_timestamp)",
new RowMapper<InstanceInfo>() {
@Override
public InstanceInfo mapRow(ResultSet rs, int rowNum) throws SQLException {
InstanceInfo instance = new InstanceInfo();
instance.id = rs.getInt("id");
instance.state = rs.getString("state");
return instance;
}
});
String sql = "select id, state from nflow_workflow where executor_id in (select id from nflow_executor where "
+ executorGroupCondition + " and id <> " + getExecutorId() + " and expires < current_timestamp)";
List<InstanceInfo> instances = jdbc.query(sql, new RowMapper<InstanceInfo>() {
@Override
public InstanceInfo mapRow(ResultSet rs, int rowNum) throws SQLException {
InstanceInfo instance = new InstanceInfo();
instance.id = rs.getInt("id");
instance.state = rs.getString("state");
return instance;
}
});
for (InstanceInfo instance : instances) {
WorkflowInstanceAction action = new WorkflowInstanceAction.Builder().setExecutionStart(now()).setExecutionEnd(now())
.setType(recovery).setState(instance.state).setStateText("Recovered").setWorkflowInstanceId(instance.id).build();
Expand All @@ -180,7 +181,7 @@ static final class InstanceInfo {

private void updateWithPreparedStatement(String sql) {
// jdbc.update(sql) won't use prepared statements, this uses.
jdbc.update(sql, (PreparedStatementSetter)null);
jdbc.update(sql, (PreparedStatementSetter) null);
}

public List<WorkflowExecutor> getExecutors() {
Expand All @@ -199,7 +200,11 @@ public WorkflowExecutor mapRow(ResultSet rs, int rowNum) throws SQLException {
}

public void markShutdown() {
jdbc.update("update nflow_executor set expires=current_timestamp where executor_group = ? and id = ?", executorGroup,
getExecutorId());
try {
jdbc.update("update nflow_executor set expires=current_timestamp where executor_group = ? and id = ?", executorGroup,
getExecutorId());
} catch (DataAccessException e) {
logger.warn("Failed to mark executor as expired", e);
}
}
}

0 comments on commit e9613d6

Please sign in to comment.