Skip to content
Permalink
Browse files
BATCHEE-132 ensure ServicesManager represents the 'container' of batc…
…hee and defines its lifecycle
  • Loading branch information
rmannibucau committed Mar 25, 2018
1 parent aef149a commit 86c407c632408c5099318890d17feb08fbbd2a2c
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 104 deletions.
@@ -16,18 +16,16 @@
*/
package org.apache.batchee.container.impl;

import org.apache.batchee.container.Init;
import org.apache.batchee.container.services.BatchKernelService;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.container.services.JobStatusManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.JobStatus;
import org.apache.batchee.jmx.BatchEE;
import org.apache.batchee.jmx.BatchEEMBean;
import org.apache.batchee.jmx.BatchEEMBeanImpl;
import org.apache.batchee.spi.JobExecutionCallbackService;
import org.apache.batchee.spi.JobXMLLoaderService;
import org.apache.batchee.spi.PersistenceManagerService;
import java.io.IOException;
import java.io.StringWriter;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.batch.operations.JobExecutionAlreadyCompleteException;
import javax.batch.operations.JobExecutionIsRunningException;
@@ -44,52 +42,23 @@
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.JobInstance;
import javax.batch.runtime.StepExecution;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware;
import org.apache.batchee.container.Init;
import org.apache.batchee.container.services.BatchKernelService;
import org.apache.batchee.container.services.InternalJobExecution;
import org.apache.batchee.container.services.JobStatusManagerService;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.status.JobStatus;
import org.apache.batchee.spi.JobExecutionCallbackService;
import org.apache.batchee.spi.JobXMLLoaderService;
import org.apache.batchee.spi.PersistenceManagerService;


public class JobOperatorImpl implements JobOperator, AutoCloseable {
public class JobOperatorImpl implements JobOperator {
private static final Logger LOGGER = Logger.getLogger(JobOperatorImpl.class.getName());

static {
Init.doInit();

if (Boolean.parseBoolean(ServicesManager.value("org.apache.batchee.jmx", "true"))) {
try {
final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
final String app = ServicesManager.value("org.apache.batchee.jmx.application", "");
final ObjectName name;
if (app.isEmpty()) {
name = new ObjectName(BatchEEMBean.DEFAULT_OBJECT_NAME);
} else {
name = new ObjectName(BatchEEMBean.DEFAULT_OBJECT_NAME + ",application=" + app);
}

if (platformMBeanServer.isRegistered(name)) {
platformMBeanServer.unregisterMBean(name);
}

platformMBeanServer.registerMBean(
new BatchEE(
makeLoaderAware(BatchEEMBean.class, new Class<?>[]{ BatchEEMBean.class }, BatchEEMBeanImpl.INSTANCE)),
name);
} catch (final Exception e) {
throw new IllegalStateException(e);
}
}
}

private final BatchKernelService kernelService;
@@ -115,10 +84,6 @@ public JobOperatorImpl() {
this(ServicesManager.find());
}

public void close() throws Exception {

}

@Override
public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
/*
@@ -17,6 +17,23 @@
*/
package org.apache.batchee.container.services;

import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.services.callback.SimpleJobExecutionCallbackService;
@@ -31,6 +48,9 @@
import org.apache.batchee.container.services.status.DefaultJobStatusManager;
import org.apache.batchee.container.services.transaction.DefaultBatchTransactionService;
import org.apache.batchee.container.util.BatchContainerConstants;
import org.apache.batchee.jmx.BatchEE;
import org.apache.batchee.jmx.BatchEEMBean;
import org.apache.batchee.jmx.BatchEEMBeanImpl;
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.BatchService;
import org.apache.batchee.spi.BatchThreadPoolService;
@@ -40,15 +60,6 @@
import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.spi.TransactionManagementService;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

public class ServicesManager implements BatchContainerConstants {
private final static Logger LOGGER = Logger.getLogger(ServicesManager.class.getName());

@@ -77,6 +88,7 @@ public class ServicesManager implements BatchContainerConstants {

private static ServicesManagerLocator servicesManagerLocator;

private ObjectName jmxName;
private ClassLoader loader = null;

// designed to be used from app or a server
@@ -147,12 +159,69 @@ public void init(final Properties props) {

logServices = Boolean.parseBoolean(batchRuntimeConfig.getProperty("batchee.service-manager.log", "false"));

if (Boolean.parseBoolean(batchRuntimeConfig.getProperty("org.apache.batchee.jmx", "true"))) {
try {
final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
final String app = batchRuntimeConfig.getProperty("org.apache.batchee.jmx.application", "");
if (app.isEmpty()) {
jmxName = new ObjectName(BatchEEMBean.DEFAULT_OBJECT_NAME);
} else {
jmxName = new ObjectName(BatchEEMBean.DEFAULT_OBJECT_NAME + ",application=" + app);
}

if (!platformMBeanServer.isRegistered(jmxName)) {
platformMBeanServer.registerMBean(
new BatchEE(
makeLoaderAware(BatchEEMBean.class, new Class<?>[]{ BatchEEMBean.class },
BatchEEMBeanImpl.INSTANCE)),
jmxName);
} else {
jmxName = null;
LOGGER.warning("You didn't specify org.apache.batchee.jmx.application and JMX is already registered, skipping");
}
} catch (final Exception e) {
throw new IllegalStateException(e);
}
}

isInited = Boolean.TRUE;
}
}
}
}

public void close() {
if (isInited) {
synchronized (isInitedLock) {
if (isInited) {
service(BatchThreadPoolService.class).shutdown();
synchronized (serviceRegistry) {
for (final Object service : serviceRegistry.values()) {
if (Closeable.class.isInstance(service)) {
try {
Closeable.class.cast(service).close();
} catch (IOException e) { // don't make it blocking, on j7 we can use suppressed maybe?
LOGGER.log(Level.SEVERE, e.getMessage(), e);
}
}
}
}


if (jmxName != null) { // unregister jmx bean if deployed in an app
final MBeanServer jmx = ManagementFactory.getPlatformMBeanServer();
try {
jmx.unregisterMBean(jmxName);
} catch (final Exception e) {
// no-op
}
}
isInited = false;
}
}
}
}

public <T extends BatchService> T service(final Class<T> clazz) throws BatchContainerServiceException {
T service = clazz.cast(serviceRegistry.get(clazz.getName()));
if (service == null) {
@@ -22,7 +22,6 @@
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -46,7 +45,7 @@ public class DefaultDataRepresentationService implements DataRepresentationServi

public static final String BATCHEE_DATA_PREFIX = "BatchEE_data" + BATCHEE_SPLIT_TOKEN;

private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");

private static final Logger LOGGER = Logger.getLogger(DefaultDataRepresentationService.class.getName());

@@ -298,7 +297,7 @@ protected Object invokeStaticMethod(String typeVal, String methodName, Class par
Class<?> typeClass = getClassLoader().loadClass(typeVal);
Method method = typeClass.getMethod(methodName, paramType);
return method.invoke(null, valueVal);
} catch (ReflectiveOperationException e) {
} catch (Exception e) {
throw new BatchContainerServiceException("Cannot convert data [" + valueVal + "] of type [" + typeVal + "]", e );
}
}
@@ -41,8 +41,6 @@
public class BatchEEMBeanImpl implements BatchEEMBean {
public static final BatchEEMBeanImpl INSTANCE = new BatchEEMBeanImpl();

private final JobOperator operator = BatchRuntime.getJobOperator();

private static final String[] JOB_INSTANCES_ATTRIBUTES = { "jobName", "instanceId" };
private static final TabularType JOB_INSTANCES_TABULAR_TYPE;
private static final CompositeType JOB_INSTANCES_COMPOSITE_TYPE;
@@ -154,20 +152,24 @@ public BatchEEMBeanImpl() {
// no-op
}

private JobOperator operator() { // lazy since we register jmx with the servicesmanager init
return BatchRuntime.getJobOperator();
}

@Override
public String[] getJobNames() {
final Set<String> jobNames = operator.getJobNames();
final Set<String> jobNames = operator().getJobNames();
return jobNames.toArray(new String[jobNames.size()]);
}

@Override
public int getJobInstanceCount(final String jobName) {
return operator.getJobInstanceCount(jobName);
return operator().getJobInstanceCount(jobName);
}

@Override
public TabularData getJobInstances(final String jobName, final int start, final int count) {
final List<JobInstance> instances = operator.getJobInstances(jobName, start, count);
final List<JobInstance> instances = operator().getJobInstances(jobName, start, count);

try {
final TabularDataSupport data = new TabularDataSupport(JOB_INSTANCES_TABULAR_TYPE);
@@ -183,7 +185,7 @@ public TabularData getJobInstances(final String jobName, final int start, final
@Override
public Long[] getRunningExecutions(final String jobName) {
try {
final List<Long> runningExecutions = operator.getRunningExecutions(jobName);
final List<Long> runningExecutions = operator().getRunningExecutions(jobName);
return runningExecutions.toArray(new Long[runningExecutions.size()]);
} catch (final NoSuchJobException nsje) {
return new Long[0];
@@ -192,7 +194,7 @@ public Long[] getRunningExecutions(final String jobName) {

@Override
public TabularData getParameters(final long executionId) {
final Properties parameters = operator.getParameters(executionId);
final Properties parameters = operator().getParameters(executionId);
try {
final TabularDataSupport data = new TabularDataSupport(PROPERTIES_TABULAR_TYPE);
for (final Map.Entry<Object, Object> entry : parameters.entrySet()) {
@@ -206,7 +208,7 @@ public TabularData getParameters(final long executionId) {

@Override
public TabularData getJobInstance(final long executionId) {
final JobInstance instance = operator.getJobInstance(executionId);
final JobInstance instance = operator().getJobInstance(executionId);
try {
final TabularDataSupport data = new TabularDataSupport(JOB_INSTANCES_TABULAR_TYPE);
data.put(new CompositeDataSupport(JOB_INSTANCES_COMPOSITE_TYPE, JOB_INSTANCES_ATTRIBUTES, new Object[] { instance.getJobName(), instance.getInstanceId() }));
@@ -218,27 +220,27 @@ public TabularData getJobInstance(final long executionId) {

@Override
public void stop(final long executionId) {
operator.stop(executionId);
operator().stop(executionId);
}

@Override
public void abandon(final long executionId) {
operator.abandon(executionId);
operator().abandon(executionId);
}

@Override
public long start(final String jobXMLName, final String jobParameters) {
return operator.start(jobXMLName, toProperties(jobParameters));
return operator().start(jobXMLName, toProperties(jobParameters));
}

@Override
public long restart(final long executionId, final String restartParameters) {
return operator.restart(executionId, toProperties(restartParameters));
return operator().restart(executionId, toProperties(restartParameters));
}

@Override
public TabularData getJobExecutions(final long id, final String name) {
final List<JobExecution> executions = operator.getJobExecutions(new JobInstanceImpl(id, name));
final List<JobExecution> executions = operator().getJobExecutions(new JobInstanceImpl(id, name));
try {
final TabularDataSupport data = new TabularDataSupport(JOB_EXECUTION_TABULAR_TYPE);
for (final JobExecution n : executions) {
@@ -252,7 +254,7 @@ public TabularData getJobExecutions(final long id, final String name) {

@Override
public TabularData getJobExecution(final long executionId) {
final JobExecution execution = operator.getJobExecution(executionId);
final JobExecution execution = operator().getJobExecution(executionId);
try {
final TabularDataSupport data = new TabularDataSupport(JOB_EXECUTION_TABULAR_TYPE);
data.put(new CompositeDataSupport(JOB_EXECUTION_COMPOSITE_TYPE, JOB_EXECUTION_ATTRIBUTES, asArray(execution)));
@@ -264,7 +266,7 @@ public TabularData getJobExecution(final long executionId) {

@Override
public TabularData getStepExecutions(final long jobExecutionId) {
final List<StepExecution> executions = operator.getStepExecutions(jobExecutionId);
final List<StepExecution> executions = operator().getStepExecutions(jobExecutionId);
try {
final TabularDataSupport data = new TabularDataSupport(STEP_EXECUTION_TABULAR_TYPE);
for (final StepExecution n : executions) {

0 comments on commit 86c407c

Please sign in to comment.