Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix batching of mdm-clear tasks #3295

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,5 @@
---
type: fix
issue: 3295
title: "When $mdm-clear operation batch was split into multiple threads, ResourceVersionConflictExceptions were being
thrown. This issue has been fixed."
Expand Up @@ -676,8 +676,8 @@ This operation takes two optional Parameters.
<td>Integer</td>
<td>0..1</td>
<td>
The number of links that should be deleted at a time. If ommitted, then the batch size will be determined by the value
of [Expunge Batch Size](/apidocs/hapi-fhir-storage/ca/uhn/fhir/jpa/api/config/DaoConfig.html#getExpungeBatchSize())
The number of links that should be deleted at a time. If omitted, then the batch size will be determined by the value
of [Reindex Batch Size](/apidocs/hapi-fhir-storage/ca/uhn/fhir/jpa/api/config/DaoConfig.html#getReindexBatchSize())
property.
</td>
</tr>
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class MdmLinkDeleter implements ItemProcessor<List<Long>, List<Long>> {
public List<Long> process(List<Long> thePidList) throws Exception {
ConcurrentLinkedQueue<Long> goldenPidAggregator = new ConcurrentLinkedQueue<>();
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getReindexBatchSize(), myDaoConfig.getReindexThreadCount());
partitionRunner.runInPartitionedThreads(new SliceImpl<>(thePidList), pids -> removeLinks(thePidList, goldenPidAggregator));
partitionRunner.runInPartitionedThreads(new SliceImpl<>(thePidList), pids -> removeLinks(pids, goldenPidAggregator));
return new ArrayList<>(goldenPidAggregator);
}

Expand Down
Expand Up @@ -95,7 +95,11 @@ public void runInPartitionedThreads(Slice<Long> theResourceIds, Consumer<List<Lo
private List<Callable<Void>> buildCallableTasks(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
List<Callable<Void>> retval = new ArrayList<>();

ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.getContent().size(), myBatchSize);
if (myBatchSize > theResourceIds.getContent().size()) {
ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.getContent().size(), myBatchSize);
} else {
ourLog.info("Creating batch job of {} entries", theResourceIds.getContent().size());
}
List<List<Long>> partitions = Lists.partition(theResourceIds.getContent(), myBatchSize);

for (List<Long> nextPartition : partitions) {
Expand Down
@@ -0,0 +1,63 @@
package ca.uhn.fhir.jpa.batch.mdm.job;

import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class MdmLinkDeleterTest {
@Mock
private DaoConfig myDaoConfig;
@Mock
private PlatformTransactionManager myPlatformTransactionManager;
@Mock
private IMdmLinkDao myMdmLinkDao;

@Captor
ArgumentCaptor<List<Long>> myPidListCaptor;

@InjectMocks
private MdmLinkDeleter myMdmLinkDeleter;

@Test
public void testMdmLinkDeleterSplitsPidList() throws Exception {
int threadCount = 4;
int batchSize = 5;
when(myDaoConfig.getReindexBatchSize()).thenReturn(batchSize);
when(myDaoConfig.getReindexThreadCount()).thenReturn(threadCount);

List<Long> allPidsList = new ArrayList<>();
int count = threadCount*batchSize;
for (long i = 0; i < count; ++i) {
allPidsList.add(i);
}
myMdmLinkDeleter.process(allPidsList);

verify(myMdmLinkDao, times(threadCount)).findAllById(myPidListCaptor.capture());
verify(myMdmLinkDao, times(threadCount)).deleteAll(anyList());
verifyNoMoreInteractions(myMdmLinkDao);
List<List<Long>> pidListList = myPidListCaptor.getAllValues();
assertEquals(threadCount, pidListList.size());
for (List<Long> pidList : pidListList) {
assertEquals(batchSize, pidList.size());
}
}

}