Skip to content

Commit

Permalink
review comments incorporated
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed Apr 29, 2024
1 parent dab4e0f commit 8488d97
Showing 1 changed file with 21 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,7 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation)
ClusteredVariantEntity mergeDestination = mergeDestinationAndMergees.getLeft();
List<ClusteredVariantEntity> mergees = mergeDestinationAndMergees.getRight();

if (mergees.size() > 1) {
// check if any variant with same hash as mergeDestination already exist in DB
ClusteredVariantEntity existingClusteredVariantEntity = getClusteredVariantEntityWithHash(mergeDestination);
if (existingClusteredVariantEntity != null && existingClusteredVariantEntity.getAccession() != mergeDestination.getAccession()) {
if (mergeDestination.getAccession() == ClusteredVariantMergingPolicy.prioritise(mergeDestination.getAccession(),
existingClusteredVariantEntity.getAccession()).accessionToKeep) {
merge(mergeDestination, existingClusteredVariantEntity, currentOperation);
mergees = mergees.stream()
.filter(cve -> !(cve.equals(existingClusteredVariantEntity)
&& cve.getAccession() == existingClusteredVariantEntity.getAccession()))
.collect(Collectors.toList());
}
}
}
removeMergeesAndInsertMergeDestination(mergeDestination, mergees);

for (ClusteredVariantEntity mergee: mergees) {
logger.info("RS merge operation: Merging rs{} to rs{} due to hash collision...",
Expand All @@ -216,17 +203,27 @@ public void writeRSMerge(SubmittedVariantOperationEntity currentOperation)
}
}

private ClusteredVariantEntity getClusteredVariantEntityWithHash(ClusteredVariantEntity cve) {
ClusteredVariantEntity existingClusteredVariantEntity = null;
existingClusteredVariantEntity = mongoTemplate.findOne(query(where(ID_ATTRIBUTE).is(cve.getHashedMessage())),
ClusteredVariantEntity.class);
if (existingClusteredVariantEntity != null) {
return existingClusteredVariantEntity;
} else {
existingClusteredVariantEntity = mongoTemplate.findOne(query(where(ID_ATTRIBUTE).is(cve.getHashedMessage())),
DbsnpClusteredVariantEntity.class);
private void removeMergeesAndInsertMergeDestination(ClusteredVariantEntity mergeDestination,
List<ClusteredVariantEntity> mergeeList) {
List<Long> mergeeAccList = mergeeList.stream().map(cve->cve.getAccession()).collect(Collectors.toList());
Query queryForMergee = query(where(ID_ATTRIBUTE).is(mergeeList.get(0).getHashedMessage())
.and(ACCESSION_ATTRIBUTE).in(mergeeAccList))
.addCriteria(where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
mongoTemplate.remove(queryForMergee, ClusteredVariantEntity.class);
mongoTemplate.remove(queryForMergee, DbsnpClusteredVariantEntity.class);

metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, mergeeList.size());

Query queryForMergeDestination = query(where(ID_ATTRIBUTE).is(mergeDestination.getHashedMessage())
.and(ACCESSION_ATTRIBUTE).is(mergeDestination.getAccession()))
.addCriteria(where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
List<? extends ClusteredVariantEntity> clusteredVariantToKeep =
mongoTemplate.find(queryForMergeDestination, clusteringWriter.getClusteredVariantCollection(
mergeDestination.getAccession()));
if (clusteredVariantToKeep.isEmpty()) {
// Insert RS record for destination RS
insertRSRecordForMergeDestination(mergeDestination);
}
return existingClusteredVariantEntity;
}

private ImmutablePair<String, Long> getHashedMessageAndAccessionForSVIE(SubmittedVariantInactiveEntity svie) {
Expand Down Expand Up @@ -293,35 +290,6 @@ protected void merge(ClusteredVariantEntity mergeDestination, ClusteredVariantEn
Long accessionToBeMerged = mergee.getAccession();
Long accessionToKeep = mergeDestination.getAccession();

//Confine merge updates to the particular assembly where clustering is being performed
Query queryForMergee = query(where(ACCESSION_ATTRIBUTE).is(accessionToBeMerged))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));
Query queryForMergeTarget = query(where(ACCESSION_ATTRIBUTE).is(accessionToKeep))
.addCriteria(
where(REFERENCE_ASSEMBLY_FIELD_IN_CLUSTERED_VARIANT_COLLECTION).is(this.assemblyAccession));

List<? extends ClusteredVariantEntity> clusteredVariantToMerge =
mongoTemplate.find(queryForMergee,
clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));

List<? extends ClusteredVariantEntity> clusteredVariantToKeep =
mongoTemplate.find(queryForMergeTarget, clusteringWriter.getClusteredVariantCollection(
accessionToKeep));

if (clusteringWriter.isMultimap(clusteredVariantToMerge) || clusteringWriter.isMultimap(clusteredVariantToKeep)) {
// multimap! don't merge. see isMultimap() below for more details
return;
}

// Mergee is no longer valid to be present in the main clustered variant collection
mongoTemplate.remove(queryForMergee, clusteringWriter.getClusteredVariantCollection(accessionToBeMerged));
metricCompute.addCount(ClusteringMetric.CLUSTERED_VARIANTS_UPDATED, clusteredVariantToMerge.size());

if (clusteredVariantToKeep.isEmpty()) {
// Insert RS record for destination RS
insertRSRecordForMergeDestination(mergeDestination);
}
// Record merge operation
insertMergeOperation(mergeDestination, mergee);

Expand Down

0 comments on commit 8488d97

Please sign in to comment.