diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6f73024cdd2..6fcfd4a78ce 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1454,6 +1454,11 @@ public int getSparkOutputMaxSize() { return Integer.valueOf(getOptional("kylin.engine.spark.output.max-size", "10485760")); } + public boolean isSparkDimensionDictionaryEnabled() { + return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false")); + } + + // ============================================================================ // ENGINE.LIVY // ============================================================================ diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 334ed0e385a..1e001900bdf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -33,9 +33,11 @@ private ExecutableConstants() { public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written"; public static final String SOURCE_RECORDS_COUNT = "source_records_count"; public static final String SOURCE_RECORDS_SIZE = "source_records_size"; + public static final String SPARK_DIMENSION_DIC_SEGMENT_ID = "spark_dimension_dic_segment_id"; public static final String STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL = "Extract Dictionary from Global Dictionary"; public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; + public static final String STEP_NAME_BUILD_SPARK_DICTIONARY = "Build Dimension Dictionary with Spark"; public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table"; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index 960cc192f67..635e35e8ee4 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -279,6 +279,11 @@ public String getColumWithTableAndSchema() { return (getTableWithSchema() + "." + column.getName()).toUpperCase(Locale.ROOT); } + + public String getColumnWithTable() { + return (getTable() + "." + column.getName()).toUpperCase(Locale.ROOT); + } + // used by projection rewrite, see OLAPProjectRel public enum InnerDataTypeEnum { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 74d35350cd8..273dea71c7d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -251,6 +251,10 @@ public boolean isEnableUHCDictStep() { return true; } + public boolean isEnabledSparkDimensionDictionary() { + return config.getConfig().isSparkDimensionDictionaryEnabled(); + } + public LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob result) { LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result); CubeDesc cubeDesc = seg.getCubeDesc(); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index fe34461f0c3..208c7e835e4 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -76,7 +76,13 @@ public CubingJob build() { result.addTask(createBuildUHCDictStep(jobId)); } - result.addTask(createBuildDictionaryStep(jobId)); + + if (isEnabledSparkDimensionDictionary()) { + result.addTask(createBuildDictionarySparkStep(jobId)); + } else { + result.addTask(createBuildDictionaryStep(jobId)); + } + result.addTask(createSaveStatisticsStep(jobId)); // add materialize lookup tables if needed @@ -127,6 +133,28 @@ public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) { return sparkExecutable; } + public SparkExecutable createBuildDictionarySparkStep(String jobId) { + final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig()); + + sparkExecutable.setClassName(SparkBuildDictionary.class.getName()); + sparkExecutable.setParam(SparkBuildDictionary.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId)); + sparkExecutable.setParam(SparkBuildDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); + sparkExecutable.setParam(SparkBuildDictionary.OPTION_INPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId)); + sparkExecutable.setParam(SparkBuildDictionary.OPTION_DICT_PATH.getOpt(), getDictRootPath(jobId)); + sparkExecutable.setParam(SparkBuildDictionary.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkBuildDictionary.OPTION_CUBING_JOB_ID.getOpt(), jobId); + + sparkExecutable.setJobId(jobId); + sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_DICTIONARY); + sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_SIZE_BYTES, getCounterOutputPath(jobId)); + + StringBuilder jars = new StringBuilder(); + StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars()); + sparkExecutable.setJars(jars.toString()); + return sparkExecutable; + } + + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig()); sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java new file mode 100644 index 00000000000..dbd9f798b79 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayPrimitiveWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.ByteBufferBackedInputStream; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.DimensionDesc; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryInfoSerializer; +import org.apache.kylin.dict.lookup.ILookupTable; +import org.apache.kylin.dict.lookup.SnapshotTable; +import org.apache.kylin.dict.lookup.SnapshotTableSerializer; +import org.apache.kylin.engine.mr.SortedColumnDFSFile; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; +import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.IReadableTable; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + + +public class SparkBuildDictionary extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkBuildDictionary.class); + + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() + .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg() + .isRequired(true).withDescription("Cube dictionary output path").create(BatchConstants.ARG_DICT_PATH); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true) + .withDescription("Cube Segment Id").create("segmentId"); + public static final Option OPTION_CUBING_JOB_ID = OptionBuilder + .withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(true) + .withDescription("Cubing job id").create(BatchConstants.ARG_CUBING_JOB_ID); + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() + .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); + public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) + .withDescription("HDFS metadata url").create("metaUrl"); + public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT).hasArg() + .isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT); + + private Options options; + + public SparkBuildDictionary() { + options = new Options(); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_DICT_PATH); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_META_URL); + options.addOption(OPTION_COUNTER_PATH); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + String dictPath = optionsHelper.getOptionValue(OPTION_DICT_PATH); + String factColumnsInputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + final String jobId = optionsHelper.getOptionValue(OPTION_CUBING_JOB_ID); + final String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH); + + Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), + Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") , + Class.forName("scala.collection.mutable.WrappedArray$ofRef")}; + + SparkConf sparkConf = new SparkConf().setAppName("Build Dimension Dictionary for: " + cubeName + " segment " + segmentId); + + //serialization conf + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); + sparkConf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray); + + KylinSparkJobListener jobListener = new KylinSparkJobListener(); + try(JavaSparkContext sc = new JavaSparkContext(sparkConf)) { + sc.sc().addSparkListener(jobListener); + HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictPath)); + + // calculate source record bytes size + final LongAccumulator bytesWritten = sc.sc().longAccumulator(); + final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); + KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + CubeSegment cubeSegment = cube.getSegmentById(segmentId); + + Set tblColRefs = cubeSegment.getCubeDesc().getAllColumnsNeedDictionaryBuilt(); + JavaRDD tblColRefRDD = sc.parallelize(Lists.newArrayList(tblColRefs)); + logger.info("Dimensions all is :" + cubeSegment.getCubeDesc().getDimensions().toString()); + + List uhcColumns = cube.getDescriptor().getAllUHCColumns(); + logger.info("Spark build dict uhc columns is " + uhcColumns.size()); + + JavaPairRDD tblColRefJavaPairRDD = tblColRefRDD.mapPartitionsToPair( + new TableColumnRefFunction(cubeName, segmentId, config, factColumnsInputPath, dictPath, uhcColumns)); + + long unfinishedNumber = tblColRefJavaPairRDD.filter(tuple -> !tuple._1).count(); + if (unfinishedNumber > 0) { + logger.warn("Not all dict build! {} columns dictionary should be build, but {} not built", tblColRefs.size(), unfinishedNumber); + } + + cubeSegment = CubeManager.getInstance(config).reloadCube(cubeName).getSegmentById(segmentId); + JavaRDD dimensionDescRDD = sc.parallelize(cubeSegment.getCubeDesc().getDimensions()); + + JavaPairRDD> snapShots = dimensionDescRDD.filter(new DimensionDescFilterFunction(cubeName, segmentId, config)) + .mapToPair(new PairFunction() { + @Override + public Tuple2 call(DimensionDesc dimensionDesc) throws Exception { + TableRef table = dimensionDesc.getTableRef(); + return new Tuple2(table.getTableIdentity(), table); + } + }).groupByKey(); + + snapShots.mapToPair(new DimensionDescPairFunction(cubeName, segmentId, jobId, config)) + .filter(tuple -> tuple._2).count(); + Boolean dictsAndSnapshotsBuildState = isAllDictsAndSnapshotsReady(config, cubeName, segmentId); + if(!dictsAndSnapshotsBuildState) { + logger.error("Not all dictionaries and snapshots ready for cube segment: {}", segmentId); + } else { + logger.info("Succeed to build all dictionaries and snapshots for cube segment: {}", segmentId); + } + + long recordCount = tblColRefRDD.count(); + logger.info("Map input records={}", recordCount); + logger.info("HDFS Read: {} HDFS Write", bytesWritten.value()); + logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten()); + + Map counterMap = Maps.newHashMap(); + counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten())); + counterMap.put(ExecutableConstants.SPARK_DIMENSION_DIC_SEGMENT_ID, segmentId); + + // save counter to hdfs + HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap); + } + } + + + static class TableColumnRefFunction implements PairFlatMapFunction, Boolean, TblColRef> { + private transient volatile boolean initialized = false; + private String cubeName; + private String segmentId; + private CubeSegment cubeSegment; + private CubeManager cubeManager; + private KylinConfig config; + private String factColumnsInputPath; + private String dictPath; + List uhcColumns; + + public TableColumnRefFunction(String cubeName, String segmentId, KylinConfig config, String factColumnsInputPath, String dictPath, List uhcColumns) { + this.cubeName = cubeName; + this.segmentId = segmentId; + this.config = config; + this.factColumnsInputPath = factColumnsInputPath; + this.dictPath = dictPath; + this.uhcColumns = uhcColumns; + logger.info("Cube name is {}, segment id is {}", cubeName, segmentId); + logger.info("Fact columns input path is " + factColumnsInputPath); + logger.info("Fact columns input path is " + dictPath); + } + + private void init() { + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(config)) { + cubeManager = CubeManager.getInstance(config); + cubeSegment = cubeManager.getCube(cubeName).getSegmentById(segmentId); + } + initialized = true; + } + + @Override + public Iterator> call(Iterator cols) throws Exception { + if (initialized == false) { + synchronized (SparkBuildDictionary.class) { + if (initialized == false) { + init(); + } + } + } + + List> result = Lists.newArrayList(); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(config)) { + while (cols.hasNext()) { + TblColRef col = cols.next(); + logger.info("Building dictionary for column {}", col); + IReadableTable inpTable = getDistinctValuesFor(col); + + Dictionary preBuiltDict = getDictionary(col); + + if (preBuiltDict != null) { + logger.info("Dict for '{}' has already been built, save it", col.getName()); + cubeManager.saveDictionary(cubeSegment, col, inpTable, preBuiltDict); + } else { + logger.info("Dict for '{}' not pre-built, build it from {}", col.getName(), inpTable); + cubeManager.buildDictionary(cubeSegment, col, inpTable); + } + + result.add(new Tuple2<>(true, col)); + } + } + return result.iterator(); + } + + public IReadableTable getDistinctValuesFor(TblColRef col) { + return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getIdentity(), col.getType()); + } + + public Dictionary getDictionary(TblColRef col) throws IOException { + Path colDir; + if (config.isBuildUHCDictWithMREnabled() && uhcColumns.contains(col)) { + colDir = new Path(dictPath, col.getIdentity()); + } else { + colDir = new Path(factColumnsInputPath, col.getIdentity()); + } + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + + Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, + col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); + if (dictFile == null) { + logger.info("Dict for '{}' not pre-built.", col.getName()); + return null; + } + + try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), + SequenceFile.Reader.file(dictFile))) { + NullWritable key = NullWritable.get(); + ArrayPrimitiveWritable value = new ArrayPrimitiveWritable(); + reader.next(key, value); + + ByteBuffer buffer = new ByteArray((byte[]) value.get()).asBuffer(); + try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(buffer))) { + String dictClassName = is.readUTF(); + Dictionary dict = (Dictionary) ClassUtil.newInstance(dictClassName); + dict.readFields(is); + logger.info("DictionaryProvider read dict from file: {}", dictFile); + return dict; + } + } + } + } + + static class DimensionDescPairFunction implements PairFunction>, String, Boolean> { + + private String cubeName; + private String segmentId; + private String jobId; + private KylinConfig config; + private CubeManager cubeManager; + private CubeSegment cubeSegment; + private transient volatile boolean initialized = false; + public DimensionDescPairFunction(String cubeName, String segmentId, String jobId, KylinConfig config) { + this.cubeName = cubeName; + this.segmentId = segmentId; + this.jobId = jobId; + this.config = config; + } + + private void init() { + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(config)) { + cubeManager = CubeManager.getInstance(config); + cubeSegment = cubeManager.getCube(cubeName).getSegmentById(segmentId); + } + initialized = true; + } + + @Override + public Tuple2 call(Tuple2> snapShot) throws Exception { + if (initialized == false) { + synchronized (SparkBuildDictionary.class) { + if (initialized == false) { + init(); + } + } + } + String tableIdentity = snapShot._1(); + boolean status = true; + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(config)) { + logger.info("Building snapshot of {}", tableIdentity); + + if (!cubeSegment.getModel().isLookupTable(tableIdentity) || cubeSegment.getCubeDesc().isExtSnapshotTable(tableIdentity)) { + return new Tuple2<>(tableIdentity, false); + } + + cubeManager.buildSnapshotTable(cubeSegment, tableIdentity, jobId); + } + + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(config)) { + Iterator tableRefIterator = snapShot._2().iterator(); + + CubeInstance updatedCube = cubeManager.getCube(cubeSegment.getCubeInstance().getName()); + CubeSegment cubeSeg = updatedCube.getSegmentById(cubeSegment.getUuid()); + + while (tableRefIterator.hasNext()) { + TableRef tableRef = tableRefIterator.next(); + logger.info("Checking snapshot of {}", tableRef); + try { + JoinDesc join = cubeSeg.getModel().getJoinsTree().getJoinByPKSide(tableRef); + ILookupTable table = cubeManager.getLookupTable(cubeSeg, join); + if (table != null) { + IOUtils.closeStream(table); + } + } catch (Exception e) { + status = false; + logger.error(String.format(Locale.ROOT, "Checking snapshot of %s failed, exception is %s.", tableRef, e.getMessage())); + } + } + } + return new Tuple2<>(tableIdentity, status); + } + } + + static class DimensionDescFilterFunction implements Function { + private String cubeName; + private String segmentId; + private KylinConfig config; + private CubeSegment cubeSegment; + private transient volatile boolean initialized = false; + + public DimensionDescFilterFunction(String cubeName, String segmentId, KylinConfig config) { + this.cubeName = cubeName; + this.segmentId = segmentId; + this.config = config; + } + + private void init() { + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(config)) { + cubeSegment = CubeManager.getInstance(config).getCube(cubeName).getSegmentById(segmentId); + } + initialized = true; + } + + @Override + public Boolean call(DimensionDesc dimensionDesc) throws Exception { + if (initialized == false) { + synchronized (SparkBuildDictionary.class) { + if (initialized == false) { + init(); + } + } + } + + return !cubeSegment.getCubeDesc().isExtSnapshotTable(dimensionDesc.getTableRef().getTableIdentity()); + } + } + + private boolean isAllDictsAndSnapshotsReady(KylinConfig config, String cubeName, String segmentID) { + final CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.reloadCube(cubeName); + CubeSegment segment = cube.getSegmentById(segmentID); + ResourceStore store = ResourceStore.getStore(config); + + // check dicts + logger.info("Begin to check if all dictionaries exist of Segment: {}", segmentID); + Map dictionaries = segment.getDictionaries(); + logger.info("Get dictionaries number: {}", dictionaries.size()); + for (Map.Entry entry : dictionaries.entrySet()) { + String dictResPath = entry.getValue(); + String dictKey = entry.getKey(); + try { + DictionaryInfo dictInfo = store.getResource(dictResPath, DictionaryInfoSerializer.INFO_SERIALIZER); + if (dictInfo == null) { + logger.warn("Dictionary=[key: {}, resource path: {}] doesn't exist in resource store", dictKey, + dictResPath); + return false; + } + } catch (IOException e) { + logger.warn("Dictionary=[key: {}, path: {}] failed to check, details: {}", dictKey, dictResPath, e); + return false; + } + } + + // check snapshots + logger.info("Begin to check if all snapshots exist of Segment: {}", segmentID); + Map snapshots = segment.getSnapshots(); + logger.info("Get snapshot number: {}", snapshots.size()); + for (Map.Entry entry : snapshots.entrySet()) { + String snapshotKey = entry.getKey(); + String snapshotResPath = entry.getValue(); + try { + SnapshotTable snapshot = store.getResource(snapshotResPath, SnapshotTableSerializer.INFO_SERIALIZER); + if (snapshot == null) { + logger.info("SnapshotTable=[key: {}, resource path: {}] doesn't exist in resource store", + snapshotKey, snapshotResPath); + return false; + } + } catch (IOException e) { + logger.warn("SnapshotTable=[key: {}, resource path: {}] failed to check, details: {}", snapshotKey, + snapshotResPath, e); + return false; + } + } + + logger.info("All dictionaries and snapshots exist checking succeed for Cube Segment: {}", segmentID); + + return true; + } +} + diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 00dab71b4cd..3a93a16811e 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -42,7 +43,9 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil; import org.apache.kylin.engine.spark.exception.SparkException; @@ -367,6 +370,12 @@ public Pair call() throws Exception { } readCounters(joblogInfo); getManager().addJobInfo(getId(), joblogInfo); + if (joblogInfo.containsKey(ExecutableConstants.SPARK_DIMENSION_DIC_SEGMENT_ID)) { + updateSparkDimensionDicMetadata(config, cube, joblogInfo.get(ExecutableConstants.SPARK_DIMENSION_DIC_SEGMENT_ID)); + logger.info("Finished update dictionaries and snapshot info from {} to {}.", + this.getParam(SparkBuildDictionary.OPTION_META_URL.getOpt()), + config.getMetadataUrl()); + } return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog()); } // clear SPARK_JOB_ID on job failure. @@ -403,6 +412,27 @@ protected void dumpMetadata(CubeSegment segment, List mergingSeg) t } } + //to update metadata from hdfs due to the step build dimension dic using spark dump metadata to hdfs + private void updateSparkDimensionDicMetadata(KylinConfig config, CubeInstance cube, String segmentId) throws IOException{ + KylinConfig hdfsConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(this.getParam(SparkBuildDictionary.OPTION_META_URL.getOpt())); + CubeInstance cubeInstance = CubeManager.getInstance(hdfsConfig).reloadCube(cube.getName()); + CubeSegment segment = cubeInstance.getSegmentById(segmentId); + + CubeSegment oldSeg = cube.getSegmentById(segmentId); + oldSeg.setDictionaries((ConcurrentHashMap) segment.getDictionaries()); + oldSeg.setSnapshots((ConcurrentHashMap)segment.getSnapshots()); + CubeInstance cubeCopy = cube.latestCopyForWrite(); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(oldSeg); + CubeManager.getInstance(config).updateCube(update); + + Set dumpList = new LinkedHashSet<>(); + dumpList.addAll(segment.getDictionaryPaths()); + dumpList.addAll(segment.getSnapshotPaths()); + + JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(), config.getMetadataUrl().toString()); + } + // Spark Cubing can only work in layer algorithm protected void setAlgorithmLayer() { ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());