Skip to content
Permalink
Browse files
BATCHEE-131 keep track of running jobs
and actively stop them on shutdown.
  • Loading branch information
struberg committed Mar 20, 2018
1 parent ab0fb6f commit aef149a0afdedd6b3d83079c7494a80c0f8adf5c
Showing 3 changed files with 46 additions and 6 deletions.
@@ -61,7 +61,7 @@
import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware;


public class JobOperatorImpl implements JobOperator {
public class JobOperatorImpl implements JobOperator, AutoCloseable {
private static final Logger LOGGER = Logger.getLogger(JobOperatorImpl.class.getName());

static {
@@ -115,6 +115,10 @@ public JobOperatorImpl() {
this(ServicesManager.find());
}

public void close() throws Exception {

}

@Override
public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
/*
@@ -18,12 +18,17 @@

import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;

import org.apache.batchee.container.cdi.BatchCDIInjectionExtension;
import org.apache.batchee.container.util.BatchWorkUnit;
import org.apache.batchee.spi.BatchThreadPoolService;

/**
@@ -43,10 +48,12 @@
*
*/
public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService {


private static final Logger logger = Logger.getLogger(AsyncEjbBatchThreadPoolService.class.getName());

private BeanManager beanManager;
private ThreadExecutorEjb threadExecutorEjb;

@Override
public void init(Properties batchConfig) {
beanManager = BatchCDIInjectionExtension.getInstance().getBeanManager();
@@ -65,8 +72,19 @@ public void executeTask(Runnable work, Object config) {

@Override
public void shutdown() {
// We cannot force an async EJB to shutdown.
// This usually works out of the box if the container EJB
// undeploys or stops the application.
Set<BatchWorkUnit> runningBatchWorkUnits = threadExecutorEjb.getRunningBatchWorkUnits();
if (!runningBatchWorkUnits.isEmpty()) {
JobOperator jobOperator = BatchRuntime.getJobOperator();
for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
try {
long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
if (executionId >= 0) {
jobOperator.stop(executionId);
}
} catch(Exception e) {
logger.log(Level.SEVERE, "Failure while shutting down execution", e);
}
}
}
}
}
@@ -16,6 +16,8 @@
*/
package org.apache.batchee.tools.services.thread;

import org.apache.batchee.container.util.BatchWorkUnit;

import javax.annotation.Resource;
import javax.ejb.Asynchronous;
import javax.ejb.Lock;
@@ -24,6 +26,9 @@
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.transaction.UserTransaction;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* Small helper class to allow new threads being created via the
@@ -42,18 +47,31 @@ public class ThreadExecutorEjb {
@Resource
private UserTransaction ut;

private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());


private static ThreadLocal<UserTransaction> userTransactions = new ThreadLocal<UserTransaction>();

@Asynchronous
public void executeTask(Runnable work, Object config) {
try {
userTransactions.set(ut);
if (work instanceof BatchWorkUnit) {
runningBatchWorkUnits.add((BatchWorkUnit) work);
}

work.run();
} finally {
if (work instanceof BatchWorkUnit) {
runningBatchWorkUnits.remove(work);
}
userTransactions.remove();
}
}

public Set<BatchWorkUnit> getRunningBatchWorkUnits() {
return runningBatchWorkUnits;
}

public static UserTransaction getUserTransaction() {
return userTransactions.get();

0 comments on commit aef149a

Please sign in to comment.