Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ public int getSamplingPercentage() {
return samplingPercentage;
}

public long getSourceRowCount() {
return sourceRowCount;
}

public Map<Long, Long> getCuboidRowEstimatesHLL() {
return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
}

public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, long sourceRecordCount) throws IOException {
writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, sourceRecordCount);
}

public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
long sourceRecordCoun) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
try {

int averageSamplingPercentage = 0;
long sourceRecordCount = 0;
long effectiveTimeRange = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).content();
Expand All @@ -99,6 +101,13 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
if (key.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(value.getBytes());
} else if (key.get() == -3) {
long perSourceRecordCount = Bytes.toLong(value.getBytes());
if (perSourceRecordCount > 0) {
sourceRecordCount += perSourceRecordCount;
CubeSegment iSegment = cube.getSegmentById(segmentId);
effectiveTimeRange += iSegment.getTSRange().duration();
}
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
Expand All @@ -120,8 +129,11 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
tempFile.delete();
}
}
sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
CubeStatsWriter.writeCuboidStatistics(conf,
new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio

String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
averageSamplingPercentage);
averageSamplingPercentage, oldSegmentStatsReader.getSourceRowCount());

try (FSDataInputStream mergedStats = hdfs
.open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {
Expand Down