From f2136adfc587d12d15a639c1290899d7672765e5 Mon Sep 17 00:00:00 2001 From: Reinhard Sandtner Date: Fri, 13 Jan 2017 11:24:41 +0100 Subject: [PATCH] BATCHEE-115 implemented portable Job/StepContext --- extensions/cdi/pom.xml | 9 ++- .../apache/batchee/cdi/impl/BaseContext.java | 31 ++++++++++ .../cdi/impl/BatchEEScopeExtension.java | 4 +- .../batchee/cdi/impl/ContextResolver.java | 38 ++++++++++++ .../cdi/impl/DynamicContextResolver.java | 58 ++++++++++++++++++ .../batchee/cdi/impl/JobContextImpl.java | 25 ++++---- .../batchee/cdi/impl/StepContextImpl.java | 19 +++--- .../cdi/impl/ThreadLocalContextResolver.java | 60 +++++++++++++++++++ .../cdi/listener/AfterStepScopeListener.java | 4 ++ .../cdi/listener/BeforeJobScopeListener.java | 4 +- .../cdi/listener/BeforeStepScopeListener.java | 15 +++-- .../apache/batchee/cdi/BatchScopesTest.java | 9 +++ .../PartitionedStepScopedReader.java | 45 ++++++++++++++ .../batch-jobs/partitioned-step-scoped.xml | 51 ++++++++++++++++ .../cdi/src/test/resources/batchee.properties | 1 - 15 files changed, 337 insertions(+), 36 deletions(-) create mode 100644 extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ContextResolver.java create mode 100644 extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/DynamicContextResolver.java create mode 100644 extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ThreadLocalContextResolver.java create mode 100644 extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedStepScopedReader.java create mode 100644 extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-step-scoped.xml diff --git a/extensions/cdi/pom.xml b/extensions/cdi/pom.xml index ab44f8c..28937ca 100644 --- a/extensions/cdi/pom.xml +++ b/extensions/cdi/pom.xml @@ -33,10 +33,17 @@ + + + org.apache.batchee + batchee-jbatch + ${project.version} + true + + org.apache.geronimo.specs geronimo-servlet_3.0_spec - 1.0 test diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java index 82abffc..c8fe047 100644 --- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java @@ -16,25 +16,39 @@ */ package org.apache.batchee.cdi.impl; +import javax.batch.runtime.BatchRuntime; import javax.enterprise.context.ContextNotActiveException; import javax.enterprise.context.spi.Context; import javax.enterprise.context.spi.Contextual; import javax.enterprise.context.spi.CreationalContext; +import javax.enterprise.inject.spi.BeanManager; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public abstract class BaseContext implements Context { + + protected final BeanManager bm; + /** * key == either the stepExecutionId or the jobExecutionId */ private ConcurrentMap, Instance>> storages = new ConcurrentHashMap, Instance>>(); + private ContextResolver contextResolver; + + + public BaseContext(BeanManager bm) { + this.bm = bm; + } + + /** * @return current keys (we inherit contexts here) sorted by order (the last is the most specific) */ protected abstract Long currentKey(); + @Override public T get(final Contextual component, final CreationalContext creationalContext) { checkActive(); @@ -89,6 +103,23 @@ public void endContext(Long key) { } + protected ContextResolver getContextResolver() { + + // lazy initialisation to ensure BatchRuntime.getJobOperator() + // and all dependents are there and ready + + if (contextResolver == null) { + + if (BatchRuntime.getJobOperator().getClass().getName().contains("batchee")) { + contextResolver = new ThreadLocalContextResolver(); + } else { + contextResolver = new DynamicContextResolver(bm); + } + } + + return contextResolver; + } + private void checkActive() { if (!isActive()) { throw new ContextNotActiveException("CDI context with scope annotation @" + getScope().getName() + " is not active with respect to the current thread"); diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java index e872b9a..92b6d4a 100644 --- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java @@ -27,8 +27,8 @@ public class BatchEEScopeExtension implements Extension { private StepContextImpl stepContext; void addBatchScopes(final @Observes AfterBeanDiscovery afterBeanDiscovery, final BeanManager bm) { - jobContext = new JobContextImpl(); - stepContext = new StepContextImpl(); + jobContext = new JobContextImpl(bm); + stepContext = new StepContextImpl(bm); afterBeanDiscovery.addContext(jobContext); afterBeanDiscovery.addContext(stepContext); diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ContextResolver.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ContextResolver.java new file mode 100644 index 0000000..a18c4b9 --- /dev/null +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ContextResolver.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cdi.impl; + +import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; + +/** + * Provides methods to resolve {@link JobContext} and {@link StepContext} + * for setting up {@link JobContextImpl} and {@link StepContextImpl}. + */ +interface ContextResolver { + + /** + * @return the current {@link JobContext} for this {@link Thread} + */ + JobContext getJobContext(); + + /** + * @return the current {@link StepContext} for this {@link Thread} + */ + StepContext getStepContext(); + +} diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/DynamicContextResolver.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/DynamicContextResolver.java new file mode 100644 index 0000000..c9e64a4 --- /dev/null +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/DynamicContextResolver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cdi.impl; + +import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; +import javax.enterprise.inject.spi.Bean; +import javax.enterprise.inject.spi.BeanManager; + +/** + * This implementation of {@link ContextResolver} is needed to enable BatchEE CDI-Scopes + * for other JBatch implementations then BatchEE. + *

+ * It uses {@link BeanManager} for the current {@link JobContext} and {@link StepContext} lookup. + *

+ * Every time a *Context is needed, it will be resolved via {@link BeanManager}. + */ +class DynamicContextResolver implements ContextResolver { + + private final BeanManager bm; + + + DynamicContextResolver(BeanManager bm) { + this.bm = bm; + } + + + @Override + public JobContext getJobContext() { + return resolve(JobContext.class); + } + + @Override + public StepContext getStepContext() { + return resolve(StepContext.class); + } + + + private T resolve(Class contextClass) { + Bean bean = bm.resolve(bm.getBeans(contextClass)); + return (T) bm.getReference(bean, contextClass, bm.createCreationalContext(bean)); + + } +} diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java index cfec9b2..79f51e5 100644 --- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java @@ -24,13 +24,18 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.enterprise.inject.Typed; - +import javax.enterprise.inject.spi.BeanManager; @Typed public class JobContextImpl extends BaseContext { private ConcurrentMap jobReferences = new ConcurrentHashMap(); - private ThreadLocal currentJobExecutionId = new ThreadLocal(); + + + JobContextImpl(BeanManager bm) { + super(bm); + } + @Override public Class getScope() { @@ -39,11 +44,14 @@ public Class getScope() { @Override protected Long currentKey() { - return currentJobExecutionId.get(); + return getContextResolver().getJobContext().getExecutionId(); } - public void enterJobExecution(Long jobExecutionId) { + public void enterJobExecution() { + + Long jobExecutionId = currentKey(); + AtomicInteger jobRefs = jobReferences.get(jobExecutionId); if (jobRefs == null) { jobRefs = new AtomicInteger(0); @@ -53,12 +61,12 @@ public void enterJobExecution(Long jobExecutionId) { } } jobRefs.incrementAndGet(); - - currentJobExecutionId.set(jobExecutionId); } public void exitJobExecution() { - Long jobExecutionId = currentJobExecutionId.get(); + + Long jobExecutionId = currentKey(); + AtomicInteger jobRefs = jobReferences.get(jobExecutionId); if (jobRefs != null) { int references = jobRefs.decrementAndGet(); @@ -66,8 +74,5 @@ public void exitJobExecution() { endContext(jobExecutionId); } } - - currentJobExecutionId.set(null); - currentJobExecutionId.remove(); } } diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java index ae09379..e06b27b 100644 --- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java @@ -21,11 +21,16 @@ import java.lang.annotation.Annotation; import javax.enterprise.inject.Typed; +import javax.enterprise.inject.spi.BeanManager; @Typed public class StepContextImpl extends BaseContext { - private ThreadLocal currentStepContext = new ThreadLocal(); + + StepContextImpl(BeanManager bm) { + super(bm); + } + @Override public Class getScope() { @@ -34,20 +39,12 @@ public Class getScope() { @Override protected Long currentKey() { - return currentStepContext.get(); - } - - public void enterStep(final Long stepContextId) { - currentStepContext.set(stepContextId); + return getContextResolver().getStepContext().getStepExecutionId(); } public void exitStep() { - Long stepContextId = currentKey(); - endContext(stepContextId); - currentStepContext.set(null); - currentStepContext.remove(); + endContext(currentKey()); } - } diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ThreadLocalContextResolver.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ThreadLocalContextResolver.java new file mode 100644 index 0000000..78d567e --- /dev/null +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/ThreadLocalContextResolver.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cdi.impl; + +import org.apache.batchee.container.proxy.InjectionReferences; +import org.apache.batchee.container.proxy.ProxyFactory; + +import javax.batch.runtime.context.JobContext; +import javax.batch.runtime.context.StepContext; + +/** + * Implementation of {@link ContextResolver} which is used to enable BatchEE CDI-Scopes for BatchEE. + *

+ * Uses {@link ProxyFactory#getInjectionReferences()}, which are basically {@link ThreadLocal ThreadLocals} to resolve + * the current {@link JobContext} and {@link StepContext} + */ +class ThreadLocalContextResolver implements ContextResolver { + + @Override + public JobContext getJobContext() { + + InjectionReferences references = getInjectionReferences(); + if (references != null) { + return references.getJobContext(); + } + + return null; + } + + + @Override + public StepContext getStepContext() { + + InjectionReferences references = getInjectionReferences(); + if (references != null) { + return references.getStepContext(); + } + + return null; + } + + + private static InjectionReferences getInjectionReferences() { + return ProxyFactory.getInjectionReferences(); + } +} diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java index bcaaacb..b051c25 100644 --- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java @@ -23,6 +23,10 @@ import javax.inject.Inject; import javax.inject.Named; +/** + * This Listener is important for cleanup the {@link org.apache.batchee.cdi.impl.StepContextImpl}. + * Otherwise the {@link org.apache.batchee.cdi.impl.StepContextImpl} will leak. + */ @Named @Dependent public class AfterStepScopeListener implements StepListener { diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java index 7b010dd..f8e1a35 100644 --- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java @@ -19,7 +19,6 @@ import org.apache.batchee.cdi.impl.BatchEEScopeExtension; import javax.batch.api.listener.JobListener; -import javax.batch.runtime.context.JobContext; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.inject.Named; @@ -28,12 +27,11 @@ @Dependent public class BeforeJobScopeListener implements JobListener { - private @Inject JobContext jobContext; private @Inject BatchEEScopeExtension scopeExtension; @Override public void beforeJob() throws Exception { - scopeExtension.getJobContext().enterJobExecution(jobContext.getExecutionId()); + scopeExtension.getJobContext().enterJobExecution(); } @Override diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java index 6ffc75c..3341ddb 100644 --- a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java +++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java @@ -16,25 +16,24 @@ */ package org.apache.batchee.cdi.listener; -import org.apache.batchee.cdi.impl.BatchEEScopeExtension; - import javax.batch.api.listener.StepListener; -import javax.batch.runtime.context.StepContext; import javax.enterprise.context.Dependent; -import javax.inject.Inject; import javax.inject.Named; +import java.util.logging.Level; +import java.util.logging.Logger; +/** + * @deprecated only kept for compatibility - will be removed in 1.0 + */ @Named @Dependent public class BeforeStepScopeListener implements StepListener { - private @Inject StepContext stepContext; - private @Inject BatchEEScopeExtension scopeExtension; - + private static final Logger LOG = Logger.getLogger(BeforeJobScopeListener.class.getName()); @Override public void beforeStep() throws Exception { - scopeExtension.getStepContext().enterStep(stepContext.getStepExecutionId()); + LOG.log(Level.WARNING, "BeforeStepScopeListener is not required to enable @StepScoped! This Listener will removed in future versions!"); } @Override diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java index 380c111..4cee993 100644 --- a/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java +++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java @@ -82,4 +82,13 @@ public void testPartitionedJobScoped() throws Exception { assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId()); assertTrue(JobScopedBean.isDestroyed()); } + + @Test + public void testPartitionedStepScoped() { + + JobOperator jobOperator = BatchRuntime.getJobOperator(); + + BatchStatus status = Batches.waitFor(jobOperator.start("partitioned-step-scoped", new Properties())); + assertEquals(status, BatchStatus.COMPLETED); + } } diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedStepScopedReader.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedStepScopedReader.java new file mode 100644 index 0000000..fc851a9 --- /dev/null +++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedStepScopedReader.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cdi.partitioned; + +import org.apache.batchee.cdi.component.StepScopedBean; +import org.apache.batchee.cdi.scope.StepScoped; + +import javax.batch.api.chunk.AbstractItemReader; +import javax.enterprise.context.Dependent; +import javax.inject.Inject; +import javax.inject.Named; + +@Named +@Dependent +public class PartitionedStepScopedReader extends AbstractItemReader { + + @Inject + private StepScopedBean bean; + + private int count; + + @Override + public Object readItem() throws Exception { + + if (++count < 11) { + return "continue - BeanId: " + bean.getId(); + } + + return null; + } +} diff --git a/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-step-scoped.xml b/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-step-scoped.xml new file mode 100644 index 0000000..e87e7dd --- /dev/null +++ b/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-step-scoped.xml @@ -0,0 +1,51 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/extensions/cdi/src/test/resources/batchee.properties b/extensions/cdi/src/test/resources/batchee.properties index e9a67fb..efe1da4 100644 --- a/extensions/cdi/src/test/resources/batchee.properties +++ b/extensions/cdi/src/test/resources/batchee.properties @@ -16,5 +16,4 @@ org.apache.batchee.job.listeners.before = beforeJobScopeListener org.apache.batchee.job.listeners.after = afterJobScopeListener -org.apache.batchee.step.listeners.before = beforeStepScopeListener org.apache.batchee.step.listeners.after = afterStepScopeListener