Skip to content

Commit

Permalink
KYLIN-4185: optimize CuboidSizeMap by using historical segments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoukangcn authored and nichunen committed Feb 17, 2020
1 parent 908ea30 commit a979a49
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 1 deletion.
Expand Up @@ -613,6 +613,10 @@ public String getSegmentAdvisor() {
return getOptional("kylin.cube.segment-advisor", "org.apache.kylin.cube.CubeSegmentAdvisor");
}

public boolean enableJobCuboidSizeOptimize() {
return Boolean.parseBoolean(getOptional("kylin.cube.size-estimate-enable-optimize", "false"));
}

public double getJobCuboidSizeRatio() {
return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25"));
}
Expand Down
Expand Up @@ -837,6 +837,7 @@ public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRang
}

CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
newSegment.setMerged(true);

Segments<CubeSegment> mergingSegments = cubeCopy.getMergingSegments(newSegment);
if (mergingSegments.size() <= 1)
Expand Down
20 changes: 20 additions & 0 deletions core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
Expand Up @@ -82,6 +82,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
private SegmentStatusEnum status;
@JsonProperty("size_kb")
private long sizeKB;
@JsonProperty("is_merged")
private boolean isMerged;
@JsonProperty("estimate_ratio")
private List<Double> estimateRatio;
@JsonProperty("input_records")
private long inputRecords;
@JsonProperty("input_records_size")
Expand Down Expand Up @@ -224,6 +228,22 @@ public void setSizeKB(long sizeKB) {
this.sizeKB = sizeKB;
}

public boolean isMerged() {
return isMerged;
}

public void setMerged(boolean isMerged) {
this.isMerged = isMerged;
}

public List<Double> getEstimateRatio() {
return estimateRatio;
}

public void setEstimateRatio(List<Double> estimateRatio) {
this.estimateRatio = estimateRatio;
}

public long getInputRecords() {
return inputRecords;
}
Expand Down
66 changes: 66 additions & 0 deletions engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
Expand Up @@ -18,6 +18,7 @@

package org.apache.kylin.engine.mr;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
Expand All @@ -26,12 +27,18 @@
import java.util.TimeZone;
import java.util.regex.Matcher;

import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
Expand Down Expand Up @@ -361,4 +368,63 @@ public long findCubeSizeBytes() {
return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
}

public List<Double> findEstimateRatio(CubeSegment seg, KylinConfig config) {
CubeInstance cubeInstance = seg.getCubeInstance();
CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();
List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
int totalLevels = cuboidScheduler.getBuildLevel();

List<Double> result = Lists.newArrayList();

Map<Long, Double> estimatedSizeMap;

String cuboidRootPath = getCuboidRootPath(seg, config);

try {
estimatedSizeMap = new CubeStatsReader(seg, config).getCuboidSizeMap(true);
} catch (IOException e) {
logger.warn("Cannot get segment {} estimated size map", seg.getName());

return null;
}

for (int level = 0; level <= totalLevels; level++) {
double levelEstimatedSize = 0;
for (Long cuboidId : layeredCuboids.get(level)) {
levelEstimatedSize += estimatedSizeMap.get(cuboidId) == null ? 0.0 : estimatedSizeMap.get(cuboidId);
}

double levelRealSize = getRealSizeByLevel(cuboidRootPath, level);

if (levelEstimatedSize == 0.0 || levelRealSize == 0.0){
result.add(level, -1.0);
} else {
result.add(level, levelRealSize / levelEstimatedSize);
}
}

return result;
}


private double getRealSizeByLevel(String rootPath, int level) {
try {
String levelPath = JobBuilderSupport.getCuboidOutputPathsByLevel(rootPath, level);
FileSystem fs = HadoopUtil.getFileSystem(levelPath);
return fs.getContentSummary(new Path(levelPath)).getLength() / (1024L * 1024L);
} catch (Exception e) {
logger.warn("get level real size failed." + e);
return 0L;
}
}

private String getCuboidRootPath(CubeSegment seg, KylinConfig kylinConfig) {
String rootDir = kylinConfig.getHdfsWorkingDirectory();
if (!rootDir.endsWith("/")) {
rootDir = rootDir + "/";
}
String jobID = this.getId();
return rootDir + "kylin-" + jobID + "/" + seg.getRealization().getName() + "/cuboid/";
}

}
Expand Up @@ -64,6 +64,7 @@
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -171,7 +172,11 @@ public Map<Long, Long> getCuboidRowEstimatesHLL() {

// return map of Cuboid ID => MB
public Map<Long, Double> getCuboidSizeMap() {
return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount);
return getCuboidSizeMap(false);
}

public Map<Long, Double> getCuboidSizeMap(boolean origin) {
return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount, origin);
}

public double estimateCubeSize() {
Expand Down Expand Up @@ -199,6 +204,11 @@ public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HLLCoun

public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
long sourceRowCount) {
return getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap, sourceRowCount, true);
}

private static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
long sourceRowCount, boolean origin) {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
final List<Integer> rowkeyColumnSize = Lists.newArrayList();
final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
Expand All @@ -215,9 +225,96 @@ public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSeg
sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(),
baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount));
}

if (origin == false && cubeSegment.getConfig().enableJobCuboidSizeOptimize()) {
optimizeSizeMap(sizeMap, cubeSegment);
}

return sizeMap;
}

private static Double harmonicMean(List<Double> data) {
if (data == null || data.size() == 0) {
return 1.0;
}
Double sum = 0.0;
for (Double item : data) {
sum += 1.0 / item;
}
return data.size() / sum;
}

private static List<Double> getHistoricalRating(CubeSegment cubeSegment,
CubeInstance cubeInstance,
int totalLevels) {
boolean isMerged = cubeSegment.isMerged();

Map<Integer, List<Double>> layerRatio = Maps.newHashMap();
List<Double> result = Lists.newArrayList();

for (CubeSegment seg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
if (seg.isMerged() != isMerged || seg.getEstimateRatio() == null) {
continue;
}

logger.info("get ratio from {} with: {}", seg.getName(), StringUtils.join(seg.getEstimateRatio(), ","));

for(int level = 0; level <= totalLevels; level++) {
if (seg.getEstimateRatio().get(level) <= 0) {
continue;
}

List<Double> temp = layerRatio.get(level) == null ? Lists.newArrayList() : layerRatio.get(level);

temp.add(seg.getEstimateRatio().get(level));
layerRatio.put(level, temp);
}
}

if (layerRatio.size() == 0) {
logger.info("Fail to get historical rating.");
return null;
} else {
for(int level = 0; level <= totalLevels; level++) {
logger.debug("level {}: {}", level, StringUtils.join(layerRatio.get(level), ","));
result.add(level, harmonicMean(layerRatio.get(level)));
}

logger.info("Finally estimate ratio is {}", StringUtils.join(result, ","));

return result;
}
}

private static void optimizeSizeMap(Map<Long, Double> sizeMap, CubeSegment cubeSegment) {
CubeInstance cubeInstance = cubeSegment.getCubeInstance();
int totalLevels = cubeInstance.getCuboidScheduler().getBuildLevel();
List<List<Long>> layeredCuboids = cubeInstance.getCuboidScheduler().getCuboidsByLayer();

logger.info("cube size is {} before optimize", SumHelper.sumDouble(sizeMap.values()));

List<Double> levelRating = getHistoricalRating(cubeSegment, cubeInstance, totalLevels);

if (levelRating == null) {
logger.info("Fail to optimize, use origin.");
return;
}

for (int level = 0; level <= totalLevels; level++) {
Double rate = levelRating.get(level);

for (Long cuboidId : layeredCuboids.get(level)) {
double oriValue = (sizeMap.get(cuboidId) == null ? 0.0 : sizeMap.get(cuboidId));
sizeMap.put(cuboidId, oriValue * rate);
}
}

logger.info("cube size is {} after optimize", SumHelper.sumDouble(sizeMap.values()));

return;
}


/**
* Estimate the cuboid's size
*
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
Expand Down Expand Up @@ -76,11 +77,15 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
long sourceSizeBytes = cubingJob.findSourceSizeBytes();
long cubeSizeBytes = cubingJob.findCubeSizeBytes();

KylinConfig config = KylinConfig.getInstanceFromEnv();
List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(segment, config);

segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
segment.setLastBuildTime(System.currentTimeMillis());
segment.setSizeKB(cubeSizeBytes / 1024);
segment.setInputRecords(sourceCount);
segment.setInputRecordsSize(sourceSizeBytes);
segment.setEstimateRatio(cuboidEstimateRatio);

try {
saveExtSnapshotIfNeeded(cubeManager, cube, segment);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
Expand Down Expand Up @@ -92,6 +93,9 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
}
}

KylinConfig config = KylinConfig.getInstanceFromEnv();
List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(mergedSegment, config);

// update segment info
mergedSegment.setSizeKB(cubeSizeBytes / 1024);
mergedSegment.setInputRecords(sourceCount);
Expand All @@ -100,6 +104,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
mergedSegment.setLastBuildTime(System.currentTimeMillis());
mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
mergedSegment.setStreamSourceCheckpoint(lastMergedSegment != null ? lastMergedSegment.getStreamSourceCheckpoint() : null);
mergedSegment.setEstimateRatio(cuboidEstimateRatio);

if (isOffsetCube) {
SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
Expand Down

0 comments on commit a979a49

Please sign in to comment.