From 8488d97ecab5d06e5e9738b2aa5a9941c1e54819 Mon Sep 17 00:00:00 2001 From: nkumar2 Date: Mon, 29 Apr 2024 11:57:10 +0100 Subject: [PATCH] review comments incorporated --- .../clustering/batch/io/RSMergeWriter.java | 74 ++++++------------- 1 file changed, 21 insertions(+), 53 deletions(-) diff --git a/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java b/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java index 069565f06..d2f4a365a 100644 --- a/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java +++ b/eva-accession-clustering/src/main/java/uk/ac/ebi/eva/accession/clustering/batch/io/RSMergeWriter.java @@ -194,20 +194,7 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation) ClusteredVariantEntity mergeDestination = mergeDestinationAndMergees.getLeft(); List mergees = mergeDestinationAndMergees.getRight(); - if (mergees.size() > 1) { - // check if any variant with same hash as mergeDestination already exist in DB - ClusteredVariantEntity existingClusteredVariantEntity = getClusteredVariantEntityWithHash(mergeDestination); - if (existingClusteredVariantEntity != null && existingClusteredVariantEntity.getAccession() != mergeDestination.getAccession()) { - if (mergeDestination.getAccession() == ClusteredVariantMergingPolicy.prioritise(mergeDestination.getAccession(), - existingClusteredVariantEntity.getAccession()).accessionToKeep) { - merge(mergeDestination, existingClusteredVariantEntity, currentOperation); - mergees = mergees.stream() - .filter(cve -> !(cve.equals(existingClusteredVariantEntity) - && cve.getAccession() == existingClusteredVariantEntity.getAccession())) - .collect(Collectors.toList()); - } - } - } + removeMergeesAndInsertMergeDestination(mergeDestination, mergees); for (ClusteredVariantEntity mergee: mergees) { logger.info("RS merge operation: Merging rs{} to rs{} due to hash collision...", @@ -216,17 +203,27 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation) } } - private ClusteredVariantEntity getClusteredVariantEntityWithHash(ClusteredVariantEntity cve) { - ClusteredVariantEntity existingClusteredVariantEntity = null; - existingClusteredVariantEntity = mongoTemplate.findOne(query(where(ID_ATTRIBUTE).is(cve.getHashedMessage())), - ClusteredVariantEntity.class); - if (existingClusteredVariantEntity != null) { - return existingClusteredVariantEntity; - } else { - existingClusteredVariantEntity = mongoTemplate.findOne(query(where(ID_ATTRIBUTE).is(cve.getHashedMessage())), - DbsnpClusteredVariantEntity.class); + private void removeMergeesAndInsertMergeDestination(ClusteredVariantEntity mergeDestination, + List mergeeList) { + List mergeeAccList = mergeeList.stream().map(cve->cve.getAccession()).collect(Collectors.toList()); + Query queryForMergee = query(where(ID_ATTRIBUTE).is(mergeeList.get(0).getHashedMessage()) + .and(ACCESSION_ATTRIBUTE).in(mergeeAccList)) + .addCriteria(where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession)); + mongoTemplate.remove(queryForMergee, ClusteredVariantEntity.class); + mongoTemplate.remove(queryForMergee, DbsnpClusteredVariantEntity.class); + + metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, mergeeList.size()); + + Query queryForMergeDestination = query(where(ID_ATTRIBUTE).is(mergeDestination.getHashedMessage()) + .and(ACCESSION_ATTRIBUTE).is(mergeDestination.getAccession())) + .addCriteria(where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession)); + List clusteredVariantToKeep = + mongoTemplate.find(queryForMergeDestination, clusteringWriter.getClusteredVariantCollection( + mergeDestination.getAccession())); + if (clusteredVariantToKeep.isEmpty()) { + // Insert RS record for destination RS + insertRSRecordForMergeDestination(mergeDestination); } - return existingClusteredVariantEntity; } private ImmutablePair getHashedMessageAndAccessionForSVIE(SubmittedVariantInactiveEntity svie) { @@ -293,35 +290,6 @@ protected void merge(ClusteredVariantEntity mergeDestination, ClusteredVariantEn Long accessionToBeMerged = mergee.getAccession(); Long accessionToKeep = mergeDestination.getAccession(); - //Confine merge updates to the particular assembly where clustering is being performed - Query queryForMergee = query(where(ACCESSION_ATTRIBUTE).is(accessionToBeMerged)) - .addCriteria( - where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession)); - Query queryForMergeTarget = query(where(ACCESSION_ATTRIBUTE).is(accessionToKeep)) - .addCriteria( - where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession)); - - List clusteredVariantToMerge = - mongoTemplate.find(queryForMergee, - clusteringWriter.getClusteredVariantCollection(accessionToBeMerged)); - - List clusteredVariantToKeep = - mongoTemplate.find(queryForMergeTarget, clusteringWriter.getClusteredVariantCollection( - accessionToKeep)); - - if (clusteringWriter.isMultimap(clusteredVariantToMerge) || clusteringWriter.isMultimap(clusteredVariantToKeep)) { - // multimap! don't merge. see isMultimap() below for more details - return; - } - - // Mergee is no longer valid to be present in the main clustered variant collection - mongoTemplate.remove(queryForMergee, clusteringWriter.getClusteredVariantCollection(accessionToBeMerged)); - metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, clusteredVariantToMerge.size()); - - if (clusteredVariantToKeep.isEmpty()) { - // Insert RS record for destination RS - insertRSRecordForMergeDestination(mergeDestination); - } // Record merge operation insertMergeOperation(mergeDestination, mergee);