Skip to content
Permalink
Browse files
BATCHEE-115 implemented portable Job/StepContext
  • Loading branch information
rsandtner committed Jan 13, 2017
1 parent 2be8d22 commit f2136adfc587d12d15a639c1290899d7672765e5
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 36 deletions.
@@ -33,10 +33,17 @@
</properties>

<dependencies>
<!-- optional since we are protable and can be used with other jbatch implementations as well -->
<dependency>
<groupId>org.apache.batchee</groupId>
<artifactId>batchee-jbatch</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-servlet_3.0_spec</artifactId>
<version>1.0</version>
<scope>test</scope>
</dependency>

@@ -16,25 +16,39 @@
*/
package org.apache.batchee.cdi.impl;

import javax.batch.runtime.BatchRuntime;
import javax.enterprise.context.ContextNotActiveException;
import javax.enterprise.context.spi.Context;
import javax.enterprise.context.spi.Contextual;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.inject.spi.BeanManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public abstract class BaseContext implements Context {

protected final BeanManager bm;

/**
* key == either the stepExecutionId or the jobExecutionId
*/
private ConcurrentMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>>();

private ContextResolver contextResolver;


public BaseContext(BeanManager bm) {
this.bm = bm;
}


/**
* @return current keys (we inherit contexts here) sorted by order (the last is the most specific)
*/
protected abstract Long currentKey();


@Override
public <T> T get(final Contextual<T> component, final CreationalContext<T> creationalContext) {
checkActive();
@@ -89,6 +103,23 @@ public void endContext(Long key) {
}


protected ContextResolver getContextResolver() {

// lazy initialisation to ensure BatchRuntime.getJobOperator()
// and all dependents are there and ready

if (contextResolver == null) {

if (BatchRuntime.getJobOperator().getClass().getName().contains("batchee")) {
contextResolver = new ThreadLocalContextResolver();
} else {
contextResolver = new DynamicContextResolver(bm);
}
}

return contextResolver;
}

private void checkActive() {
if (!isActive()) {
throw new ContextNotActiveException("CDI context with scope annotation @" + getScope().getName() + " is not active with respect to the current thread");
@@ -27,8 +27,8 @@ public class BatchEEScopeExtension implements Extension {
private StepContextImpl stepContext;

void addBatchScopes(final @Observes AfterBeanDiscovery afterBeanDiscovery, final BeanManager bm) {
jobContext = new JobContextImpl();
stepContext = new StepContextImpl();
jobContext = new JobContextImpl(bm);
stepContext = new StepContextImpl(bm);

afterBeanDiscovery.addContext(jobContext);
afterBeanDiscovery.addContext(stepContext);
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.cdi.impl;

import javax.batch.runtime.context.JobContext;
import javax.batch.runtime.context.StepContext;

/**
* Provides methods to resolve {@link JobContext} and {@link StepContext}
* for setting up {@link JobContextImpl} and {@link StepContextImpl}.
*/
interface ContextResolver {

/**
* @return the current {@link JobContext} for this {@link Thread}
*/
JobContext getJobContext();

/**
* @return the current {@link StepContext} for this {@link Thread}
*/
StepContext getStepContext();

}
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.cdi.impl;

import javax.batch.runtime.context.JobContext;
import javax.batch.runtime.context.StepContext;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;

/**
* This implementation of {@link ContextResolver} is needed to enable BatchEE CDI-Scopes
* for other JBatch implementations then BatchEE.
* <p>
* It uses {@link BeanManager} for the current {@link JobContext} and {@link StepContext} lookup.
* <p>
* Every time a *Context is needed, it will be resolved via {@link BeanManager}.
*/
class DynamicContextResolver implements ContextResolver {

private final BeanManager bm;


DynamicContextResolver(BeanManager bm) {
this.bm = bm;
}


@Override
public JobContext getJobContext() {
return resolve(JobContext.class);
}

@Override
public StepContext getStepContext() {
return resolve(StepContext.class);
}


private <T> T resolve(Class<T> contextClass) {
Bean<?> bean = bm.resolve(bm.getBeans(contextClass));
return (T) bm.getReference(bean, contextClass, bm.createCreationalContext(bean));

}
}
@@ -24,13 +24,18 @@
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.inject.Typed;

import javax.enterprise.inject.spi.BeanManager;

@Typed
public class JobContextImpl extends BaseContext {

private ConcurrentMap<Long, AtomicInteger> jobReferences = new ConcurrentHashMap<Long, AtomicInteger>();
private ThreadLocal<Long> currentJobExecutionId = new ThreadLocal<Long>();


JobContextImpl(BeanManager bm) {
super(bm);
}


@Override
public Class<? extends Annotation> getScope() {
@@ -39,11 +44,14 @@ public Class<? extends Annotation> getScope() {

@Override
protected Long currentKey() {
return currentJobExecutionId.get();
return getContextResolver().getJobContext().getExecutionId();
}


public void enterJobExecution(Long jobExecutionId) {
public void enterJobExecution() {

Long jobExecutionId = currentKey();

AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
if (jobRefs == null) {
jobRefs = new AtomicInteger(0);
@@ -53,21 +61,18 @@ public void enterJobExecution(Long jobExecutionId) {
}
}
jobRefs.incrementAndGet();

currentJobExecutionId.set(jobExecutionId);
}

public void exitJobExecution() {
Long jobExecutionId = currentJobExecutionId.get();

Long jobExecutionId = currentKey();

AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
if (jobRefs != null) {
int references = jobRefs.decrementAndGet();
if (references == 0) {
endContext(jobExecutionId);
}
}

currentJobExecutionId.set(null);
currentJobExecutionId.remove();
}
}
@@ -21,11 +21,16 @@
import java.lang.annotation.Annotation;

import javax.enterprise.inject.Typed;
import javax.enterprise.inject.spi.BeanManager;

@Typed
public class StepContextImpl extends BaseContext {

private ThreadLocal<Long> currentStepContext = new ThreadLocal<Long>();

StepContextImpl(BeanManager bm) {
super(bm);
}


@Override
public Class<? extends Annotation> getScope() {
@@ -34,20 +39,12 @@ public Class<? extends Annotation> getScope() {

@Override
protected Long currentKey() {
return currentStepContext.get();
}

public void enterStep(final Long stepContextId) {
currentStepContext.set(stepContextId);
return getContextResolver().getStepContext().getStepExecutionId();
}

public void exitStep() {
Long stepContextId = currentKey();
endContext(stepContextId);
currentStepContext.set(null);
currentStepContext.remove();
endContext(currentKey());
}



}
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.cdi.impl;

import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;

import javax.batch.runtime.context.JobContext;
import javax.batch.runtime.context.StepContext;

/**
* Implementation of {@link ContextResolver} which is used to enable BatchEE CDI-Scopes for BatchEE.
* <p>
* Uses {@link ProxyFactory#getInjectionReferences()}, which are basically {@link ThreadLocal ThreadLocals} to resolve
* the current {@link JobContext} and {@link StepContext}
*/
class ThreadLocalContextResolver implements ContextResolver {

@Override
public JobContext getJobContext() {

InjectionReferences references = getInjectionReferences();
if (references != null) {
return references.getJobContext();
}

return null;
}


@Override
public StepContext getStepContext() {

InjectionReferences references = getInjectionReferences();
if (references != null) {
return references.getStepContext();
}

return null;
}


private static InjectionReferences getInjectionReferences() {
return ProxyFactory.getInjectionReferences();
}
}
@@ -23,6 +23,10 @@
import javax.inject.Inject;
import javax.inject.Named;

/**
* This Listener is important for cleanup the {@link org.apache.batchee.cdi.impl.StepContextImpl}.
* Otherwise the {@link org.apache.batchee.cdi.impl.StepContextImpl} will leak.
*/
@Named
@Dependent
public class AfterStepScopeListener implements StepListener {
@@ -19,7 +19,6 @@
import org.apache.batchee.cdi.impl.BatchEEScopeExtension;

import javax.batch.api.listener.JobListener;
import javax.batch.runtime.context.JobContext;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
@@ -28,12 +27,11 @@
@Dependent
public class BeforeJobScopeListener implements JobListener {

private @Inject JobContext jobContext;
private @Inject BatchEEScopeExtension scopeExtension;

@Override
public void beforeJob() throws Exception {
scopeExtension.getJobContext().enterJobExecution(jobContext.getExecutionId());
scopeExtension.getJobContext().enterJobExecution();
}

@Override

0 comments on commit f2136ad

Please sign in to comment.