Skip to content

Commit

Permalink
Moved jobClass config to job configuration context.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Aug 4, 2016
1 parent 983a1a1 commit 0664ccd
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 17 deletions.
Empty file modified README_1.x.md
100755 → 100644
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,25 @@ public final class ArgumentsParser {
* @throws JobExecutionEnvironmentException 作业执行环境异常
*/
public static ArgumentsParser parse(final String[] args) throws JobExecutionEnvironmentException {
int argumentsLength = 3;
int argumentsLength = 2;
if (argumentsLength != args.length) {
throw new JobExecutionEnvironmentException("Elastic-Job: Arguments parse failure, should have %s arguments.", argumentsLength);
}
ArgumentsParser result = new ArgumentsParser();
result.jobConfig = new JobConfigurationContext(GsonFactory.getGson().fromJson(args[1], Map.class));
String jobClass = result.jobConfig.getTypeConfig().getJobClass();
try {
Class<?> elasticJobClass = Class.forName(args[0]);
Class<?> elasticJobClass = Class.forName(jobClass);
if (!ElasticJob.class.isAssignableFrom(elasticJobClass)) {
throw new JobExecutionEnvironmentException("Elastic-Job: Class '%s' must implements ElasticJob interface.", args[0]);
throw new JobExecutionEnvironmentException("Elastic-Job: Class '%s' must implements ElasticJob interface.", jobClass);
}
if (elasticJobClass != ScriptJob.class) {
result.elasticJob = (ElasticJob) elasticJobClass.newInstance();
}
} catch (final ReflectiveOperationException ex) {
throw new JobExecutionEnvironmentException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", args[0], ex.getMessage());
throw new JobExecutionEnvironmentException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage());
}
result.shardingContext = GsonFactory.getGson().fromJson(args[1], ShardingContext.class);
result.jobConfig = new JobConfigurationContext(GsonFactory.getGson().fromJson(args[2], Map.class));
result.shardingContext = GsonFactory.getGson().fromJson(args[0], ShardingContext.class);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.dangdang.ddframe.job.api.type.dataflow.api.DataflowJobConfiguration.DataflowType;
import com.dangdang.ddframe.job.api.type.script.api.ScriptJobConfiguration;
import com.dangdang.ddframe.job.api.type.simple.api.SimpleJobConfiguration;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

import java.util.Map;

Expand All @@ -43,7 +45,11 @@ class JobConfigurationContext implements JobRootConfiguration {
int ignoredShardingTotalCount = 1;
String jobClass = jobConfigurationMap.get("jobClass");
String jobType = jobConfigurationMap.get("jobType");
JobCoreConfiguration jobCoreConfig = JobCoreConfiguration.newBuilder(jobConfigurationMap.get("jobName"), ignoredCron, ignoredShardingTotalCount).build();
String jobName = jobConfigurationMap.get("jobName");
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobType), "jobType can not be empty.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobClass), "jobClass can not be empty.");
JobCoreConfiguration jobCoreConfig = JobCoreConfiguration.newBuilder(jobName, ignoredCron, ignoredShardingTotalCount).build();
jobCoreConfig.getJobProperties().put(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.name(), jobConfigurationMap.get("executorServiceHandler"));
jobCoreConfig.getJobProperties().put(JobPropertiesEnum.JOB_EXCEPTION_HANDLER.name(), jobConfigurationMap.get("jobExceptionHandler"));
if (JobType.DATAFLOW.name().equals(jobType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.dangdang.ddframe.job.api.type.script.api.ScriptJob;
import com.dangdang.ddframe.job.cloud.api.fixture.TestJob;
import org.junit.Test;
import org.omg.CORBA.Object;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertNotNull;
Expand All @@ -33,34 +32,34 @@ public final class ArgumentsParserTest {
private String shardingContextJson = "{\"jobName\":\"test_job\",\"shardingTotalCount\":1,\"jobParameter\":\"\",\"shardingItemParameters\":{\"0\":\"\"}}";

private String configContextJson = "{\"jobType\":\"SIMPLE\",\"executorServiceHandler\":\"com.dangdang.ddframe.job.api.internal.executor.DefaultExecutorServiceHandler\","
+ "\"jobExceptionHandler\":\"com.dangdang.ddframe.job.api.internal.executor.DefaultJobExceptionHandler\",\"jobName\":\"test_job\"}";
+ "\"jobExceptionHandler\":\"com.dangdang.ddframe.job.api.internal.executor.DefaultJobExceptionHandler\",\"jobName\":\"test_job\",\"jobClass\":\"%s\"}";

@Test(expected = JobExecutionEnvironmentException.class)
public void assertParseWhenArgumentsIsNotEnough() throws JobExecutionEnvironmentException {
ArgumentsParser.parse(new String[] {"", ""});
ArgumentsParser.parse(new String[] {""});
}

@Test(expected = JobExecutionEnvironmentException.class)
public void assertParseWhenClassIsNotFound() throws JobExecutionEnvironmentException {
ArgumentsParser.parse(new String[] {"Object1", "", ""});
ArgumentsParser.parse(new String[] {"", "{\"jobType\":\"SIMPLE\",\"jobName\":\"test_job\",\"jobClass\":\"testClass\"}"});
}

@Test(expected = JobExecutionEnvironmentException.class)
public void assertParseWhenClassIsNotElasticJob() throws JobExecutionEnvironmentException {
ArgumentsParser.parse(new String[] {Object.class.getCanonicalName(), "", ""});
ArgumentsParser.parse(new String[]{"", configContextJson});
}

@Test
public void assertParse() throws JobExecutionEnvironmentException {
ArgumentsParser actual = ArgumentsParser.parse(new String[] {TestJob.class.getCanonicalName(), shardingContextJson, configContextJson});
ArgumentsParser actual = ArgumentsParser.parse(new String[] {shardingContextJson, String.format(configContextJson, TestJob.class.getCanonicalName())});
assertThat(actual.getElasticJob(), instanceOf(TestJob.class));
assertNotNull(actual.getShardingContext());
assertNotNull(actual.getJobConfig());
}

@Test
public void assertParseScriptJob() throws JobExecutionEnvironmentException {
ArgumentsParser actual = ArgumentsParser.parse(new String[] {ScriptJob.class.getCanonicalName(), shardingContextJson, configContextJson});
ArgumentsParser actual = ArgumentsParser.parse(new String[] {shardingContextJson, String.format(configContextJson, ScriptJob.class.getCanonicalName())});
assertNull(actual.getElasticJob());
assertNotNull(actual.getShardingContext());
assertNotNull(actual.getJobConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@EqualsAndHashCode(of = "offerId")
public final class HardwareResource {

private static final String RUN_COMMAND = "sh %s %s '%s' '%s'";
private static final String RUN_COMMAND = "sh %s '%s' '%s'";

private final Protos.Offer offer;

Expand Down Expand Up @@ -138,7 +138,7 @@ public Protos.TaskInfo createTaskInfo(final JobContext jobContext, final int sha
// TODO 更改cache为elastic-job-cloud.properties配置
Protos.CommandInfo.URI uri = Protos.CommandInfo.URI.newBuilder().setValue(jobConfig.getAppURL()).setExtract(true).setCache(false).build();
Protos.CommandInfo command = Protos.CommandInfo.newBuilder().addUris(uri).setShell(true).setValue(
String.format(RUN_COMMAND, jobConfig.getBootstrapScript(), jobConfig.getTypeConfig().getJobClass(), GsonFactory.getGson().toJson(shardingContext),
String.format(RUN_COMMAND, jobConfig.getBootstrapScript(), GsonFactory.getGson().toJson(shardingContext),
GsonFactory.getGson().toJson(buildJobConfigurationContext(jobConfig)))).build();
return Protos.TaskInfo.newBuilder()
.setName(taskId.getValue())
Expand All @@ -154,6 +154,7 @@ private Map<String, String> buildJobConfigurationContext(final CloudJobConfigura
Map<String, String> result = new HashMap<>();
result.put("jobType", jobConfig.getTypeConfig().getJobType().name());
result.put("jobName", jobConfig.getJobName());
result.put("jobClass", jobConfig.getTypeConfig().getJobClass());
result.put("jobExceptionHandler", jobConfig.getTypeConfig().getCoreConfig().getJobProperties().get(JobPropertiesEnum.JOB_EXCEPTION_HANDLER));
result.put("executorServiceHandler", jobConfig.getTypeConfig().getCoreConfig().getJobProperties().get(JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
if (jobConfig.getTypeConfig() instanceof DataflowJobConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dangdang.ddframe.job.cloud.mesos.fixture.OfferBuilder;
import com.dangdang.ddframe.job.cloud.state.fixture.CloudJobConfigurationBuilder;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskInfo;
import org.junit.Test;
import org.unitils.util.ReflectionUtils;

Expand Down Expand Up @@ -87,15 +88,32 @@ public void assertCommitReservedResources() {
public void assertCreateTaskInfo() {
HardwareResource hardwareResource = new HardwareResource(OfferBuilder.createOffer(10d, 1280d));
Protos.TaskInfo actual = hardwareResource.createTaskInfo(JobContext.from(CloudJobConfigurationBuilder.createCloudJobConfiguration("test_job"), ExecutionType.READY), 0);
assertGetBasicTaskInfo(actual);
assertGetResources(actual);
assertGetCommand(actual);
}

private void assertGetBasicTaskInfo(final TaskInfo actual) {
assertThat(actual.getTaskId().getValue(), startsWith("test_job@-@0@-@READY@-@"));
assertThat(actual.getName(), startsWith("test_job@-@0@-@READY@-@"));
assertThat(actual.getSlaveId().getValue(), is("slave-offer_id_0"));
}

private void assertGetResources(final TaskInfo actual) {
assertThat(actual.getResources(0).getName(), is("cpus"));
assertThat(actual.getResources(0).getScalar().getValue(), is(1d));
assertThat(actual.getResources(1).getName(), is("mem"));
assertThat(actual.getResources(1).getScalar().getValue(), is(128d));
}

private void assertGetCommand(final TaskInfo actual) {
assertThat(actual.getCommand().getUris(0).getValue(), is("http://localhost/app.jar"));
assertThat(actual.getCommand().getValue(), is("sh bin/start.sh '{\"jobName\":\"test_job\",\"shardingTotalCount\":10,\"jobParameter\":\"\",\"shardingItemParameters\":{\"0\":\"\"}}' "
+ "'{\"jobType\":\"SIMPLE\",\"executorServiceHandler\":\"com.dangdang.ddframe.job.api.internal.executor.DefaultExecutorServiceHandler\","
+ "\"jobExceptionHandler\":\"com.dangdang.ddframe.job.api.internal.executor.DefaultJobExceptionHandler\",\"jobName\":\"test_job\","
+ "\"jobClass\":\"com.dangdang.ddframe.job.cloud.state.fixture.TestSimpleJob\"}'"));
}

@Test
public void assertEquals() {
assertThat(new HardwareResource(OfferBuilder.createOffer("offer_id_0", 10d, 1280d)), is(new HardwareResource(OfferBuilder.createOffer("offer_id_0", 1d, 128d))));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
java -classpath lib/*:. com.dangdang.ddframe.job.cloud.example.Main $1 $2 $3
java -classpath lib/*:. com.dangdang.ddframe.job.cloud.example.Main $1 $2

0 comments on commit 0664ccd

Please sign in to comment.