diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 58f0e664197..e9351737163 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -161,6 +161,10 @@ public int getSamplingPercentage() { return samplingPercentage; } + public long getSourceRowCount() { + return sourceRowCount; + } + public Map getCuboidRowEstimatesHLL() { return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index c3d6042b1dd..06a02cb5a20 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -43,6 +43,11 @@ public static void writeCuboidStatistics(Configuration conf, Path outputPath, // writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0); } + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // + Map cuboidHLLMap, int samplingPercentage, long sourceRecordCount) throws IOException { + writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, sourceRecordCount); + } + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, long sourceRecordCoun) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 64ceebe19b0..5d4b35d112e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -75,6 +75,8 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio try { int averageSamplingPercentage = 0; + long sourceRecordCount = 0; + long effectiveTimeRange = 0; for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) { String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId); InputStream is = rs.getResource(fileKey).content(); @@ -99,6 +101,13 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio if (key.get() == 0L) { // sampling percentage; averageSamplingPercentage += Bytes.toInt(value.getBytes()); + } else if (key.get() == -3) { + long perSourceRecordCount = Bytes.toLong(value.getBytes()); + if (perSourceRecordCount > 0) { + sourceRecordCount += perSourceRecordCount; + CubeSegment iSegment = cube.getSegmentById(segmentId); + effectiveTimeRange += iSegment.getTSRange().duration(); + } } else if (key.get() > 0) { HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision()); ByteArray byteArray = new ByteArray(value.getBytes()); @@ -120,8 +129,11 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio tempFile.delete(); } } + sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange; averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size(); - CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage); + CubeStatsWriter.writeCuboidStatistics(conf, + new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, + averageSamplingPercentage, sourceRecordCount); Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf); FSDataInputStream is = fs.open(statisticsFilePath); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java index 434892c3e3f..8dd73414e13 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java @@ -120,7 +120,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams()); CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap, - averageSamplingPercentage); + averageSamplingPercentage, oldSegmentStatsReader.getSourceRowCount()); try (FSDataInputStream mergedStats = hdfs .open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {