Skip to content

Commit

Permalink
[CARBONDATA-2895] Fix Query result count is more than actual csv rows…
Browse files Browse the repository at this point in the history
… with Batch-sort in save to disk (sort temp files) scenario

probelm: Query result mismatch with Batch-sort in save to disk (sort
temp files) scenario.

scenario:
a) Configure batchsort but give batch size more than
UnsafeMemoryManager.INSTANCE.getUsableMemory().
b) Load data that is greater than batch size. Observe that
unsafeMemoryManager save to disk happened as it cannot process one
batch.
c) so load happens in 2 batch.
d) When query the results. There result data rows is more than expected
data rows.

root cause:

For each batch, createSortDataRows() will be called.
Files saved to disk during sorting of previous batch was considered for
this batch.

solution:
Files saved to disk during sorting of previous batch ,should not be
considered for this batch.
Hence use batchID as rangeID field of sorttempfiles.
So getFilesToMergeSort() will select files of only this batch.

This closes #2664
  • Loading branch information
ajantha-bhat authored and kumarvishal09 committed Sep 5, 2018
1 parent 94d2089 commit 50248f5
Showing 1 changed file with 13 additions and 2 deletions.
Expand Up @@ -62,12 +62,17 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter

private AtomicLong rowCounter;

/* will be incremented for each batch. This ID is used in sort temp files name,
to identify files of that batch */
private AtomicInteger batchId;

public UnsafeBatchParallelReadMergeSorterImpl(AtomicLong rowCounter) {
this.rowCounter = rowCounter;
}

@Override public void initialize(SortParameters sortParameters) {
this.sortParameters = sortParameters;
batchId = new AtomicInteger(0);

}

Expand Down Expand Up @@ -172,7 +177,7 @@ public void run() {

}

private static class SortBatchHolder
private class SortBatchHolder
extends CarbonIterator<UnsafeSingleThreadFinalSortFilesMerger> {

private SortParameters sortParameters;
Expand All @@ -193,7 +198,7 @@ private static class SortBatchHolder

private final Object lock = new Object();

public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
ThreadStatusObserver threadStatusObserver) {
this.sortParameters = sortParameters.getCopy();
this.iteratorCount = new AtomicInteger(numberOfThreads);
Expand All @@ -203,6 +208,12 @@ public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
}

private void createSortDataRows() {
// For each batch, createSortDataRows() will be called.
// Files saved to disk during sorting of previous batch,should not be considered
// for this batch.
// Hence use batchID as rangeID field of sorttempfiles.
// so getFilesToMergeSort() will select only this batch files.
this.sortParameters.setRangeId(batchId.incrementAndGet());
int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
setTempLocation(sortParameters);
this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
Expand Down

0 comments on commit 50248f5

Please sign in to comment.