Skip to content

Commit

Permalink
EVA-3570 Add jobs for recovering blocks through recovery agent for ca…
Browse files Browse the repository at this point in the history
…tegory ss and rs (#447)

 add recovery jobs using monotonic accession recovery agent to recover blocks for category ss and rs
  • Loading branch information
nitin-ebi committed May 14, 2024
1 parent 948cf3e commit cf8bf77
Show file tree
Hide file tree
Showing 39 changed files with 1,285 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package uk.ac.ebi.eva.accession.clustering.batch.recovery;

import org.springframework.batch.core.JobExecution;
import uk.ac.ebi.ampt2d.commons.accession.generators.monotonic.MonotonicAccessionRecoveryAgent;

import java.time.LocalDateTime;

public class RSAccessionRecoveryService {
private final static String CATEGORY_ID = "rs";
private MonotonicAccessionRecoveryAgent monotonicAccessionRecoveryAgent;
private JobExecution jobExecution;
private long recoveryCutOffDays;

public RSAccessionRecoveryService(MonotonicAccessionRecoveryAgent monotonicAccessionRecoveryAgent,
long recoveryCutOffDays) {
this.monotonicAccessionRecoveryAgent = monotonicAccessionRecoveryAgent;
this.recoveryCutOffDays = recoveryCutOffDays;
}

public void runRecoveryForCategoryRS() {
LocalDateTime recoveryCutOffTime = LocalDateTime.now().minusDays(recoveryCutOffDays);
monotonicAccessionRecoveryAgent.runRecovery(CATEGORY_ID, jobExecution.getJobId().toString(), recoveryCutOffTime);
}

public void setJobExecution(JobExecution jobExecution) {
this.jobExecution = jobExecution;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class BeanNames {

public static final String PROGRESS_LISTENER = "PROGRESS_LISTENER";

public static final String JOB_EXECUTION_LISTENER = "JOB_EXECUTION_LISTENER";

public static final String ACCESSIONING_SHUTDOWN_STEP = "ACCESSIONING_SHUTDOWN_STEP";

public static final String CLUSTERING_FROM_VCF_STEP = "CLUSTERING_FROM_VCF_STEP";
Expand Down Expand Up @@ -95,4 +97,12 @@ public class BeanNames {
public static final String CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER = "CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER";

public static final String RS_SPLIT_WRITER_JOB_EXECUTION_SETTER = "RS_SPLIT_WRITER_JOB_EXECUTION_SETTER";

public static final String RS_ACCESSION_RECOVERY_SERVICE = "RS_ACCESSION_RECOVERY_SERVICE";

public static final String RS_ACCESSION_RECOVERY_STEP = "RS_ACCESSION_RECOVERY_STEP";

public static final String RS_ACCESSION_RECOVERY_JOB = "RS_ACCESSION_RECOVERY_JOB";

public static final String RS_ACCESSION_RECOVERY_JOB_LISTENER = "RS_ACCESSION_RECOVERY_JOB_LISTENER";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.jobs;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
Expand All @@ -28,6 +29,7 @@
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTER_UNCLUSTERED_VARIANTS_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_MERGE_CANDIDATES_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_SPLIT_CANDIDATES_STEP;

Expand All @@ -44,6 +46,7 @@ public Job clusteringFromMongoJob(
@Qualifier(CLEAR_RS_MERGE_AND_SPLIT_CANDIDATES_STEP) Step clearRSMergeAndSplitCandidatesStep,
@Qualifier(CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP) Step clusteringNonClusteredVariantsFromMongoStep,
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(CLUSTER_UNCLUSTERED_VARIANTS_JOB)
.incrementer(new RunIdIncrementer())
Expand All @@ -52,6 +55,7 @@ public Job clusteringFromMongoJob(
.next(clearRSMergeAndSplitCandidatesStep)
.next(clusteringNonClusteredVariantsFromMongoStep)
.next(accessioningShutdownStep)
.listener(jobExecutionListener)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import htsjdk.samtools.util.StringUtil;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
Expand Down Expand Up @@ -47,6 +48,7 @@
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_FROM_MONGO_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.CLUSTERING_NON_CLUSTERED_VARIANTS_FROM_MONGO_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_MERGE_CANDIDATES_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROCESS_RS_SPLIT_CANDIDATES_STEP;

Expand Down Expand Up @@ -86,6 +88,7 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
// Back-propagate RS in the remapped assembly that were split or merged
@Qualifier(BACK_PROPAGATE_SPLIT_OR_MERGED_RS_STEP)
Step backPropagateSplitMergedRSStep,
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
StepBuilderFactory stepBuilderFactory,
JobBuilderFactory jobBuilderFactory,
InputParameters inputParameters) {
Expand All @@ -95,6 +98,7 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
.incrementer(new RunIdIncrementer())
//We need the dummy step here because Spring won't conditionally start the first step
.start(dummyStep)
.listener(jobExecutionListener)
.next(jobExecutionDecider)
.on("TRUE")
.to(new FlowBuilder<SimpleFlow>("remappedAssemblyClusteringFlow")
Expand All @@ -105,7 +109,8 @@ public Job clusteringFromMongoJob(@Qualifier(CLUSTERING_CLUSTERED_VARIANTS_FROM_
.next(clusteringNonClusteredVariantsFromMongoStep)
.next(accessioningShutdownStep)
.next(backPropagateNewRSStep)
.next(backPropagateSplitMergedRSStep).build())
.next(backPropagateSplitMergedRSStep)
.build())
.on("*").end()
.from(jobExecutionDecider)
.on("FALSE")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.jobs;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_JOB_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_STEP;

@Configuration
@EnableBatchProcessing
public class RSAccessionRecoveryJobConfiguration {

@Autowired
@Qualifier(RS_ACCESSION_RECOVERY_STEP)
private Step monotonicAccessionRecoveryAgentCategoryRSStep;

@Autowired
@Qualifier(RS_ACCESSION_RECOVERY_JOB_LISTENER)
private JobExecutionListener monotonicAccessionRecoveryAgentCategoryRSJobListener;

@Bean(RS_ACCESSION_RECOVERY_JOB)
public Job createMonotonicAccessionRecoveryAgentCategoryRSJob(JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(RS_ACCESSION_RECOVERY_JOB)
.incrementer(new RunIdIncrementer())
.start(monotonicAccessionRecoveryAgentCategoryRSStep)
.listener(monotonicAccessionRecoveryAgentCategoryRSJobListener)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.jobs;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
Expand All @@ -25,6 +26,7 @@
import org.springframework.context.annotation.Configuration;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.ACCESSIONING_SHUTDOWN_STEP;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_JOB;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.STUDY_CLUSTERING_STEP;

Expand All @@ -35,11 +37,13 @@ public class StudyClusteringJobConfiguration {
@Bean(STUDY_CLUSTERING_JOB)
public Job studyClusteringJob(@Qualifier(STUDY_CLUSTERING_STEP) Step clusteringStep,
@Qualifier(ACCESSIONING_SHUTDOWN_STEP) Step accessioningShutdownStep,
@Qualifier(JOB_EXECUTION_LISTENER) JobExecutionListener jobExecutionListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(STUDY_CLUSTERING_JOB)
.incrementer(new RunIdIncrementer())
.start(clusteringStep)
.next(accessioningShutdownStep)
.listener(jobExecutionListener)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package uk.ac.ebi.eva.accession.clustering.batch.listeners;
package uk.ac.ebi.eva.accession.clustering.configuration.batch.listeners;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
Expand All @@ -17,7 +17,7 @@
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_SPLIT_WRITER_JOB_EXECUTION_SETTER;

@Configuration
public class JobExecutionSetter {
public class JobExecutionSetterConfiguration {
@Bean(NON_CLUSTERED_CLUSTERING_WRITER_JOB_EXECUTION_SETTER)
public StepExecutionListener getNonClusteredClusteringWriterJobExecutionSetter(
@Qualifier(NON_CLUSTERED_CLUSTERING_WRITER) ClusteringWriter nonClusteredClusteringWriter) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.listeners;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -8,16 +10,18 @@
import uk.ac.ebi.eva.accession.clustering.batch.listeners.ClusteringProgressListener;
import uk.ac.ebi.eva.accession.clustering.metric.ClusteringMetricCompute;
import uk.ac.ebi.eva.accession.clustering.parameters.InputParameters;
import uk.ac.ebi.eva.accession.core.service.nonhuman.ClusteredVariantAccessioningService;
import uk.ac.ebi.eva.accession.core.service.nonhuman.SubmittedVariantAccessioningService;
import uk.ac.ebi.eva.metrics.configuration.MetricConfiguration;
import uk.ac.ebi.eva.metrics.count.CountServiceParameters;
import uk.ac.ebi.eva.metrics.metric.MetricCompute;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.JOB_EXECUTION_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.PROGRESS_LISTENER;

@Configuration
@Import({MetricConfiguration.class})
public class ListenersConfiguration {

@Bean(PROGRESS_LISTENER)
public ClusteringProgressListener clusteringProgressListener(InputParameters parameters, MetricCompute metricCompute) {
return new ClusteringProgressListener(parameters, metricCompute);
Expand All @@ -28,6 +32,21 @@ public MetricCompute getClusteringMetricCompute(CountServiceParameters countServ
@Qualifier("COUNT_STATS_REST_TEMPLATE") RestTemplate restTemplate,
InputParameters inputParameters) {
return new ClusteringMetricCompute(countServiceParameters, restTemplate, inputParameters.getAssemblyAccession(),
inputParameters.getProjects());
inputParameters.getProjects());
}

@Bean(JOB_EXECUTION_LISTENER)
public JobExecutionListener jobExecutionListener(SubmittedVariantAccessioningService submittedVariantAccessioningService,
ClusteredVariantAccessioningService clusteredVariantAccessioningService) {
return new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {}

@Override
public void afterJob(JobExecution jobExecution) {
submittedVariantAccessioningService.shutDownAccessionGenerator();
clusteredVariantAccessioningService.shutDownAccessionGenerator();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.listeners;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.accession.clustering.batch.recovery.RSAccessionRecoveryService;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_JOB_LISTENER;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_SERVICE;

@Configuration
public class RSAccessionRecoveryJobListenerConfiguration {
@Bean(RS_ACCESSION_RECOVERY_JOB_LISTENER)
public JobExecutionListener jobExecutionListener(@Qualifier(RS_ACCESSION_RECOVERY_SERVICE)
RSAccessionRecoveryService RSAccessionRecoveryService) {
return new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
RSAccessionRecoveryService.setJobExecution(jobExecution);
}

@Override
public void afterJob(JobExecution jobExecution) {
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.recovery;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.ampt2d.commons.accession.generators.monotonic.MonotonicAccessionRecoveryAgent;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.eva.accession.clustering.batch.recovery.RSAccessionRecoveryService;
import uk.ac.ebi.eva.accession.core.service.nonhuman.eva.ClusteredVariantAccessioningDatabaseService;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_SERVICE;

@Configuration
public class RSAccessionRecoveryServiceConfiguration {
@Autowired
private ContiguousIdBlockService blockService;
@Autowired
private ClusteredVariantAccessioningDatabaseService clusteredVariantAccessioningDatabaseService;

@Value("${recovery.cutoff.days}")
private long recoveryCutOffDays;

@Bean(RS_ACCESSION_RECOVERY_SERVICE)
public RSAccessionRecoveryService getMonotonicAccessionRecoveryAgentCategoryRSService() {
return new RSAccessionRecoveryService(getMonotonicAccessionRecoveryAgent(), recoveryCutOffDays);
}

private MonotonicAccessionRecoveryAgent getMonotonicAccessionRecoveryAgent() {
return new MonotonicAccessionRecoveryAgent(blockService, clusteredVariantAccessioningDatabaseService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package uk.ac.ebi.eva.accession.clustering.configuration.batch.steps;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.accession.clustering.batch.recovery.RSAccessionRecoveryService;

import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_SERVICE;
import static uk.ac.ebi.eva.accession.clustering.configuration.BeanNames.RS_ACCESSION_RECOVERY_STEP;

@Configuration
@EnableBatchProcessing
public class RSAccessionRecoveryStepConfiguration {
@Autowired
@Qualifier(RS_ACCESSION_RECOVERY_SERVICE)
private RSAccessionRecoveryService RSAccessionRecoveryService;

@Bean(RS_ACCESSION_RECOVERY_STEP)
public Step monotonicAccessionRecoveryAgentCategoryRSStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get(RS_ACCESSION_RECOVERY_STEP)
.tasklet((contribution, chunkContext) -> {
RSAccessionRecoveryService.runRecoveryForCategoryRS();
return null;
})
.build();
}
}
Loading

0 comments on commit cf8bf77

Please sign in to comment.