Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

스프링배치 #16

Open
Hae-Riri opened this issue Aug 12, 2021 · 4 comments
Open

스프링배치 #16

Hae-Riri opened this issue Aug 12, 2021 · 4 comments

Comments

@Hae-Riri
Copy link
Owner

Hae-Riri commented Aug 12, 2021

Spring Batch

  • 일련의 작업을 정해진 로직으로 수행(일괄처리)
  • 정해진 시간에 자동으로 진행되는 것들이 batch이고, 프로그래밍에서 function과 비슷하다. 어떻게 input/output을 정의하느냐에 따라 거의 모든 작업을 스프링 배치화할 수도 있다.
  • 실행 도중 문제가 생기면 그 지점부터 재시작할 수 있다는 특징을 갖고 있다.

배치 사용 시나리오

image

  1. Reads a large number of records from a database, file, or queue.
  2. Processes the data in some fashion.
  3. Writes back data in a modified form.
    읽기 -> 처리 -> 쓰기(수정된 내용 반영)

스프링 배치 계층 구조

image

  • Application : 개발자가 작성한 코드와 모든 배치 작업을 포함
  • Batch Core: 배치 작업을 시작하고 제어하는 데에 필수적인 핵심 런타임 클래스를 포함 (JobLauncher, Job, Step)
  • Batch Infrastructure: application, batch core는 infrastructure 위에 있고 이 infrastructure는 readers, writers, service(RetryTemplate 같은 거)를 포함한다.
    개발자는 Application 계층의 비즈니스 로직에 집중할 수 있고, 배치 동작과 관련된 건 Batch Core에 있는 클래스를 이용해서 제어할 수 있다.

스프링 배치 구성

image

  • Job:Step = 1:N
  • Step:ItemReader, ItemProcessor, ItemWriter = 1:1
  • Job 이라는 하나의 큰 일감 안에 여러 단계(Step)을 두고, 각 단계를 배치의 기본 흐름대로 구성.

Job

  • 여러 Step 인스턴스를 포함하는 컨테이너로, 전체 배치 처리에서 항상 최상단 계층에 있다.
  • Job = 배치 작업. Flow라고도 불리는데 최소 하나의 Step을 가져야 하고 2-10개의 step을 권장.(10개도 많음)
  • Job Instance는 Job이 실행될 때 하나의 Job 실행 단위다. 하루에 한번씩 배치가 실행된다고 가정할 때 어제의 배치와 오늘의 배치 각각이 Job Instance다.
  • Job Execution 은 Job Instance에 대한 한번의 실행을 나타내는 객체로, Job 실행 정보를 담은 도메인 객체를 갖고 있음.
  • Job Prameters 는 Job Instance와 1대 1 관계로, Job Instance를 구분하는 기준이 되기도 한다. Job이 실행될 때 필요한 파라미터들을 Map타입으로 지정하는 객체.
public class JobBuilderFactory {
    private JobRepostiroy jobrepository;

    public JobBuilderFactory(JobRepository jobRepository){
        this.jobrepository = jobrepository;
    }

    public JobBuilder get(String name){
        JobBuilder builder = new JobBuilder(name).repository(jobrepository);
        return builder;
    }
}
  • JobBuilderFactory는 JobBuilder를 생성할 수 있는 get()을 포함하고 있고, get()은 JobBuilder를 만들어서 반환한다.
  • 여기서 만들어지는 모든 JobBuilder가 레포지토리를 사용한다.
  • JobBuilderFactory는 JobBuilder를 생성하는 역할만 하고, 이렇게 생성된 JobBuilder를 통해 Job을 생성해야 한다.
  • JobBuilder가 직접적으로 Job을 만드는 건 아니고, 별도의 구체적인 빌더를 생성해서 반환한다.
public SimpleJobBuilder start(Step step){
    //(1)
    // Step을 추가해서 가장 기본이되는 SimpleJobBuilder를 생성합니다.
    return new SimpleJobBuilder(tihs).start(step);
}

public JobFlowBuilder start(Flow flow){
    //(2)
    // Flow를 실행할 JobFlowBuilder를 생성합니다.
    return new JobFlowBuilder(tihs).start(flow);
}

public JobFlowBuilder flow(Step step){
    //(3)
    // Step을 실행할 FlowJobBuilder를 생성합니다.
    return new JobFlowBuilder(tihs).start(step);
}

Step

  • [읽기 -> 처리 -> 쓰기]의 묶음. 이 묶음을 Chunk processing이라고 부르는데 하나의 트랜잭션과 비슷하다.
  • Step Execution 은 Job에 Job Execution(Job 실행정보)와 비슷하게, Step의 실행정보를 담은 객체를 갖고 있다.
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);
  • 위 코드는 chunk processing의 컨셉을 보여주는 예제인데, 한꺼번에 다 읽어서 저 chunk processing이라는 한 묶음을 실행하는 게 아니고, commitInterval 만큼만 읽어오고 쓰기 때문에 앞서 언급한 재시작이 가능한 것이다.
  • 그리고 여러 개의 ItemReader를 거치고 여러 개의 ItemProcessor를 거쳐서 여러 개의 ItermWriter를 사용하게 할 수도 있다.

Job Launcher

  • Job, Job Parameter와 함께 배치를 실행하는 인터페이스

ItemReader

  • 데이터 읽기 담당. T read()를 가짐.
Reads a piece of input data and advance to the next one. Implementations must return null at the end of the input data set. In a transactional setting, caller might get the same item twice from successive calls (or otherwise), if the first call was in a transaction that rolled back.
  • null 리턴이면 전체 데이터의 마지막이라는 의미고, 이 read 메소드의 반환값은 ItemProcessor의 Input으로 사용된다.

ItemProcessor

  • ItemReader로부터 Object를 넘겨 받아서 원하는 방식으로 가공/처리한 뒤 ItemWriter에게 넘기는 역할을 한다.
  • 한 번에 하나의 아이템을 처리.

ItemWriter

  • ItemReader나 ItemProcessor가 ItemWriter로 데이터를 넘겨주면 리스트에 차곡차곡 쌓아 놓는다. 이때 commit-interval 프로퍼티에 정의된 개수만큼 데이터가 모이면 write 메소드를 실행하게 된다. commit-interval은 Step의 Chunk에 설정할 수 있다.

스프링 배치 메타 테이블과 Job Repository

  • 메타테이블 : 스프링 배치에서는 작업을 수행하면서 일련의 상태에 관한 메타데이터들을 메타 테이블에 저장해서 실패한 작업에 대한 기록을 남겨 실패에 대비할 수 있게 도와준다.
  • Job Repository : 배치 처리 정보를 담은 매커니즘으로, 어떤 Job이 실행되었으면 몇 번 실행됐고 언제 끝나는지 등 배치 처리에 대한 메타 데이터를 저장한다.
  • 스프링 배치 프레임워크는 아래 6개 테이블을 사용해서 작동한다.

BATCH_JOB_INSTANCE : job이 실행되면 새 row를 여기에 만듦. Job 이름, Job이 시작될 때 넘어온 파라미터를 직렬화해서 저장함.
BATCH_JOB_EXECUTION : job 실행내용을 담음. job의 실패 + 성공 횟수만큼 row가 생성됨.
BATCH_JOB_EXECUTION_PARAMS : Job 파라미터가 저장된다.
BATCH_JOB_EXECUTION_CONTEXT : Job안에 있는 tasklet이나 step 같은 컴포넌트들이 정보를 교환해야 할 때가 있는데 그 때 이걸 사용해서 정보를 넣거나 빼올 수 있음. 이런 정보들이 저장됨.
BATCH_STEP_EXECUTION : Step 실행 내용을 담음. Job Execution과 마찬가지로 성공+실패 횟수만큼 row 생성됨.
image

배치 원칙과 가이드

  • 같은 서비스 환경에서 동작하는 서비스와 배치는 서로한테 영향을 미칠 수 있으니 이걸 최소화할 수 있도록 디자인하기.
  • 가능한 한 단순하게 설계.
  • 데이터 처리하는 곳과 데이터 저장소는 물리적으로 가깝게 두기.
  • 시스템 리소스 사용을 최소화하고, 최대한 많은 데이터를 메모리 위에서 처리하도록 하기.
  • 처리 시간이 긴 작업을 시작하기 전에는 메모리 재할당에 소모되는 시간을 피하도록 충분한 메모리 할당하기.
  • 데이터 무결성을 위해 검사,기록하는 코드를 적절히 추가하기.
@Hae-Riri
Copy link
Owner Author

@Hae-Riri
Copy link
Owner Author

Hae-Riri commented Aug 13, 2021

오늘 한 일

1. 과제 진행

  • 유저, 웹툰동의 정보 캐시 테스트 코드, 테스트
  • Spring Batch 조사
spring active profile을 dev로 변경해서도 테스트하려 했는데 에러가 발생해서 약간의 시간 소요가 있었음. boilerplate에 작성되어 있던 log쓰는 부분에서 해당 path의 디렉토리에 쓸 수 없다는 에러라 내 노트북에서 변경하려 했는데 그 부분은 서버 path에 로그를 쓰는 거라 Local로 진행하면 되는 거였다.
# 이슈 사항

내일 할 일

  • nstore-batch dvd-customer 로직 제거 (dvd-api 삭제)
  • Spring Batch 기술조사
  • 캐시 부분 코드 리뷰 받게 되면 리뷰 반영

공부한 내용

  • Spring Batch 조사

@Hae-Riri
Copy link
Owner Author

예제 : 휴면 회원 배치 설계

가입한 회원 중 1년이 지나도록 상태 변화가 없는 회원을 휴면 처리하는 배치작업.

  1. DB에 저장된 데이터 중 1년간 업데이트되지 않은 사용자를 찾는 로직을 ItemReader에 구현
  2. 사용자 데이터의 상태값을 휴면으로 전환하는 프로세스를 ItemProcessor에 구현
  3. 상태값이 휴면으로 전환된 사용자들을 실제 DB에 저장하는 로직을 ItemWriter에 구현

배치처리 구현 순서

  1. 휴면 회원 Job 설정
  2. 휴면 회원 Step 설정
  3. 휴면 회원 Reader, Processor, Writer 설정

Job 설정

@Configuration
public class InactiveUserJobConfig {
    ...
    @Bean
    public Job inactiveUserJob(JobBuilderFactory jobBuilderFactory, Step inactiveJobStep) { //(1)
        return jobBuilderFactory.get("inactiveUserJob")
                .preventRestart() //(2)
                .start(inactiveJobStep) //(3)
                .build();
    }

(1) JobBuilderFactory를 주입받아서 JobBuilder를 만든다. (inactiveUserJob인데, 빌더를 반환한다.)
(2) preventRestart() : 재실행 막기
(3) start(inactiveJobStep)은 파라미터에서 주입 받은 휴면 회원 관련 Step을 제일 먼저 실행하게 함.

Job 설정을 마치면 Step 설정 진행.

@Bean
public Step inactiveJobStep(StepBuilderFactory stepBuilderFactory) {
    return stepBuilderFactory.get("inactiveUserStep") //(1)
            .<User, User> chunk(10) //(2)
            .reader(inactiveUserReader()) //(3)
            .processor(inactiveUserProcessor())
            .writer(inactiveUserWriter())
            .build();
}

(1) inactiveUserStep이라는 이름의 StepBuilder를 factory를 통해 생성
(2) 제네릭을 통해 chunk()의 입력과 출력 타입을 User로 설정하고, 10으로 인자값을 주어 쓸 때에 write() 메소드를 실행할 단위를 정함. 커밋 단위가 10개임.
(3) step의 reader, processor, writer를 각각 설정.

Reader 설정

@Bean
@StepScope //(1)
public QueueItemReader<User> inactiveUserReader() {
    //(2)
    List<User> oldUsers =
            userRepository.findByUpdatedDateBeforeAndStatusEquals(
                    LocalDateTime.now().minusYears(1),
                    UserStatus.ACTIVE);

    return new QueueItemReader<>(oldUsers); //(3)
}

(1) 기본 빈 생성은 싱글톤이지만 @StepScope을 사용하면 해당 메소드는 Step의 주기에 따라 새로운 빈을 생성한다. 각 Step의 실행마다 새로운 빈을 만들기 때문에 지연 생성이 가능하고, @StepScope을 쓰면 반드시 구현된 반환 타입을 명시해서 반환해야 한다.
(2) findByUpdatedDateBeforeAndStatusEquals()를 통해 휴면 회원의 리스트를 가져옴.
(3) QueueItemReader 객체 생성, 불로운 휴면 회원들을 객체에 넣어서 반환.

Reader 설정 - QueueItemReader()

public class QueueItemReader<T> implements ItemReader<T> {
    private Queue<T> queue;

    public QueueItemReader(List<T> data) {
        this.queue = new LinkedList<>(data); //(1)
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return queue.poll(); //(2)
    }
}

QueueItemReader : queue를 사용해서 저장하는 ItemReader 구현체.
- ItemReader는 기본 반환 타입이 단수형인데 그에 따라 구현하면 User 객체 하나씩을 DB에서 select 하니까 매우 비효율적임. 그래서 Queue를 통해 휴면 회원으로 저장될 타겟 사용자들을 한번에 불러와서 담는다.
(1) 한번에 데이터 불러와서 큐에 담기.
(2) read() 안에서 큐의 poll()을 통해 큐에서 데이터를 하나씩 반환함.

Processor 설정

public ItemProcessor<User, User> inactiveUserProcessor() {
    return user -> user.setInactive();
}

읽어온 타겟 사용자를 휴면 회원으로 전환시키는 코드.

Writer 설정

public ItemWriter inactiveUserWriter() {
return ((List<? extends User> users) -> userRepository.saveAll(users));
}
ItemWriter는 앞서 설정한 청크 단위로 받아서 (아까 10개로 설정했으니 10개 커밋단위를 한 번에 받아서) 한번에 DB에 저장한다.

@Hae-Riri
Copy link
Owner Author

Hae-Riri commented Aug 14, 2021

배치 심화

  • 다양한 ItemReader 구현 클래스, ItemWriter
  • JobParameter 사용
  • 테스트 시에만 H2 DB를 사용하도록
  • 청크 지향 프로세스
  • 배치 인터셉터 리스너 설정하기
  • 어노테이션 기반 리스너 설정하기

다양한 ItemReader 구현 클래스, ItemWriter

  • 수백, 수천 개 이상의 데이터를 한번에 가져와서 메모리에 올려놓으면 좋지 않다. 배치 프로젝트에서 제공하는 것들을 사용하자. PagingItemReader, JdbcPagingItemReader, JpaPagingItemReader, HibernatePagingItemReader, HibernatePagingReader 등을 제공한다.

JobParameter 사용

  • 파라미터를 동적으로 주입시킬 수 있다.

테스트 시에만 H2 데이터베이스를 사용하도록 설정

@AutoConfigureTestDatabase(connection = EmbeddedDatabaseConnection.H2)

  • 청크 지향 프로세싱
  • 트랜잭션 경계 내에서 청크 단위로 데이터를 읽고 생성하는 프로그래밍 기법
  • 청크는 아이템이 트랜잭션에 커밋되는 수인데, Read한 데이터 수가 지정한 청크 단위와 일치하면 write을 수행하고 트랜잭션을 커밋한다.
  • 청크 지향 프로세싱의 장점은, 1000개의 데이터에 대해 배치 로직을 수행한다고 가정할 대 청크로 나누지 않으면 하나만 실패해도 다른 성공한 999개의 데이터가 롤백된다. 하지만 청크 단위로 나눠서 배치처리를 하면 도중에 배치 처리에 실패해도 다른 청크는 영향을 받지 않아서 좀 더 효율적이다.

배치 인터셉터 리스터 설정

  • 배치 흐름에서 전후 처리를 하는 리스너를 설정할 수 있다.
  • Job의 전후 처리, Step의 전후 처리, 각 청크 단위의 전후처리 등 세세한 과정을 싫애할 때 특정 로직을 제어할 수 있음.
  • 로깅 같은 데에 많이 쓰임

어노테이션 기반 리스너 설정

  • @BeforeStep, @afterstep 을 통해 시작 전후에 로그를 남길 수도 있다.

Step의 흐름을 제어하는 Flow

Step의 흐름은 읽기 - 처리 - 쓰기인데, 세부적인 조건에 따라서 Step의 실행 여부를 정할 수 있다. 흐름의 조건에 대항다는 부분을 JobExcecution 라는 인터페이스의 decide 구현을 통해 만들 수 있다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant