Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3543 check if rs with same hash already exist before merging #441

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import uk.ac.ebi.eva.accession.clustering.metric.ClusteringMetric;
import uk.ac.ebi.eva.accession.core.model.IClusteredVariant;
import uk.ac.ebi.eva.accession.core.model.ISubmittedVariant;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpClusteredVariantEntity;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantEntity;
import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantOperationEntity;
import uk.ac.ebi.eva.accession.core.model.eva.ClusteredVariantEntity;
Expand Down Expand Up @@ -191,12 +192,39 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation)
ImmutablePair<ClusteredVariantEntity, List<ClusteredVariantEntity>> mergeDestinationAndMergees =
getMergeDestinationAndMergees(mergeCandidates);
ClusteredVariantEntity mergeDestination = mergeDestinationAndMergees.getLeft();

List<ClusteredVariantEntity> mergees = mergeDestinationAndMergees.getRight();

removeMergeesAndInsertMergeDestination(mergeDestination, mergees);

for (ClusteredVariantEntity mergee: mergees) {
logger.info("RS merge operation: Merging rs{} to rs{} due to hash collision...",
mergee.getAccession(), mergeDestination.getAccession());
merge(mergeDestination, mergee, currentOperation);
recordMergeOperations(mergeDestination, mergee, currentOperation);
}
}

private void removeMergeesAndInsertMergeDestination(ClusteredVariantEntity mergeDestination,
tcezard marked this conversation as resolved.
Show resolved Hide resolved
List<ClusteredVariantEntity> mergeeList) {
Query queryForCheckingExistingCVE = query(where(ID_ATTRIBUTE).is(mergeDestination.getHashedMessage()));
List<ClusteredVariantEntity> existingCVEList = mongoTemplate.find(queryForCheckingExistingCVE, ClusteredVariantEntity.class);
List<DbsnpClusteredVariantEntity> existingDbsnpCVEList = mongoTemplate.find(queryForCheckingExistingCVE, DbsnpClusteredVariantEntity.class);
ClusteredVariantEntity existingCVE = null;
if (existingCVEList != null && !existingCVEList.isEmpty()) {
existingCVE = existingCVEList.get(0);
}
if (existingDbsnpCVEList != null && !existingDbsnpCVEList.isEmpty()) {
existingCVE = existingDbsnpCVEList.get(0);
}

if (existingCVE == null) {
insertRSRecordForMergeDestination(mergeDestination);
} else {
if (!existingCVE.getAccession().equals(mergeDestination.getAccession())) {
mongoTemplate.remove(query(where(ID_ATTRIBUTE).is(existingCVE.getHashedMessage())),
clusteringWriter.getClusteredVariantCollection(existingCVE.getAccession()));
metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, 1);
insertRSRecordForMergeDestination(mergeDestination);
}
}
}

Expand Down Expand Up @@ -259,40 +287,11 @@ private ImmutablePair<ClusteredVariantEntity, List<ClusteredVariantEntity>> getM
return new ImmutablePair<>(targetRS, mergees);
}

protected void merge(ClusteredVariantEntity mergeDestination, ClusteredVariantEntity mergee,
protected void recordMergeOperations(ClusteredVariantEntity mergeDestination, ClusteredVariantEntity mergee,
SubmittedVariantOperationEntity currentOperation) {
Long accessionToBeMerged = mergee.getAccession();
Long accessionToKeep = mergeDestination.getAccession();

//Confine merge updates to the particular assembly where clustering is being performed
Query queryForMergee = query(where(ACCESSION_ATTRIBUTE).is(accessionToBeMerged))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
Query queryForMergeTarget = query(where(ACCESSION_ATTRIBUTE).is(accessionToKeep))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));

List<? extends ClusteredVariantEntity> clusteredVariantToMerge =
mongoTemplate.find(queryForMergee,
clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));

List<? extends ClusteredVariantEntity> clusteredVariantToKeep =
mongoTemplate.find(queryForMergeTarget, clusteringWriter.getClusteredVariantCollection(
accessionToKeep));

if (clusteringWriter.isMultimap(clusteredVariantToMerge) || clusteringWriter.isMultimap(clusteredVariantToKeep)) {
// multimap! don't merge. see isMultimap() below for more details
return;
}

// Mergee is no longer valid to be present in the main clustered variant collection
mongoTemplate.remove(queryForMergee, clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));
metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, clusteredVariantToMerge.size());

if (clusteredVariantToKeep.isEmpty()) {
// Insert RS record for destination RS
insertRSRecordForMergeDestination(mergeDestination);
}
// Record merge operation
insertMergeOperation(mergeDestination, mergee);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,82 @@ public void testMultiLevelRSMerges() throws Exception {
assertRSAssociatedWithSS(1L, ss4);
}

@Test
@DirtiesContext
public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithHigherAccession() throws Exception {
ss1 = createSS(1L, 1L, 100L, "C", "T");
ss2 = createSS(2L, 2L, 100L, "C", "A");
ss3 = createSS(3L, 3L, 100L, "A", "G");
this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION);

DbsnpClusteredVariantEntity rs2 = this.createRS(ss2);
this.mongoTemplate.insert(rs2);

SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity();
mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE,
ss1.getAccession(), null, "Different RS with matching loci",
Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList()));
mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1));
this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION);

List<SubmittedVariantOperationEntity> submittedVariantOperationEntities = new ArrayList<>();
SubmittedVariantOperationEntity temp;
rsMergeCandidatesReader.open(new ExecutionContext());
while ((temp = rsMergeCandidatesReader.read()) != null) {
submittedVariantOperationEntities.add(temp);
}

//Perform merge
rsMergeWriter.write(submittedVariantOperationEntities);

// Check rs2,rs3 merged into to rs1
assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);
assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);

// Check rs1 to rs3 all have same rs
assertRSAssociatedWithSS(1L, ss1);
assertRSAssociatedWithSS(1L, ss2);
assertRSAssociatedWithSS(1L, ss3);
}

@Test
@DirtiesContext
public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithLowerAccession() throws Exception {
ss1 = createSS(1L, 1L, 100L, "C", "T");
ss2 = createSS(2L, 2L, 100L, "C", "A");
ss3 = createSS(3L, 3L, 100L, "A", "G");
this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION);

DbsnpClusteredVariantEntity rs1 = this.createRS(ss1);
this.mongoTemplate.insert(rs1);

SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity();
mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE,
ss1.getAccession(), null, "Different RS with matching loci",
Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList()));
mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1));
this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION);

List<SubmittedVariantOperationEntity> submittedVariantOperationEntities = new ArrayList<>();
SubmittedVariantOperationEntity temp;
rsMergeCandidatesReader.open(new ExecutionContext());
while ((temp = rsMergeCandidatesReader.read()) != null) {
submittedVariantOperationEntities.add(temp);
}

//Perform merge
rsMergeWriter.write(submittedVariantOperationEntities);

// Check rs2,rs3 merged into to rs1
assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);
assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY);

// Check rs1 to rs3 all have same rs
assertRSAssociatedWithSS(1L, ss1);
assertRSAssociatedWithSS(1L, ss2);
assertRSAssociatedWithSS(1L, ss3);
}

private void assertMergeOp(Long mergee, Long merger, String assemblyToUse) {
assertEquals(0, this.clusteredVariantAccessioningService
.getAllActiveByAssemblyAndAccessionIn(assemblyToUse, Arrays.asList(mergee)).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,87 +498,6 @@ public void do_not_merge_remapped_multimap_variants() throws Exception {
assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void do_not_merge_multimap_variants_into_remapped_variants() throws Exception {
// given
Long rs1 = 3000000000L;
Long rs2 = 3100000000L;
Long ssToRemap = 5000000000L;
Long ss2 = 5100000000L;
Long ss3 = 5200000000L;
String asm1 = "asm1";
String asm2 = "asm2";
assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0);

mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1));

mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED),
getSubmittedCollection(ssToRemap));


// NOTE: rs2 won't be merged into rs1 because rs2 is multimap
mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2));
mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, null), getClusteredTable(rs2));

mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED, 3),
getSubmittedCollection(ss2));
mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED),
getSubmittedCollection(ss2));

assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

// when
SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1);
this.clusterVariants(Collections.singletonList(sve1Remapped));

// then
assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void do_not_merge_remapped_variant_into_multimap_variants() throws Exception {
// given
Long rs1 = 3100000000L;
Long rs2 = 3000000000L;
Long ssToRemap = 5500000000L;
Long ss2 = 5100000000L;
Long ss3 = 5200000000L;
String asm1 = "asm1";
String asm2 = "asm2";
assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0);

// NOTE rs1 won't be merged into rs2 because rs2 is multimap
mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1));

mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED), getSubmittedCollection(ssToRemap));


mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2));
mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, 3), getClusteredTable(rs2));

mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED), getSubmittedCollection(ss2));
mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED), getSubmittedCollection(ss2));

assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

// when
SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1);
this.clusterVariants(Collections.singletonList(sve1Remapped));

// then
assertDatabaseCounts(0, 3, 0, 0,
0, 3, 0, 0);

assertAssembliesPresent(Sets.newTreeSet(asm1, asm2));
}

@Test
@DirtiesContext
public void merge_into_remapped_multimap_variants_if_single_mapping_per_assembly()
Expand Down
Loading