diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 94cbd2c9d0f..2e16f4736f2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2691,4 +2691,17 @@ public boolean jobStatusWriteKafka() { public Map getJobStatusKafkaConfig() { return getPropertiesByPrefix("kylin.engine.job-status.kafka."); } + + public boolean isHFileDistCP() { + return Boolean.parseBoolean(getOptional("kylin.storage.hfile-distcp-enable", "false")); + } + + public int getDistCPMapBandWidth(){ + return Integer.valueOf(getOptional("kylin.storage.distcp-map-bandwidth", "20")); + } + + public int getDistCPMaxMapNum(){ + return Integer.valueOf(getOptional("kylin.storage.distcp-max-map-num", "50")); + } + } diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index c6cbe5384c7..4deab999938 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -57,6 +57,7 @@ private ExecutableConstants() { public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits"; public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable"; public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile"; + public static final String STEP_NAME_HFILE_DISTCP = "HFile Distcp To HBase"; public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table"; public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment"; public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary"; diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml index 2e0e6d12dcd..4ae15de56df 100644 --- a/storage-hbase/pom.xml +++ b/storage-hbase/pom.xml @@ -80,6 +80,11 @@ hbase-common provided + + org.apache.hadoop + hadoop-distcp + provided + org.apache.hbase hbase-client diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java index b9712a6a4b9..3f57407761d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java @@ -55,6 +55,9 @@ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) { @Override public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + if(seg.getConfig().isHFileDistCP()){ + jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId())); + } jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } @@ -80,6 +83,9 @@ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { public void addStepPhase2_BuildCube(CubeSegment seg, List mergingSegments, DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + if(seg.getConfig().isHFileDistCP()){ + jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId())); + } jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java index 727cce3eb7c..cea665318dc 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkSteps.java @@ -46,10 +46,18 @@ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) { flinkExecutable.setParam(FlinkCubeHFile.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); flinkExecutable.setParam(FlinkCubeHFile.OPTION_META_URL.getOpt(), jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId)); - flinkExecutable.setParam(FlinkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId)); flinkExecutable.setParam(FlinkCubeHFile.OPTION_INPUT_PATH.getOpt(), inputPath); - flinkExecutable.setParam(FlinkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), - getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); + + String rootPath = getRealizationRootPath(jobId); + String outputPath = getHFilePath(jobId); + String partitionOutputPath = getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"; + if(this.seg.getConfig().isHFileDistCP()){ + partitionOutputPath = rootPath + "/rowkey_stats/part-r-00000_hfile"; + outputPath = rootPath + "/hfile/"; + } + + flinkExecutable.setParam(FlinkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), outputPath); + flinkExecutable.setParam(FlinkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), partitionOutputPath); flinkExecutable.setParam(AbstractHadoopJob.OPTION_HBASE_CONF_PATH.getOpt(), getHBaseConfFilePath(jobId)); flinkExecutable.setJobId(jobId); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java index c05e707869d..d2e3ba3f388 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java @@ -27,6 +27,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; @@ -63,6 +64,13 @@ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); + String partitionOutputPath = null; + if(this.seg.getConfig().isHFileDistCP()){ + partitionOutputPath = getRealizationRootPath(jobId) + "/rowkey_stats/part-r-00000_hfile"; + }else { + partitionOutputPath = getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"; + } + appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, partitionOutputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString()); appendExecCmdParameters(cmd, BatchConstants.ARG_HBASE_CONF_PATH, getHBaseConfFilePath(jobId)); @@ -72,6 +80,26 @@ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum return createHtableStep; } + public AbstractExecutable createDistcpHFileStep(String jobId){ + String inputPath = getRealizationRootPath(jobId) + "/hfile"; + MapReduceExecutable createHFilesStep = new MapReduceExecutable(); + createHFilesStep.setName(ExecutableConstants.STEP_NAME_HFILE_DISTCP); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); +// appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Distcp_" + seg.getRealization().getName() + "_Step"); + + createHFilesStep.setMapReduceParams(cmd.toString()); + createHFilesStep.setMapReduceJobClass(HFileDistcpJob.class); + createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES); + return createHFilesStep; + } + // TODO make it abstract public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List mergingSegments, String jobID, Class clazz) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index fcc375410f0..9001bc6198c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -78,6 +78,9 @@ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) { @Override public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + if(seg.getConfig().isHFileDistCP()){ + jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId())); + } jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } @@ -143,6 +146,9 @@ public void addStepPhase2_BuildCube(CubeSegment seg, List mergingSe jobFlow.addTask( steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class)); jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + if(seg.getConfig().isHFileDistCP()){ + jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId())); + } jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } @@ -198,6 +204,9 @@ public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) { @Override public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + if(seg.getConfig().isHFileDistCP()){ + jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId())); + } jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 25683d37533..774669baf03 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -40,6 +40,14 @@ public HadoopShellExecutable createCreateHTableStep(String jobId) { public AbstractExecutable createConvertCuboidToHfileStep(String jobId) { String cuboidRootPath = getCuboidRootPath(jobId); String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*"; + String rootPath = getRealizationRootPath(jobId); + String outputPath = getHFilePath(jobId); + String partitionOutputPath = getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"; + + if(this.seg.getConfig().isHFileDistCP()){ + partitionOutputPath = rootPath + "/rowkey_stats/part-r-00000_hfile"; + outputPath = rootPath + "/hfile/"; + } MapReduceExecutable createHFilesStep = new MapReduceExecutable(); createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE); @@ -47,9 +55,9 @@ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) { appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); + appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, partitionOutputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier()); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java index 43babfd5d0d..3702a92f976 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java @@ -55,6 +55,9 @@ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) { @Override public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + if(seg.getConfig().isHFileDistCP()){ + jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId())); + } jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } @@ -80,6 +83,9 @@ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { public void addStepPhase2_BuildCube(CubeSegment seg, List mergingSegments, DefaultChainedExecutable jobFlow) { jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + if(seg.getConfig().isHFileDistCP()){ + jobFlow.addTask(steps.createDistcpHFileStep(jobFlow.getId())); + } jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java index 9b4a7490f09..2eb0f3afce6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java @@ -91,7 +91,8 @@ private void dropHdfsPathOnCluster(List oldHdfsPaths, FileSystem fileSys } // If hbase was deployed on another cluster, the job dir is empty and should be dropped, // because of rowkey_stats and hfile dirs are both dropped. - if (fileSystem.listStatus(oldPath.getParent()).length == 0) { + Path parentPath = oldPath.getParent(); + if (fileSystem.exists(parentPath) && fileSystem.listStatus(parentPath).length == 0) { Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId())); emptyJobPath = Path.getPathWithoutSchemeAndAuthority(emptyJobPath); if (fileSystem.exists(emptyJobPath)) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileDistcpJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileDistcpJob.java new file mode 100644 index 00000000000..dfc8653a97a --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileDistcpJob.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.steps; + +import com.google.common.collect.Lists; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @author fengpod + */ +public class HFileDistcpJob extends AbstractHadoopJob { + + protected static final Logger logger = LoggerFactory.getLogger(HFileDistcpJob.class); + + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_HTABLE_NAME); + parseOptions(options, args); + + // use current hbase configuration + Configuration configuration = new Configuration(); + HBaseConnection.addHBaseClusterNNHAConfiguration(configuration); + + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + FileSystem fs = HadoopUtil.getFileSystem(output, configuration); + if (fs.exists(output) == false) { + fs.mkdirs(output); + } + + String cubeName = getOptionValue(OPTION_CUBE_NAME); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + + List sourceList = Lists.newArrayList(); + sourceList.add(input); + DistCpOptions distCpOptions = new DistCpOptions(sourceList, output); + distCpOptions.setMapBandwidth(cube.getConfig().getDistCPMapBandWidth()); + distCpOptions.setMaxMaps(cube.getConfig().getDistCPMaxMapNum()); + distCpOptions.setOverwrite(true); + distCpOptions.setBlocking(true); + + configuration.set(JobContext.JOB_NAME, getOptionValue(OPTION_JOB_NAME)); + DistCp distCp = new DistCp(configuration, distCpOptions); + + job = distCp.execute(); + + // set job configuration + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + // add metadata to distributed cache + attachCubeMetadata(cube, job.getConfiguration()); + } catch (Exception e){ + throw e; + }finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + return 0; + } + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new HFileDistcpJob(), args); + System.exit(exitCode); + } + +} \ No newline at end of file