Skip to content
Permalink
Browse files
BATCHEE-113 improve CDI scope for Jobs and Steps
  • Loading branch information
struberg committed Dec 20, 2016
1 parent 8268e3d commit ce60c30e89d92f756d1c0eaf6e26fa0d5e393e14
Showing 9 changed files with 109 additions and 185 deletions.
@@ -24,13 +24,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public abstract class BaseContext<K> implements Context {
private final ConcurrentMap<K, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<K, ConcurrentMap<Contextual<?>, Instance<?>>>();
public abstract class BaseContext implements Context {
/**
* key == either the stepExecutionId or the jobExecutionId
*/
private ConcurrentMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<Long, ConcurrentMap<Contextual<?>, Instance<?>>>();

/**
* @return current keys (we inherit contexts here) sorted by order (the last is the most specific)
*/
protected abstract K[] currentKeys();
protected abstract Long currentKey();

@Override
public <T> T get(final Contextual<T> component, final CreationalContext<T> creationalContext) {
@@ -56,45 +59,35 @@ public <T> T get(final Contextual<T> component, final CreationalContext<T> creat
public <T> T get(final Contextual<T> component) {
checkActive();

for (final K key : currentKeys()) {
final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
if (storage != null) {
final Instance<?> instance = storage.get(component);
if (instance == null) {
return null;
}
return (T) instance.value;
final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(currentKey());
if (storage != null) {
final Instance<?> instance = storage.get(component);
if (instance == null) {
return null;
}
return (T) instance.value;
}
return null;
}

@Override
public boolean isActive() {
final K[] ks = currentKeys();
return ks != null && ks.length != 0;
return currentKey() != null;
}

public void endContext() {
final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.remove(lastKey());
public void endContext(Long key) {
final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.remove(key);
if (storage == null) {
return;
}

for (final Map.Entry<Contextual<?>, Instance<?>> entry : storage.entrySet()) {
final Instance<?> instance = entry.getValue();
Contextual.class.cast(entry.getKey()).destroy(instance.value, instance.context);
Contextual.class.cast(entry.getKey()).destroy(instance.value, instance.cc);
}
storage.clear();
}

private K lastKey() {
final K[] keys = currentKeys();
if (keys == null || keys.length == 0) {
return null;
}
return keys[keys.length - 1];
}

private void checkActive() {
if (!isActive()) {
@@ -103,7 +96,7 @@ private void checkActive() {
}

private ConcurrentMap<Contextual<?>, Instance<?>> getOrCreateCurrentStorage() {
final K key = lastKey();
final Long key = currentKey();

ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
if (storage == null) {
@@ -118,11 +111,11 @@ private ConcurrentMap<Contextual<?>, Instance<?>> getOrCreateCurrentStorage() {

private static class Instance<T> {
private final T value;
private final CreationalContext<T> context;
private final CreationalContext<T> cc;

private Instance(final T value, final CreationalContext<T> context) {
this.value = value;
this.context = context;
this.cc = context;
}
}
}
@@ -20,17 +20,25 @@
import javax.enterprise.inject.spi.AfterBeanDiscovery;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.Extension;
import javax.enterprise.inject.spi.ProcessAnnotatedType;

public class BatchEEScopeExtension implements Extension {
void vetoInternalBeans(final @Observes ProcessAnnotatedType<?> pat) {
if (pat.getAnnotatedType().getJavaClass().getName().startsWith(BatchEEScopeExtension.class.getPackage().getName())) {
pat.veto();
}
}

private JobContextImpl jobContext;
private StepContextImpl stepContext;

void addBatchScopes(final @Observes AfterBeanDiscovery afterBeanDiscovery, final BeanManager bm) {
afterBeanDiscovery.addContext(JobContextImpl.INSTANCE);
afterBeanDiscovery.addContext(StepContextImpl.INSTANCE);
jobContext = new JobContextImpl();
stepContext = new StepContextImpl();

afterBeanDiscovery.addContext(jobContext);
afterBeanDiscovery.addContext(stepContext);
}

public JobContextImpl getJobContext() {
return jobContext;
}

public StepContextImpl getStepContext() {
return stepContext;
}
}
@@ -19,47 +19,52 @@
import org.apache.batchee.cdi.scope.JobScoped;

import java.lang.annotation.Annotation;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.batchee.cdi.impl.LocationHolder.currentJob;

public class JobContextImpl extends BaseContext<JobContextImpl.JobKey> {
public static final BaseContext<?> INSTANCE = new JobContextImpl();
public class JobContextImpl extends BaseContext {

private JobContextImpl() {
// no-op
}
private ConcurrentMap<Long, AtomicInteger> jobReferences = new ConcurrentHashMap<Long, AtomicInteger>();
private ThreadLocal<Long> currentJobExecutionId = new ThreadLocal<Long>();

@Override
public Class<? extends Annotation> getScope() {
return JobScoped.class;
}

@Override
protected JobKey[] currentKeys() {
return new JobKey[] { new JobKey(currentJob().getExecutionId()) };
protected Long currentKey() {
return currentJobExecutionId.get();
}

public static class JobKey {
private final long executionId;

private final int hashCode;

public JobKey(final long executionId) {
this.executionId = executionId;

hashCode = (int) (executionId ^ (executionId >>> 32));
public void enterJobExecution(Long jobExecutionId) {
AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
if (jobRefs == null) {
jobRefs = new AtomicInteger(0);
AtomicInteger oldJobRefs = jobReferences.putIfAbsent(jobExecutionId, jobRefs);
if (oldJobRefs != null) {
jobRefs = oldJobRefs;
}
}
jobRefs.incrementAndGet();

@Override
public boolean equals(final Object o) {
return this == o
|| (!(o == null || getClass() != o.getClass()) && executionId == JobKey.class.cast(o).executionId);
currentJobExecutionId.set(jobExecutionId);
}

public void exitJobExecution() {
Long jobExecutionId = currentJobExecutionId.get();
AtomicInteger jobRefs = jobReferences.get(jobExecutionId);
if (jobRefs != null) {
int references = jobRefs.decrementAndGet();
if (references == 0) {
endContext(jobExecutionId);
}
}

@Override
public int hashCode() {
return hashCode;
}
currentJobExecutionId.set(null);
currentJobExecutionId.remove();
}
}

This file was deleted.

@@ -18,57 +18,33 @@

import org.apache.batchee.cdi.scope.StepScoped;

import javax.batch.runtime.context.StepContext;
import java.lang.annotation.Annotation;
import java.util.List;

import static org.apache.batchee.cdi.impl.LocationHolder.currentSteps;
public class StepContextImpl extends BaseContext {

public class StepContextImpl extends BaseContext<StepContextImpl.StepKey> {
public static final BaseContext<?> INSTANCE = new StepContextImpl();

private StepContextImpl() {
// no-op
}
private ThreadLocal<Long> currentStepContext = new ThreadLocal<Long>();

@Override
public Class<? extends Annotation> getScope() {
return StepScoped.class;
}

@Override
protected StepKey[] currentKeys() {
final List<StepContext> stepContexts = currentSteps();
final StepKey[] keys = new StepKey[stepContexts.size()];

int i = 0;
for (final StepContext stepContext : stepContexts) {
keys[i++] = new StepKey(stepContext.getStepExecutionId());
}
return keys;
protected Long currentKey() {
return currentStepContext.get();
}

public static class StepKey {
private final long stepExecutionId;

private final int hashCode;

public StepKey(final long stepExecutionId) {
this.stepExecutionId = stepExecutionId;
public void enterStep(final Long stepContextId) {
currentStepContext.set(stepContextId);
}

hashCode = (int) (stepExecutionId ^ (stepExecutionId >>> 32));
}
public void exitStep() {
Long stepContextId = currentKey();
endContext(stepContextId);
currentStepContext.set(null);
currentStepContext.remove();
}

@Override
public boolean equals(final Object o) {
return this == o
|| (!(o == null || getClass() != o.getClass()) && stepExecutionId == StepKey.class.cast(o).stepExecutionId);

}

@Override
public int hashCode() {
return hashCode;
}
}
}
@@ -16,21 +16,24 @@
*/
package org.apache.batchee.cdi.listener;

import org.apache.batchee.cdi.impl.JobContextImpl;
import org.apache.batchee.cdi.impl.LocationHolder;
import org.apache.batchee.cdi.impl.BatchEEScopeExtension;

import javax.batch.api.listener.JobListener;
import javax.inject.Inject;
import javax.inject.Named;

@Named
public class AfterJobScopeListener extends LocationHolder implements JobListener {
public class AfterJobScopeListener implements JobListener {

private @Inject BatchEEScopeExtension scopeExtension;

@Override
public void beforeJob() throws Exception {
// no-op
}

@Override
public void afterJob() throws Exception {
exitJob(JobContextImpl.INSTANCE);
scopeExtension.getJobContext().exitJobExecution();
}
}

0 comments on commit ce60c30

Please sign in to comment.