Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3543 check if rs with same hash already exist before merging #441

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.mongodb.MongoBulkWriteException;

import com.mongodb.client.result.DeleteResult;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +36,7 @@
import uk.ac.ebi.eva.accession.clustering.metric.ClusteringMetric;
import uk.ac.ebi.eva.accession.core.model.IClusteredVariant;
import uk.ac.ebi.eva.accession.core.model.ISubmittedVariant;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpClusteredVariantEntity;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantEntity;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantOperationEntity;
import uk.ac.ebi.eva.accession.core.model.eva.ClusteredVariantEntity;
Expand Down Expand Up @@ -191,15 +193,40 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation)
ImmutablePair<ClusteredVariantEntity, List<ClusteredVariantEntity>> mergeDestinationAndMergees =
getMergeDestinationAndMergees(mergeCandidates);
ClusteredVariantEntity mergeDestination = mergeDestinationAndMergees.getLeft();

List<ClusteredVariantEntity> mergees = mergeDestinationAndMergees.getRight();

removeMergeesAndInsertMergeDestination(mergeDestination, mergees);

for (ClusteredVariantEntity mergee: mergees) {
logger.info("RS merge operation: Merging rs{} to rs{} due to hash collision...",
mergee.getAccession(), mergeDestination.getAccession());
merge(mergeDestination, mergee, currentOperation);
tcezard marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void removeMergeesAndInsertMergeDestination(ClusteredVariantEntity mergeDestination,
tcezard marked this conversation as resolved.
Show resolved Hide resolved
List<ClusteredVariantEntity> mergeeList) {
List<Long> mergeeAccList = mergeeList.stream().map(cve->cve.getAccession()).collect(Collectors.toList());
Query queryForMergee = query(where(ACCESSION_ATTRIBUTE).in(mergeeAccList))
.addCriteria(where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
DeleteResult deleteResultCVE = mongoTemplate.remove(queryForMergee, ClusteredVariantEntity.class);
DeleteResult deleteResultDbsnpSVE = mongoTemplate.remove(queryForMergee, DbsnpClusteredVariantEntity.class);

metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED,
deleteResultCVE.getDeletedCount() + deleteResultDbsnpSVE.getDeletedCount());

Query queryForMergeDestination = query(where(ID_ATTRIBUTE).is(mergeDestination.getHashedMessage())
.and(ACCESSION_ATTRIBUTE).is(mergeDestination.getAccession()))
.addCriteria(where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
List<? extends ClusteredVariantEntity> clusteredVariantToKeep =
mongoTemplate.find(queryForMergeDestination, clusteringWriter.getClusteredVariantCollection(
mergeDestination.getAccession()));
if (clusteredVariantToKeep.isEmpty()) {
// Insert RS record for destination RS
insertRSRecordForMergeDestination(mergeDestination);
}
}

private ImmutablePair<String, Long> getHashedMessageAndAccessionForSVIE(SubmittedVariantInactiveEntity svie) {
return new ImmutablePair<>(svie.getHashedMessage(), svie.getAccession());
}
Expand Down Expand Up @@ -264,35 +291,6 @@ protected void merge(ClusteredVariantEntity mergeDestination, ClusteredVariantEn
Long accessionToBeMerged = mergee.getAccession();
Long accessionToKeep = mergeDestination.getAccession();

//Confine merge updates to the particular assembly where clustering is being performed
Query queryForMergee = query(where(ACCESSION_ATTRIBUTE).is(accessionToBeMerged))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
Query queryForMergeTarget = query(where(ACCESSION_ATTRIBUTE).is(accessionToKeep))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));

List<? extends ClusteredVariantEntity> clusteredVariantToMerge =
mongoTemplate.find(queryForMergee,
clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));

List<? extends ClusteredVariantEntity> clusteredVariantToKeep =
mongoTemplate.find(queryForMergeTarget, clusteringWriter.getClusteredVariantCollection(
accessionToKeep));

if (clusteringWriter.isMultimap(clusteredVariantToMerge) || clusteringWriter.isMultimap(clusteredVariantToKeep)) {
// multimap! don't merge. see isMultimap() below for more details
return;
}

// Mergee is no longer valid to be present in the main clustered variant collection
mongoTemplate.remove(queryForMergee, clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));
metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, clusteredVariantToMerge.size());

if (clusteredVariantToKeep.isEmpty()) {
// Insert RS record for destination RS
insertRSRecordForMergeDestination(mergeDestination);
}
// Record merge operation
insertMergeOperation(mergeDestination, mergee);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,82 @@ public void testMultiLevelRSMerges() throws Exception {
assertRSAssociatedWithSS(1L, ss4);
}

@Test
@DirtiesContext
public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithHigherAccession() throws Exception {
ss1 = createSS(1L, 1L, 100L, "C", "T");
ss2 = createSS(2L, 2L, 100L, "C", "A");
ss3 = createSS(3L, 3L, 100L, "A", "G");
this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION);

DbsnpClusteredVariantEntity rs2 = this.createRS(ss2);
this.mongoTemplate.insert(rs2);

SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity();
mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE,
ss1.getAccession(), null, "Different RS with matching loci",
Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList()));
mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1));
this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION);

List<SubmittedVariantOperationEntity> submittedVariantOperationEntities = new ArrayList<>();
SubmittedVariantOperationEntity temp;
rsMergeCandidatesReader.open(new ExecutionContext());
while ((temp = rsMergeCandidatesReader.read()) != null) {
submittedVariantOperationEntities.add(temp);
}

//Perform merge
rsMergeWriter.write(submittedVariantOperationEntities);

// Check rs2,rs3 merged into to rs1
assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);
assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);

// Check rs1 to rs3 all have same rs
assertRSAssociatedWithSS(1L, ss1);
assertRSAssociatedWithSS(1L, ss2);
assertRSAssociatedWithSS(1L, ss3);
}

@Test
@DirtiesContext
public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithLowerAccession() throws Exception {
ss1 = createSS(1L, 1L, 100L, "C", "T");
ss2 = createSS(2L, 2L, 100L, "C", "A");
ss3 = createSS(3L, 3L, 100L, "A", "G");
this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION);

DbsnpClusteredVariantEntity rs1 = this.createRS(ss1);
this.mongoTemplate.insert(rs1);

SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity();
mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE,
ss1.getAccession(), null, "Different RS with matching loci",
Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList()));
mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1));
this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION);

List<SubmittedVariantOperationEntity> submittedVariantOperationEntities = new ArrayList<>();
SubmittedVariantOperationEntity temp;
rsMergeCandidatesReader.open(new ExecutionContext());
while ((temp = rsMergeCandidatesReader.read()) != null) {
submittedVariantOperationEntities.add(temp);
}

//Perform merge
rsMergeWriter.write(submittedVariantOperationEntities);

// Check rs2,rs3 merged into to rs1
assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);
assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);

// Check rs1 to rs3 all have same rs
assertRSAssociatedWithSS(1L, ss1);
assertRSAssociatedWithSS(1L, ss2);
assertRSAssociatedWithSS(1L, ss3);
}

private void assertMergeOp(Long mergee, Long merger, String assemblyToUse) {
assertEquals(0, this.clusteredVariantAccessioningService
.getAllActiveByAssemblyAndAccessionIn(assemblyToUse, Arrays.asList(mergee)).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,87 +498,6 @@ public void do_not_merge_remapped_multimap_variants() throws Exception {
assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void do_not_merge_multimap_variants_into_remapped_variants() throws Exception {
// given
Long rs1 = 3000000000L;
Long rs2 = 3100000000L;
Long ssToRemap = 5000000000L;
Long ss2 = 5100000000L;
Long ss3 = 5200000000L;
String asm1 = "asm1";
String asm2 = "asm2";
assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0);

mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1));

mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED),
getSubmittedCollection(ssToRemap));


// NOTE: rs2 won't be merged into rs1 because rs2 is multimap
mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2));
mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, null), getClusteredTable(rs2));

mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED, 3),
getSubmittedCollection(ss2));
mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED),
getSubmittedCollection(ss2));

assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

// when
SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1);
this.clusterVariants(Collections.singletonList(sve1Remapped));

// then
assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void do_not_merge_remapped_variant_into_multimap_variants() throws Exception {
// given
Long rs1 = 3100000000L;
Long rs2 = 3000000000L;
Long ssToRemap = 5500000000L;
Long ss2 = 5100000000L;
Long ss3 = 5200000000L;
String asm1 = "asm1";
String asm2 = "asm2";
assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0);

// NOTE rs1 won't be merged into rs2 because rs2 is multimap
mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1));

mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED), getSubmittedCollection(ssToRemap));


mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2));
mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, 3), getClusteredTable(rs2));

mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED), getSubmittedCollection(ss2));
mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED), getSubmittedCollection(ss2));

assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

// when
SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1);
this.clusterVariants(Collections.singletonList(sve1Remapped));

// then
assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void merge_into_remapped_multimap_variants_if_single_mapping_per_assembly()
Expand Down
Loading