Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KYLIN-3926 set sourceRecordCount when updating statistics #579

Merged
merged 2 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ public int getSamplingPercentage() {
return samplingPercentage;
}

public long getSourceRowCount() {
return sourceRowCount;
}

public Map<Long, Long> getCuboidRowEstimatesHLL() {
return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
}

public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, long sourceRecordCoun) throws IOException {
writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, sourceRecordCoun);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: should be sourceRecordCount

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just copied from the existing parameter name. However, it's better to change it from sourceRecordCoun to sourceRecordCount.

}

public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
long sourceRecordCoun) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ protected void doMap(IntWritable key, NullWritable value, Context context)
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
long sourceRecordCount = 0;
long effectiveTimeRange = 0;

for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();
Expand Down Expand Up @@ -162,7 +164,14 @@ protected void doMap(IntWritable key, NullWritable value, Context context)
if (keyW.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
} else if (keyW.get() > 0) {
} else if (keyW.get() == -3) {
long perSourceRecordCount = Bytes.toLong(valueW.getBytes());
if (perSourceRecordCount > 0) {
sourceRecordCount += perSourceRecordCount;
CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
effectiveTimeRange += iSegment.getTSRange().duration();
}
} else if (keyW.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(valueW.getBytes());
hll.readRegisters(byteArray.asBuffer());
Expand All @@ -181,12 +190,13 @@ protected void doMap(IntWritable key, NullWritable value, Context context)
IOUtils.closeStream(reader);
}
}

averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
averageSamplingPercentage);
sourceRecordCount *= effectiveTimeRange == 0 ? 0
: (double) newSegment.getTSRange().duration() / effectiveTimeRange;
Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);

FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
try {

int averageSamplingPercentage = 0;
long sourceRecordCount = 0;
long effectiveTimeRange = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
String fileKey = CubeSegment
.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).content();
File tempFile = null;
FileOutputStream tempFileStream = null;
Expand All @@ -99,6 +102,13 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
if (key.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(value.getBytes());
} else if (key.get() == -3) {
long perSourceRecordCount = Bytes.toLong(value.getBytes());
if (perSourceRecordCount > 0) {
sourceRecordCount += perSourceRecordCount;
CubeSegment iSegment = cube.getSegmentById(segmentId);
effectiveTimeRange += iSegment.getTSRange().duration();
}
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
Expand All @@ -120,9 +130,15 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
tempFile.delete();
}
}
averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
sourceRecordCount *= effectiveTimeRange == 0 ? 0
: (double) newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage
/ CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
CubeStatsWriter.writeCuboidStatistics(conf,
new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()),
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio

String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
averageSamplingPercentage);
averageSamplingPercentage, oldSegmentStatsReader.getSourceRowCount());

try (FSDataInputStream mergedStats = hdfs
.open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

package org.apache.kylin.engine.spark;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
Expand Down Expand Up @@ -62,14 +67,11 @@
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import scala.Tuple2;

/**
merge dictionary
Expand Down Expand Up @@ -236,14 +238,16 @@ public Tuple2<Text, Text> call(Integer index) throws Exception {
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
long sourceRecordCount = 0;
long effectiveTimeRange = 0;

for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();

File tempFile = File.createTempFile(segmentId, ".seq");

try(InputStream is = rs.getResource(filePath).content();
FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
try (InputStream is = rs.getResource(filePath).content();
FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {

org.apache.commons.io.IOUtils.copy(is, tempFileStream);
}
Expand All @@ -252,15 +256,24 @@ public Tuple2<Text, Text> call(Integer index) throws Exception {

conf = HadoopUtil.getCurrentConfiguration();

try(SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
new Path(tempFile.getAbsolutePath()), conf)) {
//noinspection deprecation
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(),
conf);

while (reader.next(key, value)) {
if (key.get() == 0L) {
// sampling percentage
averageSamplingPercentage += Bytes.toInt(value.getBytes());
} else if (key.get() == -3) {
long perSourceRecordCount = Bytes.toLong(value.getBytes());
if (perSourceRecordCount > 0) {
sourceRecordCount += perSourceRecordCount;
CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
effectiveTimeRange += iSegment.getTSRange().duration();
}
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
Expand All @@ -276,9 +289,13 @@ public Tuple2<Text, Text> call(Integer index) throws Exception {
}
}

sourceRecordCount *= effectiveTimeRange == 0 ? 0
: (double) newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);
Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);

FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);
Expand Down