diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 637502ef05b..025a982bdf5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -426,7 +426,6 @@ public double getExtTableSnapshotLocalCacheMaxSizeGB() { return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200")); } - // ============================================================================ // CUBE // ============================================================================ @@ -449,7 +448,11 @@ public double getJobCuboidSizeMemHungryRatio() { } public double getJobCuboidSizeCountDistinctRatio() { - return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.05")); + return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.5")); + } + + public double getJobCuboidSizeTopNRatio() { + return Double.parseDouble(getOptional("kylin.cube.size-estimate-topn-ratio", "0.5")); } public String getCubeAlgorithm() { @@ -872,7 +875,7 @@ public int getSqoopMapperNum() { public Map getSqoopConfigOverride() { return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override."); } - + public String getJdbcSourceFieldDelimiter() { return getOptional("kylin.source.jdbc.field-delimiter", "|"); } @@ -1223,11 +1226,11 @@ public boolean isSparkSanityCheckEnabled() { public Boolean isEnumerableRulesEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false")); } - + public boolean isReduceExpressionsRulesEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.calcite.reduce-rules-enabled", "true")); } - + public boolean isConvertCreateTableToWith() { return Boolean.valueOf(getOptional("kylin.query.convert-create-table-to-with", "false")); } @@ -1328,12 +1331,13 @@ public int getBadQueryHistoryNum() { public int getBadQueryDefaultAlertingSeconds() { return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90")); } + public double getBadQueryDefaultAlertingCoefficient() { return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5")); } public int getBadQueryDefaultDetectIntervalSeconds() { - int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout + int time = (int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout if (time == 0) { time = 60; // 60 sec } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java index 1c138761f31..29a25e9b5bf 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java @@ -30,6 +30,7 @@ public class BitmapSerializer extends DataTypeSerializer { private static final int IS_RESULT_FLAG = 1; private static final int RESULT_SIZE = 12; + private static final int DEFAULT_MAX_SIZE = 1024; // called by reflection public BitmapSerializer(DataType type) { @@ -85,6 +86,19 @@ public int getStorageBytesEstimate() { return 8 * 1024; } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + // MappeableArrayContainer DEFAULT_MAX_SIZE = 4096 + if (averageNumOfElementsInCounter < DEFAULT_MAX_SIZE) { + // 8 = 4 + 4 for SERIAL_COOKIE_NO_RUNCONTAINER + size + // size * 8 = 2 * size + 2 * size + 4 * size as keys + values Cardinality + startOffsets + // size * 8 for values array + return 8 + averageNumOfElementsInCounter * 16; + } else { + return getStorageBytesEstimate(); + } + } + @Override public boolean supportDirectReturnResult() { return true; diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index 98bc5cf772a..93108649162 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -80,4 +80,16 @@ public int getStorageBytesEstimate() { return current().maxLength(); } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + int registerIndexSize = current().getRegisterIndexSize(); + int m = 1 << precision; + if (!current().isDense((int) averageNumOfElementsInCounter) + || averageNumOfElementsInCounter < (m - 5) / (1 + registerIndexSize)) { + // 5 = 1 + 4 for scheme and size + // size * (getRegisterIndexSize + 1) + return 5 + averageNumOfElementsInCounter * (registerIndexSize + 1); + } + return getStorageBytesEstimate(); + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java index b79346503ed..80bbb2a9c17 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java @@ -78,7 +78,7 @@ public HLLCounter(int p, RegisterType type) { } } - private boolean isDense(int size) { + public boolean isDense(int size) { double over = OVERFLOW_FACTOR * m; return size > (int) over; } @@ -358,7 +358,7 @@ public int maxLength() { return 1 + m; } - private int getRegisterIndexSize() { + public int getRegisterIndexSize() { return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java index 77a69cf9357..eff510f8d0f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java @@ -61,12 +61,12 @@ public int peekLength(ByteBuffer in) { @Override public int maxLength() { - return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8), 1024 * 1024); // use at least 1M + return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(), 1024 * 1024); // use at least 1M } @Override public int getStorageBytesEstimate() { - return precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8); + return precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(); } @Override @@ -107,4 +107,17 @@ public TopNCounter deserialize(ByteBuffer in) { return counter; } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + if (averageNumOfElementsInCounter < precision * TopNCounter.EXTRA_SPACE_RATE) { + return averageNumOfElementsInCounter * storageBytesEstimatePerCounter() + 12; + } else { + return getStorageBytesEstimate(); + } + } + + private int storageBytesEstimatePerCounter() { + return (scale + 8); + } + } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 3c054a32839..6b8934abb4a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -55,6 +55,7 @@ import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -79,6 +80,7 @@ public class CubeStatsReader { final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge final Map cuboidRowEstimatesHLL; final CuboidScheduler cuboidScheduler; + final long sourceRowCount; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig); @@ -94,7 +96,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, RawResource resource = store.getResource(statsKey); if (resource == null) throw new IllegalStateException("Missing resource at " + statsKey); - + File tmpSeqFile = writeTmpSeqFile(resource.inputStream); Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); @@ -107,6 +109,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); } /** @@ -129,6 +132,7 @@ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); } private File writeTmpSeqFile(InputStream inputStream) throws IOException { @@ -158,7 +162,7 @@ public Map getCuboidRowEstimatesHLL() { // return map of Cuboid ID => MB public Map getCuboidSizeMap() { - return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL()); + return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount); } public double estimateCubeSize() { @@ -184,7 +188,8 @@ public static Map getCuboidRowCountMapFromSampling(Map getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map rowCountMap) { + public static Map getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map rowCountMap, + long sourceRowCount) { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); final List rowkeyColumnSize = Lists.newArrayList(); final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); @@ -199,7 +204,7 @@ public static Map getCuboidSizeMapFromRowCount(CubeSegment cubeSeg Map sizeMap = Maps.newHashMap(); for (Map.Entry entry : rowCountMap.entrySet()) { sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), - baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize)); + baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount)); } return sizeMap; } @@ -210,7 +215,7 @@ public static Map getCuboidSizeMapFromRowCount(CubeSegment cubeSeg * @return the cuboid size in M bytes */ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, - long baseCuboidId, long baseCuboidCount, List rowKeyColumnLength) { + long baseCuboidId, long baseCuboidCount, List rowKeyColumnLength, long sourceRowCount) { int rowkeyLength = cubeSegment.getRowKeyPreambleSize(); KylinConfig kylinConf = cubeSegment.getConfig(); @@ -228,12 +233,21 @@ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cu int normalSpace = rowkeyLength; int countDistinctSpace = 0; double percentileSpace = 0; + int topNSpace = 0; for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { + if (rowCount == 0) + break; DataType returnType = measureDesc.getFunction().getReturnDataType(); if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) { - countDistinctSpace += returnType.getStorageBytesEstimate(); + long estimateDistinctCount = sourceRowCount / rowCount; + estimateDistinctCount = estimateDistinctCount == 0 ? 1L : estimateDistinctCount; + countDistinctSpace += returnType.getStorageBytesEstimate(estimateDistinctCount); } else if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_PERCENTILE)) { percentileSpace += returnType.getStorageBytesEstimate(baseCuboidCount * 1.0 / rowCount); + } else if (measureDesc.getFunction().getExpression().equals(TopNMeasureType.FUNC_TOP_N)) { + long estimateTopNCount = sourceRowCount / rowCount; + estimateTopNCount = estimateTopNCount == 0 ? 1L : estimateTopNCount; + topNSpace += returnType.getStorageBytesEstimate(estimateTopNCount); } else { normalSpace += returnType.getStorageBytesEstimate(); } @@ -241,9 +255,11 @@ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cu double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio(); double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio(); + double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio(); + double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio - + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount) - / (1024L * 1024L); + + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount + + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L); return ret; } @@ -351,6 +367,7 @@ private static String formatDouble(double input) { public static class CubeStatsResult { private int percentage = 100; private double mapperOverlapRatio = 0; + private long sourceRecordCount = 0; private int mapperNumber = 0; private Map counterMap = Maps.newHashMap(); @@ -367,6 +384,8 @@ public CubeStatsResult(Path path, int precision) throws IOException { mapperOverlapRatio = Bytes.toDouble(value.getBytes()); } else if (key.get() == -2) { mapperNumber = Bytes.toInt(value.getBytes()); + } else if (key.get() == -3) { + sourceRecordCount = Bytes.toLong(value.getBytes()); } else if (key.get() > 0) { HLLCounter hll = new HLLCounter(precision); ByteArray byteArray = new ByteArray(value.getBytes()); @@ -392,6 +411,10 @@ public int getMapperNumber() { public Map getCounterMap() { return Collections.unmodifiableMap(counterMap); } + + public long getSourceRecordCount() { + return sourceRecordCount; + } } public static void main(String[] args) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index f50a4beb97d..c3d6042b1dd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -40,14 +40,15 @@ public class CubeStatsWriter { public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map cuboidHLLMap, int samplingPercentage) throws IOException { - writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0); + writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0); } public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { + Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + long sourceRecordCoun) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, - mapperOverlapRatio); + mapperOverlapRatio, sourceRecordCoun); } //Be care of that the file name for partial cuboid statistics should start with BatchConstants.CFG_OUTPUT_STATISTICS, @@ -57,12 +58,12 @@ public static void writePartialCuboidStatistics(Configuration conf, Path outputP int shard) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard); writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, - mapperOverlapRatio); + mapperOverlapRatio, 0); } private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, // - Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) - throws IOException { + Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + long sourceRecordCount) throws IOException { List allCuboids = Lists.newArrayList(); allCuboids.addAll(cuboidHLLMap.keySet()); Collections.sort(allCuboids); @@ -80,6 +81,9 @@ private static void writeCuboidStatisticsInner(Configuration conf, Path outputFi // sampling percentage at key 0 writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage))); + // flat table source_count at key -3 + writer.append(new LongWritable(-3), new BytesWritable(Bytes.toBytes(sourceRecordCount))); + for (long i : allCuboids) { valueBuf.clear(); cuboidHLLMap.get(i).writeRegisters(valueBuf); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index b532360f368..1f79539f6d3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -122,12 +122,15 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio totalRowsBeforeMerge); } double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal; + CubingJob cubingJob = (CubingJob) getManager() + .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + long sourceRecordCount = cubingJob.findSourceRecordCount(); CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage, - mapperNumber, mapperOverlapRatio); + mapperNumber, mapperOverlapRatio, sourceRecordCount); Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); logger.info(newSegment + " stats saved to hdfs " + statisticsFile); - + FSDataInputStream is = fs.open(statisticsFile); try { // put the statistics to metadata store @@ -135,8 +138,6 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio rs.putResource(resPath, is, System.currentTimeMillis()); logger.info(newSegment + " stats saved to resource " + resPath); - CubingJob cubingJob = (CubingJob) getManager() - .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment); StatisticsDecisionUtil.optimizeCubingPlan(newSegment); } finally {