Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e20d17f
add delete flow test
yunheli Nov 14, 2017
c7902b9
stop running job test
yunheli Nov 14, 2017
36d8dec
find credential and pass with cmd env
Nov 14, 2017
f5f9b03
Merge pull request #160 from FlowCI/feature/api/credential_not_pass
Nov 14, 2017
50c8e83
Merge pull request #157 from FlowCI/feature/api/delete_flow_stop_curr…
Nov 14, 2017
c0f84d7
fix env variable without slash for new line character
Nov 14, 2017
a340c4e
check dup agent
yunheli Nov 15, 2017
e7301b0
add 注释
yunheli Nov 15, 2017
28a79eb
Merge pull request #165 from FlowCI/feature/api/agent_dup_error
Nov 15, 2017
c25b9ae
do not check agent session when update agent status from cmd
Nov 15, 2017
2d898ee
update agent status when delete session from cmd callback
Nov 15, 2017
da597de
fix session clean up logic
Nov 15, 2017
d2284e4
get rid of system print in agent
Nov 15, 2017
4df4974
do not wait cmd thread pool, stop it directly after send killed status
Nov 15, 2017
e7e32a3
Merge pull request #166 from FlowCI/fix/agent/164
Nov 15, 2017
642791e
apply priority message for queue item
Nov 16, 2017
4a3c98c
add unit test for queue priority
Nov 16, 2017
e4b39f8
set default queue priority and range
Nov 16, 2017
ad44e91
Merge pull request #167 from FlowCI/fix/cc/156
Nov 16, 2017
25d035d
add flow name to env
Nov 16, 2017
57720d3
fix failure unit test since num of env variables been changed
Nov 16, 2017
47e556b
add test && add flow name include -
yunheli Nov 16, 2017
9a1b6df
合并代码
yunheli Nov 16, 2017
e3c3db7
add flow name when create job since existing flow not included this env
Nov 16, 2017
e7cf57c
Merge pull request #168 from FlowCI/feature/163
Nov 16, 2017
ea268d3
补充单元测试
yunheli Nov 16, 2017
d045072
delete no use code
yunheli Nov 16, 2017
41bf37c
Merge pull request #170 from FlowCI/feature/162
yunheli Nov 16, 2017
18b8deb
optimize logic where env variables in run shell cmd
Nov 16, 2017
4893934
set seconds digit on crontab task
Nov 16, 2017
66d2989
Merge pull request #172 from FlowCI/feature/155
Nov 16, 2017
3a4c960
do not check available agent when retry
Nov 17, 2017
984e9dc
get rid of cmd queue item instead of cmd id as queue item
Nov 17, 2017
3666604
fix unit test for job timeout
Nov 17, 2017
308037a
Merge pull request #174 from FlowCI/hotfix/173
Nov 17, 2017
cf43bad
fix delete nodeResult error and add test
yunheli Nov 17, 2017
dd40502
add enqueue retry times and add item priority
yunheli Nov 17, 2017
2260f2f
modify exception
yunheli Nov 17, 2017
0a01de0
add callback item test
yunheli Nov 17, 2017
aadfd46
change retryTimes dec and set priority param
yunheli Nov 18, 2017
7be5f0c
refactor code
yunheli Nov 18, 2017
5e4cf40
Merge pull request #175 from FlowCI/feature/171
Nov 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions config/app-api.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ system.email = admin@flow.ci
system.username = admin
system.password = 123456

task.job.toggle.execution_timeout = false
## 6s expire job
task.job.toggle.execution_create_session_duration = 600
## 1h expire job
task.job.toggle.execution_timeout = true
## expired in 1800 seconds for create session
task.job.toggle.execution_create_session_duration = 1800
## expired in 3600 seconds for job running
task.job.toggle.execution_running_duration = 3600
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.flow.platform.domain.Cmd;
import com.flow.platform.domain.Jsonable;
import com.flow.platform.util.Logger;
import com.flow.platform.util.zk.*;

import com.flow.platform.util.zk.ZKClient;
import com.flow.platform.util.zk.ZkException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -48,6 +48,9 @@ public class AgentManager implements Runnable, TreeCacheListener, AutoCloseable
private int zkTimeout;
private ZKClient zkClient;

// node delete or not, default true
private Boolean canDeleted = true;

private String zone; // agent running zone
private String name; // agent name, can be machine name

Expand Down Expand Up @@ -92,6 +95,10 @@ public void stop() {
public void run() {
// init zookeeper
zkClient.start();

// if node is exists, exit
checkNodePathExistAndExit();

registerZkNodeAndWatch();

synchronized (STATUS_LOCKER) {
Expand All @@ -103,6 +110,21 @@ public void run() {
}
}

/**
* if node exist , exit
*/
private void checkNodePathExistAndExit() {
if (this.zkClient.exist(this.nodePath)) {
exit();
}
}

private void exit(){
this.canDeleted = false;
LOGGER.info("One Agent is running in other place. Please first to stop another agent, thx!");
Runtime.getRuntime().exit(1);
}

@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData eventData = event.getData();
Expand Down Expand Up @@ -143,7 +165,11 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc

@Override
public void close() throws IOException {
removeZkNode();
// only this node can delete
if (this.canDeleted) {
removeZkNode();
}

stop();
}

Expand Down Expand Up @@ -194,8 +220,13 @@ private void onDataChanged(String path) {
* @return path of zookeeper or null if failure
*/
private String registerZkNodeAndWatch() {
String path = zkClient.createEphemeral(nodePath, null);
zkClient.watchTree(path, this);
String path = null;
try {
path = zkClient.createEphemeral(nodePath);
zkClient.watchTree(path, this);
} catch (ZkException e) {
exit();
}
return path;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,27 +173,23 @@ public void run() {
ProcEventHandler procEventHandler =
new ProcEventHandler(getCmd(), extraProcEventListeners, running, finished);

CmdExecutor executor;
try {
executor = new CmdExecutor(
CmdExecutor executor = new CmdExecutor(
procEventHandler,
logListener,
cmd.getInputs(),
cmd.getWorkingDir(),
cmd.getOutputEnvFilter(),
cmd.getTimeout(),
Lists.newArrayList(getCmd().getCmd()));

executor.run();
} catch (Throwable e) {
LOGGER.errorMarker("execute", "Cannot init CmdExecutor for cmd " + cmd, e);

CmdResult result = new CmdResult();
result.getExceptions().add(e);
procEventHandler.onException(result);

return;
}

executor.run();
}
});

Expand Down Expand Up @@ -262,6 +258,7 @@ public synchronized void kill() {
for (Map.Entry<Cmd, CmdResult> entry : running.entrySet()) {
CmdResult r = entry.getValue();
Cmd cmd = entry.getKey();
finished.put(cmd, r);

r.getProcess().destroy();

Expand All @@ -270,12 +267,9 @@ public synchronized void kill() {
}

try {
if (!cmdExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
cmdExecutor.shutdownNow();
LOGGER.warn("Force to terminate CmdExecutor since been waiting 10 seconds");
}
} catch (Throwable e) {
LOGGER.error("Exception while waiting for all cmd thread finish", e);
cmdExecutor.shutdownNow();
} catch (Throwable ignore) {

} finally {
cmdExecutor = createExecutor(); // reset cmd executor
LOGGER.trace("Cmd thread terminated");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@
import com.flow.platform.domain.Cmd;
import com.flow.platform.domain.CmdType;
import com.flow.platform.util.zk.ZKClient;
import com.flow.platform.util.zk.ZkException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.ZKUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -32,10 +39,6 @@
import org.junit.Test;
import org.junit.runners.MethodSorters;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

/**
* @author gy@fir.im
*/
Expand Down Expand Up @@ -102,6 +105,7 @@ public void should_receive_command() throws Throwable {

@After
public void after() throws Throwable {
zkClient.delete(ZKPaths.makePath(ZK_ROOT, ZONE, MACHINE), true);
zkClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.flow.platform.core.config.DatabaseConfig;
import com.flow.platform.core.queue.InMemoryQueue;
import com.flow.platform.core.queue.PlatformQueue;
import com.flow.platform.core.queue.PriorityMessage;
import com.flow.platform.core.util.ThreadUtil;
import com.flow.platform.util.Logger;
import java.io.IOException;
Expand All @@ -34,15 +35,12 @@
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.RuntimeConstants;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
Expand Down Expand Up @@ -136,7 +134,7 @@ public ThreadPoolTaskExecutor taskExecutor() {
* Queue to process cmd callback task
*/
@Bean
public PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue() {
public PlatformQueue<PriorityMessage> cmdCallbackQueue() {
return new InMemoryQueue<>(executor, 50, "CmdCallbackQueue");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

import com.flow.platform.api.domain.CmdCallbackQueueItem;
import com.flow.platform.api.service.job.JobService;
import com.flow.platform.core.exception.FlowException;
import com.flow.platform.core.exception.NotFoundException;
import com.flow.platform.core.queue.PlatformQueue;
import com.flow.platform.core.queue.PriorityMessage;
import com.flow.platform.core.queue.QueueListener;
import com.flow.platform.core.util.ThreadUtil;
import com.flow.platform.util.Logger;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -30,12 +33,15 @@
* @author yh@firim
*/
@Component
public class CmdCallbackQueueConsumer implements QueueListener<CmdCallbackQueueItem> {
public class CmdCallbackQueueConsumer implements QueueListener<PriorityMessage> {

private final static Logger LOGGER = new Logger(CmdCallbackQueueConsumer.class);

// requeue 1 s
private final static int REQUEUE_DELAY_TIME = 1000;

@Autowired
private PlatformQueue<CmdCallbackQueueItem> cmdCallbackQueue;
private PlatformQueue<PriorityMessage> cmdCallbackQueue;

@Autowired
private JobService jobService;
Expand All @@ -46,28 +52,45 @@ public void init() {
}

@Override
public void onQueueItem(CmdCallbackQueueItem item) {
if (item == null) {
public void onQueueItem(PriorityMessage message) {
if (message == null) {
return;
}

CmdCallbackQueueItem item = CmdCallbackQueueItem.parse(message.getBody(), CmdCallbackQueueItem.class);

try {
jobService.callback(item);
} catch (NotFoundException notFoundException) {

// detect retry times is reach the limit or not
detectRetryTimes(item);

// re-enqueue cmd callback if job not found since transaction problem
reEnqueueJobCallback(item, 1000);
reEnqueueJobCallback(item, REQUEUE_DELAY_TIME, message.getMessageProperties().getPriority());

} catch (Throwable throwable) {
LOGGER.traceMarker("onQueueItem", String.format("exception - %s", throwable));
}
}

private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait) {
try {
Thread.sleep(wait);
} catch (Throwable ignore) {
private void detectRetryTimes(CmdCallbackQueueItem item) {
if (item.getRetryTimes() <= 0) {
throw new FlowException("retry times has reach the limit");
}
}

private void reEnqueueJobCallback(CmdCallbackQueueItem item, long wait, int priority) {

// sleep seconds
ThreadUtil.sleep(wait);

// set retry times
item.setRetryTimes(item.getRetryTimes() - 1);

//priority inc 1
priority = priority + 1;

jobService.enterQueue(item);
jobService.enterQueue(item, priority);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void execute(@RequestBody Cmd cmd, @RequestParam String identifier) {
cmd.getStatus(),
cmd.getId(),
cmd.getCmdResult());
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd));
jobService.enterQueue(new CmdCallbackQueueItem(jobId, cmd), 1);
} catch (NumberFormatException warn) {
LOGGER.warn("Invalid job id format");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.hibernate.Session;
import org.hibernate.query.Query;
import org.springframework.stereotype.Repository;

/**
Expand Down Expand Up @@ -100,11 +99,8 @@ public int update(BigInteger jobId, NodeStatus target) {

@Override
public void delete(List<BigInteger> jobIds) {
execute((Session session) -> {
String delete = String.format("delete from NodeResult where job_id in (:list)");
Query query = session.createQuery(delete);
query.setParameterList("list", jobIds);
return true;
});
execute((Session session) -> session.createQuery("delete from NodeResult where key.jobId in ( :jobIds )")
.setParameterList("jobIds", jobIds)
.executeUpdate());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,33 @@
package com.flow.platform.api.domain;

import com.flow.platform.domain.Cmd;
import com.flow.platform.domain.Jsonable;
import java.math.BigInteger;

/**
* @author yh@firim
*/
public class CmdCallbackQueueItem {
public class CmdCallbackQueueItem extends Jsonable {

private final BigInteger jobId;

private final String path; // node path

private final Cmd cmd;

private Integer retryTimes = 0;
// default retry times 5
private Integer retryTimes = 5;

public CmdCallbackQueueItem(BigInteger jobId, Cmd cmd) {
this.jobId = jobId;
this.cmd = cmd;
this.path = cmd.getExtra();
}

public void setRetryTimes(Integer retryTimes) {
this.retryTimes = retryTimes;
}

public BigInteger getJobId() {
return jobId;
}
Expand Down
Loading