Skip to content

Commit

Permalink
KYLIN-4833 create hfile on hadoop hdfs and distcp to hbase hdfs (#1494)
Browse files Browse the repository at this point in the history
* create hfile on hadoop hdfs and distcp to hbase hdfs

* code style

Co-authored-by: zhangrusong <zhangrusong@ke.com>
  • Loading branch information
2 people authored and zhangayqian committed Nov 5, 2021
1 parent 3f4c3d8 commit f8385fd
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2691,4 +2691,17 @@ public boolean jobStatusWriteKafka() {
public Map<String, String> getJobStatusKafkaConfig() {
return getPropertiesByPrefix("kylin.engine.job-status.kafka.");
}

public boolean isHFileDistCP() {
return Boolean.parseBoolean(getOptional("kylin.storage.hfile-distcp-enable", "false"));
}

public int getDistCPMapBandWidth(){
return Integer.valueOf(getOptional("kylin.storage.distcp-map-bandwidth", "20"));
}

public int getDistCPMaxMapNum(){
return Integer.valueOf(getOptional("kylin.storage.distcp-max-map-num", "50"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private ExecutableConstants() {
public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
public static final String STEP_NAME_HFILE_DISTCP = "HFile Distcp To HBase";
public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
Expand Down
5 changes: 5 additions & 0 deletions storage-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
<artifactId>hbase-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
if(seg.getConfig().isHFileDistCP()){
jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId()));
}
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}

Expand All @@ -80,6 +83,9 @@ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
if(seg.getConfig().isHFileDistCP()){
jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId()));
}
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
flinkExecutable.setParam(FlinkCubeHFile.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
flinkExecutable.setParam(FlinkCubeHFile.OPTION_META_URL.getOpt(),
jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId));
flinkExecutable.setParam(FlinkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
flinkExecutable.setParam(FlinkCubeHFile.OPTION_INPUT_PATH.getOpt(), inputPath);
flinkExecutable.setParam(FlinkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");

String rootPath = getRealizationRootPath(jobId);
String outputPath = getHFilePath(jobId);
String partitionOutputPath = getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile";
if(this.seg.getConfig().isHFileDistCP()){
partitionOutputPath = rootPath + "/rowkey_stats/part-r-00000_hfile";
outputPath = rootPath + "/hfile/";
}

flinkExecutable.setParam(FlinkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), outputPath);
flinkExecutable.setParam(FlinkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), partitionOutputPath);
flinkExecutable.setParam(AbstractHadoopJob.OPTION_HBASE_CONF_PATH.getOpt(), getHBaseConfFilePath(jobId));
flinkExecutable.setJobId(jobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
Expand Down Expand Up @@ -63,6 +64,13 @@ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION,
getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
String partitionOutputPath = null;
if(this.seg.getConfig().isHFileDistCP()){
partitionOutputPath = getRealizationRootPath(jobId) + "/rowkey_stats/part-r-00000_hfile";
}else {
partitionOutputPath = getRowkeyDistributionOutputPath(jobId) + "/part-r-00000";
}
appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, partitionOutputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
appendExecCmdParameters(cmd, BatchConstants.ARG_HBASE_CONF_PATH, getHBaseConfFilePath(jobId));

Expand All @@ -72,6 +80,26 @@ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum
return createHtableStep;
}

public AbstractExecutable createDistcpHFileStep(String jobId){
String inputPath = getRealizationRootPath(jobId) + "/hfile";
MapReduceExecutable createHFilesStep = new MapReduceExecutable();
createHFilesStep.setName(ExecutableConstants.STEP_NAME_HFILE_DISTCP);
StringBuilder cmd = new StringBuilder();

appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
// appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Distcp_" + seg.getRealization().getName() + "_Step");

createHFilesStep.setMapReduceParams(cmd.toString());
createHFilesStep.setMapReduceJobClass(HFileDistcpJob.class);
createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
return createHFilesStep;
}

// TODO make it abstract
public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments,
String jobID, Class<? extends AbstractHadoopJob> clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
if(seg.getConfig().isHFileDistCP()){
jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId()));
}
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}

Expand Down Expand Up @@ -143,6 +146,9 @@ public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSe
jobFlow.addTask(
steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
if(seg.getConfig().isHFileDistCP()){
jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId()));
}
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}

Expand Down Expand Up @@ -198,6 +204,9 @@ public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
if(seg.getConfig().isHFileDistCP()){
jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId()));
}
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,24 @@ public HadoopShellExecutable createCreateHTableStep(String jobId) {
public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
String cuboidRootPath = getCuboidRootPath(jobId);
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
String rootPath = getRealizationRootPath(jobId);
String outputPath = getHFilePath(jobId);
String partitionOutputPath = getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile";

if(this.seg.getConfig().isHFileDistCP()){
partitionOutputPath = rootPath + "/rowkey_stats/part-r-00000_hfile";
outputPath = rootPath + "/hfile/";
}

MapReduceExecutable createHFilesStep = new MapReduceExecutable();
createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
StringBuilder cmd = new StringBuilder();

appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, partitionOutputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
if(seg.getConfig().isHFileDistCP()){
jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId()));
}
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}

Expand All @@ -80,6 +83,9 @@ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
if(seg.getConfig().isHFileDistCP()){
jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId()));
}
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSys
}
// If hbase was deployed on another cluster, the job dir is empty and should be dropped,
// because of rowkey_stats and hfile dirs are both dropped.
if (fileSystem.listStatus(oldPath.getParent()).length == 0) {
Path parentPath = oldPath.getParent();
if (fileSystem.exists(parentPath) && fileSystem.listStatus(parentPath).length == 0) {
Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId()));
emptyJobPath = Path.getPathWithoutSchemeAndAuthority(emptyJobPath);
if (fileSystem.exists(emptyJobPath)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.storage.hbase.steps;

import com.google.common.collect.Lists;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* @author fengpod
*/
public class HFileDistcpJob extends AbstractHadoopJob {

protected static final Logger logger = LoggerFactory.getLogger(HFileDistcpJob.class);

public int run(String[] args) throws Exception {
Options options = new Options();

try {
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
parseOptions(options, args);

// use current hbase configuration
Configuration configuration = new Configuration();
HBaseConnection.addHBaseClusterNNHAConfiguration(configuration);

Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
FileSystem fs = HadoopUtil.getFileSystem(output, configuration);
if (fs.exists(output) == false) {
fs.mkdirs(output);
}

String cubeName = getOptionValue(OPTION_CUBE_NAME);
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);

List<Path> sourceList = Lists.newArrayList();
sourceList.add(input);
DistCpOptions distCpOptions = new DistCpOptions(sourceList, output);
distCpOptions.setMapBandwidth(cube.getConfig().getDistCPMapBandWidth());
distCpOptions.setMaxMaps(cube.getConfig().getDistCPMaxMapNum());
distCpOptions.setOverwrite(true);
distCpOptions.setBlocking(true);

configuration.set(JobContext.JOB_NAME, getOptionValue(OPTION_JOB_NAME));
DistCp distCp = new DistCp(configuration, distCpOptions);

job = distCp.execute();

// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
// add metadata to distributed cache
attachCubeMetadata(cube, job.getConfiguration());
} catch (Exception e){
throw e;
}finally {
if (job != null)
cleanupTempConfFile(job.getConfiguration());
}
return 0;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new HFileDistcpJob(), args);
System.exit(exitCode);
}

}

0 comments on commit f8385fd

Please sign in to comment.