diff --git a/fineract-cob/src/main/java/org/apache/fineract/cob/common/CustomJobParameterResolver.java b/fineract-cob/src/main/java/org/apache/fineract/cob/common/CustomJobParameterResolver.java index 06328b14614..05896b47859 100644 --- a/fineract-cob/src/main/java/org/apache/fineract/cob/common/CustomJobParameterResolver.java +++ b/fineract-cob/src/main/java/org/apache/fineract/cob/common/CustomJobParameterResolver.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.Set; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.fineract.cob.exceptions.CustomJobParameterNotFoundException; import org.apache.fineract.infrastructure.core.serialization.GoogleGsonSerializerHelper; import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO; @@ -37,8 +38,10 @@ import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.item.ExecutionContext; import org.springframework.stereotype.Component; +@Slf4j @RequiredArgsConstructor @Component public class CustomJobParameterResolver { @@ -47,15 +50,21 @@ public class CustomJobParameterResolver { protected Gson gson = GoogleGsonSerializerHelper.createSimpleGson(); - public void resolve(StepContribution contribution, ChunkContext chunkContext, String customJobParameterKey, - String parameterNameInExecutionContext) { - Set jobParameterDTOList = getCustomJobParameterSet(chunkContext.getStepContext().getStepExecution()) + public void resolveToJobExecutionContext(final StepContribution contribution, final ChunkContext chunkContext, + final String[] requiredParameterNames, final String[] optionalParameterNames) { + final Set jobParameterDTOList = getCustomJobParameterSet(chunkContext.getStepContext().getStepExecution()) .orElseThrow(() -> new CustomJobParameterNotFoundException(SpringBatchJobConstants.CUSTOM_JOB_PARAMETER_ID_KEY)); - JobParameterDTO businessDateParameter = jobParameterDTOList.stream() - .filter(jobParameterDTO -> customJobParameterKey.equals(jobParameterDTO.getParameterName())) // - .findFirst().orElseThrow(() -> new CustomJobParameterNotFoundException(customJobParameterKey)); - contribution.getStepExecution().getExecutionContext().put(parameterNameInExecutionContext, - businessDateParameter.getParameterValue()); + final ExecutionContext jobExecutionContext = contribution.getStepExecution().getJobExecution().getExecutionContext(); + for (String parameterName : requiredParameterNames) { + final JobParameterDTO dto = jobParameterDTOList.stream().filter(p -> parameterName.equals(p.getParameterName())).findFirst() + .orElseThrow(() -> new CustomJobParameterNotFoundException(parameterName)); + jobExecutionContext.put(parameterName, dto.getParameterValue()); + } + for (String parameterName : optionalParameterNames) { + jobParameterDTOList.stream().filter(p -> parameterName.equals(p.getParameterName())).findFirst().ifPresentOrElse( + dto -> jobExecutionContext.put(parameterName, dto.getParameterValue()), + () -> log.warn("Optional custom job parameter '{}' not found in custom parameter table.", parameterName)); + } } /** @@ -70,12 +79,6 @@ public Optional> getCustomJobParameterSet(StepExecution ste .map(json -> gson.fromJson(json, new TypeToken>() {}.getType())); } - public Optional getCustomJobParameterById(StepExecution stepExecution, String key) { - return getCustomJobParameterSet(stepExecution) - .flatMap(paramterList -> paramterList.stream().filter(dto -> dto.getParameterName().equals(key)).findFirst()) - .map(JobParameterDTO::getParameterValue); - } - /** * Resolve job parameters from step execution context, * like @org.springframework.batch.core.scope.context.StepContext#getJobParameters() diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/common/InitialisationTasklet.java b/fineract-cob/src/main/java/org/apache/fineract/cob/common/InitialisationTasklet.java similarity index 95% rename from fineract-provider/src/main/java/org/apache/fineract/cob/common/InitialisationTasklet.java rename to fineract-cob/src/main/java/org/apache/fineract/cob/common/InitialisationTasklet.java index a642756795e..4093608b7cd 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/cob/common/InitialisationTasklet.java +++ b/fineract-cob/src/main/java/org/apache/fineract/cob/common/InitialisationTasklet.java @@ -24,7 +24,7 @@ import java.util.Objects; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.fineract.cob.loan.LoanCOBConstant; +import org.apache.fineract.cob.COBConstant; import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType; import org.apache.fineract.infrastructure.core.domain.ActionContext; import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; @@ -56,7 +56,7 @@ public RepeatStatus execute(@NonNull StepContribution contribution, @NonNull Chu ThreadLocalContextUtil.setActionContext(ActionContext.COB); String businessDateString = Objects.requireNonNull((String) chunkContext.getStepContext().getStepExecution().getJobExecution() - .getExecutionContext().get(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME)); + .getExecutionContext().get(COBConstant.BUSINESS_DATE_PARAMETER_NAME)); LocalDate businessDate = LocalDate.parse(businessDateString, DateTimeFormatter.ISO_DATE); businessDates.put(BusinessDateType.COB_DATE, businessDate); diff --git a/fineract-cob/src/main/java/org/apache/fineract/cob/listener/CobWorkerStepListener.java b/fineract-cob/src/main/java/org/apache/fineract/cob/listener/CobWorkerStepListener.java new file mode 100644 index 00000000000..7a29dbf8e3f --- /dev/null +++ b/fineract-cob/src/main/java/org/apache/fineract/cob/listener/CobWorkerStepListener.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.cob.listener; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.scope.context.StepContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.lang.NonNull; + +/** + * Runs COB worker setup and teardown without a {@link org.springframework.batch.core.job.flow.FlowStep}. A flow uses + * {@link org.springframework.batch.core.job.SimpleStepHandler}, which persists the job execution context after every + * sub-step and causes lock contention on {@code BATCH_JOB_EXECUTION_CONTEXT} during remote partitioning. + */ +@Slf4j +@RequiredArgsConstructor +public class CobWorkerStepListener implements StepExecutionListener { + + private final Tasklet initialisationTasklet; + + private final Tasklet applyLockTasklet; + + private final Tasklet resetContextTasklet; + + @Override + public void beforeStep(@NonNull final StepExecution stepExecution) { + runTasklet(initialisationTasklet, stepExecution); + try { + runTasklet(applyLockTasklet, stepExecution); + } catch (Exception e) { + log.error("Failed to apply lock in beforeStep for '{}'; resetting thread-local context.", stepExecution.getStepName(), e); + runTasklet(resetContextTasklet, stepExecution); + throw e; + } + } + + @Override + public ExitStatus afterStep(@NonNull final StepExecution stepExecution) { + runTasklet(resetContextTasklet, stepExecution); + return stepExecution.getExitStatus(); + } + + private void runTasklet(final Tasklet tasklet, final StepExecution stepExecution) { + RepeatStatus status = RepeatStatus.CONTINUABLE; + while (RepeatStatus.CONTINUABLE.equals(status)) { + final StepContribution contribution = new StepContribution(stepExecution); + final ChunkContext chunkContext = new ChunkContext(new StepContext(stepExecution)); + try { + status = tasklet.execute(contribution, chunkContext); + if (status == null) { + status = RepeatStatus.FINISHED; + } + stepExecution.incrementCommitCount(); + } catch (Exception exception) { + throw new IllegalStateException("COB worker step listener failed for step " + stepExecution.getStepName(), exception); + } + } + } +} diff --git a/fineract-security/src/main/java/org/apache/fineract/infrastructure/security/domain/PlatformUserRepository.java b/fineract-core/src/main/java/org/apache/fineract/infrastructure/security/domain/PlatformUserRepository.java similarity index 100% rename from fineract-security/src/main/java/org/apache/fineract/infrastructure/security/domain/PlatformUserRepository.java rename to fineract-core/src/main/java/org/apache/fineract/infrastructure/security/domain/PlatformUserRepository.java diff --git a/fineract-provider/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepository.java b/fineract-core/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepository.java similarity index 100% rename from fineract-provider/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepository.java rename to fineract-core/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepository.java diff --git a/fineract-provider/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepositoryWrapper.java b/fineract-core/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepositoryWrapper.java similarity index 100% rename from fineract-provider/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepositoryWrapper.java rename to fineract-core/src/main/java/org/apache/fineract/useradministration/domain/AppUserRepositoryWrapper.java diff --git a/fineract-provider/src/main/java/org/apache/fineract/useradministration/exception/UserNotFoundException.java b/fineract-core/src/main/java/org/apache/fineract/useradministration/exception/UserNotFoundException.java similarity index 100% rename from fineract-provider/src/main/java/org/apache/fineract/useradministration/exception/UserNotFoundException.java rename to fineract-core/src/main/java/org/apache/fineract/useradministration/exception/UserNotFoundException.java diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java index 38c679946c1..464ce7dedda 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBManagerConfiguration.java @@ -37,7 +37,6 @@ import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.launch.support.RunIdIncrementer; -import org.springframework.batch.core.listener.ExecutionContextPromotionListener; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; @@ -99,7 +98,7 @@ public Step loanCOBStep(LoanCOBPartitioner partitioner) { @Bean public Step resolveCustomJobParametersStep() { return new StepBuilder("Resolve custom job parameters - Step", jobRepository) - .tasklet(resolveCustomJobParametersTasklet(), transactionManager).listener(customJobParametersPromotionListener()).build(); + .tasklet(resolveCustomJobParametersTasklet(), transactionManager).build(); } @Bean @@ -141,10 +140,4 @@ public Job loanCOBJob(LoanCOBPartitioner partitioner) { .build(); } - @Bean - public ExecutionContextPromotionListener customJobParametersPromotionListener() { - ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener(); - listener.setKeys(new String[] { LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME, LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME }); - return listener; - } } diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java index b84b8001593..95434300a55 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java @@ -25,6 +25,7 @@ import org.apache.fineract.cob.domain.LoanAccountLock; import org.apache.fineract.cob.domain.LockingService; import org.apache.fineract.cob.listener.ChunkProcessingLoanItemListener; +import org.apache.fineract.cob.listener.CobWorkerStepListener; import org.apache.fineract.cob.service.BeforeStepLockingItemReaderHelper; import org.apache.fineract.cob.service.RetrieveLoanIdService; import org.apache.fineract.infrastructure.core.config.FineractProperties; @@ -36,14 +37,10 @@ import org.apache.fineract.useradministration.domain.AppUserRepositoryWrapper; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.batch.core.job.builder.FlowBuilder; -import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -89,43 +86,8 @@ public class LoanCOBWorkerConfiguration { private ProgressiveLoanModelProcessingService progressiveLoanModelProcessingService; @Bean(name = LoanCOBConstant.LOAN_COB_WORKER_STEP) - public Step loanCOBWorkerStep(Flow cobFlow) { - return stepBuilderFactory.get("Loan COB worker - Step").inputChannel(inboundRequests).flow(cobFlow).build(); - } - - @Bean("cobFlow") - public Flow flow(Step initialisationStep, Step applyLockStep, Step loanBusinessStep, Step resetContextStep) { - return new FlowBuilder("cobFlow").start(initialisationStep).next(applyLockStep).next(loanBusinessStep).next(resetContextStep) - .build(); - } - - @Bean("initialisationStep") - @StepScope - public Step initialisationStep(@Value("#{stepExecutionContext['partition']}") String partitionName) { - return new StepBuilder("Initialisation - Step:" + partitionName, jobRepository).tasklet(initialiseContext(), transactionManager) - .build(); - } - - @Bean - public TaskExecutor cobTaskExecutor() { - if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) == 1) { - return new SyncTaskExecutor(); - } - ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setThreadNamePrefix("COB-Thread-"); - taskExecutor.setThreadGroupName("COB-Thread"); - taskExecutor.setCorePoolSize(propertyService.getThreadPoolCorePoolSize(JobName.LOAN_COB.name())); - taskExecutor.setMaxPoolSize(propertyService.getThreadPoolMaxPoolSize(JobName.LOAN_COB.name())); - taskExecutor.setQueueCapacity(propertyService.getThreadPoolQueueCapacity(JobName.LOAN_COB.name())); - taskExecutor.setAllowCoreThreadTimeOut(true); - taskExecutor.setTaskDecorator(new ContextAwareTaskDecorator()); - return taskExecutor; - } - - @Bean("loanBusinessStep") - @StepScope - public Step loanBusinessStep(@Value("#{stepExecutionContext['partition']}") String partitionName, TaskExecutor cobTaskExecutor) { - SimpleStepBuilder stepBuilder = new StepBuilder("Loan Business - Step:" + partitionName, jobRepository) + public Step loanCOBWorkerStep() { + final SimpleStepBuilder stepBuilder = stepBuilderFactory.get("Loan COB worker - Step").inputChannel(inboundRequests) .chunk(propertyService.getChunkSize(JobName.LOAN_COB.name()), transactionManager) // .reader(cobWorkerItemReader()) // .processor(cobWorkerItemProcessor()) // @@ -136,25 +98,35 @@ public Step loanBusinessStep(@Value("#{stepExecutionContext['partition']}") Stri .skip(Exception.class) // .skipLimit(propertyService.getChunkSize(LoanCOBConstant.JOB_NAME) + 1) // .listener(loanItemListener()) // + .listener(cobWorkerStepListener()) // .transactionManager(transactionManager); if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) > 1) { - stepBuilder.taskExecutor(cobTaskExecutor); + stepBuilder.taskExecutor(cobTaskExecutor()); } return stepBuilder.build(); } - @Bean("applyLockStep") - @StepScope - public Step applyLockStep(@Value("#{stepExecutionContext['partition']}") String partitionName) { - return new StepBuilder("Apply lock - Step:" + partitionName, jobRepository).tasklet(applyLock(), transactionManager).build(); + @Bean + public TaskExecutor cobTaskExecutor() { + if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) == 1) { + return new SyncTaskExecutor(); + } + final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + taskExecutor.setThreadNamePrefix("COB-Thread-"); + taskExecutor.setThreadGroupName("COB-Thread"); + taskExecutor.setCorePoolSize(propertyService.getThreadPoolCorePoolSize(JobName.LOAN_COB.name())); + taskExecutor.setMaxPoolSize(propertyService.getThreadPoolMaxPoolSize(JobName.LOAN_COB.name())); + taskExecutor.setQueueCapacity(propertyService.getThreadPoolQueueCapacity(JobName.LOAN_COB.name())); + taskExecutor.setAllowCoreThreadTimeOut(true); + taskExecutor.setTaskDecorator(new ContextAwareTaskDecorator()); + return taskExecutor; } - @Bean("resetContextStep") - @StepScope - public Step resetContextStep(@Value("#{stepExecutionContext['partition']}") String partitionName) { - return new StepBuilder("Reset context - Step:" + partitionName, jobRepository).tasklet(resetContext(), transactionManager).build(); + @Bean + public CobWorkerStepListener cobWorkerStepListener() { + return new CobWorkerStepListener(initialiseContext(), applyLock(), resetContext()); } @Bean diff --git a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ResolveLoanCOBCustomJobParametersTasklet.java b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ResolveLoanCOBCustomJobParametersTasklet.java index 62d3a339f84..f0924f1a016 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ResolveLoanCOBCustomJobParametersTasklet.java +++ b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ResolveLoanCOBCustomJobParametersTasklet.java @@ -33,9 +33,9 @@ public class ResolveLoanCOBCustomJobParametersTasklet implements Tasklet { private final CustomJobParameterResolver customJobParameterResolver; @Override - public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { - customJobParameterResolver.resolve(contribution, chunkContext, LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME, - LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME); + public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception { + customJobParameterResolver.resolveToJobExecutionContext(contribution, chunkContext, + new String[] { LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME }, new String[] { LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME }); return RepeatStatus.FINISHED; } } diff --git a/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBCustomJobParametersResolverTasklet.java b/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBCustomJobParametersResolverTasklet.java index 042351ee352..32622c13d34 100644 --- a/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBCustomJobParametersResolverTasklet.java +++ b/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBCustomJobParametersResolverTasklet.java @@ -19,6 +19,7 @@ package org.apache.fineract.cob.workingcapitalloan; import static org.apache.fineract.cob.COBConstant.BUSINESS_DATE_PARAMETER_NAME; +import static org.apache.fineract.cob.COBConstant.IS_CATCH_UP_PARAMETER_NAME; import lombok.RequiredArgsConstructor; import org.apache.fineract.cob.common.CustomJobParameterResolver; @@ -35,8 +36,9 @@ public class WorkingCapitalLoanCOBCustomJobParametersResolverTasklet implements @Nullable @Override - public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { - customJobParameterResolver.resolve(contribution, chunkContext, BUSINESS_DATE_PARAMETER_NAME, BUSINESS_DATE_PARAMETER_NAME); + public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception { + customJobParameterResolver.resolveToJobExecutionContext(contribution, chunkContext, new String[] { BUSINESS_DATE_PARAMETER_NAME }, + new String[] { IS_CATCH_UP_PARAMETER_NAME }); return RepeatStatus.FINISHED; } } diff --git a/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBManagerConfiguration.java b/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBManagerConfiguration.java index e704ae4a6e6..30312cdea86 100644 --- a/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBManagerConfiguration.java +++ b/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBManagerConfiguration.java @@ -41,7 +41,6 @@ import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.JobOperator; import org.springframework.batch.core.launch.support.RunIdIncrementer; -import org.springframework.batch.core.listener.ExecutionContextPromotionListener; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; @@ -79,11 +78,11 @@ public WorkingCapitalLoanCOBPartitioner workingCapitalLoanCOBPartitioner(@Value( } @Bean(WORKING_CAPITAL_JOB_HUMAN_READABLE_NAME) - public Job workingCapitalLoanCOBJob(WorkingCapitalLoanCOBPartitioner workingCapitalLoanCOBPartitioner, - ExecutionContextPromotionListener customJobParametersPromotionListener, ApplicationContext applicationContext) { + public Job workingCapitalLoanCOBJob(final WorkingCapitalLoanCOBPartitioner workingCapitalLoanCOBPartitioner, + final ApplicationContext applicationContext) { return new JobBuilder(WORKING_CAPITAL_LOAN_COB_JOB.name(), jobRepository) // .listener(new COBExecutionListenerRunner(applicationContext, WORKING_CAPITAL_LOAN_COB_JOB.name())) // - .start(resolveCustomJobParametersForWorkingCapitalStep(customJobParametersPromotionListener)) // + .start(resolveCustomJobParametersForWorkingCapitalStep()) // .next(workingCapitalLoanCOBStep(workingCapitalLoanCOBPartitioner)) // .next(unlockProcessedWorkingCapitalLoansStep()) // .incrementer(new RunIdIncrementer()) // @@ -107,10 +106,9 @@ public WorkingCapitalLoanCOBCustomJobParametersResolverTasklet resolveCustomJobP } @Bean - public Step resolveCustomJobParametersForWorkingCapitalStep(ExecutionContextPromotionListener customJobParametersPromotionListener) { + public Step resolveCustomJobParametersForWorkingCapitalStep() { return new StepBuilder("Resolve custom job parameters - Step", jobRepository) - .tasklet(resolveCustomJobParametersForWorkingCapitalTasklet(), transactionManager) - .listener(customJobParametersPromotionListener).build(); + .tasklet(resolveCustomJobParametersForWorkingCapitalTasklet(), transactionManager).build(); } @Bean(WORKING_CAPITAL_LOAN_COB_STEP) diff --git a/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBWorkerConfiguration.java b/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBWorkerConfiguration.java index bf3e609f205..d74e3886e97 100644 --- a/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBWorkerConfiguration.java +++ b/fineract-working-capital-loan/src/main/java/org/apache/fineract/cob/workingcapitalloan/WorkingCapitalLoanCOBWorkerConfiguration.java @@ -19,15 +19,16 @@ package org.apache.fineract.cob.workingcapitalloan; import static org.apache.fineract.cob.workingcapitalloan.WorkingCapitalLoanCOBConstant.WORKING_CAPITAL_JOB_NAME; -import static org.apache.fineract.cob.workingcapitalloan.WorkingCapitalLoanCOBConstant.WORKING_CAPITAL_LOAN_COB_BUSINESS_STEP; -import static org.apache.fineract.cob.workingcapitalloan.WorkingCapitalLoanCOBConstant.WORKING_CAPITAL_LOAN_COB_FLOW; import static org.apache.fineract.cob.workingcapitalloan.WorkingCapitalLoanCOBConstant.WORKING_CAPITAL_LOAN_COB_WORKER_STEP; import lombok.RequiredArgsConstructor; import org.apache.fineract.cob.COBBusinessStepService; +import org.apache.fineract.cob.common.InitialisationTasklet; +import org.apache.fineract.cob.common.ResetContextTasklet; import org.apache.fineract.cob.conditions.BatchWorkerCondition; import org.apache.fineract.cob.domain.LockingService; import org.apache.fineract.cob.domain.WorkingCapitalLoanAccountLock; +import org.apache.fineract.cob.listener.CobWorkerStepListener; import org.apache.fineract.cob.loan.ContextAwareTaskDecorator; import org.apache.fineract.cob.service.BeforeStepLockingItemReaderHelper; import org.apache.fineract.infrastructure.core.config.FineractProperties; @@ -36,14 +37,9 @@ import org.apache.fineract.portfolio.workingcapitalloan.domain.WorkingCapitalLoan; import org.apache.fineract.portfolio.workingcapitalloan.repository.WorkingCapitalLoanRepository; import org.springframework.batch.core.Step; -import org.springframework.batch.core.configuration.annotation.StepScope; -import org.springframework.batch.core.job.builder.FlowBuilder; -import org.springframework.batch.core.job.flow.Flow; -import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -61,7 +57,6 @@ public class WorkingCapitalLoanCOBWorkerConfiguration { private final RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory; private final MessageChannel inboundRequests; - private final JobRepository jobRepository; private final PropertyService propertyService; private final PlatformTransactionManager transactionManager; private final TransactionTemplate transactionTemplate; @@ -69,26 +64,15 @@ public class WorkingCapitalLoanCOBWorkerConfiguration { private final FineractProperties fineractProperties; private final WorkingCapitalLoanRetrieveIdService retrieveIdService; private final WorkingCapitalLoanRepository workingCapitalLoanRepository; - - @Bean(WORKING_CAPITAL_LOAN_COB_FLOW) - public Flow workingCapitalLoanCOBFlow(Step initialisationStep, Step applyWorkingCapitalLockStep, Step workingCapitalLoanCOBBusinessStep, - Step resetContextStep) { - return new FlowBuilder(WORKING_CAPITAL_LOAN_COB_FLOW).start(initialisationStep).next(applyWorkingCapitalLockStep) - .next(workingCapitalLoanCOBBusinessStep).next(resetContextStep).build(); - } + @Qualifier("initialiseContext") + private final InitialisationTasklet initialisationTasklet; + @Qualifier("resetContext") + private final ResetContextTasklet resetContextTasklet; @Bean(WORKING_CAPITAL_LOAN_COB_WORKER_STEP) - public Step workingCapitalLoanCOBWorkerStep(Flow workingCapitalLoanCOBFlow) { - return stepBuilderFactory.get(WORKING_CAPITAL_LOAN_COB_WORKER_STEP).inputChannel(inboundRequests).flow(workingCapitalLoanCOBFlow) - .build(); - } - - @Bean(WORKING_CAPITAL_LOAN_COB_BUSINESS_STEP) - @StepScope - public Step workingCapitalLoanCOBBusinessStep(@Value("#{stepExecutionContext['partition']}") String partitionName, - TaskExecutor workingCapitalCobTaskExecutor, COBBusinessStepService cobBusinessStepService) { - SimpleStepBuilder stepBuilder = new StepBuilder("Loan Business - Step:" + partitionName, - jobRepository) + public Step workingCapitalLoanCOBWorkerStep(final COBBusinessStepService cobBusinessStepService) { + final SimpleStepBuilder stepBuilder = stepBuilderFactory + .get(WORKING_CAPITAL_LOAN_COB_WORKER_STEP).inputChannel(inboundRequests) .chunk(propertyService.getChunkSize(JobName.LOAN_COB.name()), transactionManager) // .reader(new WorkingCapitalLoanCOBWorkerItemReader(workingCapitalLoanRepository, new BeforeStepLockingItemReaderHelper<>(retrieveIdService, wpcLoanLockingService))) // @@ -100,15 +84,21 @@ public Step workingCapitalLoanCOBBusinessStep(@Value("#{stepExecutionContext['pa .skip(Exception.class) // .skipLimit(propertyService.getChunkSize(WORKING_CAPITAL_JOB_NAME) + 1) // .listener(workingCapitalLoanItemListener()) // + .listener(workingCapitalCobWorkerStepListener()) // .transactionManager(transactionManager); if (propertyService.getThreadPoolMaxPoolSize(WORKING_CAPITAL_JOB_NAME) > 1) { - stepBuilder.taskExecutor(workingCapitalCobTaskExecutor); + stepBuilder.taskExecutor(workingCapitalCobTaskExecutor()); } return stepBuilder.build(); } + @Bean + public CobWorkerStepListener workingCapitalCobWorkerStepListener() { + return new CobWorkerStepListener(initialisationTasklet, applyWorkingCapitalLoanLock(), resetContextTasklet); + } + @Bean public TaskExecutor workingCapitalCobTaskExecutor() { if (propertyService.getThreadPoolMaxPoolSize(WORKING_CAPITAL_JOB_NAME) == 1) { @@ -125,19 +115,11 @@ public TaskExecutor workingCapitalCobTaskExecutor() { return taskExecutor; } - // Lock @Bean public WorkingCapitalLoanCOBWorkerItemListener workingCapitalLoanItemListener() { return new WorkingCapitalLoanCOBWorkerItemListener(wpcLoanLockingService, transactionTemplate); } - @Bean("applyWorkingCapitalLockStep") - @StepScope - public Step applyWorkingCapitalLockStep(@Value("#{stepExecutionContext['partition']}") String partitionName) { - return new StepBuilder("Apply lock - Step:" + partitionName, jobRepository) - .tasklet(applyWorkingCapitalLoanLock(), transactionManager).build(); - } - @Bean public ApplyWorkingCapitalLoanLockTasklet applyWorkingCapitalLoanLock() { return new ApplyWorkingCapitalLoanLockTasklet(fineractProperties, wpcLoanLockingService, retrieveIdService, transactionTemplate);