Skip to content
Permalink
Browse files

Particionado

  • Loading branch information...
jonasurbano committed Jan 29, 2018
1 parent e2cf935 commit 502b9fb26bc04acad6d3ad407812cf891e92bebc
@@ -0,0 +1,41 @@
package com.example.demo.configuration;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.core.io.Resource;

import java.util.HashMap;
import java.util.Map;

public class CustomMultiResourcePartitioner implements Partitioner {

private static final String DEFAULT_KEY_NAME = "fileName";

private static final String PARTITION_KEY = "partition";

private Resource[] resources = new Resource[0];

private String keyName = DEFAULT_KEY_NAME;

public void setResources(Resource[] resources) {
this.resources = resources;
}

public void setKeyName(String keyName) {
this.keyName = keyName;
}

public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
context.putString(keyName, resource.getFilename());

map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}

}
@@ -4,6 +4,7 @@
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.item.ItemReader;
@@ -16,9 +17,13 @@
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.FlatFileFormatException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.validation.BindException;

import java.util.List;
@@ -34,10 +39,10 @@
private StepBuilderFactory stepBuilderFactory;

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Person,Person>chunk(2)
.reader(csvReader())
.reader(csvReader(null))
.writer(writer())
.faultTolerant()
.skipLimit(2)
@@ -52,6 +57,33 @@ public Step step1() {
.build();
}

@Bean
public Step partitionStep() {
return stepBuilderFactory.get("partitionStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(5);
taskExecutor.setCorePoolSize(5);
taskExecutor.setQueueCapacity(5);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}

@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner();
Resource[] resources = new Resource[] { new ClassPathResource("names0.csv"), new ClassPathResource("names1.csv") };
partitioner.setResources(resources);
return partitioner;
}

@Bean
public JobExecutionListener jobListener() {
return new JobExecutionListener() {
@@ -126,7 +158,8 @@ public void onSkipInProcess(Person item, Throwable t) {
}

@Bean
public ItemReader<Person> csvReader() {
@StepScope
public FlatFileItemReader<Person> csvReader(@Value("#{stepExecutionContext[fileName]}") String filename) {
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames(new String[]{"name", "age"});

@@ -135,7 +168,7 @@ public void onSkipInProcess(Person item, Throwable t) {
lineMapper.setFieldSetMapper(personMapper());

FlatFileItemReader<Person> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new ClassPathResource("names.csv"));
itemReader.setResource(new ClassPathResource(filename));
itemReader.setLineMapper(lineMapper);
return itemReader;
}
@@ -169,10 +202,10 @@ public void write(List<? extends Person> people) throws Exception {
}

@Bean
public Job job(Step step1) throws Exception {
public Job job() throws Exception {
return jobBuilderFactory.get("job1")
.incrementer(new RunIdIncrementer())
.start(step1)
.start(partitionStep())
.listener(jobListener())
.build();
}

This file was deleted.

@@ -0,0 +1,4 @@
JUAN,20
CARLOS,40
PILAR,22
PEDRO,30
@@ -0,0 +1,4 @@
LUCAS,30
IRENE,29
PABLO,34
CRISTIAN,29

0 comments on commit 502b9fb

Please sign in to comment.
You can’t perform that action at this time.