Skip to content
Permalink
Browse files
BATCHEE-113 added test for improved scope handling
  • Loading branch information
rsandtner committed Dec 20, 2016
1 parent ce60c30 commit 1a6634f10952f2aee626741c327410878f6ffbe5
Showing 5 changed files with 225 additions and 3 deletions.
@@ -19,20 +19,29 @@
import org.apache.batchee.cdi.component.Holder;
import org.apache.batchee.cdi.component.JobScopedBean;
import org.apache.batchee.cdi.component.StepScopedBean;
import org.apache.batchee.cdi.partitioned.PartitionedJobScopedReader;
import org.apache.batchee.cdi.testng.CdiContainerLifecycle;
import org.apache.batchee.util.Batches;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Listeners;
import org.testng.annotations.Test;

import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertTrue;
import java.util.Properties;

import static org.testng.Assert.*;

@Listeners(CdiContainerLifecycle.class)
public class BatchScopesTest {

@BeforeMethod
public void resetJobScopedBean() {
JobScopedBean.reset();
}

@Test
public void test() {
final JobOperator jobOperator = BatchRuntime.getJobOperator();
@@ -47,4 +56,30 @@ public void test() {
assertTrue(JobScopedBean.isDestroyed());
assertTrue(StepScopedBean.isDestroyed());
}

@Test
public void testPartitionedJobScoped() throws Exception {

JobOperator jobOperator = BatchRuntime.getJobOperator();

long executionId = jobOperator.start("partitioned-job-scoped", new Properties());

Thread.sleep(100);

assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId());

PartitionedJobScopedReader.stop();

Thread.sleep(100);

assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId());
assertFalse(JobScopedBean.isDestroyed(), "JobScopedBean must not be destroyed -> partition 2 is still running :(");

PartitionedJobScopedReader.stopPartition2();

assertEquals(Batches.waitFor(executionId), BatchStatus.COMPLETED);

assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId());
assertTrue(JobScopedBean.isDestroyed());
}
}
@@ -41,4 +41,8 @@ public static boolean isDestroyed() {
void destroy() {
destroyed = true;
}

public static void reset() {
destroyed = false;
}
}
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.cdi.partitioned;

import org.apache.batchee.cdi.component.JobScopedBean;

import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.AbstractItemReader;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.logging.Level;
import java.util.logging.Logger;

@Named
@Dependent
public class PartitionedJobScopedReader extends AbstractItemReader {

private static final Logger LOG = Logger.getLogger(PartitionedJobScopedReader.class.getName());

private static volatile boolean stop;
private static volatile boolean stopPartition2;

private static long originalBeanId;
private static long currentBeanId;

private boolean firstRun = true;

@Inject
@BatchProperty
private Integer partition;

@Inject
private JobScopedBean jobScopedBean;


@Override
public Object readItem() throws Exception {

if (firstRun) {
originalBeanId = jobScopedBean.getId();
}

currentBeanId = jobScopedBean.getId();

if (partition == 2 && stopPartition2 || partition != 2 && stop) {
LOG.log(Level.INFO, "Finished partition "+ partition);
return null;
}

Thread.sleep(10);
return "Partition " + partition + ": JobScopedBean destroyed? " + JobScopedBean.isDestroyed();
}


public static void stop() {
LOG.log(Level.INFO, "Stopping all partitions except Partition 2");
stop = true;
}

public static void stopPartition2() {
stopPartition2 = true;
}

public static long originalBeanId() {
return originalBeanId;
}

public static long currentBeanId() {
return currentBeanId;
}
}
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.cdi.partitioned;

import org.apache.batchee.cdi.component.JobScopedBean;

import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.AbstractItemWriter;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

@Named
@Dependent
public class PartitionedJobScopedWriter extends AbstractItemWriter {

private static final Logger LOG = Logger.getLogger(PartitionedJobScopedWriter.class.getName());

@Inject
@BatchProperty
private Integer partition;


@Override
public void writeItems(List<Object> items) throws Exception {
LOG.log(Level.INFO, "Writing {0} items from partition {1}", new Object[]{items.size(), partition});
LOG.log(Level.INFO, "JobScopedBean destroyed? {0}", JobScopedBean.isDestroyed());
}
}
@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
See the NOTICE file distributed with this work for additional information
regarding copyright ownership. Licensed under the Apache License,
Version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<job id="partitioned-job-scoped-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">

<step id="partitioned-step">

<chunk item-count="100">
<reader ref="partitionedJobScopedReader">
<properties>
<property name="partition" value="#{partitionPlan['index']}" />
</properties>
</reader>
<writer ref="partitionedJobScopedWriter">
<properties>
<property name="partition" value="#{partitionPlan['index']}" />
</properties>
</writer>
</chunk>

<partition>

<plan partitions="2" threads="2">

<properties partition="0">
<property name="index" value="1" />
</properties>

<properties partition="1">
<property name="index" value="2" />
</properties>

</plan>

</partition>

</step>

</job>

0 comments on commit 1a6634f

Please sign in to comment.