result = new HashMap<>();
+ result.put("jobName", "test_job");
+ result.put("cron", cron);
+ result.put("jobClass", jobClass);
+ result.put("jobType", jobType.name());
+ result.put("scriptCommandLine", "echo \"\"");
+ return result;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-executor/src/test/java/com/dangdang/ddframe/job/cloud/executor/fixture/TestJob.java b/elastic-job-cloud/elastic-job-cloud-executor/src/test/java/com/dangdang/ddframe/job/cloud/executor/fixture/TestJob.java
new file mode 100644
index 0000000000..45ef1ac3fe
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-executor/src/test/java/com/dangdang/ddframe/job/cloud/executor/fixture/TestJob.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.executor.fixture;
+
+import com.dangdang.ddframe.job.api.ShardingContext;
+import com.dangdang.ddframe.job.api.simple.SimpleJob;
+
+public final class TestJob implements SimpleJob {
+
+ @Override
+ public void execute(final ShardingContext shardingContext) {
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-executor/src/test/java/com/dangdang/ddframe/job/cloud/executor/fixture/TestScriptJobConfiguration.java b/elastic-job-cloud/elastic-job-cloud-executor/src/test/java/com/dangdang/ddframe/job/cloud/executor/fixture/TestScriptJobConfiguration.java
new file mode 100644
index 0000000000..0114a4a175
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-executor/src/test/java/com/dangdang/ddframe/job/cloud/executor/fixture/TestScriptJobConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.executor.fixture;
+
+
+import com.dangdang.ddframe.job.config.JobCoreConfiguration;
+import com.dangdang.ddframe.job.config.JobRootConfiguration;
+import com.dangdang.ddframe.job.config.JobTypeConfiguration;
+import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
+import com.dangdang.ddframe.job.executor.handler.JobProperties;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public final class TestScriptJobConfiguration implements JobRootConfiguration {
+
+ private final String scriptCommandLine;
+
+ @Override
+ public JobTypeConfiguration getTypeConfig() {
+ return new ScriptJobConfiguration(JobCoreConfiguration.newBuilder("test_script_job", "0/1 * * * * ?", 3)
+ .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), "ignoredExceptionHandler").build(), scriptCommandLine);
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-executor/src/test/resources/applicationContext.xml b/elastic-job-cloud/elastic-job-cloud-executor/src/test/resources/applicationContext.xml
new file mode 100644
index 0000000000..033830355a
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-executor/src/test/resources/applicationContext.xml
@@ -0,0 +1,8 @@
+
+
+
+
diff --git a/elastic-job-cloud/elastic-job-cloud-executor/src/test/resources/logback-test.xml b/elastic-job-cloud/elastic-job-cloud-executor/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000..b3571e1bdd
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-executor/src/test/resources/logback-test.xml
@@ -0,0 +1,23 @@
+
+
+
+
+
+
+ ${log.context.name}
+
+
+
+ ERROR
+
+
+ ${log.pattern}
+
+
+
+
+
+
+
+
+
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/pom.xml b/elastic-job-cloud/elastic-job-cloud-scheduler/pom.xml
new file mode 100644
index 0000000000..a3683951e0
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/pom.xml
@@ -0,0 +1,100 @@
+
+
+
+ elastic-job-cloud
+ com.dangdang
+ 2.0.6-SNAPSHOT
+
+ 4.0.0
+ elastic-job-cloud-scheduler
+ ${project.artifactId}
+
+
+ 2.5.5
+
+
+
+
+ com.dangdang
+ elastic-job-common-core
+ ${project.parent.version}
+
+
+ com.dangdang
+ elastic-job-common-restful
+ ${project.parent.version}
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.mesos
+ mesos
+
+
+ com.netflix.fenzo
+ fenzo-core
+
+
+ commons-dbcp
+ commons-dbcp
+
+
+ mysql
+ mysql-connector-java
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+ org.slf4j
+ log4j-over-slf4j
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+ junit
+ junit
+
+
+ org.unitils
+ unitils-core
+
+
+ org.mockito
+ mockito-core
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ src/main/resources/assembly/assembly.xml
+
+
+
+
+ package
+
+ assembly
+
+
+
+
+
+
+
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/Bootstrap.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/Bootstrap.java
new file mode 100644
index 0000000000..2ff59e5219
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/Bootstrap.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler;
+
+import com.dangdang.ddframe.job.cloud.scheduler.env.BootstrapEnvironment;
+import com.dangdang.ddframe.job.cloud.scheduler.ha.HANode;
+import com.dangdang.ddframe.job.cloud.scheduler.ha.SchedulerElectionCandidate;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperElectionService;
+import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.curator.framework.CuratorFramework;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Mesos框架启动器.
+ *
+ * @author caohao
+ */
+@Slf4j
+public final class Bootstrap {
+
+ /**
+ * 启动入口.
+ *
+ * @param args 命令行参数无需传入
+ * @throws InterruptedException 线程中断异常
+ */
+ // CHECKSTYLE:OFF
+ public static void main(final String[] args) throws InterruptedException {
+ // CHECKSTYLE:ON
+ CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(BootstrapEnvironment.getInstance().getZookeeperConfiguration());
+ regCenter.init();
+ final ZookeeperElectionService electionService = new ZookeeperElectionService(
+ BootstrapEnvironment.getInstance().getFrameworkHostPort(), (CuratorFramework) regCenter.getRawClient(), HANode.ELECTION_NODE, new SchedulerElectionCandidate(regCenter));
+ electionService.start();
+ final CountDownLatch latch = new CountDownLatch(1);
+ latch.await();
+ Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {
+
+ @Override
+ public void run() {
+ electionService.stop();
+ latch.countDown();
+ }
+ });
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfiguration.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfiguration.java
new file mode 100644
index 0000000000..f77cff2afc
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.app;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+/**
+ * 云作业App配置对象.
+ *
+ * @author caohao
+ */
+@AllArgsConstructor
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class CloudAppConfiguration {
+
+ private final String appName;
+
+ private final String appURL;
+
+ private final String bootstrapScript;
+
+ private double cpuCount = 1;
+
+ private double memoryMB = 128;
+
+ private boolean appCacheEnable = true;
+
+ private int eventTraceSamplingCount;
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationGsonFactory.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationGsonFactory.java
new file mode 100644
index 0000000000..63e4391b72
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationGsonFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.app;
+
+import com.dangdang.ddframe.job.util.json.GsonFactory;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.io.IOException;
+
+/**
+ * 云作业App配置的Gson工厂.
+ *
+ * @author caohao
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CloudAppConfigurationGsonFactory {
+
+ static {
+ GsonFactory.registerTypeAdapter(CloudAppConfiguration.class, new CloudAppConfigurationGsonTypeAdapter());
+ }
+
+ /**
+ * 将云作业App配置转换为JSON字符串.
+ *
+ * @param cloudAppConfig 云作业App配置对象
+ * @return 作业配置JSON字符串
+ */
+ public static String toJson(final CloudAppConfiguration cloudAppConfig) {
+ return GsonFactory.getGson().toJson(cloudAppConfig);
+ }
+
+ /**
+ * 将JSON字符串转换为云作业App配置.
+ *
+ * @param cloudAppConfigJson 云作业App配置JSON字符串
+ * @return 作业配置对象
+ */
+ public static CloudAppConfiguration fromJson(final String cloudAppConfigJson) {
+ return GsonFactory.getGson().fromJson(cloudAppConfigJson, CloudAppConfiguration.class);
+ }
+
+ /**
+ * 云作业App配置的Json转换适配器.
+ *
+ * @author caohao
+ */
+ public static final class CloudAppConfigurationGsonTypeAdapter extends TypeAdapter {
+
+ @Override
+ public CloudAppConfiguration read(final JsonReader in) throws IOException {
+ String appURL = "";
+ String appName = "";
+ String bootstrapScript = "";
+ double cpuCount = 1.0d;
+ double memoryMB = 128.0d;
+ boolean appCacheEnable = true;
+ int eventTraceSamplingCount = 0;
+ in.beginObject();
+ while (in.hasNext()) {
+ String jsonName = in.nextName();
+ switch (jsonName) {
+ case "appName":
+ appName = in.nextString();
+ break;
+ case "appURL":
+ appURL = in.nextString();
+ break;
+ case "bootstrapScript":
+ bootstrapScript = in.nextString();
+ break;
+ case "cpuCount":
+ cpuCount = in.nextDouble();
+ break;
+ case "memoryMB":
+ memoryMB = in.nextDouble();
+ break;
+ case "appCacheEnable":
+ appCacheEnable = in.nextBoolean();
+ break;
+ case "eventTraceSamplingCount":
+ eventTraceSamplingCount = in.nextInt();
+ break;
+ default:
+ break;
+ }
+ }
+ in.endObject();
+ return new CloudAppConfiguration(appName, appURL, bootstrapScript, cpuCount, memoryMB, appCacheEnable, eventTraceSamplingCount);
+ }
+
+ @Override
+ public void write(final JsonWriter out, final CloudAppConfiguration value) throws IOException {
+ out.beginObject();
+ out.name("appName").value(value.getAppName());
+ out.name("appURL").value(value.getAppURL());
+ out.name("bootstrapScript").value(value.getBootstrapScript());
+ out.name("cpuCount").value(value.getCpuCount());
+ out.name("memoryMB").value(value.getMemoryMB());
+ out.name("appCacheEnable").value(value.isAppCacheEnable());
+ out.name("eventTraceSamplingCount").value(value.getEventTraceSamplingCount());
+ out.endObject();
+ }
+
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationNode.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationNode.java
new file mode 100644
index 0000000000..f0aa289298
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationNode.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.app;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+/**
+ * 云作业App配置节点路径.
+ *
+ * @author caohao
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CloudAppConfigurationNode {
+
+ public static final String ROOT = "/config/app";
+
+ private static final String APP_CONFIG = ROOT + "/%s";
+
+ static String getRootNodePath(final String appName) {
+ return String.format(APP_CONFIG, appName);
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationService.java
new file mode 100644
index 0000000000..d946311e62
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/app/CloudAppConfigurationService.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.app;
+
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.google.common.base.Optional;
+import lombok.RequiredArgsConstructor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * 云作业App配置服务.
+ *
+ * @author caohao
+ */
+@RequiredArgsConstructor
+public class CloudAppConfigurationService {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ /**
+ * 添加云作业APP配置.
+ *
+ * @param appConfig 云作业App配置对象
+ */
+ public void add(final CloudAppConfiguration appConfig) {
+ regCenter.persist(CloudAppConfigurationNode.getRootNodePath(appConfig.getAppName()), CloudAppConfigurationGsonFactory.toJson(appConfig));
+ }
+
+ /**
+ * 修改云作业APP配置.
+ *
+ * @param appConfig 云作业App配置对象
+ */
+ public void update(final CloudAppConfiguration appConfig) {
+ regCenter.update(CloudAppConfigurationNode.getRootNodePath(appConfig.getAppName()), CloudAppConfigurationGsonFactory.toJson(appConfig));
+ }
+
+ /**
+ * 根据云作业App名称获取App配置.
+ *
+ * @param appName 云作业App名称
+ * @return 云作业App配置
+ */
+ public Optional load(final String appName) {
+ return Optional.fromNullable(CloudAppConfigurationGsonFactory.fromJson(regCenter.get(CloudAppConfigurationNode.getRootNodePath(appName))));
+ }
+
+ /**
+ * 获取所有注册的云作业App配置.
+ *
+ * @return 注册的云作业App配置
+ */
+ public Collection loadAll() {
+ if (!regCenter.isExisted(CloudAppConfigurationNode.ROOT)) {
+ return Collections.emptyList();
+ }
+ List appNames = regCenter.getChildrenKeys(CloudAppConfigurationNode.ROOT);
+ Collection result = new ArrayList<>(appNames.size());
+ for (String each : appNames) {
+ Optional config = load(each);
+ if (config.isPresent()) {
+ result.add(config.get());
+ }
+ }
+ return result;
+ }
+
+
+ /**
+ * 删除云作业App配置.
+ *
+ * @param appName 云作业App名称
+ */
+ public void remove(final String appName) {
+ regCenter.remove(CloudAppConfigurationNode.getRootNodePath(appName));
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfiguration.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfiguration.java
new file mode 100644
index 0000000000..7444ae8a1c
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.job;
+
+import com.dangdang.ddframe.job.config.JobRootConfiguration;
+import com.dangdang.ddframe.job.config.JobTypeConfiguration;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * 云作业配置对象.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+@AllArgsConstructor
+@Getter
+public final class CloudJobConfiguration implements JobRootConfiguration {
+
+ private final String appName;
+
+ private final JobTypeConfiguration typeConfig;
+
+ private final double cpuCount;
+
+ private final double memoryMB;
+
+ private final CloudJobExecutionType jobExecutionType;
+
+ private String beanName;
+
+ private String applicationContext;
+
+ /**
+ * 获取作业名称.
+ *
+ * @return 作业名称
+ */
+ public String getJobName() {
+ return typeConfig.getCoreConfig().getJobName();
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationGsonFactory.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationGsonFactory.java
new file mode 100644
index 0000000000..123f15f849
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationGsonFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.job;
+
+import com.dangdang.ddframe.job.config.JobTypeConfiguration;
+import com.dangdang.ddframe.job.util.json.AbstractJobConfigurationGsonTypeAdapter;
+import com.dangdang.ddframe.job.util.json.GsonFactory;
+import com.google.common.base.Preconditions;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Cloud作业配置的Gson工厂.
+ *
+ * @author zhangliang
+ * @author caohao
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CloudJobConfigurationGsonFactory {
+
+ static {
+ GsonFactory.registerTypeAdapter(CloudJobConfiguration.class, new CloudJobConfigurationGsonTypeAdapter());
+ }
+
+ /**
+ * 将作业配置转换为JSON字符串.
+ *
+ * @param cloudJobConfig 作业配置对象
+ * @return 作业配置JSON字符串
+ */
+ public static String toJson(final CloudJobConfiguration cloudJobConfig) {
+ return GsonFactory.getGson().toJson(cloudJobConfig);
+ }
+
+ /**
+ * 将JSON字符串转换为作业配置.
+ *
+ * @param cloudJobConfigJson 作业配置JSON字符串
+ * @return 作业配置对象
+ */
+ public static CloudJobConfiguration fromJson(final String cloudJobConfigJson) {
+ return GsonFactory.getGson().fromJson(cloudJobConfigJson, CloudJobConfiguration.class);
+ }
+
+ /**
+ * Cloud作业配置的Json转换适配器.
+ *
+ * @author zhangliang
+ */
+ public static final class CloudJobConfigurationGsonTypeAdapter extends AbstractJobConfigurationGsonTypeAdapter {
+
+ @Override
+ protected void addToCustomizedValueMap(final String jsonName, final JsonReader in, final Map customizedValueMap) throws IOException {
+ switch (jsonName) {
+ case "appName":
+ customizedValueMap.put("appName", in.nextString());
+ break;
+ case "cpuCount":
+ customizedValueMap.put("cpuCount", in.nextDouble());
+ break;
+ case "memoryMB":
+ customizedValueMap.put("memoryMB", in.nextDouble());
+ break;
+ case "jobExecutionType":
+ customizedValueMap.put("jobExecutionType", in.nextString());
+ break;
+ case "beanName":
+ customizedValueMap.put("beanName", in.nextString());
+ break;
+ case "applicationContext":
+ customizedValueMap.put("applicationContext", in.nextString());
+ break;
+ default:
+ in.skipValue();
+ break;
+ }
+ }
+
+ @Override
+ protected CloudJobConfiguration getJobRootConfiguration(final JobTypeConfiguration typeConfig, final Map customizedValueMap) {
+ Preconditions.checkNotNull(customizedValueMap.get("appName"), "appName cannot be null.");
+ Preconditions.checkNotNull(customizedValueMap.get("cpuCount"), "cpuCount cannot be null.");
+ Preconditions.checkArgument((double) customizedValueMap.get("cpuCount") >= 0.001, "cpuCount cannot be less than 0.001");
+ Preconditions.checkNotNull(customizedValueMap.get("memoryMB"), "memoryMB cannot be null.");
+ Preconditions.checkArgument((double) customizedValueMap.get("memoryMB") >= 1, "memory cannot be less than 1");
+ Preconditions.checkNotNull(customizedValueMap.get("jobExecutionType"), "jobExecutionType cannot be null.");
+ if (customizedValueMap.containsKey("beanName") && customizedValueMap.containsKey("applicationContext")) {
+ return new CloudJobConfiguration((String) customizedValueMap.get("appName"), typeConfig, (double) customizedValueMap.get("cpuCount"),
+ (double) customizedValueMap.get("memoryMB"), CloudJobExecutionType.valueOf(customizedValueMap.get("jobExecutionType").toString()),
+ customizedValueMap.get("beanName").toString(), customizedValueMap.get("applicationContext").toString());
+ } else {
+ return new CloudJobConfiguration((String) customizedValueMap.get("appName"), typeConfig, (double) customizedValueMap.get("cpuCount"),
+ (double) customizedValueMap.get("memoryMB"), CloudJobExecutionType.valueOf(customizedValueMap.get("jobExecutionType").toString()));
+ }
+ }
+
+ @Override
+ protected void writeCustomized(final JsonWriter out, final CloudJobConfiguration value) throws IOException {
+ out.name("appName").value(value.getAppName());
+ out.name("cpuCount").value(value.getCpuCount());
+ out.name("memoryMB").value(value.getMemoryMB());
+ out.name("jobExecutionType").value(value.getJobExecutionType().name());
+ out.name("beanName").value(value.getBeanName());
+ out.name("applicationContext").value(value.getApplicationContext());
+ }
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListener.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListener.java
new file mode 100644
index 0000000000..ba6c36d47b
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListener.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.job;
+
+import com.dangdang.ddframe.job.cloud.scheduler.producer.ProducerManager;
+import com.dangdang.ddframe.job.cloud.scheduler.state.ready.ReadyService;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+import java.util.Collections;
+import java.util.concurrent.Executors;
+
+/**
+ * 云作业配置变更监听.
+ *
+ * @author zhangliang
+ * @author caohao
+ */
+public class CloudJobConfigurationListener implements TreeCacheListener {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ private final ProducerManager producerManager;
+
+ private final ReadyService readyService;
+
+ public CloudJobConfigurationListener(final CoordinatorRegistryCenter regCenter, final ProducerManager producerManager) {
+ this.regCenter = regCenter;
+ readyService = new ReadyService(regCenter);
+ this.producerManager = producerManager;
+ }
+
+ @Override
+ public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
+ String path = null == event.getData() ? "" : event.getData().getPath();
+ if (isJobConfigNode(event, path, Type.NODE_ADDED)) {
+ CloudJobConfiguration jobConfig = getJobConfig(event);
+ if (null != jobConfig) {
+ producerManager.schedule(jobConfig);
+ }
+ } else if (isJobConfigNode(event, path, Type.NODE_UPDATED)) {
+ CloudJobConfiguration jobConfig = getJobConfig(event);
+ if (null == jobConfig) {
+ return;
+ }
+ if (CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) {
+ readyService.remove(Collections.singletonList(jobConfig.getJobName()));
+ }
+ if (!jobConfig.getTypeConfig().getCoreConfig().isMisfire()) {
+ readyService.setMisfireDisabled(jobConfig.getJobName());
+ }
+ producerManager.reschedule(jobConfig);
+ } else if (isJobConfigNode(event, path, Type.NODE_REMOVED)) {
+ String jobName = path.substring(CloudJobConfigurationNode.ROOT.length() + 1, path.length());
+ producerManager.unschedule(jobName);
+ }
+ }
+
+ private boolean isJobConfigNode(final TreeCacheEvent event, final String path, final Type type) {
+ return type == event.getType() && path.startsWith(CloudJobConfigurationNode.ROOT) && path.length() > CloudJobConfigurationNode.ROOT.length();
+ }
+
+ private CloudJobConfiguration getJobConfig(final TreeCacheEvent event) {
+ try {
+ return CloudJobConfigurationGsonFactory.fromJson(new String(event.getData().getData()));
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ return null;
+ }
+ }
+
+ /**
+ * 启动云作业配置变更监听服务.
+ */
+ public void start() {
+ getCache().getListenable().addListener(this, Executors.newSingleThreadExecutor());
+ }
+
+ /**
+ * 停止云作业配置变更监听服务.
+ */
+ public void stop() {
+ getCache().getListenable().removeListener(this);
+ }
+
+ private TreeCache getCache() {
+ TreeCache result = (TreeCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT);
+ if (null != result) {
+ return result;
+ }
+ regCenter.addCacheData(CloudJobConfigurationNode.ROOT);
+ return (TreeCache) regCenter.getRawCache(CloudJobConfigurationNode.ROOT);
+ }
+}
diff --git a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/env/FakeLocalHostService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationNode.java
similarity index 52%
rename from elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/env/FakeLocalHostService.java
rename to elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationNode.java
index 2809223e3a..82efa26031 100644
--- a/elastic-job-core/src/test/java/com/dangdang/ddframe/job/internal/env/FakeLocalHostService.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationNode.java
@@ -1,12 +1,12 @@
-/**
+/*
* Copyright 1999-2015 dangdang.com.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,28 +15,24 @@
*
*/
-package com.dangdang.ddframe.job.internal.env;
+package com.dangdang.ddframe.job.cloud.scheduler.config.job;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
/**
- * 为测试使用的获取本地网络的服务.
- *
+ * 作业配置节点路径.
+ *
* @author zhangliang
*/
-public final class FakeLocalHostService implements LocalHostService {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CloudJobConfigurationNode {
- private final String localHostIp;
+ public static final String ROOT = "/config/job";
- public FakeLocalHostService(final String localHostIp) {
- this.localHostIp = localHostIp;
- }
-
- @Override
- public String getIp() {
- return localHostIp;
- }
+ private static final String JOB_CONFIG = ROOT + "/%s";
- @Override
- public String getHostName() {
- return localHostIp + "_hostMame";
+ static String getRootNodePath(final String jobName) {
+ return String.format(JOB_CONFIG, jobName);
}
}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationService.java
new file mode 100644
index 0000000000..a9c638562a
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationService.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.config.job;
+
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.google.common.base.Optional;
+import lombok.RequiredArgsConstructor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * 作业配置服务.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+public class CloudJobConfigurationService {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ /**
+ * 添加云作业配置.
+ *
+ * @param jobConfig 云作业配置对象
+ */
+ public void add(final CloudJobConfiguration jobConfig) {
+ regCenter.persist(CloudJobConfigurationNode.getRootNodePath(jobConfig.getJobName()), CloudJobConfigurationGsonFactory.toJson(jobConfig));
+ }
+
+ /**
+ * 修改云作业配置.
+ *
+ * @param jobConfig 云作业配置对象
+ */
+ public void update(final CloudJobConfiguration jobConfig) {
+ regCenter.update(CloudJobConfigurationNode.getRootNodePath(jobConfig.getJobName()), CloudJobConfigurationGsonFactory.toJson(jobConfig));
+ }
+
+ /**
+ * 获取所有注册的云作业配置.
+ *
+ * @return 注册的云作业配置
+ */
+ public Collection loadAll() {
+ if (!regCenter.isExisted(CloudJobConfigurationNode.ROOT)) {
+ return Collections.emptyList();
+ }
+ List jobNames = regCenter.getChildrenKeys(CloudJobConfigurationNode.ROOT);
+ Collection result = new ArrayList<>(jobNames.size());
+ for (String each : jobNames) {
+ Optional config = load(each);
+ if (config.isPresent()) {
+ result.add(config.get());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * 根据作业名称获取云作业配置.
+ *
+ * @param jobName 作业名称
+ * @return 云作业配置
+ */
+ public Optional load(final String jobName) {
+ return Optional.fromNullable(CloudJobConfigurationGsonFactory.fromJson(regCenter.get(CloudJobConfigurationNode.getRootNodePath(jobName))));
+ }
+
+ /**
+ * 删除云作业.
+ *
+ * @param jobName 作业名称
+ */
+ public void remove(final String jobName) {
+ regCenter.remove(CloudJobConfigurationNode.getRootNodePath(jobName));
+ }
+}
diff --git a/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/exception/NoRegistryCenterException.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobExecutionType.java
similarity index 69%
rename from elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/exception/NoRegistryCenterException.java
rename to elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobExecutionType.java
index e62f7512fc..9d577d6220 100644
--- a/elastic-job-console/src/main/java/com/dangdang/ddframe/job/console/exception/NoRegistryCenterException.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobExecutionType.java
@@ -1,12 +1,12 @@
-/**
+/*
* Copyright 1999-2015 dangdang.com.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,9 +15,15 @@
*
*/
-package com.dangdang.ddframe.job.console.exception;
+package com.dangdang.ddframe.job.cloud.scheduler.config.job;
-public final class NoRegistryCenterException extends RuntimeException {
+/**
+ * 作业执行类型.
+ *
+ * @author zhangliang
+ */
+public enum CloudJobExecutionType {
+
+ DAEMON, TRANSIENT
- private static final long serialVersionUID = -7230151498491198890L;
}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/context/JobContext.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/context/JobContext.java
new file mode 100644
index 0000000000..31f4154082
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/context/JobContext.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.context;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.context.ExecutionType;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 作业运行上下文.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+@Getter
+public final class JobContext {
+
+ private final CloudJobConfiguration jobConfig;
+
+ private final List assignedShardingItems;
+
+ private final ExecutionType type;
+
+ /**
+ * 通过作业配置创建作业运行上下文.
+ *
+ * @param jobConfig 作业配置
+ * @param type 执行类型
+ * @return 作业运行上下文
+ */
+ public static JobContext from(final CloudJobConfiguration jobConfig, final ExecutionType type) {
+ int shardingTotalCount = jobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
+ List shardingItems = new ArrayList<>(shardingTotalCount);
+ for (int i = 0; i < shardingTotalCount; i++) {
+ shardingItems.add(i);
+ }
+ return new JobContext(jobConfig, shardingItems, type);
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/BootstrapEnvironment.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/BootstrapEnvironment.java
new file mode 100644
index 0000000000..f8075ae133
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/BootstrapEnvironment.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.env;
+
+import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
+import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.dbcp.BasicDataSource;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Properties;
+
+/**
+ * 启动环境对象.
+ *
+ * @author zhangliang
+ */
+@Slf4j
+public final class BootstrapEnvironment {
+
+ @Getter
+ private static BootstrapEnvironment instance = new BootstrapEnvironment();
+
+ private static final String PROPERTIES_PATH = "conf/elastic-job-cloud-scheduler.properties";
+
+ private final Properties properties;
+
+ private BootstrapEnvironment() {
+ properties = getProperties();
+ }
+
+ private Properties getProperties() {
+ Properties result = new Properties();
+ try (FileInputStream fileInputStream = new FileInputStream(PROPERTIES_PATH)) {
+ result.load(fileInputStream);
+ } catch (final IOException ex) {
+ log.warn("Cannot found conf/elastic-job-cloud-scheduler.properties, use default value now.");
+ }
+ return result;
+ }
+
+ /**
+ * 获取Framework的Hostname和Port.
+ *
+ * @return Framework的Hostname和Port
+ */
+ public String getFrameworkHostPort() {
+ return String.format("%s:%d", getMesosConfiguration().getHostname(), getRestfulServerConfiguration().getPort());
+ }
+
+ /**
+ * 获取Mesos配置对象.
+ *
+ * @return Mesos配置对象
+ */
+ public MesosConfiguration getMesosConfiguration() {
+ return new MesosConfiguration(getValue(EnvironmentArgument.USER), getValue(EnvironmentArgument.MESOS_URL), getValue(EnvironmentArgument.HOSTNAME));
+ }
+
+ /**
+ * 获取Zookeeper配置对象.
+ *
+ * @return Zookeeper配置对象
+ */
+ // TODO 其他zkConfig的值可配置
+ public ZookeeperConfiguration getZookeeperConfiguration() {
+ ZookeeperConfiguration result = new ZookeeperConfiguration(getValue(EnvironmentArgument.ZOOKEEPER_SERVERS), getValue(EnvironmentArgument.ZOOKEEPER_NAMESPACE));
+ String digest = getValue(EnvironmentArgument.ZOOKEEPER_DIGEST);
+ if (!Strings.isNullOrEmpty(digest)) {
+ result.setDigest(digest);
+ }
+ return result;
+ }
+
+ /**
+ * 获取Restful服务器配置对象.
+ *
+ * @return Restful服务器配置对象
+ */
+ public RestfulServerConfiguration getRestfulServerConfiguration() {
+ return new RestfulServerConfiguration(Integer.parseInt(getValue(EnvironmentArgument.PORT)));
+ }
+
+ /**
+ * 获取Mesos框架配置对象.
+ *
+ * @return Mesos框架配置对象
+ */
+ public FrameworkConfiguration getFrameworkConfiguration() {
+ return new FrameworkConfiguration(Integer.parseInt(getValue(EnvironmentArgument.JOB_STATE_QUEUE_SIZE)));
+ }
+
+ /**
+ * 获取作业数据库事件配置.
+ *
+ * @return 作业数据库事件配置
+ */
+ public Optional getJobEventRdbConfiguration() {
+ String driver = getValue(EnvironmentArgument.EVENT_TRACE_RDB_DRIVER);
+ String url = getValue(EnvironmentArgument.EVENT_TRACE_RDB_URL);
+ String username = getValue(EnvironmentArgument.EVENT_TRACE_RDB_USERNAME);
+ String password = getValue(EnvironmentArgument.EVENT_TRACE_RDB_PASSWORD);
+ if (!Strings.isNullOrEmpty(driver) && !Strings.isNullOrEmpty(url) && !Strings.isNullOrEmpty(username)) {
+ BasicDataSource dataSource = new BasicDataSource();
+ dataSource.setDriverClassName(driver);
+ dataSource.setUrl(url);
+ dataSource.setUsername(username);
+ dataSource.setPassword(password);
+ return Optional.of(new JobEventRdbConfiguration(dataSource));
+ }
+ return Optional.absent();
+ }
+
+ /**
+ * 获取作业数据库事件配置Map.
+ *
+ * @return 作业数据库事件配置Map
+ */
+ // CHECKSTYLE:OFF
+ public HashMap getJobEventRdbConfigurationMap() {
+ HashMap result = new HashMap<>(4, 1);
+ // CHECKSTYLE:ON
+ result.put(EnvironmentArgument.EVENT_TRACE_RDB_DRIVER.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_DRIVER));
+ result.put(EnvironmentArgument.EVENT_TRACE_RDB_URL.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_URL));
+ result.put(EnvironmentArgument.EVENT_TRACE_RDB_USERNAME.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_USERNAME));
+ result.put(EnvironmentArgument.EVENT_TRACE_RDB_PASSWORD.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_PASSWORD));
+ return result;
+ }
+
+ private String getValue(final EnvironmentArgument environmentArgument) {
+ String result = properties.getProperty(environmentArgument.getKey(), environmentArgument.getDefaultValue());
+ if (environmentArgument.isRequired()) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(result), String.format("Property `%s` is required.", environmentArgument.getKey()));
+ }
+ return result;
+ }
+
+ /**
+ * 环境参数.
+ *
+ * @author zhangliang
+ */
+ @RequiredArgsConstructor
+ @Getter
+ public enum EnvironmentArgument {
+
+ HOSTNAME("hostname", "localhost", true),
+
+ MESOS_URL("mesos_url", "zk://localhost:2181/mesos", true),
+
+ USER("user", "", false),
+
+ ZOOKEEPER_SERVERS("zk_servers", "localhost:2181", true),
+
+ ZOOKEEPER_NAMESPACE("zk_namespace", "elastic-job-cloud", true),
+
+ ZOOKEEPER_DIGEST("zk_digest", "", false),
+
+ PORT("http_port", "8899", true),
+
+ JOB_STATE_QUEUE_SIZE("job_state_queue_size", "10000", true),
+
+ EVENT_TRACE_RDB_DRIVER("event_trace_rdb_driver", "", false),
+
+ EVENT_TRACE_RDB_URL("event_trace_rdb_url", "", false),
+
+ EVENT_TRACE_RDB_USERNAME("event_trace_rdb_username", "", false),
+
+ EVENT_TRACE_RDB_PASSWORD("event_trace_rdb_password", "", false);
+
+ private final String key;
+
+ private final String defaultValue;
+
+ private final boolean required;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/FrameworkConfiguration.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/FrameworkConfiguration.java
new file mode 100644
index 0000000000..54a906e54d
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/FrameworkConfiguration.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.env;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Mesos框架配置项.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+@Getter
+public final class FrameworkConfiguration {
+
+ private final int jobStateQueueSize;
+}
diff --git a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/AbstractJobExecutionShardingContext.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/MesosConfiguration.java
similarity index 50%
rename from elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/AbstractJobExecutionShardingContext.java
rename to elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/MesosConfiguration.java
index 2471b42f51..3c9de4869e 100644
--- a/elastic-job-core/src/main/java/com/dangdang/ddframe/job/api/AbstractJobExecutionShardingContext.java
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/MesosConfiguration.java
@@ -1,12 +1,12 @@
-/**
+/*
* Copyright 1999-2015 dangdang.com.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,38 +15,33 @@
*
*/
-package com.dangdang.ddframe.job.api;
+package com.dangdang.ddframe.job.cloud.scheduler.env;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
+/**
+ * Mesos配置项.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
@Getter
-@Setter
-abstract class AbstractJobExecutionShardingContext {
+public final class MesosConfiguration {
/**
- * 作业名称.
+ * 框架名称.
*/
- private String jobName;
+ public static final String FRAMEWORK_NAME = "Elastic-Job-Cloud";
/**
- * 分片总数.
+ * 框架Failover超时时间,默认为1周.
*/
- private int shardingTotalCount;
+ public static final double FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24 * 7;
- /**
- * 作业自定义参数.
- * 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例.
- */
- private String jobParameter;
+ private final String user;
- /**
- * 监控作业执行时状态.
- */
- private boolean monitorExecution;
+ private final String url;
- /**
- * 每次抓取的数据量.
- */
- private int fetchDataCount;
+ private final String hostname;
}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/RestfulServerConfiguration.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/RestfulServerConfiguration.java
new file mode 100644
index 0000000000..c9f0796964
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/env/RestfulServerConfiguration.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.env;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Restful服务器配置项.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+@Getter
+public final class RestfulServerConfiguration {
+
+ private final int port;
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/FrameworkIDService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/FrameworkIDService.java
new file mode 100644
index 0000000000..6d11e6c5d4
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/FrameworkIDService.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.ha;
+
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * FrameworkID 的保存器.
+ *
+ * @author gaohongtao
+ */
+@RequiredArgsConstructor
+public class FrameworkIDService {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ /**
+ * 获取FrameworkID,返回值是一个可选的结果.
+ *
+ * @return 获取FrameworkID的可选结果
+ */
+ public Optional fetch() {
+ String frameworkId = regCenter.getDirectly(HANode.FRAMEWORK_ID_NODE);
+ return Strings.isNullOrEmpty(frameworkId) ? Optional.absent() : Optional.of(frameworkId);
+ }
+
+ /**
+ * 保存FrameworkID.
+ *
+ * @param id Framework的ID
+ */
+ public void save(final String id) {
+ if (!regCenter.isExisted(HANode.FRAMEWORK_ID_NODE)) {
+ regCenter.persist(HANode.FRAMEWORK_ID_NODE, id);
+ }
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/HANode.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/HANode.java
new file mode 100644
index 0000000000..c212294870
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/HANode.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.ha;
+
+/**
+ * 保存HA相关数据节点.
+ *
+ * @author gaohongtao
+ */
+public class HANode {
+
+ /**
+ * HA根节点.
+ */
+ public static final String ROOT = "/ha";
+
+ /**
+ * FrameworkID保存的节点.
+ */
+ public static final String FRAMEWORK_ID_NODE = ROOT + "/framework_id";
+
+ /**
+ * 选举节点.
+ */
+ public static final String ELECTION_NODE = ROOT + "/election";
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/SchedulerElectionCandidate.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/SchedulerElectionCandidate.java
new file mode 100644
index 0000000000..472124f347
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/ha/SchedulerElectionCandidate.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.ha;
+
+import com.dangdang.ddframe.job.cloud.scheduler.mesos.SchedulerService;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.dangdang.ddframe.job.reg.base.ElectionCandidate;
+
+/**
+ * 调度器选举候选人.
+ *
+ * @author caohao
+ */
+public class SchedulerElectionCandidate implements ElectionCandidate {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ private SchedulerService schedulerService;
+
+ public SchedulerElectionCandidate(final CoordinatorRegistryCenter regCenter) {
+ this.regCenter = regCenter;
+ }
+
+ @Override
+ public void startLeadership() {
+ schedulerService = new SchedulerService(regCenter);
+ schedulerService.start();
+ }
+
+ @Override
+ public void stopLeadership() {
+ schedulerService.stop();
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java
new file mode 100644
index 0000000000..67c5db7b88
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfigurationService;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobExecutionType;
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfigurationService;
+import com.dangdang.ddframe.job.cloud.scheduler.context.JobContext;
+import com.dangdang.ddframe.job.cloud.scheduler.state.failover.FailoverService;
+import com.dangdang.ddframe.job.cloud.scheduler.state.failover.FailoverTaskInfo;
+import com.dangdang.ddframe.job.cloud.scheduler.state.ready.ReadyService;
+import com.dangdang.ddframe.job.cloud.scheduler.state.running.RunningService;
+import com.dangdang.ddframe.job.context.ExecutionType;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.dangdang.ddframe.job.context.TaskContext.MetaInfo;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * 为Mesos提供的门面服务.
+ *
+ * @author zhangliang
+ * @author caohao
+ */
+@Slf4j
+public class FacadeService {
+
+ private final CloudAppConfigurationService appConfigService;
+
+ private final CloudJobConfigurationService jobConfigService;
+
+ private final ReadyService readyService;
+
+ private final RunningService runningService;
+
+ private final FailoverService failoverService;
+
+ public FacadeService(final CoordinatorRegistryCenter regCenter) {
+ appConfigService = new CloudAppConfigurationService(regCenter);
+ jobConfigService = new CloudJobConfigurationService(regCenter);
+ readyService = new ReadyService(regCenter);
+ runningService = new RunningService(regCenter);
+ failoverService = new FailoverService(regCenter);
+ }
+
+ /**
+ * 启动门面服务.
+ */
+ public void start() {
+ log.info("Elastic Job: Start facade service");
+ runningService.start();
+ }
+
+ /**
+ * 获取有资格运行的作业.
+ *
+ * @return 作业上下文集合
+ */
+ public Collection getEligibleJobContext() {
+ Collection failoverJobContexts = failoverService.getAllEligibleJobContexts();
+ Collection readyJobContexts = readyService.getAllEligibleJobContexts(failoverJobContexts);
+ Collection result = new ArrayList<>(failoverJobContexts.size() + readyJobContexts.size());
+ result.addAll(failoverJobContexts);
+ result.addAll(readyJobContexts);
+ return result;
+ }
+
+ /**
+ * 从队列中删除已运行的作业.
+ *
+ * @param taskContexts 任务上下文集合
+ */
+ public void removeLaunchTasksFromQueue(final List taskContexts) {
+ List failoverTaskContexts = new ArrayList<>(taskContexts.size());
+ Collection readyJobNames = new HashSet<>(taskContexts.size(), 1);
+ for (TaskContext each : taskContexts) {
+ switch (each.getType()) {
+ case FAILOVER:
+ failoverTaskContexts.add(each);
+ break;
+ case READY:
+ readyJobNames.add(each.getMetaInfo().getJobName());
+ break;
+ default:
+ break;
+ }
+ }
+ failoverService.remove(Lists.transform(failoverTaskContexts, new Function() {
+
+ @Override
+ public TaskContext.MetaInfo apply(final TaskContext input) {
+ return input.getMetaInfo();
+ }
+ }));
+ readyService.remove(readyJobNames);
+ }
+
+ /**
+ * 将任务运行时上下文放入运行时队列.
+ *
+ * @param taskContext 任务运行时上下文
+ */
+ public void addRunning(final TaskContext taskContext) {
+ runningService.add(taskContext);
+ }
+
+ /**
+ * 更新常驻作业运行状态.
+ *
+ * @param taskContext 任务运行时上下文
+ * @param isIdle 是否空闲
+ */
+ public void updateDaemonStatus(final TaskContext taskContext, final boolean isIdle) {
+ runningService.updateIdle(taskContext, isIdle);
+ }
+
+ /**
+ * 将任务从运行时队列删除.
+ *
+ * @param taskContext 任务运行时上下文
+ */
+ public void removeRunning(final TaskContext taskContext) {
+ runningService.remove(taskContext);
+ }
+
+ /**
+ * 记录失效转移队列.
+ *
+ * @param taskContext 任务上下文
+ */
+ public void recordFailoverTask(final TaskContext taskContext) {
+ Optional jobConfigOptional = jobConfigService.load(taskContext.getMetaInfo().getJobName());
+ if (!jobConfigOptional.isPresent()) {
+ return;
+ }
+ CloudJobConfiguration jobConfig = jobConfigOptional.get();
+ if (jobConfig.getTypeConfig().getCoreConfig().isFailover() || CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) {
+ failoverService.add(taskContext);
+ }
+ }
+
+ /**
+ * 将瞬时作业放入待执行队列.
+ *
+ * @param jobName 作业名称
+ */
+ public void addTransient(final String jobName) {
+ readyService.addTransient(jobName);
+ }
+
+ /**
+ * 根据作业名称获取云作业配置.
+ *
+ * @param jobName 作业名称
+ * @return 云作业配置
+ */
+ public Optional load(final String jobName) {
+ return jobConfigService.load(jobName);
+ }
+
+ /**
+ * 根据作业应用名称获取云作业应用配置.
+ *
+ * @param appName 作业应用名称
+ * @return 云作业应用配置
+ */
+ public Optional loadAppConfig(final String appName) {
+ return appConfigService.load(appName);
+ }
+
+ /**
+ * 根据作业元信息获取失效转移作业Id.
+ *
+ * @param metaInfo 作业元信息
+ * @return 失效转移作业Id
+ */
+ public Optional getFailoverTaskId(final MetaInfo metaInfo) {
+ return failoverService.getTaskId(metaInfo);
+ }
+
+ /**
+ * 将常驻作业放入待执行队列.
+ *
+ * @param jobName 作业名称
+ */
+ public void addDaemonJobToReadyQueue(final String jobName) {
+ readyService.addDaemon(jobName);
+ }
+
+ /**
+ * 判断作业是否在运行.
+ *
+ * @param jobName 作业名称
+ * @return 作业是否在运行
+ */
+ public boolean isRunning(final String jobName) {
+ return !runningService.getRunningTasks(jobName).isEmpty();
+ }
+
+ /**
+ * 根据作业执行类型判断作业是否在运行.
+ *
+ * READY类型的作业为整体, 任意一片运行都视为作业运行. FAILOVER则仅以当前分片运行为运行依据.
+ *
+ * @param taskContext 任务运行时上下文
+ * @return 作业是否在运行
+ */
+ public boolean isRunning(final TaskContext taskContext) {
+ return ExecutionType.FAILOVER != taskContext.getType() && !runningService.getRunningTasks(taskContext.getMetaInfo().getJobName()).isEmpty()
+ || ExecutionType.FAILOVER == taskContext.getType() && runningService.isTaskRunning(taskContext.getMetaInfo());
+ }
+
+ /**
+ * 添加任务主键和主机名称的映射.
+ *
+ * @param taskId 任务主键
+ * @param hostname 主机名称
+ */
+ public void addMapping(final String taskId, final String hostname) {
+ runningService.addMapping(taskId, hostname);
+ }
+
+ /**
+ * 根据任务主键获取主机名称并清除该任务.
+ *
+ * @param taskId 任务主键
+ * @return 删除任务的主机名称
+ */
+ public String popMapping(final String taskId) {
+ return runningService.popMapping(taskId);
+ }
+
+ /**
+ * 获取待运行的全部任务.
+ *
+ * @return 待运行的全部任务
+ */
+ public Map getAllReadyTasks() {
+ return readyService.getAllReadyTasks();
+ }
+
+ /**
+ * 获取所有运行中的任务.
+ *
+ * @return 运行中任务集合
+ */
+ public Map> getAllRunningTasks() {
+ return runningService.getAllRunningTasks();
+ }
+
+ /**
+ * 获取待失效转移的全部任务.
+ *
+ * @return 待失效转移的全部任务
+ */
+ public Map> getAllFailoverTasks() {
+ return failoverService.getAllFailoverTasks();
+ }
+
+ /**
+ * 停止门面服务.
+ */
+ public void stop() {
+ log.info("Elastic Job: Stop facade service");
+ // TODO 停止作业调度
+ runningService.clear();
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequest.java
new file mode 100644
index 0000000000..1490f782a0
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/JobTaskRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 作业任务请求对象.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+public final class JobTaskRequest implements TaskRequest {
+
+ private final TaskContext taskContext;
+
+ private final CloudJobConfiguration jobConfig;
+
+ @Override
+ public String getId() {
+ return taskContext.getId();
+ }
+
+ @Override
+ public String taskGroupName() {
+ return "";
+ }
+
+ @Override
+ public double getCPUs() {
+ return jobConfig.getCpuCount();
+ }
+
+ @Override
+ public double getMemory() {
+ return jobConfig.getMemoryMB();
+ }
+
+ @Override
+ public double getNetworkMbps() {
+ return 0;
+ }
+
+ @Override
+ public double getDisk() {
+ return 10d;
+ }
+
+ @Override
+ public int getPorts() {
+ return 1;
+ }
+
+ @Override
+ public Map getScalarRequests() {
+ return null;
+ }
+
+ @Override
+ public List extends ConstraintEvaluator> getHardConstraints() {
+ return null;
+ }
+
+ @Override
+ public List extends VMTaskFitnessCalculator> getSoftConstraints() {
+ return null;
+ }
+
+ @Override
+ public void setAssignedResources(final AssignedResources assignedResources) {
+ }
+
+ @Override
+ public AssignedResources getAssignedResources() {
+ return null;
+ }
+
+ @Override
+ public Map getCustomNamedResources() {
+ return Collections.emptyMap();
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/LaunchingTasks.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/LaunchingTasks.java
new file mode 100644
index 0000000000..30e3b16906
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/LaunchingTasks.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.context.JobContext;
+import com.dangdang.ddframe.job.context.ExecutionType;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMAssignmentResult;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 分配任务行为包.
+ *
+ * @author zhangliang
+ */
+@Slf4j
+public final class LaunchingTasks {
+
+ private final Map eligibleJobContextsMap;
+
+ public LaunchingTasks(final Collection eligibleJobContexts) {
+ eligibleJobContextsMap = new HashMap<>(eligibleJobContexts.size(), 1);
+ for (JobContext each : eligibleJobContexts) {
+ eligibleJobContextsMap.put(each.getJobConfig().getJobName(), each);
+ }
+ }
+
+ List getPendingTasks() {
+ List result = new ArrayList<>(eligibleJobContextsMap.size() * 10);
+ for (JobContext each : eligibleJobContextsMap.values()) {
+ result.addAll(createTaskRequests(each));
+ }
+ return result;
+ }
+
+ private Collection createTaskRequests(final JobContext jobContext) {
+ Collection result = new ArrayList<>(jobContext.getAssignedShardingItems().size());
+ for (int each : jobContext.getAssignedShardingItems()) {
+ result.add(new JobTaskRequest(new TaskContext(jobContext.getJobConfig().getJobName(), Arrays.asList(each), jobContext.getType()), jobContext.getJobConfig()));
+ }
+ return result;
+ }
+
+ Collection getIntegrityViolationJobs(final Collection vmAssignmentResults) {
+ Map assignedJobShardingTotalCountMap = getAssignedJobShardingTotalCountMap(vmAssignmentResults);
+ Collection result = new HashSet<>(assignedJobShardingTotalCountMap.size(), 1);
+ for (Map.Entry entry : assignedJobShardingTotalCountMap.entrySet()) {
+ JobContext jobContext = eligibleJobContextsMap.get(entry.getKey());
+ if (ExecutionType.FAILOVER != jobContext.getType() && !entry.getValue().equals(jobContext.getJobConfig().getTypeConfig().getCoreConfig().getShardingTotalCount())) {
+ log.warn("Job {} is not assigned at this time, because resources not enough to run all sharding instances.", entry.getKey());
+ result.add(entry.getKey());
+ }
+ }
+ return result;
+ }
+
+ private Map getAssignedJobShardingTotalCountMap(final Collection vmAssignmentResults) {
+ Map result = new HashMap<>(eligibleJobContextsMap.size(), 1);
+ for (VMAssignmentResult vmAssignmentResult: vmAssignmentResults) {
+ for (TaskAssignmentResult tasksAssigned: vmAssignmentResult.getTasksAssigned()) {
+ String jobName = TaskContext.from(tasksAssigned.getTaskId()).getMetaInfo().getJobName();
+ if (result.containsKey(jobName)) {
+ result.put(jobName, result.get(jobName) + 1);
+ } else {
+ result.put(jobName, 1);
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/LeasesQueue.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/LeasesQueue.java
new file mode 100644
index 0000000000..5654ea7fec
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/LeasesQueue.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.plugins.VMLeaseObject;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.mesos.Protos;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * 资源预占队列.
+ *
+ * @author zhangliang
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class LeasesQueue {
+
+ private static final LeasesQueue INSTANCE = new LeasesQueue();
+
+ private final BlockingQueue queue = new LinkedBlockingQueue<>();
+
+ /**
+ * 获取实例.
+ *
+ * @return 单例对象
+ */
+ public static LeasesQueue getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * 添加资源至队列预占.
+ *
+ * @param offer 资源
+ */
+ public void offer(final Protos.Offer offer) {
+ queue.offer(new VMLeaseObject(offer));
+ }
+
+ /**
+ * 出栈队列资源.
+ *
+ * @return 队列资源集合
+ */
+ public List drainTo() {
+ List result = new ArrayList<>(queue.size());
+ queue.drainTo(result);
+ return result;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/MesosStateService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/MesosStateService.java
new file mode 100644
index 0000000000..844d71048a
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/MesosStateService.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.ha.FrameworkIDService;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.sun.jersey.api.client.Client;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Mesos状态服务.
+ *
+ * @author gaohongtao
+ */
+@Slf4j
+public class MesosStateService {
+
+ private static String stateUrl;
+
+ private final String frameworkID;
+
+ public MesosStateService(final CoordinatorRegistryCenter regCenter) {
+ Optional frameworkIDOptional = new FrameworkIDService(regCenter).fetch();
+ Preconditions.checkState(frameworkIDOptional.isPresent());
+ this.frameworkID = frameworkIDOptional.get();
+ }
+
+ /**
+ * 注册Mesos的Master信息.
+ *
+ * @param hostName Master的主机名
+ * @param port Master端口
+ */
+ public static synchronized void register(final String hostName, final int port) {
+ stateUrl = String.format("http://%s:%d/state", hostName, port);
+ }
+
+ /**
+ * 注销Mesos的Master信息.
+ */
+ public static synchronized void deregister() {
+ stateUrl = null;
+ }
+
+ /**
+ * 获取沙箱信息.
+ *
+ * @param appName 作业云配置App的名字
+ * @return 沙箱信息
+ * @throws JSONException 解析JSON格式异常
+ */
+ public JsonArray sandbox(final String appName) throws JSONException {
+ JSONObject state = fetch(stateUrl);
+ JsonArray result = new JsonArray();
+ for (JSONObject each : findExecutors(state.getJSONArray("frameworks"), appName)) {
+ JSONArray slaves = state.getJSONArray("slaves");
+ String slaveHost = null;
+ for (int i = 0; i < slaves.length(); i++) {
+ JSONObject slave = slaves.getJSONObject(i);
+ if (each.getString("slave_id").equals(slave.getString("id"))) {
+ slaveHost = slave.getString("pid").split("@")[1];
+ }
+ }
+ Preconditions.checkNotNull(slaveHost);
+ JSONObject slaveState = fetch(String.format("http://%s/state", slaveHost));
+ String workDir = slaveState.getJSONObject("flags").getString("work_dir");
+ Collection executorsOnSlave = findExecutors(slaveState.getJSONArray("frameworks"), appName);
+ for (JSONObject executorOnSlave : executorsOnSlave) {
+ JsonObject r = new JsonObject();
+ r.addProperty("hostname", slaveState.getString("hostname"));
+ r.addProperty("path", executorOnSlave.getString("directory").replace(workDir, ""));
+ result.add(r);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * 查找执行器信息.
+ *
+ * @param appName 作业云配置App的名字
+ * @return 执行器信息
+ * @throws JSONException 解析JSON格式异常
+ */
+ public Collection executors(final String appName) throws JSONException {
+ return Collections2.transform(findExecutors(fetch(stateUrl).getJSONArray("frameworks"), appName), new Function() {
+ @Override
+ public ExecutorInfo apply(final JSONObject input) {
+ try {
+ return ExecutorInfo.builder().id(getExecutorId(input)).slaveId(input.getString("slave_id")).build();
+ } catch (final JSONException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ }
+
+ private JSONObject fetch(final String url) {
+ Preconditions.checkState(!Strings.isNullOrEmpty(url));
+ return Client.create().resource(url).get(JSONObject.class);
+ }
+
+ private Collection findExecutors(final JSONArray frameworks, final String appName) throws JSONException {
+ List result = Lists.newArrayList();
+ for (int i = 0; i < frameworks.length(); i++) {
+ JSONObject framework = frameworks.getJSONObject(i);
+ if (!framework.getString("id").equals(frameworkID)) {
+ continue;
+ }
+ JSONArray executors = framework.getJSONArray("executors");
+ for (int j = 0; j < executors.length(); j++) {
+ JSONObject executor = executors.getJSONObject(j);
+ if (appName.equals(getExecutorId(executor).split("@-@")[0])) {
+ result.add(executor);
+ }
+ }
+ }
+ return result;
+ }
+
+ private String getExecutorId(final JSONObject executor) throws JSONException {
+ return executor.has("id") ? executor.getString("id") : executor.getString("executor_id");
+ }
+
+ @Builder
+ @Getter
+ public static final class ExecutorInfo {
+
+ private final String id;
+
+ private final String slaveId;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SchedulerEngine.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SchedulerEngine.java
new file mode 100644
index 0000000000..24df0c1ced
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SchedulerEngine.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.ha.FrameworkIDService;
+import com.dangdang.ddframe.job.cloud.scheduler.statistics.StatisticManager;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.dangdang.ddframe.job.event.JobEventBus;
+import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
+import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.Source;
+import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
+import com.netflix.fenzo.TaskScheduler;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.List;
+
+/**
+ * 作业云引擎.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class SchedulerEngine implements Scheduler {
+
+ private final TaskScheduler taskScheduler;
+
+ private final FacadeService facadeService;
+
+ private final JobEventBus jobEventBus;
+
+ private final FrameworkIDService frameworkIDService;
+
+ private final StatisticManager statisticManager;
+
+ @Override
+ public void registered(final SchedulerDriver schedulerDriver, final Protos.FrameworkID frameworkID, final Protos.MasterInfo masterInfo) {
+ log.info("call registered");
+ frameworkIDService.save(frameworkID.getValue());
+ taskScheduler.expireAllLeases();
+ MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
+ }
+
+ @Override
+ public void reregistered(final SchedulerDriver schedulerDriver, final Protos.MasterInfo masterInfo) {
+ log.info("call reregistered");
+ taskScheduler.expireAllLeases();
+ MesosStateService.register(masterInfo.getHostname(), masterInfo.getPort());
+ }
+
+ @Override
+ public void resourceOffers(final SchedulerDriver schedulerDriver, final List offers) {
+ for (Protos.Offer offer: offers) {
+ log.trace("Adding offer {} from host {}", offer.getId(), offer.getHostname());
+ LeasesQueue.getInstance().offer(offer);
+ }
+ }
+
+ @Override
+ public void offerRescinded(final SchedulerDriver schedulerDriver, final Protos.OfferID offerID) {
+ log.trace("call offerRescinded: {}", offerID);
+ taskScheduler.expireLease(offerID.getValue());
+ }
+
+ @Override
+ public void statusUpdate(final SchedulerDriver schedulerDriver, final Protos.TaskStatus taskStatus) {
+ String taskId = taskStatus.getTaskId().getValue();
+ TaskContext taskContext = TaskContext.from(taskId);
+ String jobName = taskContext.getMetaInfo().getJobName();
+ log.trace("call statusUpdate task state is: {}, task id is: {}", taskStatus.getState(), taskId);
+ jobEventBus.post(new JobStatusTraceEvent(jobName, taskContext.getId(), taskContext.getSlaveId(), Source.CLOUD_SCHEDULER,
+ taskContext.getType(), String.valueOf(taskContext.getMetaInfo().getShardingItems()), State.valueOf(taskStatus.getState().name()), taskStatus.getMessage()));
+ switch (taskStatus.getState()) {
+ case TASK_RUNNING:
+ if (!facadeService.load(jobName).isPresent()) {
+ schedulerDriver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
+ }
+ if ("BEGIN".equals(taskStatus.getMessage())) {
+ facadeService.updateDaemonStatus(taskContext, false);
+ } else if ("COMPLETE".equals(taskStatus.getMessage())) {
+ facadeService.updateDaemonStatus(taskContext, true);
+ }
+ break;
+ case TASK_FINISHED:
+ facadeService.removeRunning(taskContext);
+ unAssignTask(taskId);
+ statisticManager.taskRunSuccessfully();
+ break;
+ case TASK_KILLED:
+ log.warn("task id is: {}, status is: {}, message is: {}, source is: {}", taskId, taskStatus.getState(), taskStatus.getMessage(), taskStatus.getSource());
+ facadeService.removeRunning(taskContext);
+ facadeService.addDaemonJobToReadyQueue(jobName);
+ unAssignTask(taskId);
+ break;
+ case TASK_LOST:
+ case TASK_FAILED:
+ case TASK_ERROR:
+ log.warn("task id is: {}, status is: {}, message is: {}, source is: {}", taskId, taskStatus.getState(), taskStatus.getMessage(), taskStatus.getSource());
+ facadeService.removeRunning(taskContext);
+ facadeService.recordFailoverTask(taskContext);
+ unAssignTask(taskId);
+ statisticManager.taskRunFailed();
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void unAssignTask(final String taskId) {
+ String hostname = facadeService.popMapping(taskId);
+ if (null != hostname) {
+ taskScheduler.getTaskUnAssigner().call(TaskContext.getIdForUnassignedSlave(taskId), hostname);
+ }
+ }
+
+ @Override
+ public void frameworkMessage(final SchedulerDriver schedulerDriver, final Protos.ExecutorID executorID, final Protos.SlaveID slaveID, final byte[] bytes) {
+ log.trace("call frameworkMessage slaveID: {}, bytes: {}", slaveID, new String(bytes));
+ }
+
+ @Override
+ public void disconnected(final SchedulerDriver schedulerDriver) {
+ log.warn("call disconnected");
+ MesosStateService.deregister();
+ }
+
+ @Override
+ public void slaveLost(final SchedulerDriver schedulerDriver, final Protos.SlaveID slaveID) {
+ log.warn("call slaveLost slaveID is: {}", slaveID);
+ taskScheduler.expireAllLeasesByVMId(slaveID.getValue());
+ }
+
+ @Override
+ public void executorLost(final SchedulerDriver schedulerDriver, final Protos.ExecutorID executorID, final Protos.SlaveID slaveID, final int i) {
+ log.warn("call executorLost slaveID is: {}, executorID is: {}", slaveID, executorID);
+ }
+
+ @Override
+ public void error(final SchedulerDriver schedulerDriver, final String message) {
+ log.error("call error, message is: {}", message);
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SchedulerService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SchedulerService.java
new file mode 100644
index 0000000000..e841d13b52
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SchedulerService.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfigurationListener;
+import com.dangdang.ddframe.job.cloud.scheduler.env.BootstrapEnvironment;
+import com.dangdang.ddframe.job.cloud.scheduler.env.MesosConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.ha.FrameworkIDService;
+import com.dangdang.ddframe.job.cloud.scheduler.producer.ProducerManager;
+import com.dangdang.ddframe.job.cloud.scheduler.restful.RestfulService;
+import com.dangdang.ddframe.job.cloud.scheduler.statistics.StatisticManager;
+import com.dangdang.ddframe.job.event.JobEventBus;
+import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+
+import static com.dangdang.ddframe.job.cloud.scheduler.env.MesosConfiguration.FRAMEWORK_FAILOVER_TIMEOUT;
+import static com.dangdang.ddframe.job.cloud.scheduler.env.MesosConfiguration.FRAMEWORK_NAME;
+
+/**
+ * 调度服务.
+ *
+ * @author zhangliang
+ * @author caohao
+ */
+@Slf4j
+@AllArgsConstructor
+public final class SchedulerService {
+
+ private static final String WEB_UI_PROTOCOL = "http://";
+
+ private final BootstrapEnvironment env;
+
+ private final FacadeService facadeService;
+
+ private final SchedulerDriver schedulerDriver;
+
+ private final ProducerManager producerManager;
+
+ private final StatisticManager statisticManager;
+
+ private final CloudJobConfigurationListener cloudJobConfigurationListener;
+
+ private final Service taskLaunchScheduledService;
+
+ private final RestfulService restfulService;
+
+ public SchedulerService(final CoordinatorRegistryCenter regCenter) {
+ env = BootstrapEnvironment.getInstance();
+ facadeService = new FacadeService(regCenter);
+ statisticManager = StatisticManager.getInstance(regCenter, env.getJobEventRdbConfiguration());
+ TaskScheduler taskScheduler = getTaskScheduler();
+ JobEventBus jobEventBus = getJobEventBus();
+ schedulerDriver = getSchedulerDriver(taskScheduler, jobEventBus, new FrameworkIDService(regCenter));
+ producerManager = new ProducerManager(schedulerDriver, regCenter);
+ cloudJobConfigurationListener = new CloudJobConfigurationListener(regCenter, producerManager);
+ taskLaunchScheduledService = new TaskLaunchScheduledService(schedulerDriver, taskScheduler, facadeService, jobEventBus);
+ restfulService = new RestfulService(regCenter, env.getRestfulServerConfiguration(), producerManager);
+ }
+
+ private SchedulerDriver getSchedulerDriver(final TaskScheduler taskScheduler, final JobEventBus jobEventBus, final FrameworkIDService frameworkIDService) {
+ MesosConfiguration mesosConfig = env.getMesosConfiguration();
+ Optional frameworkIDOptional = frameworkIDService.fetch();
+ Protos.FrameworkInfo.Builder builder = Protos.FrameworkInfo.newBuilder();
+ if (frameworkIDOptional.isPresent()) {
+ builder.setId(Protos.FrameworkID.newBuilder().setValue(frameworkIDOptional.get()).build());
+ }
+ Protos.FrameworkInfo frameworkInfo = builder.setUser(mesosConfig.getUser()).setName(FRAMEWORK_NAME)
+ .setHostname(mesosConfig.getHostname()).setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT)
+ .setWebuiUrl(WEB_UI_PROTOCOL + env.getFrameworkHostPort()).build();
+ return new MesosSchedulerDriver(new SchedulerEngine(taskScheduler, facadeService, jobEventBus, frameworkIDService, statisticManager), frameworkInfo, mesosConfig.getUrl());
+ }
+
+ private TaskScheduler getTaskScheduler() {
+ return new TaskScheduler.Builder()
+ .withLeaseOfferExpirySecs(1000000000L)
+ .withLeaseRejectAction(new Action1() {
+
+ @Override
+ public void call(final VirtualMachineLease lease) {
+ log.warn("Declining offer on '{}'", lease.hostname());
+ schedulerDriver.declineOffer(lease.getOffer().getId());
+ }
+ }).build();
+ }
+
+ private JobEventBus getJobEventBus() {
+ Optional rdbConfig = env.getJobEventRdbConfiguration();
+ if (rdbConfig.isPresent()) {
+ return new JobEventBus(rdbConfig.get());
+ }
+ return new JobEventBus();
+ }
+
+ /**
+ * 以守护进程方式启动.
+ */
+ public void start() {
+ facadeService.start();
+ producerManager.startup();
+ statisticManager.startup();
+ cloudJobConfigurationListener.start();
+ taskLaunchScheduledService.startAsync();
+ restfulService.start();
+ schedulerDriver.start();
+ }
+
+ /**
+ * 停止运行.
+ */
+ public void stop() {
+ restfulService.stop();
+ taskLaunchScheduledService.stopAsync();
+ cloudJobConfigurationListener.stop();
+ statisticManager.shutdown();
+ producerManager.shutdown();
+ schedulerDriver.stop(true);
+ facadeService.stop();
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SupportedExtractionType.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SupportedExtractionType.java
new file mode 100644
index 0000000000..0ad2d7c381
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/SupportedExtractionType.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import lombok.NoArgsConstructor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Mesos所支持的压缩类型.
+ *
+ * @author zhangliang
+ */
+@NoArgsConstructor
+public final class SupportedExtractionType {
+
+ private static final Set EXTRACTION_TYPES = new HashSet<>(9, 1);
+
+ static {
+ EXTRACTION_TYPES.add(".tar");
+ EXTRACTION_TYPES.add(".tar.gz");
+ EXTRACTION_TYPES.add(".tar.bz2");
+ EXTRACTION_TYPES.add(".tar.xz");
+ EXTRACTION_TYPES.add(".gz");
+ EXTRACTION_TYPES.add(".tgz");
+ EXTRACTION_TYPES.add(".tbz2");
+ EXTRACTION_TYPES.add(".txz");
+ EXTRACTION_TYPES.add(".zip");
+ }
+
+ /**
+ * 判断URL的文件是否为压缩格式.
+ * @param appURL 应用URL地址
+ * @return URL的文件是否为压缩格式
+ */
+ public static boolean isExtraction(final String appURL) {
+ for (String each : EXTRACTION_TYPES) {
+ if (appURL.endsWith(each)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskInfoData.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskInfoData.java
new file mode 100644
index 0000000000..2e80ba9035
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskInfoData.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobExecutionType;
+import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
+import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
+import com.dangdang.ddframe.job.executor.ShardingContexts;
+import com.dangdang.ddframe.job.executor.handler.JobProperties;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.SerializationUtils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * 随任务传递的数据.
+ *
+ * @author zhangliang
+ */
+@RequiredArgsConstructor
+public final class TaskInfoData {
+
+ private final ShardingContexts shardingContexts;
+
+ private final CloudJobConfiguration jobConfig;
+
+ /**
+ * 序列化.
+ *
+ * @return 序列化后的字节数组
+ */
+ public byte[] serialize() {
+ LinkedHashMap result = new LinkedHashMap<>(2, 1);
+ result.put("shardingContext", shardingContexts);
+ result.put("jobConfigContext", buildJobConfigurationContext());
+ return SerializationUtils.serialize(result);
+ }
+
+ private Map buildJobConfigurationContext() {
+ Map result = new LinkedHashMap<>(16, 1);
+ result.put("jobType", jobConfig.getTypeConfig().getJobType().name());
+ result.put("jobName", jobConfig.getJobName());
+ result.put("jobClass", jobConfig.getTypeConfig().getJobClass());
+ result.put("cron", CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType() ? jobConfig.getTypeConfig().getCoreConfig().getCron() : "");
+ result.put("jobExceptionHandler", jobConfig.getTypeConfig().getCoreConfig().getJobProperties().get(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER));
+ result.put("executorServiceHandler", jobConfig.getTypeConfig().getCoreConfig().getJobProperties().get(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
+ if (jobConfig.getTypeConfig() instanceof DataflowJobConfiguration) {
+ result.put("streamingProcess", Boolean.toString(((DataflowJobConfiguration) jobConfig.getTypeConfig()).isStreamingProcess()));
+ } else if (jobConfig.getTypeConfig() instanceof ScriptJobConfiguration) {
+ result.put("scriptCommandLine", ((ScriptJobConfiguration) jobConfig.getTypeConfig()).getScriptCommandLine());
+ }
+ result.put("beanName", jobConfig.getBeanName());
+ result.put("applicationContext", jobConfig.getApplicationContext());
+ return result;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskLaunchScheduledService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskLaunchScheduledService.java
new file mode 100644
index 0000000000..a0fbc72f0f
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/TaskLaunchScheduledService.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.mesos;
+
+import com.dangdang.ddframe.job.api.JobType;
+import com.dangdang.ddframe.job.cloud.scheduler.env.BootstrapEnvironment;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobExecutionType;
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfiguration;
+import com.dangdang.ddframe.job.context.ExecutionType;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.dangdang.ddframe.job.event.JobEventBus;
+import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
+import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.Source;
+import com.dangdang.ddframe.job.executor.ShardingContexts;
+import com.dangdang.ddframe.job.util.config.ShardingItemParameters;
+import com.dangdang.ddframe.job.util.json.GsonFactory;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.protobuf.ByteString;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VMAssignmentResult;
+import com.netflix.fenzo.VirtualMachineLease;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 任务提交调度服务.
+ *
+ * @author zhangliang
+ * @author gaohongtao
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class TaskLaunchScheduledService extends AbstractScheduledService {
+
+ private final SchedulerDriver schedulerDriver;
+
+ private final TaskScheduler taskScheduler;
+
+ private final FacadeService facadeService;
+
+ private final JobEventBus jobEventBus;
+
+ private final BootstrapEnvironment env = BootstrapEnvironment.getInstance();
+
+ @Override
+ protected String serviceName() {
+ return "task-launch-processor";
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedDelaySchedule(2, 10, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ log.info("Elastic Job: Start {}", serviceName());
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ log.info("Elastic Job: Stop {}", serviceName());
+ }
+
+ @Override
+ protected void runOneIteration() throws Exception {
+ try {
+ LaunchingTasks launchingTasks = new LaunchingTasks(facadeService.getEligibleJobContext());
+ List virtualMachineLeases = LeasesQueue.getInstance().drainTo();
+ Collection vmAssignmentResults = taskScheduler.scheduleOnce(launchingTasks.getPendingTasks(), virtualMachineLeases).getResultMap().values();
+ List taskContextsList = new LinkedList<>();
+ Map, List> offerIdTaskInfoMap = new HashMap<>();
+ for (VMAssignmentResult each: vmAssignmentResults) {
+ List leasesUsed = each.getLeasesUsed();
+ List taskInfoList = new ArrayList<>(each.getTasksAssigned().size() * 10);
+ taskInfoList.addAll(getTaskInfoList(launchingTasks.getIntegrityViolationJobs(vmAssignmentResults), each, leasesUsed.get(0).hostname(), leasesUsed.get(0).getOffer().getSlaveId()));
+ for (Protos.TaskInfo taskInfo : taskInfoList) {
+ taskContextsList.add(TaskContext.from(taskInfo.getTaskId().getValue()));
+ }
+ offerIdTaskInfoMap.put(getOfferIDs(leasesUsed), taskInfoList);
+ }
+ for (TaskContext each : taskContextsList) {
+ facadeService.addRunning(each);
+ jobEventBus.post(createJobStatusTraceEvent(each));
+ }
+ facadeService.removeLaunchTasksFromQueue(taskContextsList);
+ for (Entry, List> each : offerIdTaskInfoMap.entrySet()) {
+ schedulerDriver.launchTasks(each.getKey(), each.getValue());
+ }
+ //CHECKSTYLE:OFF
+ } catch (Throwable throwable) {
+ //CHECKSTYLE:ON
+ log.error("Launch task error", throwable);
+ }
+ }
+
+ private List getTaskInfoList(final Collection integrityViolationJobs, final VMAssignmentResult vmAssignmentResult, final String hostname, final Protos.SlaveID slaveId) {
+ List result = new ArrayList<>(vmAssignmentResult.getTasksAssigned().size());
+ for (TaskAssignmentResult each: vmAssignmentResult.getTasksAssigned()) {
+ TaskContext taskContext = TaskContext.from(each.getTaskId());
+ if (!integrityViolationJobs.contains(taskContext.getMetaInfo().getJobName()) && !facadeService.isRunning(taskContext)) {
+ Protos.TaskInfo taskInfo = getTaskInfo(slaveId, each);
+ if (null != taskInfo) {
+ result.add(taskInfo);
+ facadeService.addMapping(taskInfo.getTaskId().getValue(), hostname);
+ taskScheduler.getTaskAssigner().call(each.getRequest(), hostname);
+ }
+ }
+ }
+ return result;
+ }
+
+ private Protos.TaskInfo getTaskInfo(final Protos.SlaveID slaveID, final TaskAssignmentResult taskAssignmentResult) {
+ TaskContext taskContext = TaskContext.from(taskAssignmentResult.getTaskId());
+ Optional jobConfigOptional = facadeService.load(taskContext.getMetaInfo().getJobName());
+ if (!jobConfigOptional.isPresent()) {
+ return null;
+ }
+ CloudJobConfiguration jobConfig = jobConfigOptional.get();
+ Optional appConfigOptional = facadeService.loadAppConfig(jobConfig.getAppName());
+ if (!appConfigOptional.isPresent()) {
+ return null;
+ }
+ CloudAppConfiguration appConfig = appConfigOptional.get();
+ taskContext.setSlaveId(slaveID.getValue());
+ ShardingContexts shardingContexts = getShardingContexts(taskContext, appConfig, jobConfig);
+ boolean useDefaultExecutor = CloudJobExecutionType.TRANSIENT == jobConfig.getJobExecutionType() && JobType.SCRIPT == jobConfig.getTypeConfig().getJobType();
+ Protos.CommandInfo.URI uri = buildURI(appConfig, useDefaultExecutor);
+ Protos.CommandInfo command = buildCommand(uri, appConfig.getBootstrapScript(), shardingContexts, useDefaultExecutor);
+ return buildTaskInfo(taskContext, appConfig, jobConfig, shardingContexts, slaveID, command, useDefaultExecutor);
+ }
+
+ private ShardingContexts getShardingContexts(final TaskContext taskContext, final CloudAppConfiguration appConfig, final CloudJobConfiguration jobConfig) {
+ Map shardingItemParameters = new ShardingItemParameters(jobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
+ Map assignedShardingItemParameters = new HashMap<>(1, 1);
+ int shardingItem = taskContext.getMetaInfo().getShardingItems().get(0);
+ assignedShardingItemParameters.put(shardingItem, shardingItemParameters.containsKey(shardingItem) ? shardingItemParameters.get(shardingItem) : "");
+ return new ShardingContexts(taskContext.getId(), jobConfig.getJobName(), jobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
+ jobConfig.getTypeConfig().getCoreConfig().getJobParameter(), assignedShardingItemParameters, appConfig.getEventTraceSamplingCount());
+ }
+
+ private Protos.CommandInfo.URI buildURI(final CloudAppConfiguration appConfig, final boolean useDefaultExecutor) {
+ Protos.CommandInfo.URI.Builder result = Protos.CommandInfo.URI.newBuilder().setValue(appConfig.getAppURL()).setCache(appConfig.isAppCacheEnable());
+ if (useDefaultExecutor && !SupportedExtractionType.isExtraction(appConfig.getAppURL())) {
+ result.setExecutable(true);
+ } else {
+ result.setExtract(true);
+ }
+ return result.build();
+ }
+
+ private Protos.CommandInfo buildCommand(final Protos.CommandInfo.URI uri, final String bootstrapScript, final ShardingContexts shardingContexts, final boolean useDefaultExecutor) {
+ Protos.CommandInfo.Builder result = Protos.CommandInfo.newBuilder().addUris(uri).setShell(true);
+ if (useDefaultExecutor) {
+ CommandLine commandLine = CommandLine.parse(bootstrapScript);
+ commandLine.addArgument(GsonFactory.getGson().toJson(shardingContexts), false);
+ result.setValue(Joiner.on(" ").join(commandLine.getExecutable(), Joiner.on(" ").join(commandLine.getArguments())));
+ } else {
+ result.setValue(bootstrapScript);
+ }
+ return result.build();
+ }
+
+ private Protos.TaskInfo buildTaskInfo(final TaskContext taskContext, final CloudAppConfiguration appConfig, final CloudJobConfiguration jobConfig, final ShardingContexts shardingContexts,
+ final Protos.SlaveID slaveID, final Protos.CommandInfo command, final boolean useDefaultExecutor) {
+ Protos.TaskInfo.Builder result = Protos.TaskInfo.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(taskContext.getId()).build())
+ .setName(taskContext.getTaskName()).setSlaveId(slaveID).addResources(buildResource("cpus", jobConfig.getCpuCount())).addResources(buildResource("mem", jobConfig.getMemoryMB()))
+ .setData(ByteString.copyFrom(new TaskInfoData(shardingContexts, jobConfig).serialize()));
+ if (useDefaultExecutor) {
+ return result.setCommand(command).build();
+ }
+ Protos.ExecutorInfo.Builder executorBuilder = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue(taskContext.getExecutorId(jobConfig.getAppName())))
+ .setCommand(command).addResources(buildResource("cpus", appConfig.getCpuCount())).addResources(buildResource("mem", appConfig.getMemoryMB()));
+ if (env.getJobEventRdbConfiguration().isPresent()) {
+ executorBuilder.setData(ByteString.copyFrom(SerializationUtils.serialize(env.getJobEventRdbConfigurationMap()))).build();
+ }
+ return result.setExecutor(executorBuilder.build()).build();
+ }
+
+ private Protos.Resource buildResource(final String type, final double resourceValue) {
+ return Protos.Resource.newBuilder().setName(type).setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(resourceValue)).build();
+ }
+
+ private JobStatusTraceEvent createJobStatusTraceEvent(final TaskContext taskContext) {
+ TaskContext.MetaInfo metaInfo = taskContext.getMetaInfo();
+ JobStatusTraceEvent result = new JobStatusTraceEvent(metaInfo.getJobName(), taskContext.getId(), taskContext.getSlaveId(),
+ Source.CLOUD_SCHEDULER, taskContext.getType(), String.valueOf(metaInfo.getShardingItems()), JobStatusTraceEvent.State.TASK_STAGING, "");
+ if (ExecutionType.FAILOVER == taskContext.getType()) {
+ Optional taskContextOptional = facadeService.getFailoverTaskId(metaInfo);
+ if (taskContextOptional.isPresent()) {
+ result.setOriginalTaskId(taskContextOptional.get());
+ }
+ }
+ return result;
+ }
+
+ private List getOfferIDs(final List leasesUsed) {
+ List result = new ArrayList<>();
+ for (VirtualMachineLease virtualMachineLease: leasesUsed) {
+ result.add(virtualMachineLease.getOffer().getId());
+ }
+ return result;
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/ProducerManager.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/ProducerManager.java
new file mode 100644
index 0000000000..68e73e2403
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/ProducerManager.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.producer;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfigurationService;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobExecutionType;
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfigurationService;
+import com.dangdang.ddframe.job.cloud.scheduler.state.ready.ReadyService;
+import com.dangdang.ddframe.job.cloud.scheduler.state.running.RunningService;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.dangdang.ddframe.job.exception.AppConfigurationException;
+import com.dangdang.ddframe.job.exception.JobConfigurationException;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * 发布任务作业调度管理器.
+ *
+ * @author caohao
+ * @author zhangliang
+ */
+@Slf4j
+public class ProducerManager {
+
+ private final CloudAppConfigurationService appConfigService;
+
+ private final CloudJobConfigurationService configService;
+
+ private final ReadyService readyService;
+
+ private final RunningService runningService;
+
+ private final TransientProducerScheduler transientProducerScheduler;
+
+ private final SchedulerDriver schedulerDriver;
+
+ public ProducerManager(final SchedulerDriver schedulerDriver, final CoordinatorRegistryCenter regCenter) {
+ this.schedulerDriver = schedulerDriver;
+ appConfigService = new CloudAppConfigurationService(regCenter);
+ configService = new CloudJobConfigurationService(regCenter);
+ readyService = new ReadyService(regCenter);
+ runningService = new RunningService(regCenter);
+ transientProducerScheduler = new TransientProducerScheduler(readyService);
+ }
+
+ /**
+ * 启动作业调度器.
+ */
+ public void startup() {
+ log.info("Start producer manager");
+ transientProducerScheduler.start();
+ for (CloudJobConfiguration each : configService.loadAll()) {
+ schedule(each);
+ }
+ }
+
+ /**
+ * 注册作业.
+ *
+ * @param jobConfig 作业配置
+ */
+ public void register(final CloudJobConfiguration jobConfig) {
+ Optional appConfigFromZk = appConfigService.load(jobConfig.getAppName());
+ if (!appConfigFromZk.isPresent()) {
+ throw new AppConfigurationException("Register app '%s' firstly.", jobConfig.getAppName());
+ }
+ Optional jobConfigFromZk = configService.load(jobConfig.getJobName());
+ if (jobConfigFromZk.isPresent()) {
+ throw new JobConfigurationException("Job '%s' already existed.", jobConfig.getJobName());
+ }
+ configService.add(jobConfig);
+ schedule(jobConfig);
+ }
+
+ /**
+ * 更新作业配置.
+ *
+ * @param jobConfig 作业配置
+ */
+ public void update(final CloudJobConfiguration jobConfig) {
+ Optional jobConfigFromZk = configService.load(jobConfig.getJobName());
+ if (!jobConfigFromZk.isPresent()) {
+ throw new JobConfigurationException("Cannot found job '%s', please register first.", jobConfig.getJobName());
+ }
+ configService.update(jobConfig);
+ reschedule(jobConfig);
+ }
+
+ /**
+ * 注销作业.
+ *
+ * @param jobName 作业名称
+ */
+ public void deregister(final String jobName) {
+ Optional jobConfig = configService.load(jobName);
+ if (jobConfig.isPresent()) {
+ configService.remove(jobName);
+ transientProducerScheduler.deregister(jobConfig.get());
+ }
+ unschedule(jobName);
+ }
+
+ /**
+ * 调度作业.
+ *
+ * @param jobConfig 作业配置
+ */
+ public void schedule(final CloudJobConfiguration jobConfig) {
+ if (CloudJobExecutionType.TRANSIENT == jobConfig.getJobExecutionType()) {
+ transientProducerScheduler.register(jobConfig);
+ } else if (CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) {
+ readyService.addDaemon(jobConfig.getJobName());
+ }
+ }
+
+ /**
+ * 停止调度作业.
+ *
+ * @param jobName 作业名称
+ */
+ public void unschedule(final String jobName) {
+ for (TaskContext each : runningService.getRunningTasks(jobName)) {
+ schedulerDriver.killTask(Protos.TaskID.newBuilder().setValue(each.getId()).build());
+ }
+ runningService.remove(jobName);
+ readyService.remove(Lists.newArrayList(jobName));
+ }
+
+ /**
+ * 重新调度作业.
+ *
+ * @param jobConfig 作业配置
+ */
+ public void reschedule(final CloudJobConfiguration jobConfig) {
+ unschedule(jobConfig.getJobName());
+ schedule(jobConfig);
+ }
+
+ /**
+ * 关闭作业调度器.
+ */
+ public void shutdown() {
+ log.info("Stop producer manager");
+ transientProducerScheduler.shutdown();
+ }
+
+ // TODO 挪到ReconcileService中
+ /**
+ * 为特定的任务进行显式协调.
+ *
+ * @param taskId 任务ID
+ */
+ public void explicitReconcile(final String taskId) {
+ explicitReconcile(Collections.singletonList(Iterators.find(runningService.getRunningTasks(TaskContext.from(taskId).getMetaInfo().getJobName()).iterator(), new Predicate() {
+ @Override
+ public boolean apply(final TaskContext input) {
+ return input.getId().equals(taskId);
+ }
+ })));
+ }
+
+ private void explicitReconcile(final Collection taskContexts) {
+ schedulerDriver.reconcileTasks(Collections2.transform(taskContexts, new Function() {
+ @Override
+ public Protos.TaskStatus apply(final TaskContext input) {
+ return Protos.TaskStatus.newBuilder()
+ .setTaskId(Protos.TaskID.newBuilder().setValue(input.getId()).build())
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue(input.getSlaveId()).build())
+ .setState(Protos.TaskState.TASK_RUNNING).build();
+ }
+ }));
+ }
+
+ // TODO 挪到ReconcileService中
+ /**
+ * 全量的显示协调.
+ */
+ public void explicitReconcile() {
+ Set runningDaemonTask = runningService.getAllRunningDaemonTasks();
+ if (!runningDaemonTask.isEmpty()) {
+ explicitReconcile(runningDaemonTask);
+ }
+ }
+
+ // TODO 挪到ReconcileService中
+ /**
+ * 隐式协调.
+ */
+ public void implicitReconcile() {
+ schedulerDriver.reconcileTasks(Collections.emptyList());
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerRepository.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerRepository.java
new file mode 100644
index 0000000000..cd63620df3
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerRepository.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.producer;
+
+import org.quartz.JobKey;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * 瞬时作业生成器数据访问对象.
+ *
+ * @author caohao
+ * @author zhangliang
+ */
+class TransientProducerRepository {
+
+ private final ConcurrentHashMap> cronTasks = new ConcurrentHashMap<>(256, 1);
+
+ synchronized void put(final JobKey jobKey, final String jobName) {
+ remove(jobName);
+ List taskList = cronTasks.get(jobKey);
+ if (null == taskList) {
+ taskList = new CopyOnWriteArrayList<>();
+ taskList.add(jobName);
+ cronTasks.put(jobKey, taskList);
+ return;
+ }
+ if (!taskList.contains(jobName)) {
+ taskList.add(jobName);
+ }
+ }
+
+ synchronized void remove(final String jobName) {
+ for (Entry> each : cronTasks.entrySet()) {
+ JobKey jobKey = each.getKey();
+ List jobNames = each.getValue();
+ jobNames.remove(jobName);
+ if (jobNames.isEmpty()) {
+ cronTasks.remove(jobKey);
+ }
+ }
+ }
+
+ List get(final JobKey jobKey) {
+ List result = cronTasks.get(jobKey);
+ return null == result ? Collections.emptyList() : result;
+ }
+
+ boolean containsKey(final JobKey jobKey) {
+ return cronTasks.containsKey(jobKey);
+ }
+
+ void removeAll() {
+ cronTasks.clear();
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerScheduler.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerScheduler.java
new file mode 100644
index 0000000000..88f339d918
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerScheduler.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.producer;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.state.ready.ReadyService;
+import com.dangdang.ddframe.job.exception.JobSystemException;
+import lombok.Setter;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.plugins.management.ShutdownHookPlugin;
+import org.quartz.simpl.SimpleThreadPool;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * 发布瞬时作业任务的调度器.
+ *
+ * @author caohao
+ */
+class TransientProducerScheduler {
+
+ private final TransientProducerRepository repository;
+
+ private final ReadyService readyService;
+
+ private Scheduler scheduler;
+
+ TransientProducerScheduler(final ReadyService readyService) {
+ repository = new TransientProducerRepository();
+ this.readyService = readyService;
+ }
+
+ void start() {
+ scheduler = getScheduler();
+ try {
+ scheduler.start();
+ } catch (final SchedulerException ex) {
+ throw new JobSystemException(ex);
+ }
+ }
+
+ private Scheduler getScheduler() {
+ StdSchedulerFactory factory = new StdSchedulerFactory();
+ try {
+ factory.initialize(getQuartzProperties());
+ return factory.getScheduler();
+ } catch (final SchedulerException ex) {
+ throw new JobSystemException(ex);
+ }
+ }
+
+ private Properties getQuartzProperties() {
+ Properties result = new Properties();
+ result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
+ result.put("org.quartz.threadPool.threadCount", Integer.toString(Runtime.getRuntime().availableProcessors() * 2));
+ result.put("org.quartz.scheduler.instanceName", "ELASTIC_JOB_CLOUD_TRANSIENT_PRODUCER");
+ result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
+ result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
+ return result;
+ }
+
+ // TODO 并发优化
+ synchronized void register(final CloudJobConfiguration jobConfig) {
+ String cron = jobConfig.getTypeConfig().getCoreConfig().getCron();
+ JobKey jobKey = buildJobKey(cron);
+ repository.put(jobKey, jobConfig.getJobName());
+ try {
+ if (!scheduler.checkExists(jobKey)) {
+ scheduler.scheduleJob(buildJobDetail(jobKey), buildTrigger(jobKey.getName()));
+ }
+ } catch (final SchedulerException ex) {
+ throw new JobSystemException(ex);
+ }
+ }
+
+ private JobDetail buildJobDetail(final JobKey jobKey) {
+ JobDetail result = JobBuilder.newJob(ProducerJob.class).withIdentity(jobKey).build();
+ result.getJobDataMap().put("repository", repository);
+ result.getJobDataMap().put("readyService", readyService);
+ return result;
+ }
+
+ private Trigger buildTrigger(final String cron) {
+ return TriggerBuilder.newTrigger().withIdentity(cron).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build();
+ }
+
+ void deregister(final CloudJobConfiguration jobConfig) {
+ repository.remove(jobConfig.getJobName());
+ String cron = jobConfig.getTypeConfig().getCoreConfig().getCron();
+ if (!repository.containsKey(buildJobKey(cron))) {
+ try {
+ scheduler.unscheduleJob(TriggerKey.triggerKey(cron));
+ } catch (final SchedulerException ex) {
+ throw new JobSystemException(ex);
+ }
+ }
+ }
+
+ private JobKey buildJobKey(final String cron) {
+ return JobKey.jobKey(cron);
+ }
+
+ void shutdown() {
+ try {
+ if (null != scheduler && !scheduler.isShutdown()) {
+ scheduler.shutdown();
+ }
+ } catch (final SchedulerException ex) {
+ throw new JobSystemException(ex);
+ }
+ repository.removeAll();
+ }
+
+ @Setter
+ public static final class ProducerJob implements Job {
+
+ private TransientProducerRepository repository;
+
+ private ReadyService readyService;
+
+ @Override
+ public void execute(final JobExecutionContext context) throws JobExecutionException {
+ List jobNames = repository.get(context.getJobDetail().getKey());
+ for (String each : jobNames) {
+ readyService.addTransient(each);
+ }
+ }
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudAppRestfulApi.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudAppRestfulApi.java
new file mode 100644
index 0000000000..1f07e2d8ff
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudAppRestfulApi.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.restful;
+
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfigurationGsonFactory;
+import com.dangdang.ddframe.job.cloud.scheduler.config.app.CloudAppConfigurationService;
+import com.dangdang.ddframe.job.exception.AppConfigurationException;
+import com.dangdang.ddframe.job.exception.JobSystemException;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.dangdang.ddframe.job.util.json.GsonFactory;
+import com.google.common.base.Optional;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.MediaType;
+import java.util.Collection;
+
+/**
+ * 云作业App的REST API.
+ *
+ * @author caohao
+ */
+@Path("/app")
+public final class CloudAppRestfulApi {
+
+ private static CoordinatorRegistryCenter regCenter;
+
+ private final CloudAppConfigurationService configService;
+
+ public CloudAppRestfulApi() {
+ configService = new CloudAppConfigurationService(regCenter);
+ }
+
+ /**
+ * 初始化.
+ *
+ * @param regCenter 注册中心
+ */
+ public static void init(final CoordinatorRegistryCenter regCenter) {
+ CloudAppRestfulApi.regCenter = regCenter;
+ GsonFactory.registerTypeAdapter(CloudAppConfiguration.class, new CloudAppConfigurationGsonFactory.CloudAppConfigurationGsonTypeAdapter());
+ }
+
+ /**
+ * 注册云作业APP配置.
+ *
+ * @param appConfig 云作业APP配置
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void register(final CloudAppConfiguration appConfig) {
+ Optional appConfigFromZk = configService.load(appConfig.getAppName());
+ if (appConfigFromZk.isPresent()) {
+ throw new AppConfigurationException("app '%s' already existed.", appConfig.getAppName());
+ }
+ configService.add(appConfig);
+ }
+
+ /**
+ * 更新云作业App配置.
+ *
+ * @param appConfig 云作业App配置
+ */
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void update(final CloudAppConfiguration appConfig) {
+ configService.update(appConfig);
+ }
+
+ /**
+ * 查询云作业App配置.
+ *
+ * @param appName 云作业App配置名称
+ * @return 云作业App配置
+ */
+ @GET
+ @Path("/{appName}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public CloudAppConfiguration detail(@PathParam("appName") final String appName) {
+ Optional config = configService.load(appName);
+ if (config.isPresent()) {
+ return config.get();
+ }
+ throw new JobSystemException("Cannot find app '%s', please check the appName.", appName);
+ }
+
+ /**
+ * 查找全部云作业App配置.
+ *
+ * @return 全部云作业App配置
+ */
+ @GET
+ @Path("/list")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Collection findAllApps() {
+ return configService.loadAll();
+ }
+
+ /**
+ * 注销云作业App配置.
+ *
+ * @param appConfig 云作业App配置
+ */
+ @DELETE
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void deregister(final String appConfig) {
+ configService.remove(appConfig);
+ }
+}
diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApi.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApi.java
new file mode 100644
index 0000000000..21a079dda9
--- /dev/null
+++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApi.java
@@ -0,0 +1,438 @@
+/*
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.job.cloud.scheduler.restful;
+
+import com.dangdang.ddframe.job.cloud.scheduler.env.BootstrapEnvironment;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfiguration;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfigurationGsonFactory;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobConfigurationService;
+import com.dangdang.ddframe.job.cloud.scheduler.config.job.CloudJobExecutionType;
+import com.dangdang.ddframe.job.cloud.scheduler.mesos.FacadeService;
+import com.dangdang.ddframe.job.cloud.scheduler.producer.ProducerManager;
+import com.dangdang.ddframe.job.cloud.scheduler.state.failover.FailoverTaskInfo;
+import com.dangdang.ddframe.job.cloud.scheduler.statistics.StatisticManager;
+import com.dangdang.ddframe.job.context.TaskContext;
+import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
+import com.dangdang.ddframe.job.event.rdb.JobEventRdbSearch;
+import com.dangdang.ddframe.job.event.rdb.JobEventRdbSearch.Condition;
+import com.dangdang.ddframe.job.event.rdb.JobEventRdbSearch.Result;
+import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
+import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
+import com.dangdang.ddframe.job.exception.JobConfigurationException;
+import com.dangdang.ddframe.job.exception.JobSystemException;
+import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
+import com.dangdang.ddframe.job.statistics.StatisticInterval;
+import com.dangdang.ddframe.job.statistics.type.job.JobExecutionTypeStatistics;
+import com.dangdang.ddframe.job.statistics.type.job.JobRegisterStatistics;
+import com.dangdang.ddframe.job.statistics.type.job.JobRunningStatistics;
+import com.dangdang.ddframe.job.statistics.type.job.JobTypeStatistics;
+import com.dangdang.ddframe.job.statistics.type.task.TaskResultStatistics;
+import com.dangdang.ddframe.job.statistics.type.task.TaskRunningStatistics;
+import com.dangdang.ddframe.job.util.json.GsonFactory;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriInfo;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * 作业云Job的REST API.
+ *
+ * @author zhangliang
+ * @author liguangyun
+ */
+@Path("/job")
+@Slf4j
+public final class CloudJobRestfulApi {
+
+ private static CoordinatorRegistryCenter regCenter;
+
+ private static JobEventRdbSearch jobEventRdbSearch;
+
+ private static ProducerManager producerManager;
+
+ private final CloudJobConfigurationService configService;
+
+ private final FacadeService facadeService;
+
+ private final StatisticManager statisticManager;
+
+ public CloudJobRestfulApi() {
+ Preconditions.checkNotNull(regCenter);
+ configService = new CloudJobConfigurationService(regCenter);
+ facadeService = new FacadeService(regCenter);
+ Optional jobEventRdbConfiguration = Optional.absent();
+ statisticManager = StatisticManager.getInstance(regCenter, jobEventRdbConfiguration);
+ }
+
+ /**
+ * 初始化.
+ *
+ * @param regCenter 注册中心
+ * @param producerManager 生产管理器
+ */
+ public static void init(final CoordinatorRegistryCenter regCenter, final ProducerManager producerManager) {
+ CloudJobRestfulApi.regCenter = regCenter;
+ CloudJobRestfulApi.producerManager = producerManager;
+ GsonFactory.registerTypeAdapter(CloudJobConfiguration.class, new CloudJobConfigurationGsonFactory.CloudJobConfigurationGsonTypeAdapter());
+ Optional jobEventRdbConfig = BootstrapEnvironment.getInstance().getJobEventRdbConfiguration();
+ if (jobEventRdbConfig.isPresent()) {
+ jobEventRdbSearch = new JobEventRdbSearch(jobEventRdbConfig.get().getDataSource());
+ } else {
+ jobEventRdbSearch = null;
+ }
+ }
+
+ /**
+ * 注册作业.
+ *
+ * @param jobConfig 作业配置
+ */
+ @POST
+ @Path("/register")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void register(final CloudJobConfiguration jobConfig) {
+ producerManager.register(jobConfig);
+ }
+
+ /**
+ * 更新作业配置.
+ *
+ * @param jobConfig 作业配置
+ */
+ @PUT
+ @Path("/update")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void update(final CloudJobConfiguration jobConfig) {
+ producerManager.update(jobConfig);
+ }
+
+ /**
+ * 注销作业.
+ *
+ * @param jobName 作业名称
+ */
+ @DELETE
+ @Path("/deregister")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void deregister(final String jobName) {
+ producerManager.deregister(jobName);
+ }
+
+ /**
+ * 触发一次作业.
+ *
+ * @param jobName 作业名称
+ */
+ @POST
+ @Path("/trigger")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void trigger(final String jobName) {
+ Optional config = configService.load(jobName);
+ if (config.isPresent() && CloudJobExecutionType.DAEMON == config.get().getJobExecutionType()) {
+ throw new JobSystemException("Daemon job '%s' cannot support trigger.", jobName);
+ }
+ facadeService.addTransient(jobName);
+ }
+
+ /**
+ * 查询作业详情.
+ *
+ * @param jobName 作业名称
+ * @return 作业配置对象
+ */
+ @GET
+ @Path("/jobs/{jobName}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public CloudJobConfiguration detail(@PathParam("jobName") final String jobName) {
+ Optional config = configService.load(jobName);
+ if (config.isPresent()) {
+ return config.get();
+ }
+ throw new JobConfigurationException("Cannot find job '%s', please check the jobName.", jobName);
+ }
+
+ /**
+ * 查找全部作业.
+ *
+ * @return 全部作业
+ */
+ @GET
+ @Path("/jobs")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Collection findAllJobs() {
+ return configService.loadAll();
+ }
+
+ /**
+ * 查找运行中的全部任务.
+ *
+ * @return 运行中的全部任务
+ */
+ @GET
+ @Path("tasks/running")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Collection findAllRunningTasks() {
+ List result = new LinkedList<>();
+ for (Set each : facadeService.getAllRunningTasks().values()) {
+ result.addAll(each);
+ }
+ return result;
+ }
+
+ /**
+ * 查找待运行的全部任务.
+ *
+ * @return 待运行的全部任务
+ */
+ @GET
+ @Path("tasks/ready")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Collection