diff --git a/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java b/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java index 0423fb39b..08c50c380 100644 --- a/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java +++ b/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java @@ -35,6 +35,7 @@ import uk.ac.ebi.eva.accession.clustering.metric.ClusteringMetric; import uk.ac.ebi.eva.accession.core.model.IClusteredVariant; import uk.ac.ebi.eva.accession.core.model.ISubmittedVariant; +import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpClusteredVariantEntity; import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantEntity; import uk.ac.ebi.eva.accession.core.model.dbsnp.DbsnpSubmittedVariantOperationEntity; import uk.ac.ebi.eva.accession.core.model.eva.ClusteredVariantEntity; @@ -191,12 +192,39 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation) ImmutablePair> mergeDestinationAndMergees = getMergeDestinationAndMergees(mergeCandidates); ClusteredVariantEntity mergeDestination = mergeDestinationAndMergees.getLeft(); - List mergees = mergeDestinationAndMergees.getRight(); + + removeMergeesAndInsertMergeDestination(mergeDestination, mergees); + for (ClusteredVariantEntity mergee: mergees) { logger.info("RS merge operation: Merging rs{} to rs{} due to hash collision...", mergee.getAccession(), mergeDestination.getAccession()); - merge(mergeDestination, mergee, currentOperation); + recordMergeOperations(mergeDestination, mergee, currentOperation); + } + } + + private void removeMergeesAndInsertMergeDestination(ClusteredVariantEntity mergeDestination, + List mergeeList) { + Query queryForCheckingExistingCVE = query(where(ID_ATTRIBUTE).is(mergeDestination.getHashedMessage())); + List existingCVEList = mongoTemplate.find(queryForCheckingExistingCVE, ClusteredVariantEntity.class); + List existingDbsnpCVEList = mongoTemplate.find(queryForCheckingExistingCVE, DbsnpClusteredVariantEntity.class); + ClusteredVariantEntity existingCVE = null; + if (existingCVEList != null && !existingCVEList.isEmpty()) { + existingCVE = existingCVEList.get(0); + } + if (existingDbsnpCVEList != null && !existingDbsnpCVEList.isEmpty()) { + existingCVE = existingDbsnpCVEList.get(0); + } + + if (existingCVE == null) { + insertRSRecordForMergeDestination(mergeDestination); + } else { + if (!existingCVE.getAccession().equals(mergeDestination.getAccession())) { + mongoTemplate.remove(query(where(ID_ATTRIBUTE).is(existingCVE.getHashedMessage())), + clusteringWriter.getClusteredVariantCollection(existingCVE.getAccession())); + metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, 1); + insertRSRecordForMergeDestination(mergeDestination); + } } } @@ -259,40 +287,11 @@ private ImmutablePair> getM return new ImmutablePair<>(targetRS, mergees); } - protected void merge(ClusteredVariantEntity mergeDestination, ClusteredVariantEntity mergee, + protected void recordMergeOperations(ClusteredVariantEntity mergeDestination, ClusteredVariantEntity mergee, SubmittedVariantOperationEntity currentOperation) { Long accessionToBeMerged = mergee.getAccession(); Long accessionToKeep = mergeDestination.getAccession(); - //Confine merge updates to the particular assembly where clustering is being performed - Query queryForMergee = query(where(ACCESSION_ATTRIBUTE).is(accessionToBeMerged)) - .addCriteria( - where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession)); - Query queryForMergeTarget = query(where(ACCESSION_ATTRIBUTE).is(accessionToKeep)) - .addCriteria( - where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession)); - - List clusteredVariantToMerge = - mongoTemplate.find(queryForMergee, - clusteringWriter.getClusteredVariantCollection(accessionToBeMerged)); - - List clusteredVariantToKeep = - mongoTemplate.find(queryForMergeTarget, clusteringWriter.getClusteredVariantCollection( - accessionToKeep)); - - if (clusteringWriter.isMultimap(clusteredVariantToMerge) || clusteringWriter.isMultimap(clusteredVariantToKeep)) { - // multimap! don't merge. see isMultimap() below for more details - return; - } - - // Mergee is no longer valid to be present in the main clustered variant collection - mongoTemplate.remove(queryForMergee, clusteringWriter.getClusteredVariantCollection(accessionToBeMerged)); - metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, clusteredVariantToMerge.size()); - - if (clusteredVariantToKeep.isEmpty()) { - // Insert RS record for destination RS - insertRSRecordForMergeDestination(mergeDestination); - } // Record merge operation insertMergeOperation(mergeDestination, mergee); diff --git a/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriterTest.java b/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriterTest.java index 77403c812..7533077fe 100644 --- a/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriterTest.java +++ b/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriterTest.java @@ -432,6 +432,82 @@ public void testMultiLevelRSMerges() throws Exception { assertRSAssociatedWithSS(1L, ss4); } + @Test + @DirtiesContext + public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithHigherAccession() throws Exception { + ss1 = createSS(1L, 1L, 100L, "C", "T"); + ss2 = createSS(2L, 2L, 100L, "C", "A"); + ss3 = createSS(3L, 3L, 100L, "A", "G"); + this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION); + + DbsnpClusteredVariantEntity rs2 = this.createRS(ss2); + this.mongoTemplate.insert(rs2); + + SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity(); + mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE, + ss1.getAccession(), null, "Different RS with matching loci", + Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList())); + mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1)); + this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION); + + List submittedVariantOperationEntities = new ArrayList<>(); + SubmittedVariantOperationEntity temp; + rsMergeCandidatesReader.open(new ExecutionContext()); + while ((temp = rsMergeCandidatesReader.read()) != null) { + submittedVariantOperationEntities.add(temp); + } + + //Perform merge + rsMergeWriter.write(submittedVariantOperationEntities); + + // Check rs2,rs3 merged into to rs1 + assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY); + assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY); + + // Check rs1 to rs3 all have same rs + assertRSAssociatedWithSS(1L, ss1); + assertRSAssociatedWithSS(1L, ss2); + assertRSAssociatedWithSS(1L, ss3); + } + + @Test + @DirtiesContext + public void testRSWithSameHashAsMergeDestinationAlreadyExistsInDBWithLowerAccession() throws Exception { + ss1 = createSS(1L, 1L, 100L, "C", "T"); + ss2 = createSS(2L, 2L, 100L, "C", "A"); + ss3 = createSS(3L, 3L, 100L, "A", "G"); + this.mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3), DBSNP_SUBMITTED_VARIANT_COLLECTION); + + DbsnpClusteredVariantEntity rs1 = this.createRS(ss1); + this.mongoTemplate.insert(rs1); + + SubmittedVariantOperationEntity mergeOperation1 = new SubmittedVariantOperationEntity(); + mergeOperation1.fill(RSMergeAndSplitCandidatesReaderConfiguration.MERGE_CANDIDATES_EVENT_TYPE, + ss1.getAccession(), null, "Different RS with matching loci", + Stream.of(ss1, ss2, ss3).map(SubmittedVariantInactiveEntity::new).collect(Collectors.toList())); + mergeOperation1.setId(ClusteringWriter.getMergeCandidateId(mergeOperation1)); + this.mongoTemplate.insert(Arrays.asList(mergeOperation1), SUBMITTED_VARIANT_OPERATION_COLLECTION); + + List submittedVariantOperationEntities = new ArrayList<>(); + SubmittedVariantOperationEntity temp; + rsMergeCandidatesReader.open(new ExecutionContext()); + while ((temp = rsMergeCandidatesReader.read()) != null) { + submittedVariantOperationEntities.add(temp); + } + + //Perform merge + rsMergeWriter.write(submittedVariantOperationEntities); + + // Check rs2,rs3 merged into to rs1 + assertMergeOp(ss2.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY); + assertMergeOp(ss3.getClusteredVariantAccession(), ss1.getClusteredVariantAccession(), ASSEMBLY); + + // Check rs1 to rs3 all have same rs + assertRSAssociatedWithSS(1L, ss1); + assertRSAssociatedWithSS(1L, ss2); + assertRSAssociatedWithSS(1L, ss3); + } + private void assertMergeOp(Long mergee, Long merger, String assemblyToUse) { assertEquals(0, this.clusteredVariantAccessioningService .getAllActiveByAssemblyAndAccessionIn(assemblyToUse, Arrays.asList(mergee)).size()); diff --git a/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/clustering_writer/MergeAccessionClusteringWriterTest.java b/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/clustering_writer/MergeAccessionClusteringWriterTest.java index 6f3909312..b0ef3d4e2 100644 --- a/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/clustering_writer/MergeAccessionClusteringWriterTest.java +++ b/eva-accession-clustering/src/test/java/uk/ac/ebi/eva/accession/clustering/batch/io/clustering_writer/MergeAccessionClusteringWriterTest.java @@ -498,87 +498,6 @@ public void do_not_merge_remapped_multimap_variants() throws Exception { assertAssembliesPresent(Sets.newTreeSet(asm1, asm2)); } - @Test - @DirtiesContext - public void do_not_merge_multimap_variants_into_remapped_variants() throws Exception { - // given - Long rs1 = 3000000000L; - Long rs2 = 3100000000L; - Long ssToRemap = 5000000000L; - Long ss2 = 5100000000L; - Long ss3 = 5200000000L; - String asm1 = "asm1"; - String asm2 = "asm2"; - assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0); - - mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1)); - - mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED), - getSubmittedCollection(ssToRemap)); - - - // NOTE: rs2 won't be merged into rs1 because rs2 is multimap - mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2)); - mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, null), getClusteredTable(rs2)); - - mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED, 3), - getSubmittedCollection(ss2)); - mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED), - getSubmittedCollection(ss2)); - - assertDatabaseCounts(0, 3, 0, 0, - 0, 3, 0, 0); - - // when - SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1); - this.clusterVariants(Collections.singletonList(sve1Remapped)); - - // then - assertDatabaseCounts(0, 3, 0, 0, - 0, 3, 0, 0); - - assertAssembliesPresent(Sets.newTreeSet(asm1, asm2)); - } - - @Test - @DirtiesContext - public void do_not_merge_remapped_variant_into_multimap_variants() throws Exception { - // given - Long rs1 = 3100000000L; - Long rs2 = 3000000000L; - Long ssToRemap = 5500000000L; - Long ss2 = 5100000000L; - Long ss3 = 5200000000L; - String asm1 = "asm1"; - String asm2 = "asm2"; - assertDatabaseCounts(0, 0, 0, 0, 0, 0, 0, 0); - - // NOTE rs1 won't be merged into rs2 because rs2 is multimap - mongoTemplate.insert(createClusteredVariantEntity(asm1, 100L, rs1, null), getClusteredTable(rs1)); - - mongoTemplate.insert(createSubmittedVariantEntity(asm1, 100L, rs1, ssToRemap, NOT_REMAPPED), getSubmittedCollection(ssToRemap)); - - - mongoTemplate.insert(createClusteredVariantEntity(asm2, 200L, rs2, 3), getClusteredTable(rs2)); - mongoTemplate.insert(createClusteredVariantEntity(asm2, 300L, rs2, 3), getClusteredTable(rs2)); - - mongoTemplate.insert(createSubmittedVariantEntity(asm2, 200L, rs2, ss2, NOT_REMAPPED), getSubmittedCollection(ss2)); - mongoTemplate.insert(createSubmittedVariantEntity(asm2, 300L, rs2, ss3, NOT_REMAPPED), getSubmittedCollection(ss2)); - - assertDatabaseCounts(0, 3, 0, 0, - 0, 3, 0, 0); - - // when - SubmittedVariantEntity sve1Remapped = createSubmittedVariantEntity(asm2, 200L, rs1, ssToRemap, asm1); - this.clusterVariants(Collections.singletonList(sve1Remapped)); - - // then - assertDatabaseCounts(0, 3, 0, 0, - 0, 3, 0, 0); - - assertAssembliesPresent(Sets.newTreeSet(asm1, asm2)); - } - @Test @DirtiesContext public void merge_into_remapped_multimap_variants_if_single_mapping_per_assembly()