Skip to content

Commit

Permalink
#43
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Feb 1, 2016
1 parent 5128e30 commit d8a94d8
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 18 deletions.
Expand Up @@ -20,11 +20,14 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;

import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionException;

import com.dangdang.ddframe.job.api.DataFlowElasticJob;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics;

import lombok.extern.slf4j.Slf4j;

/**
* 用于处理数据流程的作业抽象类.
*
Expand All @@ -42,7 +45,13 @@ public final void updateOffset(final int item, final String offset) {
getOffsetService().updateOffset(item, offset);
}

@Override
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
log.error("Elastic job: exception occur in job processing...", jobExecutionException.getCause());
}

protected final void processDataWithStatistics(final C shardingContext, final List<T> data) {
Exception firstException = null;
for (T each : data) {
boolean isSuccess = false;
try {
Expand All @@ -51,7 +60,9 @@ protected final void processDataWithStatistics(final C shardingContext, final Li
} catch (final Exception ex) {
// CHECKSTYLE:ON
ProcessCountStatistics.incrementProcessFailureCount(shardingContext.getJobName());
log.error("Elastic job: exception occur in job processing...", ex);
if (null == firstException) {
firstException = ex;
}
continue;
}
if (isSuccess) {
Expand All @@ -60,6 +71,9 @@ protected final void processDataWithStatistics(final C shardingContext, final Li
ProcessCountStatistics.incrementProcessFailureCount(shardingContext.getJobName());
}
}
if (null != firstException) {
throw new JobException(firstException);
}
}

protected final void latchAwait(final CountDownLatch latch) {
Expand Down
Expand Up @@ -19,6 +19,8 @@

import lombok.extern.slf4j.Slf4j;

import org.quartz.JobExecutionException;

import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.internal.job.AbstractElasticJob;

Expand All @@ -36,13 +38,12 @@ public abstract class AbstractSimpleElasticJob extends AbstractElasticJob {

@Override
protected final void executeJob(final JobExecutionMultipleShardingContext shardingContext) {
try {
process(shardingContext);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("Elastic job: exception occur in job processing...", ex);
}
process(shardingContext);
}

@Override
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
log.error("Elastic job: exception occur in job processing...", jobExecutionException.getCause());
}

/**
Expand Down
Expand Up @@ -139,6 +139,7 @@ private Properties loadLocalProperties() {
public void close() {
for (Entry<String, TreeCache> each : caches.entrySet()) {
each.getValue().close();

}
waitForCacheClose();
CloseableUtils.closeQuietly(client);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import static org.junit.Assert.assertTrue;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.quartz.SchedulerException;
Expand Down Expand Up @@ -91,11 +90,6 @@ public static void init() {
REG_CENTER.init();
}

@AfterClass
public static void destory() {
REG_CENTER.close();
}

@Before
public void setUp() {
ProcessCountStatistics.reset(jobName);
Expand Down
Expand Up @@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.List;

import org.quartz.JobExecutionException;

import lombok.Getter;

import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
Expand Down Expand Up @@ -50,6 +52,10 @@ public boolean isStreamingProcess() {
return true;
}

@Override
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
}

public static void reset() {
completed = false;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
package com.dangdang.ddframe.job.integrate.std.dataflow.throughput;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -49,6 +50,6 @@ public void assertJobInit() {
}
assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution"));
assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(0));
assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(1));
assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), not(0));
}
}
Expand Up @@ -18,6 +18,7 @@
package com.dangdang.ddframe.job.integrate.std.dataflow.throughput;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -49,6 +50,6 @@ public void assertJobInit() {
}
assertTrue(REG_CENTER.isExisted("/" + getJobName() + "/execution"));
assertThat(ProcessCountStatistics.getProcessSuccessCount(getJobName()), is(0));
assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), is(1));
assertThat(ProcessCountStatistics.getProcessFailureCount(getJobName()), not(0));
}
}
Expand Up @@ -31,8 +31,9 @@ public void setUp() {
NestedZookeeperServers.getInstance().startServerIfNotStarted();
}

@SuppressWarnings("resource")
@Test(expected = BeanCreationException.class)
public void assertJobClassNotFound() {
public void assertNotElasticJob() {
try {
new ClassPathXmlApplicationContext("classpath:META-INF/job/classNotElasticJob.xml");
} catch (final BeanCreationException ex) {
Expand Down
Expand Up @@ -31,6 +31,7 @@ public void setUp() {
NestedZookeeperServers.getInstance().startServerIfNotStarted();
}

@SuppressWarnings("resource")
@Test(expected = BeanCreationException.class)
public void assertJobClassNotFound() {
try {
Expand Down

0 comments on commit d8a94d8

Please sign in to comment.