Skip to content

Commit

Permalink
Cleanup cohort results after delete.
Browse files Browse the repository at this point in the history
Fixes #309.
  • Loading branch information
chrisknoll committed Feb 7, 2018
1 parent 711ea6a commit efaca11
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 9 deletions.
8 changes: 8 additions & 0 deletions src/main/java/org/ohdsi/webapi/DataAccessConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public TransactionTemplate transactionTemplateRequiresNew(PlatformTransactionMan
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return transactionTemplate;
}

@Bean
public TransactionTemplate transactionTemplateNoTransaction(PlatformTransactionManager transactionManager) {
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager(transactionManager);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
return transactionTemplate;
}

/*
public String getSparqlEndpoint()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2018 cknoll1.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ohdsi.webapi.cohortdefinition;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.ohdsi.circe.helper.ResourceHelper;
import org.ohdsi.sql.SqlRender;
import org.ohdsi.sql.SqlTranslate;
import org.ohdsi.webapi.source.Source;
import org.ohdsi.webapi.source.SourceDaimon;
import org.ohdsi.webapi.source.SourceRepository;
import org.ohdsi.webapi.util.SessionUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;

/**
*
* @author cknoll1
*/
public class CleanupCohortTasklet implements Tasklet {


private static final Log log = LogFactory.getLog(CleanupCohortTasklet.class);

private final TransactionTemplate transactionTemplate;
private final SourceRepository sourceRepository;

private final String CLEANUP_TEMPLATE = ResourceHelper.GetResourceAsString("/resources/cohortdefinition/sql/cleanupResults.sql");

public CleanupCohortTasklet(final TransactionTemplate transactionTemplate,
final SourceRepository sourceRepository) {
this.transactionTemplate = transactionTemplate;
this.sourceRepository = sourceRepository;
}

private JdbcTemplate getSourceJdbcTemplate(Source source) {
DriverManagerDataSource dataSource = new DriverManagerDataSource(source.getSourceConnection());
JdbcTemplate template = new JdbcTemplate(dataSource);
return template;
}

private Integer doTask(ChunkContext chunkContext) {
int sourcesUpdated = 0;

Map<String, Object> jobParams = chunkContext.getStepContext().getJobParameters();
Integer cohortId = Integer.valueOf(jobParams.get("cohort_definition_id").toString());
String sessionId = SessionUtils.sessionId();

List<Source> resultsSources = StreamSupport.stream(this.sourceRepository.findAll().spliterator(), false)
.filter(source->source.getDaimons().stream().filter(daimon->daimon.getDaimonType() == SourceDaimon.DaimonType.Results).findAny().isPresent())
.collect(Collectors.toList());

for (Source source : resultsSources) {
try {
String resultSchema = source.getTableQualifier(SourceDaimon.DaimonType.Results);
String deleteSql = SqlRender.renderSql(CLEANUP_TEMPLATE, new String[]{"results_database_schema", "cohort_definition_id"}, new String[]{resultSchema, cohortId.toString()});
deleteSql = SqlTranslate.translateSql(deleteSql, source.getSourceDialect(), sessionId, null);

getSourceJdbcTemplate(source).batchUpdate(deleteSql.split(";")); // use batch update since SQL translation may produce multiple statements
sourcesUpdated++;
} catch (Exception e) {
log.error(String.format("Error deleting results for cohort: %d", cohortId));
}
}
return sourcesUpdated;
}

@Override
public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {

final Integer ret = this.transactionTemplate.execute(new TransactionCallback<Integer>() {
@Override
public Integer doInTransaction(final TransactionStatus status) {
return doTask(chunkContext);
}
});

return RepeatStatus.FINISHED;
}

}
13 changes: 8 additions & 5 deletions src/main/java/org/ohdsi/webapi/service/AbstractDaoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public ComparativeCohortAnalysisExecutionRepository getComparativeCohortAnalysis
@Autowired
private TransactionTemplate transactionTemplateRequiresNew;

@Autowired
private TransactionTemplate transactionTemplateNoTransaction;

public SourceRepository getSourceRepository() {
return sourceRepository;
}
Expand Down Expand Up @@ -197,13 +200,13 @@ public TransactionTemplate getTransactionTemplateRequiresNew() {
return transactionTemplateRequiresNew;
}

/**
* @param transactionTemplateRequiresNew the transactionTemplateRequiresNew to
* set
/**
* @return the transactionTemplateNoTransaction
*/
public void setTransactionTemplateRequiresNew(TransactionTemplate transactionTemplateRequiresNew) {
this.transactionTemplateRequiresNew = transactionTemplateRequiresNew;
public TransactionTemplate getTransactionTemplateNoTransaction() {
return transactionTemplateNoTransaction;
}


/**
* @return the ohdsiSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.ohdsi.webapi.cohortdefinition.ExpressionType;
import org.ohdsi.webapi.cohortdefinition.GenerateCohortTasklet;
import org.ohdsi.webapi.GenerationStatus;
import org.ohdsi.webapi.cohortdefinition.CleanupCohortTasklet;
import org.ohdsi.webapi.cohortdefinition.GenerationJobExecutionListener;
import org.ohdsi.webapi.cohortdefinition.InclusionRuleReport;
import org.ohdsi.webapi.cohortfeatures.GenerateCohortFeaturesTasklet;
Expand All @@ -73,14 +74,14 @@
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;

/**
*
Expand Down Expand Up @@ -560,9 +561,37 @@ public CohortDefinitionDTO copy(@PathParam("id") final int id) {
@Produces(MediaType.APPLICATION_JSON)
@Path("/{id}")
public void delete(@PathParam("id") final int id) {
cohortDefinitionRepository.delete(id);
}


// perform the JPA update in a separate transaction
this.getTransactionTemplateRequiresNew().execute(new TransactionCallbackWithoutResult() {
@Override
public void doInTransactionWithoutResult(final TransactionStatus status) {
cohortDefinitionRepository.delete(id);
}
});

JobParametersBuilder builder = new JobParametersBuilder();
builder.addString("jobName", String.format("Cleanup cohort %d.",id));
builder.addString("cohort_definition_id", ("" + id));

final JobParameters jobParameters = builder.toJobParameters();

log.info(String.format("Beginning cohort cleanup for cohort definition id: \n %s", "" + id));

CleanupCohortTasklet cleanupTasklet = new CleanupCohortTasklet(this.getTransactionTemplateNoTransaction(),this.getSourceRepository());

Step cleanupStep = stepBuilders.get("cohortDefinition.cleanupCohort")
.tasklet(cleanupTasklet)
.build();

SimpleJobBuilder cleanupJobBuilder = jobBuilders.get("cleanupCohort")
.start(cleanupStep);

Job cleanupCohortJob = cleanupJobBuilder.build();

this.jobTemplate.launch(cleanupCohortJob, jobParameters);
}

private ArrayList<ConceptSetExport> getConceptSetExports(CohortDefinition def, SourceInfo vocabSource) throws RuntimeException {
ArrayList<ConceptSetExport> exports = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
delete from @results_database_schema.cohort where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_features where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_features_analysis_ref where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_features_dist where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_features_ref where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_inclusion where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_inclusion_result where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_inclusion_stats where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.cohort_summary_stats where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.heracles_heel_results where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.heracles_results where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.heracles_results_dist where cohort_definition_id = @cohort_definition_id;
delete from @results_database_schema.ir_analysis_dist where target_id = @cohort_definition_id or outcome_id = @cohort_definition_id;
delete from @results_database_schema.ir_analysis_result where target_id = @cohort_definition_id or outcome_id = @cohort_definition_id;
delete from @results_database_schema.ir_analysis_dist where target_id = @cohort_definition_id or outcome_id = @cohort_definition_id;

0 comments on commit efaca11

Please sign in to comment.