Skip to content
Permalink
Browse files
Merge pull request #3 from chberger/BATCHEE-140
BATCHEE-140 Propagate JobOperatorImpl instance to SimpleJobExecutionCallbackService
  • Loading branch information
rmannibucau committed Apr 20, 2020
2 parents d066900 + b2591ab commit 0071edabb6b340cc16f8e9d0e17ff25720b4d0fe
Showing 4 changed files with 13 additions and 11 deletions.
@@ -297,6 +297,6 @@ public void stop(final long executionId) throws NoSuchJobExecutionException, Job
}

public void waitFor(final long id) {
callbackService.waitFor(id);
callbackService.waitFor(this,id);
}
}
@@ -18,9 +18,6 @@

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.services.BatchKernelService;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.spi.JobExecutionCallbackService;
import org.apache.batchee.util.Batches;

@@ -32,6 +29,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.batch.operations.JobOperator;
import javax.batch.runtime.JobExecution;

public class SimpleJobExecutionCallbackService implements JobExecutionCallbackService {
private final ConcurrentMap<Long, Collection<CountDownLatch>> waiters = new ConcurrentHashMap<Long, Collection<CountDownLatch>>();

@@ -46,7 +46,7 @@ public void onJobExecutionDone(final RuntimeJobExecution jobExecution) {
}

@Override
public void waitFor(final long id) {
public void waitFor(final JobOperator jobOperator, final long id) {
Collection<CountDownLatch> toRelease = waiters.get(id);
if (toRelease == null) {
toRelease = new CopyOnWriteArrayList<CountDownLatch>();
@@ -55,15 +55,15 @@ public void waitFor(final long id) {
toRelease = existing;
}
}
if (checkIsDone(id)) {
if (checkIsDone(jobOperator, id)) {
return;
}

final CountDownLatch latch = new CountDownLatch(1);
toRelease.add(latch);
try {
while (!latch.await(1, TimeUnit.SECONDS)) {
if (checkIsDone(id)) {
if (checkIsDone(jobOperator, id)) {
return;
}
}
@@ -78,9 +78,9 @@ public void init(final Properties batchConfig) {
// no-op
}

private boolean checkIsDone(final long id) {
private boolean checkIsDone(final JobOperator jobOperator, final long id) {
// check before blocking
final InternalJobExecution finalCheckExec = ServicesManager.find().service(BatchKernelService.class).getJobExecution(id);
final JobExecution finalCheckExec = jobOperator.getJobExecution(id);
if (finalCheckExec != null && Batches.isDone(finalCheckExec.getBatchStatus())) {
waiters.remove(id);
return true;
@@ -16,9 +16,11 @@
*/
package org.apache.batchee.spi;

import javax.batch.operations.JobOperator;

import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;

public interface JobExecutionCallbackService extends BatchService {
void onJobExecutionDone(RuntimeJobExecution jobExecution);
void waitFor(long id);
void waitFor(JobOperator jobOperator, long id);
}
@@ -34,7 +34,7 @@ public class JobOperatorImplTest {
public void runningExecutionMemory_BATCHEE112() {
final JobOperator operator = new JobOperatorImpl(new ServicesManager() {{
init(new Properties() {{
setProperty(PersistenceManagerService.class.getName(), MemoryPersistenceManagerService.class.getName());
setProperty(PersistenceManagerService.class.getSimpleName(), MemoryPersistenceManagerService.class.getName());
}});
}});
for (int i = 0; i < 10; i++) {

0 comments on commit 0071eda

Please sign in to comment.