Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.cob.exceptions.CustomJobParameterNotFoundException;
import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper;
import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
Expand All @@ -37,8 +38,10 @@
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;

@Slf4j
@RequiredArgsConstructor
@Component
public class CustomJobParameterResolver {
Expand All @@ -47,15 +50,21 @@ public class CustomJobParameterResolver {

protected Gson gson = GoogleGsonSerializerHelper.createSimpleGson();

public void resolve(StepContribution contribution, ChunkContext chunkContext, String customJobParameterKey,
String parameterNameInExecutionContext) {
Set<JobParameterDTO> jobParameterDTOList = getCustomJobParameterSet(chunkContext.getStepContext().getStepExecution())
public void resolveToJobExecutionContext(final StepContribution contribution, final ChunkContext chunkContext,
final String[] requiredParameterNames, final String[] optionalParameterNames) {
final Set<JobParameterDTO> jobParameterDTOList = getCustomJobParameterSet(chunkContext.getStepContext().getStepExecution())
.orElseThrow(() -> new CustomJobParameterNotFoundException(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY));
JobParameterDTO businessDateParameter = jobParameterDTOList.stream()
.filter(jobParameterDTO -> customJobParameterKey.equals(jobParameterDTO.getParameterName())) //
.findFirst().orElseThrow(() -> new CustomJobParameterNotFoundException(customJobParameterKey));
contribution.getStepExecution().getExecutionContext().put(parameterNameInExecutionContext,
businessDateParameter.getParameterValue());
final ExecutionContext jobExecutionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
for (String parameterName : requiredParameterNames) {
final JobParameterDTO dto = jobParameterDTOList.stream().filter(p -> parameterName.equals(p.getParameterName())).findFirst()
.orElseThrow(() -> new CustomJobParameterNotFoundException(parameterName));
jobExecutionContext.put(parameterName, dto.getParameterValue());
}
for (String parameterName : optionalParameterNames) {
jobParameterDTOList.stream().filter(p -> parameterName.equals(p.getParameterName())).findFirst().ifPresentOrElse(
dto -> jobExecutionContext.put(parameterName, dto.getParameterValue()),
() -> log.warn("Optional custom job parameter '{}' not found in custom parameter table.", parameterName));
}
}

/**
Expand All @@ -70,12 +79,6 @@ public Optional<Set<JobParameterDTO>> getCustomJobParameterSet(StepExecution ste
.map(json -> gson.fromJson(json, new TypeToken<HashSet<JobParameterDTO>>() {}.getType()));
}

public Optional<String> getCustomJobParameterById(StepExecution stepExecution, String key) {
return getCustomJobParameterSet(stepExecution)
.flatMap(paramterList -> paramterList.stream().filter(dto -> dto.getParameterName().equals(key)).findFirst())
.map(JobParameterDTO::getParameterValue);
}

/**
* Resolve job parameters from step execution context,
* like @org.springframework.batch.core.scope.context.StepContext#getJobParameters()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.cob.loan.LoanCOBConstant;
import org.apache.fineract.cob.COBConstant;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.core.domain.ActionContext;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
Expand Down Expand Up @@ -56,7 +56,7 @@ public RepeatStatus execute(@NonNull StepContribution contribution, @NonNull Chu
ThreadLocalContextUtil.setActionContext(ActionContext.COB);

String businessDateString = Objects.requireNonNull((String) chunkContext.getStepContext().getStepExecution().getJobExecution()
.getExecutionContext().get(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME));
.getExecutionContext().get(COBConstant.BUSINESS_DATE_PARAMETER_NAME));
LocalDate businessDate = LocalDate.parse(businessDateString, DateTimeFormatter.ISO_DATE);

businessDates.put(BusinessDateType.COB_DATE, businessDate);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.cob.listener;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.StepContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.lang.NonNull;

/**
* Runs COB worker setup and teardown without a {@link org.springframework.batch.core.job.flow.FlowStep}. A flow uses
* {@link org.springframework.batch.core.job.SimpleStepHandler}, which persists the job execution context after every
* sub-step and causes lock contention on {@code BATCH_JOB_EXECUTION_CONTEXT} during remote partitioning.
*/
@Slf4j
@RequiredArgsConstructor
public class CobWorkerStepListener implements StepExecutionListener {

private final Tasklet initialisationTasklet;

private final Tasklet applyLockTasklet;

private final Tasklet resetContextTasklet;

@Override
public void beforeStep(@NonNull final StepExecution stepExecution) {
runTasklet(initialisationTasklet, stepExecution);
try {
runTasklet(applyLockTasklet, stepExecution);
} catch (Exception e) {
log.error("Failed to apply lock in beforeStep for '{}'; resetting thread-local context.", stepExecution.getStepName(), e);
runTasklet(resetContextTasklet, stepExecution);
throw e;
}
}

@Override
public ExitStatus afterStep(@NonNull final StepExecution stepExecution) {
runTasklet(resetContextTasklet, stepExecution);
return stepExecution.getExitStatus();
}

private void runTasklet(final Tasklet tasklet, final StepExecution stepExecution) {
RepeatStatus status = RepeatStatus.CONTINUABLE;
while (RepeatStatus.CONTINUABLE.equals(status)) {
final StepContribution contribution = new StepContribution(stepExecution);
final ChunkContext chunkContext = new ChunkContext(new StepContext(stepExecution));
try {
status = tasklet.execute(contribution, chunkContext);
if (status == null) {
status = RepeatStatus.FINISHED;
}
stepExecution.incrementCommitCount();
} catch (Exception exception) {
throw new IllegalStateException("COB worker step listener failed for step " + stepExecution.getStepName(), exception);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
Expand Down Expand Up @@ -99,7 +98,7 @@ public Step loanCOBStep(LoanCOBPartitioner partitioner) {
@Bean
public Step resolveCustomJobParametersStep() {
return new StepBuilder("Resolve custom job parameters - Step", jobRepository)
.tasklet(resolveCustomJobParametersTasklet(), transactionManager).listener(customJobParametersPromotionListener()).build();
.tasklet(resolveCustomJobParametersTasklet(), transactionManager).build();
}

@Bean
Expand Down Expand Up @@ -141,10 +140,4 @@ public Job loanCOBJob(LoanCOBPartitioner partitioner) {
.build();
}

@Bean
public ExecutionContextPromotionListener customJobParametersPromotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] { LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME, LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME });
return listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fineract.cob.domain.LoanAccountLock;
import org.apache.fineract.cob.domain.LockingService;
import org.apache.fineract.cob.listener.ChunkProcessingLoanItemListener;
import org.apache.fineract.cob.listener.CobWorkerStepListener;
import org.apache.fineract.cob.service.BeforeStepLockingItemReaderHelper;
import org.apache.fineract.cob.service.RetrieveLoanIdService;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
Expand All @@ -36,14 +37,10 @@
import org.apache.fineract.useradministration.domain.AppUserRepositoryWrapper;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -89,43 +86,8 @@ public class LoanCOBWorkerConfiguration {
private ProgressiveLoanModelProcessingService progressiveLoanModelProcessingService;

@Bean(name = LoanCOBConstant.LOAN_COB_WORKER_STEP)
public Step loanCOBWorkerStep(Flow cobFlow) {
return stepBuilderFactory.get("Loan COB worker - Step").inputChannel(inboundRequests).flow(cobFlow).build();
}

@Bean("cobFlow")
public Flow flow(Step initialisationStep, Step applyLockStep, Step loanBusinessStep, Step resetContextStep) {
return new FlowBuilder<Flow>("cobFlow").start(initialisationStep).next(applyLockStep).next(loanBusinessStep).next(resetContextStep)
.build();
}

@Bean("initialisationStep")
@StepScope
public Step initialisationStep(@Value("#{stepExecutionContext['partition']}") String partitionName) {
return new StepBuilder("Initialisation - Step:" + partitionName, jobRepository).tasklet(initialiseContext(), transactionManager)
.build();
}

@Bean
public TaskExecutor cobTaskExecutor() {
if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) == 1) {
return new SyncTaskExecutor();
}
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setThreadNamePrefix("COB-Thread-");
taskExecutor.setThreadGroupName("COB-Thread");
taskExecutor.setCorePoolSize(propertyService.getThreadPoolCorePoolSize(JobName.LOAN_COB.name()));
taskExecutor.setMaxPoolSize(propertyService.getThreadPoolMaxPoolSize(JobName.LOAN_COB.name()));
taskExecutor.setQueueCapacity(propertyService.getThreadPoolQueueCapacity(JobName.LOAN_COB.name()));
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setTaskDecorator(new ContextAwareTaskDecorator());
return taskExecutor;
}

@Bean("loanBusinessStep")
@StepScope
public Step loanBusinessStep(@Value("#{stepExecutionContext['partition']}") String partitionName, TaskExecutor cobTaskExecutor) {
SimpleStepBuilder<Loan, Loan> stepBuilder = new StepBuilder("Loan Business - Step:" + partitionName, jobRepository)
public Step loanCOBWorkerStep() {
final SimpleStepBuilder<Loan, Loan> stepBuilder = stepBuilderFactory.get("Loan COB worker - Step").inputChannel(inboundRequests)
.<Loan, Loan>chunk(propertyService.getChunkSize(JobName.LOAN_COB.name()), transactionManager) //
.reader(cobWorkerItemReader()) //
.processor(cobWorkerItemProcessor()) //
Expand All @@ -136,25 +98,35 @@ public Step loanBusinessStep(@Value("#{stepExecutionContext['partition']}") Stri
.skip(Exception.class) //
.skipLimit(propertyService.getChunkSize(LoanCOBConstant.JOB_NAME) + 1) //
.listener(loanItemListener()) //
.listener(cobWorkerStepListener()) //
.transactionManager(transactionManager);

if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) > 1) {
stepBuilder.taskExecutor(cobTaskExecutor);
stepBuilder.taskExecutor(cobTaskExecutor());
}

return stepBuilder.build();
}

@Bean("applyLockStep")
@StepScope
public Step applyLockStep(@Value("#{stepExecutionContext['partition']}") String partitionName) {
return new StepBuilder("Apply lock - Step:" + partitionName, jobRepository).tasklet(applyLock(), transactionManager).build();
@Bean
public TaskExecutor cobTaskExecutor() {
if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) == 1) {
return new SyncTaskExecutor();
}
final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setThreadNamePrefix("COB-Thread-");
taskExecutor.setThreadGroupName("COB-Thread");
taskExecutor.setCorePoolSize(propertyService.getThreadPoolCorePoolSize(JobName.LOAN_COB.name()));
taskExecutor.setMaxPoolSize(propertyService.getThreadPoolMaxPoolSize(JobName.LOAN_COB.name()));
taskExecutor.setQueueCapacity(propertyService.getThreadPoolQueueCapacity(JobName.LOAN_COB.name()));
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setTaskDecorator(new ContextAwareTaskDecorator());
return taskExecutor;
}

@Bean("resetContextStep")
@StepScope
public Step resetContextStep(@Value("#{stepExecutionContext['partition']}") String partitionName) {
return new StepBuilder("Reset context - Step:" + partitionName, jobRepository).tasklet(resetContext(), transactionManager).build();
@Bean
public CobWorkerStepListener cobWorkerStepListener() {
return new CobWorkerStepListener(initialiseContext(), applyLock(), resetContext());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class ResolveLoanCOBCustomJobParametersTasklet implements Tasklet {
private final CustomJobParameterResolver customJobParameterResolver;

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
customJobParameterResolver.resolve(contribution, chunkContext, LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME,
LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME);
public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
customJobParameterResolver.resolveToJobExecutionContext(contribution, chunkContext,
new String[] { LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME }, new String[] { LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME });
return RepeatStatus.FINISHED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.fineract.cob.workingcapitalloan;

import static org.apache.fineract.cob.COBConstant.BUSINESS_DATE_PARAMETER_NAME;
import static org.apache.fineract.cob.COBConstant.IS_CATCH_UP_PARAMETER_NAME;

import lombok.RequiredArgsConstructor;
import org.apache.fineract.cob.common.CustomJobParameterResolver;
Expand All @@ -35,8 +36,9 @@ public class WorkingCapitalLoanCOBCustomJobParametersResolverTasklet implements

@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
customJobParameterResolver.resolve(contribution, chunkContext, BUSINESS_DATE_PARAMETER_NAME, BUSINESS_DATE_PARAMETER_NAME);
public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
customJobParameterResolver.resolveToJobExecutionContext(contribution, chunkContext, new String[] { BUSINESS_DATE_PARAMETER_NAME },
new String[] { IS_CATCH_UP_PARAMETER_NAME });
return RepeatStatus.FINISHED;
}
}
Loading