Skip to content

Commit

Permalink
support parallel pruning for non-default datamaps
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Nov 24, 2018
1 parent 8e776b2 commit 946c740
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 18 deletions.
Expand Up @@ -1408,7 +1408,7 @@ private CarbonCommonConstants() {
public static final String CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT = "4";

// block prune in multi-thread if files size more than 100K files.
public static final int CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT = 100000;
public static final int CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT = 1;

//////////////////////////////////////////////////////////////////////////////////////////
// Datamap parameter start here
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.scan.expression.Expression;
Expand Down Expand Up @@ -145,20 +144,11 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolver
// for filter queries
int totalFiles = 0;
int datamapsCount = 0;
boolean isBlockDataMapType = true;
for (Segment segment : segments) {
for (DataMap dataMap : dataMaps.get(segment)) {
if (!(dataMap instanceof BlockDataMap)) {
isBlockDataMapType = false;
break;
}
totalFiles += ((BlockDataMap) dataMap).getTotalBlocks();
totalFiles += dataMap.getNumberOfEntries();
datamapsCount++;
}
if (!isBlockDataMapType) {
// totalFiles fill be 0 for non-BlockDataMap Type. ex: lucene, bloom datamap. use old flow.
break;
}
}
int numOfThreadsForPruning = getNumOfThreadsForPruning();
if (numOfThreadsForPruning == 1 || datamapsCount < numOfThreadsForPruning || totalFiles
Expand Down Expand Up @@ -205,26 +195,53 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
int totalFiles) {
/*
*********************************************************************************
* Below is the example of how this part of code works.
* consider a scenario of having 5 segments, 10 datamaps in each segment,
* and each datamap has one record. So total 50 records.
*
* Datamaps in each segment looks like below.
* s0 [0-9], s1 [0-9], s2 [0-9], s3[0-9], s4[0-9]
*
* If number of threads are 4. so filesPerEachThread = 50/4 = 12 files per each thread.
*
* SegmentDataMapGroup look like below: [SegmentId, fromIndex, toIndex]
* In each segment only those datamaps are processed between fromIndex and toIndex.
*
* Final result will be: (4 list created as numOfThreadsForPruning is 4)
* Thread1 list: s0 [0-9], s1 [0-1] : 12 files
* Thread2 list: s1 [2-9], s2 [0-3] : 12 files
* Thread3 list: s2 [4-9], s3 [0-5] : 12 files
* Thread4 list: s3 [6-9], s4 [0-9] : 14 files
* so each thread will process almost equal number of records.
*
*********************************************************************************
*/

int numOfThreadsForPruning = getNumOfThreadsForPruning();
LOG.info(
"number of threads selected for multi-thread block pruning is: " + numOfThreadsForPruning);
int filesPerEachThread = totalFiles / numOfThreadsForPruning;
int prev;
int filesCount = 0;
int processedFileCount = 0;
List<List<SegmentDataMapGroup>> segmentList = new ArrayList<>(numOfThreadsForPruning);
List<List<SegmentDataMapGroup>> datamapListForEachThread =
new ArrayList<>(numOfThreadsForPruning);
List<SegmentDataMapGroup> segmentDataMapGroupList = new ArrayList<>();
for (Segment segment : segments) {
List<DataMap> eachSegmentDataMapList = dataMaps.get(segment);
prev = 0;
for (int i = 0; i < eachSegmentDataMapList.size(); i++) {
DataMap dataMap = eachSegmentDataMapList.get(i);
filesCount += ((BlockDataMap) dataMap).getTotalBlocks();
filesCount += dataMap.getNumberOfEntries();
if (filesCount >= filesPerEachThread) {
if (segmentList.size() != numOfThreadsForPruning - 1) {
if (datamapListForEachThread.size() != numOfThreadsForPruning - 1) {
// not the last segmentList
segmentDataMapGroupList.add(new SegmentDataMapGroup(segment, prev, i));
// save the last value to process in next thread
prev = i + 1;
segmentList.add(segmentDataMapGroupList);
datamapListForEachThread.add(segmentDataMapGroupList);
segmentDataMapGroupList = new ArrayList<>();
processedFileCount += filesCount;
filesCount = 0;
Expand All @@ -243,7 +260,7 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
}
}
// adding the last segmentList data
segmentList.add(segmentDataMapGroupList);
datamapListForEachThread.add(segmentDataMapGroupList);
processedFileCount += filesCount;
if (processedFileCount != totalFiles) {
// this should not happen
Expand All @@ -254,7 +271,7 @@ private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments
final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
final String threadName = Thread.currentThread().getName();
for (int i = 0; i < numOfThreadsForPruning; i++) {
final List<SegmentDataMapGroup> segmentDataMapGroups = segmentList.get(i);
final List<SegmentDataMapGroup> segmentDataMapGroups = datamapListForEachThread.get(i);
results.add(executorService.submit(new Callable<Void>() {
@Override public Void call() throws IOException {
Thread.currentThread().setName(threadName);
Expand Down
Expand Up @@ -70,4 +70,6 @@ List<T> prune(Expression filter, SegmentProperties segmentProperties,
*/
void finish();

// can return , number of records information that are stored in datamap.
int getNumberOfEntries();
}
Expand Up @@ -1039,4 +1039,18 @@ public void setSegmentPropertiesIndex(int segmentPropertiesIndex) {
public int getSegmentPropertiesIndex() {
return segmentPropertiesIndex;
}

@Override public int getNumberOfEntries() {
if (memoryDMStore != null) {
if (memoryDMStore.getRowCount() == 0) {
// so that one datamap considered as one record
return 1;
} else {
return memoryDMStore.getRowCount();
}
} else {
// legacy store
return 1;
}
}
}
Expand Up @@ -436,4 +436,9 @@ public String toString() {
public void finish() {

}

@Override public int getNumberOfEntries() {
// keep default, one record in one datamap
return 1;
}
}
Expand Up @@ -175,4 +175,8 @@ public void finish() {

}

@Override public int getNumberOfEntries() {
// keep default, one record in one datamap
return 1;
}
}
Expand Up @@ -440,4 +440,9 @@ public void finish() {
}
}
}

@Override public int getNumberOfEntries() {
// keep default, one record in one datamap
return 1;
}
}
Expand Up @@ -251,6 +251,8 @@ class CGDataMap extends CoarseGrainDataMap {
override def finish() = {
???
}

override def getNumberOfEntries: Int = 1
}

class CGDataMapWriter(
Expand Down
Expand Up @@ -274,6 +274,8 @@ class FGDataMap extends FineGrainDataMap {
override def finish() = {

}

override def getNumberOfEntries: Int = 1
}

class FGDataMapWriter(carbonTable: CarbonTable,
Expand Down

0 comments on commit 946c740

Please sign in to comment.