Skip to content

Commit

Permalink
Thread-safe property resolution for concurrent execution of the same …
Browse files Browse the repository at this point in the history
…job: Added JobBuilder.createJob(String, Consumer<PropertyResolver>>) and deprecated JobProperties
  • Loading branch information
chrisgleissner committed May 17, 2020
1 parent d8189c0 commit da85749
Show file tree
Hide file tree
Showing 16 changed files with 355 additions and 194 deletions.
57 changes: 44 additions & 13 deletions README.md
Expand Up @@ -8,7 +8,7 @@

REST API for <a href="https://spring.io/projects/spring-batch">Spring Batch</a> based on <a href="https://github.com/spring-projects/spring-boot">Spring Boot 2.2</a> and <a href="https://github.com/spring-projects/spring-hateoas">Spring HATOEAS</a>. It comes with an OpenAPI 3 documentation provided by <a href="https://github.com/springdoc/springdoc-openapi">Springdoc</a>.

Supports Java 8 and above. Tested using OpenJDK 8, 11, and 14.
Supports Java 8 and above. Tested on OpenJDK 8, 11, and 14.

## Features
- Get information on jobs, job executions, and Quartz schedules
Expand Down Expand Up @@ -227,29 +227,60 @@ This disables the global exception handling via `com.github.chrisgleissner.sprin

## Job Property Overrides

Properties can be overridden when starting a job via REST. These overrides can then be accessed from a job either via:
Properties can be overridden when starting a job via REST. You can then access these overrides in one of the following ways.

### @Value

Annotate your Spring bean method with `@StepScope` and use the `@Value("#{jobParameters['name']}")` annotation on a method parameter
to specify the desired job parameter name.

Please note that this approach won't transparently fall back to Spring environment properties. Thus,
if this is desired, you should manually check if a job parameter is `null` and in this case return it from the Spring `Environment` instance.

Example:
```java
@Bean @StepScope
ItemWriter<Object> writer(@Value("#{jobParameters['propName']}") String prop) {
// ...
}
```

### PropertyResolver

When using `AdhocStarter`, you can create a `Job` using a `JobBuilder` and pass in a `Consumer<PropertyResolver>`.

Properties looked up from this `PropertyResolver` transparently fall back to the Spring environment if properties can't be found in the job parameters.

Example:
```java
Job job = jobBuilder.createJob("sampleJob", propertyResolver -> {
String propertyValue = propertyResolver.getProperty("sampleProperty");
...
});
```
### JobProperties

In case you don't execute the same job concurrently, you may also look up properties from the `JobProperties` singleton.

Properties looked up from this singleton transparently fall back to the Spring environment if properties can't be found in the
job parameters.

This approach is *deprecated* as it doesn't work with concurrent execution of the same job. Therefore, it is recommended to use one
of the other approaches.

Example:
```java
@Bean
ItemWriter<Object> writer() {
return new ItemWriter<Object>() {
@Override
public void write(List<?> items) throws Exception {
String prop = JobPropertyResolvers.JobProperties.of("jobName").getProperty("propName");
String prop = JobPropertyResolvers.JobProperties.of("sampleJob").getProperty("sampleProperty");
// ...
}
}
}
```
or alternatively by using `@StepScope`-annotated beans:
```java
@StepScope
@Bean
ItemWriter<Object> writer(@Value("#{jobParameters['propName']}") String prop) {
// ...
}
```

If a property is not overridden, it is resolved against the Spring environment. All overrides are reverted on job completion.

## Utilities

Expand Down
Expand Up @@ -5,7 +5,6 @@
import com.github.chrisgleissner.springbatchrest.util.core.JobBuilder;
import com.github.chrisgleissner.springbatchrest.util.core.JobConfig;
import com.github.chrisgleissner.springbatchrest.util.core.config.AdHocBatchConfig;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -26,7 +25,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.github.chrisgleissner.springbatchrest.util.core.property.JobPropertyResolvers.JobProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.batch.core.ExitStatus.COMPLETED;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
Expand Down Expand Up @@ -56,11 +54,11 @@ public class RestTest {
@Before
public void setUp() {
if (firstExecution.compareAndSet(true, false)) {
Job job = jobBuilder.createJob(JOB_NAME, () -> {
String propertyValue = JobProperties.of(JOB_NAME).getProperty(PROPERTY_NAME);
Job job = jobBuilder.createJob(JOB_NAME, propertyResolver -> {
String propertyValue = propertyResolver.getProperty(PROPERTY_NAME);
propertyValues.add(propertyValue);

String exceptionMessage = JobProperties.of(JOB_NAME).getProperty(EXCEPTION_MESSAGE_PROPERTY_NAME);
String exceptionMessage = propertyResolver.getProperty(EXCEPTION_MESSAGE_PROPERTY_NAME);
if (exceptionMessage != null)
throw new RuntimeException(exceptionMessage);

Expand Down
Expand Up @@ -4,11 +4,10 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.DuplicateJobException;
import org.springframework.batch.core.configuration.JobFactory;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
Expand All @@ -22,39 +21,32 @@
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ClassPathResource;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import static com.github.chrisgleissner.springbatchrest.util.core.property.JobPropertyResolvers.JobProperties;
import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.synchronizedList;

@Slf4j
@Configuration
@EnableBatchProcessing
@Configuration @EnableBatchProcessing @RequiredArgsConstructor @Slf4j
public class PersonJobConfig {

static final String JOB_NAME = "personJob";
static final String LAST_NAME_PREFIX = "lastNamePrefix";

@Autowired
JobBuilderFactory jobs;

@Autowired
StepBuilderFactory steps;

@Autowired
JobRegistry jobRegistry;
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
private final JobRegistry jobRegistry;
private final Environment environment;

@Bean
Job personJob(@Qualifier("personStep") Step personStep) throws DuplicateJobException {
Job personJob(@Qualifier("personStep") Step personStep) {
return JobBuilder.registerJob(jobRegistry, jobs.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(personStep)
Expand All @@ -78,31 +70,32 @@ FlatFileItemReader<Person> personReader() {
.name("personItemReader")
.resource(new ClassPathResource("person.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.names("firstName", "lastName")
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}

@Bean
ItemProcessor personProcessor(@Qualifier("personNameCaseChange") ItemProcessor personNameCaseChange) {
@Bean @StepScope
ItemProcessor personProcessor(
@Qualifier("personNameCaseChange") ItemProcessor personNameCaseChange,
@Value("#{jobParameters['" + LAST_NAME_PREFIX + "']}") String lastNamePrefix) {
CompositeItemProcessor p = new CompositeItemProcessor();
p.setDelegates(newArrayList(personNameFilter(), personNameCaseChange));
p.setDelegates(newArrayList(
personNameFilter(Optional.ofNullable(lastNamePrefix).orElseGet(() -> environment.getProperty(LAST_NAME_PREFIX))),
personNameCaseChange));
return p;
}

@Bean
ItemProcessor personNameFilter() {
private ItemProcessor personNameFilter(String lastNamePrefix) {
return new FunctionItemProcessor<Person, Person>(p -> {
String lastNamePrefix = JobProperties.of(PersonJobConfig.JOB_NAME).getProperty(LAST_NAME_PREFIX);
log.info("Last name prefix: {}", lastNamePrefix);
return p.lastName != null && p.lastName.startsWith(lastNamePrefix) ? p : null;
});
}

@StepScope
@Bean
@Bean @StepScope
ItemProcessor personNameCaseChange(@Value("#{jobParameters['upperCase']}") Boolean upperCaseParam) {
boolean upperCase = upperCaseParam == null ? false : upperCaseParam;
log.info("personNameCaseChange(upperCase={})", upperCase);
Expand All @@ -116,8 +109,14 @@ CacheItemWriter<Person> personWriter() {
return new CacheItemWriter<>();
}

public class CacheItemWriter<T> implements ItemWriter<T> {
private List<T> items = synchronizedList(new LinkedList<>());
@Data @NoArgsConstructor @AllArgsConstructor
public static class Person {
private String firstName;
private String lastName;
}

public static class CacheItemWriter<T> implements ItemWriter<T> {
private final List<T> items = synchronizedList(new LinkedList<>());

@Override
public void write(List<? extends T> items) {
Expand All @@ -132,12 +131,4 @@ public void clear() {
items.clear();
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Person {
private String firstName;
private String lastName;
}
}
Expand Up @@ -31,18 +31,12 @@
@SpringBootTest(webEnvironment = DEFINED_PORT)
@EnableAutoConfiguration
public class PersonJobTest {

@LocalServerPort
private int port;

@Autowired
private TestRestTemplate restTemplate;

@Autowired
private PersonJobConfig.CacheItemWriter<PersonJobConfig.Person> cacheItemWriter;
@LocalServerPort private int port;
@Autowired private TestRestTemplate restTemplate;
@Autowired private PersonJobConfig.CacheItemWriter<PersonJobConfig.Person> cacheItemWriter;

@Test
public void canStartJob() throws NoSuchJobException {
public void canStartJob() {
cacheItemWriter.clear();
startJob(Optional.empty(), Optional.empty());
assertThat(cacheItemWriter.getItems()).hasSize(5);
Expand Down
Expand Up @@ -4,6 +4,7 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
Expand All @@ -23,30 +24,25 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ClassPathResource;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import static com.github.chrisgleissner.springbatchrest.util.core.property.JobPropertyResolvers.JobProperties;
import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.synchronizedList;

@Slf4j
@Configuration
@Configuration @RequiredArgsConstructor @Slf4j
public class PersonJobConfig {

static final String JOB_NAME = "personJob";
static final String LAST_NAME_PREFIX = "lastNamePrefix";

@Autowired
JobBuilderFactory jobs;

@Autowired
StepBuilderFactory steps;

@Autowired
private AdHocScheduler adHocScheduler;
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
private final AdHocScheduler adHocScheduler;
private final Environment environment;

@Bean
Job personJob(@Qualifier("personStep") Step personStep) {
Expand Down Expand Up @@ -81,24 +77,25 @@ FlatFileItemReader<Person> personReader() {
.build();
}

@Bean
ItemProcessor personProcessor(@Qualifier("personNameCaseChange") ItemProcessor personNameCaseChange) {
@Bean @StepScope
ItemProcessor personProcessor(
@Qualifier("personNameCaseChange") ItemProcessor personNameCaseChange,
@Value("#{jobParameters['" + LAST_NAME_PREFIX + "']}") String lastNamePrefix) {
CompositeItemProcessor p = new CompositeItemProcessor();
p.setDelegates(newArrayList(personNameFilter(), personNameCaseChange));
p.setDelegates(newArrayList(
personNameFilter(Optional.ofNullable(lastNamePrefix).orElseGet(() -> environment.getProperty(LAST_NAME_PREFIX))),
personNameCaseChange));
return p;
}

@Bean
ItemProcessor personNameFilter() {
private ItemProcessor personNameFilter(String lastNamePrefix) {
return new FunctionItemProcessor<Person, Person>(p -> {
String lastNamePrefix = JobProperties.of(PersonJobConfig.JOB_NAME).getProperty(LAST_NAME_PREFIX);
log.info("Last name prefix: {}", lastNamePrefix);
return p.lastName != null && p.lastName.startsWith(lastNamePrefix) ? p : null;
});
}

@StepScope
@Bean
@Bean @StepScope
ItemProcessor personNameCaseChange(@Value("#{jobParameters['upperCase']}") Boolean upperCaseParam) {
boolean upperCase = upperCaseParam == null ? false : upperCaseParam;
log.info("personNameCaseChange(upperCase={})", upperCase);
Expand Down
Expand Up @@ -23,18 +23,10 @@
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = DEFINED_PORT)
public class PersonJobTest {

@LocalServerPort
private int port;

@Autowired
private TestRestTemplate restTemplate;

@Autowired
private PersonJobConfig.CacheItemWriter<PersonJobConfig.Person> cacheItemWriter;

@Autowired
private JobRegistry jobRegistry;
@LocalServerPort private int port;
@Autowired private TestRestTemplate restTemplate;
@Autowired private PersonJobConfig.CacheItemWriter<PersonJobConfig.Person> cacheItemWriter;
@Autowired private JobRegistry jobRegistry;

@Test
public void canStartJob() throws NoSuchJobException {
Expand All @@ -58,16 +50,11 @@ public void canStartJob() throws NoSuchJobException {
}

private JobExecution startJob(Optional<String> lastNamePrefix, Optional<Boolean> upperCase) {
JobConfig.JobConfigBuilder jobConfigBuilder = JobConfig.builder()
.name(PersonJobConfig.JOB_NAME).asynchronous(false);
if (lastNamePrefix.isPresent())
jobConfigBuilder.property(PersonJobConfig.LAST_NAME_PREFIX, lastNamePrefix.get());
if (upperCase.isPresent())
jobConfigBuilder.property("upperCase", "" + upperCase.get());
JobConfig jobConfig = jobConfigBuilder.build();

JobConfig.JobConfigBuilder jobConfigBuilder = JobConfig.builder().name(PersonJobConfig.JOB_NAME).asynchronous(false);
lastNamePrefix.ifPresent(s -> jobConfigBuilder.property(PersonJobConfig.LAST_NAME_PREFIX, s));
upperCase.ifPresent(aBoolean -> jobConfigBuilder.property("upperCase", "" + aBoolean));
ResponseEntity<JobExecutionResource> responseEntity = restTemplate.postForEntity("http://localhost:" + port + "/jobExecutions",
jobConfig, JobExecutionResource.class);
jobConfigBuilder.build(), JobExecutionResource.class);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
return responseEntity.getBody().getJobExecution();
}
Expand Down

0 comments on commit da85749

Please sign in to comment.