Skip to content

Commit

Permalink
Fix batching of mdm-clear tasks (#3295)
Browse files Browse the repository at this point in the history
* Fix batching of mdm-clear tasks.

* Fix batching of mdm-clear tasks.

Co-authored-by: ianmarshall <ian@simpatico.ai>
  • Loading branch information
IanMMarshall and ianmarshall committed Jan 13, 2022
1 parent 55ad98d commit e6fd5c9
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 4 deletions.
@@ -0,0 +1,5 @@
---
type: fix
issue: 3295
title: "When $mdm-clear operation batch was split into multiple threads, ResourceVersionConflictExceptions were being
thrown. This issue has been fixed."
Expand Up @@ -676,8 +676,8 @@ This operation takes two optional Parameters.
<td>Integer</td>
<td>0..1</td>
<td>
The number of links that should be deleted at a time. If ommitted, then the batch size will be determined by the value
of [Expunge Batch Size](/apidocs/hapi-fhir-storage/ca/uhn/fhir/jpa/api/config/DaoConfig.html#getExpungeBatchSize())
The number of links that should be deleted at a time. If omitted, then the batch size will be determined by the value
of [Reindex Batch Size](/apidocs/hapi-fhir-storage/ca/uhn/fhir/jpa/api/config/DaoConfig.html#getReindexBatchSize())
property.
</td>
</tr>
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class MdmLinkDeleter implements ItemProcessor<List<Long>, List<Long>> {
public List<Long> process(List<Long> thePidList) throws Exception {
ConcurrentLinkedQueue<Long> goldenPidAggregator = new ConcurrentLinkedQueue<>();
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getReindexBatchSize(), myDaoConfig.getReindexThreadCount());
partitionRunner.runInPartitionedThreads(new SliceImpl<>(thePidList), pids -> removeLinks(thePidList, goldenPidAggregator));
partitionRunner.runInPartitionedThreads(new SliceImpl<>(thePidList), pids -> removeLinks(pids, goldenPidAggregator));
return new ArrayList<>(goldenPidAggregator);
}

Expand Down
Expand Up @@ -95,7 +95,11 @@ public void runInPartitionedThreads(Slice<Long> theResourceIds, Consumer<List<Lo
private List<Callable<Void>> buildCallableTasks(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
List<Callable<Void>> retval = new ArrayList<>();

ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.getContent().size(), myBatchSize);
if (myBatchSize > theResourceIds.getContent().size()) {
ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.getContent().size(), myBatchSize);
} else {
ourLog.info("Creating batch job of {} entries", theResourceIds.getContent().size());
}
List<List<Long>> partitions = Lists.partition(theResourceIds.getContent(), myBatchSize);

for (List<Long> nextPartition : partitions) {
Expand Down
@@ -0,0 +1,63 @@
package ca.uhn.fhir.jpa.batch.mdm.job;

import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class MdmLinkDeleterTest {
@Mock
private DaoConfig myDaoConfig;
@Mock
private PlatformTransactionManager myPlatformTransactionManager;
@Mock
private IMdmLinkDao myMdmLinkDao;

@Captor
ArgumentCaptor<List<Long>> myPidListCaptor;

@InjectMocks
private MdmLinkDeleter myMdmLinkDeleter;

@Test
public void testMdmLinkDeleterSplitsPidList() throws Exception {
int threadCount = 4;
int batchSize = 5;
when(myDaoConfig.getReindexBatchSize()).thenReturn(batchSize);
when(myDaoConfig.getReindexThreadCount()).thenReturn(threadCount);

List<Long> allPidsList = new ArrayList<>();
int count = threadCount*batchSize;
for (long i = 0; i < count; ++i) {
allPidsList.add(i);
}
myMdmLinkDeleter.process(allPidsList);

verify(myMdmLinkDao, times(threadCount)).findAllById(myPidListCaptor.capture());
verify(myMdmLinkDao, times(threadCount)).deleteAll(anyList());
verifyNoMoreInteractions(myMdmLinkDao);
List<List<Long>> pidListList = myPidListCaptor.getAllValues();
assertEquals(threadCount, pidListList.size());
for (List<Long> pidList : pidListList) {
assertEquals(batchSize, pidList.size());
}
}

}

0 comments on commit e6fd5c9

Please sign in to comment.