New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-3718] Support SegmentLevel MinMax for better Pruning and less driver memory usage for cache #3584
Conversation
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1681/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1683/ |
core/src/main/java/org/apache/carbondata/core/util/SegmentMinMax.java
Outdated
Show resolved
Hide resolved
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1735/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1756/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1783/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1786/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/6/ |
f34d7d2
to
46d59d0
Compare
Build Failed with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/288/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1992/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1990/ |
|
||
public static final String CARBON_LOAD_ALL_INDEX_TO_CACHE = "carbon.load.all.indexes.to.cache"; | ||
|
||
public static final String CARBON_LOAD_ALL_INDEX_TO_CACHE_DEFAULT = "true"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment when to set false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -2333,4 +2333,9 @@ private CarbonCommonConstants() { | |||
* Default first day of week | |||
*/ | |||
public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT = "SUNDAY"; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment
@@ -85,6 +86,8 @@ | |||
*/ | |||
private transient Map<String, String> options; | |||
|
|||
private List<SegmentMinMax> segmentMinMax; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment what it stores
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
import org.apache.carbondata.core.util.SegmentMinMax; | ||
|
||
public class SegmentBlockInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
give class level and variable level comment
private SegmentMinMaxStats() { | ||
} | ||
|
||
public static SegmentMinMaxStats getInstance() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in getInstance, create new object and map, only once and then reuse for each load, just clear the map entries once filled in accumulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
public void clear() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method can be removed
segmentMinMaxList.add(new SegmentMinMax(minValues, maxValues)); | ||
this.segmentMinMaxMap.put(segmentId, segmentMinMaxList); | ||
} else { | ||
this.segmentMinMaxMap.get(segmentId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in if check already this.segmentMinMaxMap.get(segmentId)
is null, so here put the segment id in map, nullpointer can come
// add segment level minMax to accumulator | ||
accumulator.add(SegmentMinMaxStats.getInstance().getSegmentMinMaxMap. | ||
asScala.mapValues(_.asScala.toList).toMap) | ||
SegmentMinMaxStats.getInstance().clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just be map.clear
@@ -58,6 +61,9 @@ object UpdateDataLoad { | |||
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) | |||
val executor = new DataLoadExecutor | |||
TaskContext.get().addTaskCompletionListener { context => | |||
accumulator.add(SegmentMinMaxStats.getInstance().getSegmentMinMaxMap. | |||
asScala.mapValues(_.asScala.toList).toMap) | |||
SegmentMinMaxStats.getInstance().clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
@@ -315,7 +316,10 @@ object CarbonDataRDDFactory { | |||
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable | |||
var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null | |||
var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null | |||
|
|||
// accumulator to collect segment minmax | |||
val minMaxAccumulator = sqlContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to segmentMinMaxAccumulator
Build Failed with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/292/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1996/ |
Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/295/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1999/ |
Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/297/ |
Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2001/ |
Build Failed with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/307/ |
Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/759/ |
Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2467/ |
Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2479/ |
Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/771/ |
@@ -135,7 +136,7 @@ public CarbonTable getTable() { | |||
int datamapsCount = 0; | |||
// In case if filter has matched partitions, then update the segments with datamap's | |||
// segment list, as getDataMaps will return segments that matches the partition. | |||
if (null != partitions && !partitions.isEmpty()) { | |||
if (isFilterPresent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reset of segments has to happen only when the partition and filter both are present.
Now in case of non-partition + just the filter also we are resetting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, we are loading cache for only the matched segments for filter (partition/non-partition). For non-partition table, based on segment level min-max. Hence we have to reset the segments.
@@ -135,7 +136,7 @@ public CarbonTable getTable() { | |||
int datamapsCount = 0; | |||
// In case if filter has matched partitions, then update the segments with datamap's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modify the comment according to new solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
columnSchema.getColumnProperties().get(CarbonCommonConstants.COLUMN_DRIFT); | ||
if (null != isSortColumn) { | ||
if (isSortColumn.equalsIgnoreCase("true") && !isSortColumnInBlock) { | ||
modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInBlock, isColumnDrift); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please send sort info also modify method. Both set and unset can be done from that method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ColumnSchema columnSchema = tableColumnSchemas.get(columnMetaData.getKey()); | ||
if (null != columnSchema) { | ||
// get segment sort column and column drift info | ||
boolean isSortColumnInBlock = columnMetaData.getValue().isSortColumn(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please rename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -600,4 +604,20 @@ public boolean isIndexColumn() { | |||
public void setIndexColumn(boolean indexColumn) { | |||
this.indexColumn = indexColumn; | |||
} | |||
|
|||
public ColumnSchema clone() { | |||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use try with resource else stream will be open if write fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
BlockColumnMetaDataInfo currentBlockColumnMetaInfo) { | ||
// check if tableName is present in tableSegmentMetaDataInfoMap | ||
if (!this.tableSegmentMetaDataInfoMap.isEmpty() && null != this.tableSegmentMetaDataInfoMap | ||
.get(tableName) && !this.tableSegmentMetaDataInfoMap.get(tableName).isEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thin !this.tableSegmentMetaDataInfoMap.get(tableName).isEmpty()
this check is not needed, please check again and remove if not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -26,12 +26,13 @@ import org.apache.spark.sql.Row | |||
import org.apache.spark.sql.catalyst.InternalRow | |||
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow | |||
import org.apache.spark.TaskContext | |||
import org.apache.spark.util.LongAccumulator | |||
import org.apache.spark.util.{CollectionAccumulator, LongAccumulator} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
new Segment(seg.getLoadName, file) | ||
}.filter(_.getSegmentFileName != null).asJava | ||
segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList) | ||
} else { | ||
// get segmentMetadata info from accumulator | ||
val segmentMetaDataInfo = CarbonDataRDDFactory.getSegmentMetaDataInfoFromAccumulator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getSegmentMetaDataInfoFromAccumulator
move this method to carbonLoaderUtil as its used in all load flows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
extends CarbonLoadTaskCompletionListener { | ||
override def onTaskCompletion(context: TaskContext): Unit = { | ||
try { | ||
dataLoadExecutor.close() | ||
// fill segment level minMax to accumulator | ||
CarbonDataRDDFactory.fillSegmentMetaDataInfoToAccumulator(tableName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fillSegmentMetaDataInfoToAccumulator
same as above comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -203,6 +203,7 @@ object SecondaryIndexUtil { | |||
seg.getLoadName, | |||
segmentIdToLoadStartTimeMapping(seg.getLoadName).toString, | |||
carbonLoadModel.getFactTimeStamp.toString, | |||
null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a subtask in jira to support for SI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created
537ecff
to
cefb94e
Compare
Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/804/ |
Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2509/ |
LGTM |
is it verified ? |
docs/configuration-parameters.md
Outdated
@@ -146,6 +146,7 @@ This section provides the details of all the configurations required for the Car | |||
| carbon.query.prefetch.enable | true | By default this property is true, so prefetch is used in query to read next blocklet asynchronously in other thread while processing current blocklet in main thread. This can help to reduce CPU idle time. Setting this property false will disable this prefetch feature in query. | | |||
| carbon.query.stage.input.enable | false | Stage input files are data files written by external applications (such as Flink), but have not been loaded into carbon table. Enabling this configuration makes query to include these files, thus makes query on latest data. However, since these files are not indexed, query maybe slower as full scan is required for these files. | | |||
| carbon.driver.pruning.multi.thread.enable.files.count | 100000 | To prune in multi-thread when total number of segment files for a query increases beyond the configured value. | | |||
| carbon.load.all.indexes.to.cache | true | Setting this configuration to false, will prune and load only matched segment indexes to cache using segment metadata information such as columnid and it's minmax values, which decreases the usage of driver memory. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is renamed now right ? please update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2536/ |
Build Failed with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/829/ |
retest this please |
Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/830/ |
Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/2537/ |
getTableBlockUniqueIdentifierWrappers(partitionsToPrune, | ||
tableBlockIndexUniqueIdentifierWrappers, identifiers); | ||
} else { | ||
SegmentMetaDataInfo segmentMetaDataInfo = segment.getSegmentMetaDataInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tableBlockIndexUniqueIdentifierWrappers is just a wrapper around TableBlockIndexUniqueIdentifier.
So,we should avoid creating TableBlockIndexUniqueIdentifier itself If that filter doesn't match segment min max. [line 153, should not be called if isScanRequired is false]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep SegmentFileStore inside Segment class and read the segment before calling readCommittedScope.committedIndexFiles. we can avoid creating the TableBlockIndexUniqueIdentifier object. May be more changes now. Can do in other PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. Will take up in future PR
LGTM |
Why is this PR needed?
In Cloud scenarios, index is too big to store in SparkDriver, since VM may not have
so much memory. Currently in Carbon, we will load all indexes to cache for first time query.
Since Carbon LRU Cache does not support time-based expiration, indexes will be removed from
cache based on LeastRecentlyUsed mechanism, when the carbon lru cache is full.
In some scenarios, where user's table has more segments and if user queries only very
few segments often, we no need to load all indexes to cache. For filter queries,
if we prune and load only matched segments to cache,
then driver's memory will be saved.
What changes were proposed in this PR?
Added all block minmax with column-id and sort_column info to segment metadata file
and prune segment based on segment files and load index only for matched segment. Added a configurable carbon property 'carbon.load.all.index.to.cache'
to allow user to load all indexes to cache if needed. BY default, value will
be true, which loads all indexes to cache.
Does this PR introduce any user interface change?
Is any new testcase added?