Skip to content

Commit

Permalink
KYLIN-4035 Calculate column cardinality by using spark engine
Browse files Browse the repository at this point in the history
  • Loading branch information
majic31 committed Jun 19, 2019
1 parent 1fb000e commit 2148fec
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 16 deletions.
Expand Up @@ -1430,6 +1430,10 @@ public boolean isSparkFactDistinctEnable() {
return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
}

public boolean isSparkCardinalityEnabled(){
return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
}

// ============================================================================
// ENGINE.LIVY
// ============================================================================
Expand Down
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.engine.spark;

import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class SparkColumnCardinality extends AbstractApplication implements Serializable {
protected static final Logger logger = LoggerFactory.getLogger(SparkColumnCardinality.class);

public static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_TABLE_NAME).hasArg()
.isRequired(true).withDescription("Table Name").create(BatchConstants.ARG_TABLE_NAME);
public static final Option OPTION_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("Output").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_PRJ = OptionBuilder.withArgName(BatchConstants.ARG_PROJECT).hasArg()
.isRequired(true).withDescription("Project name").create(BatchConstants.ARG_PROJECT);
public static final Option OPTION_COLUMN_COUNT = OptionBuilder.withArgName(BatchConstants.CFG_OUTPUT_COLUMN).hasArg()
.isRequired(true).withDescription("column count").create(BatchConstants.CFG_OUTPUT_COLUMN);

private Options options;

public SparkColumnCardinality() {
options = new Options();
options.addOption(OPTION_TABLE_NAME);
options.addOption(OPTION_OUTPUT);
options.addOption(OPTION_PRJ);
options.addOption(OPTION_COLUMN_COUNT);
}

@Override
protected Options getOptions() {
return options;
}

@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String tableName = optionsHelper.getOptionValue(OPTION_TABLE_NAME);
String output = optionsHelper.getOptionValue(OPTION_OUTPUT);
int columnCnt = Integer.valueOf(optionsHelper.getOptionValue(OPTION_COLUMN_COUNT));

Class[] kryoClassArray = new Class[]{Class.forName("scala.reflect.ClassTag$$anon$1"),
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey")};

SparkConf conf = new SparkConf().setAppName("Calculate table:" + tableName);
//set spark.sql.catalogImplementation=hive, If it is not set, SparkSession can't read hive metadata, and throw "org.apache.spark.sql.AnalysisException"
conf.set("spark.sql.catalogImplementation", "hive");
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);

KylinSparkJobListener jobListener = new KylinSparkJobListener();
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(output));
// table will be loaded by spark sql, so isSequenceFile set false
final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(false, sc, null, tableName);
JavaPairRDD<Integer, Long> resultRdd = recordRDD.mapPartitionsToPair(new BuildHllCounter())
.reduceByKey((x, y) -> {
x.merge(y);
return x;
})
.mapToPair(record -> {
return new Tuple2<>(record._1, record._2.getCountEstimate());
})
.sortByKey(true, 1)
.cache();

if (resultRdd.count() == 0) {
ArrayList<Tuple2<Integer, Long>> list = new ArrayList<>();
for (int i = 0; i < columnCnt; ++i) {
list.add(new Tuple2<>(i, 0L));
}
JavaPairRDD<Integer, Long> nullRdd = sc.parallelizePairs(list).repartition(1);
nullRdd.saveAsNewAPIHadoopFile(output, IntWritable.class, LongWritable.class, TextOutputFormat.class);
} else {
resultRdd.saveAsNewAPIHadoopFile(output, IntWritable.class, LongWritable.class, TextOutputFormat.class);
}
}
}

static class BuildHllCounter implements
PairFlatMapFunction<Iterator<String[]>, Integer, HLLCounter> {

public BuildHllCounter() {
logger.info("BuildHllCounter init here.");
}

@Override
public Iterator<Tuple2<Integer, HLLCounter>> call(Iterator<String[]> iterator) throws Exception {
HashMap<Integer, HLLCounter> hllmap = new HashMap<>();
while (iterator.hasNext()) {
String[] values = iterator.next();
for (int m = 0; m < values.length; ++m) {
String fieldValue = values[m];
if (fieldValue == null) {
fieldValue = "NULL";
}
getHllc(hllmap, m).add(Bytes.toBytes(fieldValue));
}
}
// convert from hashmap to tuple2(scala).
List<Tuple2<Integer, HLLCounter>> result = new ArrayList<>();
for (Map.Entry<Integer, HLLCounter> entry : hllmap.entrySet()) {
result.add(new Tuple2<>(entry.getKey(), entry.getValue()));
}
return result.iterator();
}

private HLLCounter getHllc(HashMap<Integer, HLLCounter> hllcMap, Integer key) {
if (!hllcMap.containsKey(key)) {
hllcMap.put(key, new HLLCounter());
}
return hllcMap.get(key);
}
}
}
Expand Up @@ -56,6 +56,8 @@
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.slf4j.LoggerFactory;

/**
Expand Down Expand Up @@ -201,10 +203,21 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
return onResumed(sparkJobId, mgr);
} else {
String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
final KylinConfig config = cube.getConfig();

setAlgorithmLayer();
CubeInstance cube;
if (cubeName != null) {
cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
} else { // Cube name can't be got when loading hive table
cube = null;
}
final KylinConfig config;
if (cube != null) {
config = cube.getConfig();
} else {
// when loading hive table, we can't get cube name/config, so we get config from project.
String projectName = this.getParam(SparkColumnCardinality.OPTION_PRJ.getOpt());
ProjectInstance projectInst = ProjectManager.getInstance(context.getConfig()).getProject(projectName);
config = projectInst.getConfig();
}

if (KylinConfig.getSparkHome() == null) {
throw new NullPointerException();
Expand All @@ -229,11 +242,13 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
if (StringUtils.isEmpty(jars)) {
jars = jobJar;
}

String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
CubeSegment segment = cube.getSegmentById(segmentID);
Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
dumpMetadata(segment, mergingSeg);
if (cube != null) {
setAlgorithmLayer();
String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
CubeSegment segment = cube.getSegmentById(segmentID);
Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
dumpMetadata(segment, mergingSeg);
}

StringBuilder stringBuilder = new StringBuilder();
if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
Expand Down
Expand Up @@ -45,6 +45,8 @@
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.spark.SparkColumnCardinality;
import org.apache.kylin.engine.spark.SparkExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
Expand Down Expand Up @@ -486,13 +488,21 @@ public void calculateCardinality(String tableName, String submitter, String prj)
String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
String param = "-table " + tableName + " -output " + outPath + " -project " + prj;

MapReduceExecutable step1 = new MapReduceExecutable();

step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
step1.setMapReduceParams(param);
step1.setParam("segmentId", tableName);

job.addTask(step1);
if (getConfig().isSparkCardinalityEnabled()) { // use spark engine to calculate cardinality
SparkExecutable step1 = new SparkExecutable();
step1.setClassName(SparkColumnCardinality.class.getName());
step1.setParam(SparkColumnCardinality.OPTION_OUTPUT.getOpt(), outPath);
step1.setParam(SparkColumnCardinality.OPTION_PRJ.getOpt(), prj);
step1.setParam(SparkColumnCardinality.OPTION_TABLE_NAME.getOpt(), tableName);
step1.setParam(SparkColumnCardinality.OPTION_COLUMN_COUNT.getOpt(), String.valueOf(table.getColumnCount()));
job.addTask(step1);
} else {
MapReduceExecutable step1 = new MapReduceExecutable();
step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
step1.setMapReduceParams(param);
step1.setParam("segmentId", tableName);
job.addTask(step1);
}

HadoopShellExecutable step2 = new HadoopShellExecutable();

Expand Down

0 comments on commit 2148fec

Please sign in to comment.