Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ public double getExtTableSnapshotLocalCacheMaxSizeGB() {
return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
}


// ============================================================================
// CUBE
// ============================================================================
Expand All @@ -449,7 +448,11 @@ public double getJobCuboidSizeMemHungryRatio() {
}

public double getJobCuboidSizeCountDistinctRatio() {
return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.05"));
return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.5"));
}

public double getJobCuboidSizeTopNRatio() {
return Double.parseDouble(getOptional("kylin.cube.size-estimate-topn-ratio", "0.5"));
}

public String getCubeAlgorithm() {
Expand Down Expand Up @@ -872,7 +875,7 @@ public int getSqoopMapperNum() {
public Map<String, String> getSqoopConfigOverride() {
return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override.");
}

public String getJdbcSourceFieldDelimiter() {
return getOptional("kylin.source.jdbc.field-delimiter", "|");
}
Expand Down Expand Up @@ -1223,11 +1226,11 @@ public boolean isSparkSanityCheckEnabled() {
public Boolean isEnumerableRulesEnabled() {
return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false"));
}

public boolean isReduceExpressionsRulesEnabled() {
return Boolean.parseBoolean(getOptional("kylin.query.calcite.reduce-rules-enabled", "true"));
}

public boolean isConvertCreateTableToWith() {
return Boolean.valueOf(getOptional("kylin.query.convert-create-table-to-with", "false"));
}
Expand Down Expand Up @@ -1328,12 +1331,13 @@ public int getBadQueryHistoryNum() {
public int getBadQueryDefaultAlertingSeconds() {
return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90"));
}

public double getBadQueryDefaultAlertingCoefficient() {
return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5"));
}

public int getBadQueryDefaultDetectIntervalSeconds() {
int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
int time = (int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
if (time == 0) {
time = 60; // 60 sec
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {

private static final int IS_RESULT_FLAG = 1;
private static final int RESULT_SIZE = 12;
private static final int DEFAULT_MAX_SIZE = 1024;

// called by reflection
public BitmapSerializer(DataType type) {
Expand Down Expand Up @@ -85,6 +86,19 @@ public int getStorageBytesEstimate() {
return 8 * 1024;
}

@Override
protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
// MappeableArrayContainer DEFAULT_MAX_SIZE = 4096
if (averageNumOfElementsInCounter < DEFAULT_MAX_SIZE) {
// 8 = 4 + 4 for SERIAL_COOKIE_NO_RUNCONTAINER + size
// size * 8 = 2 * size + 2 * size + 4 * size as keys + values Cardinality + startOffsets
// size * 8 for values array
return 8 + averageNumOfElementsInCounter * 16;
} else {
return getStorageBytesEstimate();
}
}

@Override
public boolean supportDirectReturnResult() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,16 @@ public int getStorageBytesEstimate() {
return current().maxLength();
}

@Override
protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
int registerIndexSize = current().getRegisterIndexSize();
int m = 1 << precision;
if (!current().isDense((int) averageNumOfElementsInCounter)
|| averageNumOfElementsInCounter < (m - 5) / (1 + registerIndexSize)) {
// 5 = 1 + 4 for scheme and size
// size * (getRegisterIndexSize + 1)
return 5 + averageNumOfElementsInCounter * (registerIndexSize + 1);
}
return getStorageBytesEstimate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public HLLCounter(int p, RegisterType type) {
}
}

private boolean isDense(int size) {
public boolean isDense(int size) {
double over = OVERFLOW_FACTOR * m;
return size > (int) over;
}
Expand Down Expand Up @@ -358,7 +358,7 @@ public int maxLength() {
return 1 + m;
}

private int getRegisterIndexSize() {
public int getRegisterIndexSize() {
return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ public int peekLength(ByteBuffer in) {

@Override
public int maxLength() {
return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8), 1024 * 1024); // use at least 1M
return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(), 1024 * 1024); // use at least 1M
}

@Override
public int getStorageBytesEstimate() {
return precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8);
return precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter();
}

@Override
Expand Down Expand Up @@ -107,4 +107,17 @@ public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
return counter;
}

@Override
protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
if (averageNumOfElementsInCounter < precision * TopNCounter.EXTRA_SPACE_RATE) {
return averageNumOfElementsInCounter * storageBytesEstimatePerCounter() + 12;
} else {
return getStorageBytesEstimate();
}
}

private int storageBytesEstimatePerCounter() {
return (scale + 8);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.measure.topn.TopNMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
Expand All @@ -79,6 +80,7 @@ public class CubeStatsReader {
final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge
final Map<Long, HLLCounter> cuboidRowEstimatesHLL;
final CuboidScheduler cuboidScheduler;
final long sourceRowCount;

public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
Expand All @@ -94,7 +96,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
RawResource resource = store.getResource(statsKey);
if (resource == null)
throw new IllegalStateException("Missing resource at " + statsKey);

File tmpSeqFile = writeTmpSeqFile(resource.inputStream);
Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));

Expand All @@ -107,6 +109,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
}

/**
Expand All @@ -129,6 +132,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
}

private File writeTmpSeqFile(InputStream inputStream) throws IOException {
Expand Down Expand Up @@ -158,7 +162,7 @@ public Map<Long, Long> getCuboidRowEstimatesHLL() {

// return map of Cuboid ID => MB
public Map<Long, Double> getCuboidSizeMap() {
return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount);
}

public double estimateCubeSize() {
Expand All @@ -184,7 +188,8 @@ public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HLLCoun
return cuboidRowCountMap;
}

public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
long sourceRowCount) {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
final List<Integer> rowkeyColumnSize = Lists.newArrayList();
final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
Expand All @@ -199,7 +204,7 @@ public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSeg
Map<Long, Double> sizeMap = Maps.newHashMap();
for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(),
baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize));
baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount));
}
return sizeMap;
}
Expand All @@ -210,7 +215,7 @@ public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSeg
* @return the cuboid size in M bytes
*/
private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount,
long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength) {
long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength, long sourceRowCount) {

int rowkeyLength = cubeSegment.getRowKeyPreambleSize();
KylinConfig kylinConf = cubeSegment.getConfig();
Expand All @@ -228,22 +233,33 @@ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cu
int normalSpace = rowkeyLength;
int countDistinctSpace = 0;
double percentileSpace = 0;
int topNSpace = 0;
for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
if (rowCount == 0)
break;
DataType returnType = measureDesc.getFunction().getReturnDataType();
if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) {
countDistinctSpace += returnType.getStorageBytesEstimate();
long estimateDistinctCount = sourceRowCount / rowCount;
estimateDistinctCount = estimateDistinctCount == 0 ? 1L : estimateDistinctCount;
countDistinctSpace += returnType.getStorageBytesEstimate(estimateDistinctCount);
} else if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_PERCENTILE)) {
percentileSpace += returnType.getStorageBytesEstimate(baseCuboidCount * 1.0 / rowCount);
} else if (measureDesc.getFunction().getExpression().equals(TopNMeasureType.FUNC_TOP_N)) {
long estimateTopNCount = sourceRowCount / rowCount;
estimateTopNCount = estimateTopNCount == 0 ? 1L : estimateTopNCount;
topNSpace += returnType.getStorageBytesEstimate(estimateTopNCount);
} else {
normalSpace += returnType.getStorageBytesEstimate();
}
}

double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio();

double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio
+ 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount)
/ (1024L * 1024L);
+ 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount
+ 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L);
return ret;
}

Expand Down Expand Up @@ -351,6 +367,7 @@ private static String formatDouble(double input) {
public static class CubeStatsResult {
private int percentage = 100;
private double mapperOverlapRatio = 0;
private long sourceRecordCount = 0;
private int mapperNumber = 0;
private Map<Long, HLLCounter> counterMap = Maps.newHashMap();

Expand All @@ -367,6 +384,8 @@ public CubeStatsResult(Path path, int precision) throws IOException {
mapperOverlapRatio = Bytes.toDouble(value.getBytes());
} else if (key.get() == -2) {
mapperNumber = Bytes.toInt(value.getBytes());
} else if (key.get() == -3) {
sourceRecordCount = Bytes.toLong(value.getBytes());
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(precision);
ByteArray byteArray = new ByteArray(value.getBytes());
Expand All @@ -392,6 +411,10 @@ public int getMapperNumber() {
public Map<Long, HLLCounter> getCounterMap() {
return Collections.unmodifiableMap(counterMap);
}

public long getSourceRecordCount() {
return sourceRecordCount;
}
}

public static void main(String[] args) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ public class CubeStatsWriter {

public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0);
writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
}

public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException {
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
long sourceRecordCoun) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
mapperOverlapRatio);
mapperOverlapRatio, sourceRecordCoun);
}

//Be care of that the file name for partial cuboid statistics should start with BatchConstants.CFG_OUTPUT_STATISTICS,
Expand All @@ -57,12 +58,12 @@ public static void writePartialCuboidStatistics(Configuration conf, Path outputP
int shard) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard);
writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
mapperOverlapRatio);
mapperOverlapRatio, 0);
}

private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio)
throws IOException {
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
long sourceRecordCount) throws IOException {
List<Long> allCuboids = Lists.newArrayList();
allCuboids.addAll(cuboidHLLMap.keySet());
Collections.sort(allCuboids);
Expand All @@ -80,6 +81,9 @@ private static void writeCuboidStatisticsInner(Configuration conf, Path outputFi
// sampling percentage at key 0
writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)));

// flat table source_count at key -3
writer.append(new LongWritable(-3), new BytesWritable(Bytes.toBytes(sourceRecordCount)));

for (long i : allCuboids) {
valueBuf.clear();
cuboidHLLMap.get(i).writeRegisters(valueBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,22 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
totalRowsBeforeMerge);
}
double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;
CubingJob cubingJob = (CubingJob) getManager()
.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
long sourceRecordCount = cubingJob.findSourceRecordCount();
CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,
mapperNumber, mapperOverlapRatio);
mapperNumber, mapperOverlapRatio, sourceRecordCount);

Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
logger.info(newSegment + " stats saved to hdfs " + statisticsFile);

FSDataInputStream is = fs.open(statisticsFile);
try {
// put the statistics to metadata store
String resPath = newSegment.getStatisticsResourcePath();
rs.putResource(resPath, is, System.currentTimeMillis());
logger.info(newSegment + " stats saved to resource " + resPath);

CubingJob cubingJob = (CubingJob) getManager()
.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);
StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
} finally {
Expand Down