Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ public Short getShardId() {


public long parseCuboid(byte[] bytes) {
return getCuboidId(bytes, enableSharding);
}

public static long getCuboidId(byte[] bytes, boolean enableSharding) {
int offset = enableSharding ? RowConstants.ROWKEY_SHARDID_LEN : 0;
return Bytes.toLong(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
}

/**
* @param bytes
* @return cuboid ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public interface BatchConstants {

String CFG_SHARD_NUM = "shard.num";

String CFG_CONVERGE_CUBOID_PARTITION_PARAM = "converge.cuboid.partition.param";

/**
* command line ARGuments
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.engine.mr.common;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataPartitioner;
import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataReducer;

public class ConvergeCuboidDataUtil {

public static void setupReducer(Job job, CubeSegment cubeSegment, Path output) throws IOException {
// Output
//// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, output);

// Reducer
job.setReducerClass(ConvergeCuboidDataReducer.class);
job.setPartitionerClass(ConvergeCuboidDataPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Pair<Integer, Integer> numReduceTasks = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(cubeSegment);
job.setNumReduceTasks(numReduceTasks.getFirst());

int nBaseReduceTasks = numReduceTasks.getSecond();
boolean enableSharding = cubeSegment.isEnableSharding();
long baseCuboidId = cubeSegment.getCuboidScheduler().getBaseCuboidId();
String partiParams = enableSharding + "," + baseCuboidId + "," + nBaseReduceTasks;
job.getConfiguration().set(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM, partiParams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ private static void readCuboidStatsFromSegments(Set<Long> cuboidSet, List<CubeSe

public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long> cuboidIds, CubeSegment cubeSegment)
throws IOException {
Pair<Map<Long, Long>, Long> stats = readCuboidStatsWithSourceFromSegment(cuboidIds, cubeSegment);
return stats == null ? null : stats.getFirst();
}

public static Pair<Map<Long, Long>, Long> readCuboidStatsWithSourceFromSegment(Set<Long> cuboidIds,
CubeSegment cubeSegment) throws IOException {
if (cubeSegment == null) {
logger.warn("The cube segment can not be " + null);
return null;
Expand All @@ -157,7 +163,6 @@ public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long> cuboidIds, Cu
cuboidsWithStats.put(cuboid, rowEstimate);
}
}
return cuboidsWithStats;
return new Pair<>(cuboidsWithStats, cubeStatsReader.sourceRowCount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.io.IOException;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
Expand All @@ -31,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

public class MapReduceUtil {

private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
Expand Down Expand Up @@ -112,7 +116,35 @@ public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidSchedul
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
}
return getReduceTaskNum(totalSizeInM, kylinConfig);
}

// @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();

Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
overlapCuboids.add(baseCuboidId);

Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
.readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
cuboidStats.getSecond());
double totalSizeInM = 0;
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
}

double baseSizeInM = cubeSizeMap.get(baseCuboidId);

KylinConfig kylinConfig = cubeSeg.getConfig();
int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
return new Pair<>(nBase + nOther, nBase);
}

private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.engine.mr.steps;

import java.util.Random;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.engine.mr.common.BatchConstants;

import com.google.common.base.Preconditions;

public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> implements Configurable {

private Random rand = new Random();

private Configuration conf;
private boolean enableSharding;
private long baseCuboidID;
private int numReduceBaseCuboid;

@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding);
// the first numReduceBaseCuboid are for base cuboid
if (cuboidID == baseCuboidID) {
return rand.nextInt(numReduceBaseCuboid);
} else {
return numReduceBaseCuboid + rand.nextInt(numReduceTasks - numReduceBaseCuboid);
}
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
String partiParam = conf.get(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM);
String[] params = partiParam.split(",");
Preconditions.checkArgument(params.length >= 3);
this.enableSharding = Boolean.parseBoolean(params[0]);
this.baseCuboidID = Long.parseLong(params[1]);
this.numReduceBaseCuboid = Integer.parseInt(params[2]);
}

@Override
public Configuration getConf() {
return conf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.engine.mr.steps;

import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;

public class ConvergeCuboidDataReducer extends KylinReducer<Text, Text, Text, Text> {

private MultipleOutputs mos;

private boolean enableSharding;
private long baseCuboid;

@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
mos = new MultipleOutputs(context);

String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);

KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();

CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);

this.enableSharding = oldSegment.isEnableSharding();
this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
}

@Override
public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding);

String baseOutputPath = cuboidID == baseCuboid ? PathNameCuboidBase : PathNameCuboidOld;
int n = 0;
for (Text value : values) {
mos.write(key, value, generateFileName(baseOutputPath));
n++;
}
if (n > 1) {
throw new RuntimeException(
"multiple records share the same key in aggregated cuboid data for cuboid " + cuboidID);
}
}

@Override
public void doCleanup(Context context) throws IOException, InterruptedException {
mos.close();

Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
FileSystem fs = FileSystem.get(context.getConfiguration());
if (!fs.exists(outputDirBase)) {
fs.mkdirs(outputDirBase);
SequenceFile
.createWriter(context.getConfiguration(),
SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
.close();
}
}

private String generateFileName(String subDir) {
return subDir + "/part";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@
package org.apache.kylin.engine.mr.steps;

import java.util.Locale;

import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.ConvergeCuboidDataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -69,26 +68,21 @@ public int run(String[] args) throws Exception {

// Mapper
job.setMapperClass(FilterRecommendCuboidDataMapper.class);

// Reducer
job.setNumReduceTasks(0);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// Input
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, input);
// Output
//// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, output);

// Reducer
ConvergeCuboidDataUtil.setupReducer(job, originalSegment, output);

// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
// add metadata to distributed cache
attachSegmentMetadataWithDict(originalSegment, job.getConfiguration());
attachSegmentMetadata(originalSegment, job.getConfiguration(), false, false);

this.deletePath(job.getConfiguration(), output);

Expand Down
Loading