Skip to content
Permalink
Browse files
BATCHEE-138 ensure we don't fail when a chunk is stopped with JobOper…
…ator, thanks a lot Christian Berger for his analyzis+report of this issue
  • Loading branch information
rmannibucau committed Mar 11, 2020
1 parent f8a0d46 commit 330870928a54f6b0aee94fb6fcbb4e58f23fcd34
Showing 7 changed files with 169 additions and 13 deletions.
@@ -251,6 +251,8 @@
and it avoids to spread it over all our executions
-->
<systemProperties>
<openejb.additional.exclude>com.ibm</openejb.additional.exclude>

<batchee.service-manager.log>true</batchee.service-manager.log>

<!-- replace properties file -->
@@ -26,7 +26,6 @@
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.container.util.TCCLObjectInputStream;
import org.apache.batchee.jaxb.Chunk;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
@@ -215,7 +214,6 @@ private List<Object> readAndProcess() {
List<Object> chunkToWrite = new ArrayList<Object>();
Object itemRead;
Object itemProcessed;
int readProcessedCount = 0;

while (true) {
currentItemStatus = new SingleItemStatus();
@@ -641,14 +639,14 @@ private void invokeChunk() {

private void updateNormalMetrics(int writeCount) {
int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();
if (currentChunkStatus.isFinished()) {
if (currentChunkStatus.isFinished() && !BatchStatus.STOPPING.equals(stepContext.getBatchStatus())) {
readCount--;
}
int filterCount = readCount - writeCount;

if (readCount < 0 || filterCount < 0 || writeCount < 0) {
throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount +
", Filter count: " + filterCount + ", Write count: " + writeCount);
throw new IllegalStateException("Somehow one of the metrics was less than zero. " +
"Read count: " + readCount + ", Filter count: " + filterCount + ", Write count: " + writeCount);
}
stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValueBy(readCount);
@@ -1020,7 +1018,6 @@ private void positionWriterAtCheckpoint() {
// check for data in backing store
if (writerData != null) {
byte[] writertoken = writerData.getRestartToken();
TCCLObjectInputStream writerOIS;
try {
writerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(writertoken));
} catch (Exception ex) {
@@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class SimpleJobExecutionCallbackService implements JobExecutionCallbackService {
private final ConcurrentMap<Long, Collection<CountDownLatch>> waiters = new ConcurrentHashMap<Long, Collection<CountDownLatch>>();
@@ -54,18 +55,18 @@ public void waitFor(final long id) {
toRelease = existing;
}
}

// check before blocking
final InternalJobExecution finalCheckExec = ServicesManager.find().service(BatchKernelService.class).getJobExecution(id);
if (finalCheckExec != null && Batches.isDone(finalCheckExec.getBatchStatus())) {
waiters.remove(id);
if (checkIsDone(id)) {
return;
}

final CountDownLatch latch = new CountDownLatch(1);
toRelease.add(latch);
try {
latch.await();
while (!latch.await(1, TimeUnit.SECONDS)) {
if (checkIsDone(id)) {
return;
}
}
waiters.remove(id);
} catch (final InterruptedException e) {
throw new BatchContainerRuntimeException(e);
@@ -76,4 +77,14 @@ public void waitFor(final long id) {
public void init(final Properties batchConfig) {
// no-op
}

private boolean checkIsDone(final long id) {
// check before blocking
final InternalJobExecution finalCheckExec = ServicesManager.find().service(BatchKernelService.class).getJobExecution(id);
if (finalCheckExec != null && Batches.isDone(finalCheckExec.getBatchStatus())) {
waiters.remove(id);
return true;
}
return false;
}
}
@@ -28,6 +28,7 @@
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@@ -57,14 +58,18 @@ public Instance load(final String batchId) {
try {
final Class<?> artifactClass = tccl.loadClass(batchId);
if (artifactClass != null) {
loadedArtifact = artifactClass.newInstance();
loadedArtifact = artifactClass.getConstructor().newInstance();
}
} catch (final ClassNotFoundException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
} catch (final NoSuchMethodException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
} catch (final InstantiationException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
} catch (final IllegalAccessException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e);
} catch (final InvocationTargetException e) {
throw new BatchContainerRuntimeException("Tried but failed to load artifact with id: " + batchId, e.getCause());
}
}

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.batchee.container.services.transaction;

import java.util.Properties;

import javax.batch.runtime.context.StepContext;

import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.exception.TransactionManagementException;
import org.apache.batchee.spi.TransactionManagementService;
import org.apache.batchee.spi.TransactionManagerAdapter;

public class NoTxMgrBatchTransactionService implements TransactionManagementService {
private final TransactionManagerAdapter adapter = new DefaultNonTransactionalManager();

@Override
public void init(final Properties batchConfig) throws BatchContainerServiceException {
// no-op
}

@Override
public TransactionManagerAdapter getTransactionManager(final StepContext stepContext) throws TransactionManagementException {
return adapter;
}
}
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.batchee.container.impl;

import static org.testng.Assert.assertEquals;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import javax.batch.api.chunk.AbstractItemReader;
import javax.batch.api.chunk.AbstractItemWriter;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.StepExecution;

import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService;
import org.apache.batchee.container.services.transaction.NoTxMgrBatchTransactionService;
import org.apache.batchee.spi.PersistenceManagerService;
import org.apache.batchee.spi.TransactionManagementService;
import org.apache.batchee.util.Batches;
import org.testng.annotations.Test;

public class ChunkStepControllerTest {
@Test
public void earlyStop() throws InterruptedException {
final JobOperator operator = new JobOperatorImpl(new ServicesManager() {{
init(new Properties() {{
setProperty(PersistenceManagerService.class.getSimpleName(), MemoryPersistenceManagerService.class.getName());
setProperty(TransactionManagementService.class.getSimpleName(), NoTxMgrBatchTransactionService.class.getName());
}});
}});

final long id = operator.start("stop-chunk", new Properties());
Reader.LATCH.await();
operator.stop(id);
Batches.waitFor(operator, id);
final List<StepExecution> stepExecutions = operator.getStepExecutions(id);
final StepExecution stepExecution = stepExecutions.iterator().next();
assertEquals("STOPPED", stepExecution.getExitStatus()); // before (BATCHEE-138) it was FAILED
}

public static class Reader extends AbstractItemReader {
static final CountDownLatch LATCH = new CountDownLatch(8);

@Override
public Object readItem() throws Exception {
LATCH.countDown();
return new Object();
}
}

public static class Writer extends AbstractItemWriter {
@Override
public void writeItems(final List<Object> list) {
// no-op
}
}
}
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
See the NOTICE file distributed with this work for additional information
regarding copyright ownership. Licensed under the Apache License,
Version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<job id="stop-chunk" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
<step id="test">
<chunk>
<reader ref="org.apache.batchee.container.impl.ChunkStepControllerTest$Reader" />
<writer ref="org.apache.batchee.container.impl.ChunkStepControllerTest$Writer" />
</chunk>
</step>
</job>

0 comments on commit 3308709

Please sign in to comment.