Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,11 @@ public int getSparkOutputMaxSize() {
return Integer.valueOf(getOptional("kylin.engine.spark.output.max-size", "10485760"));
}

public boolean isSparkDimensionDictionaryEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
}


// ============================================================================
// ENGINE.LIVY
// ============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ private ExecutableConstants() {
public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
public static final String SOURCE_RECORDS_COUNT = "source_records_count";
public static final String SOURCE_RECORDS_SIZE = "source_records_size";
public static final String SPARK_DIMENSION_DIC_SEGMENT_ID = "spark_dimension_dic_segment_id";

public static final String STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL = "Extract Dictionary from Global Dictionary";
public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
public static final String STEP_NAME_BUILD_SPARK_DICTIONARY = "Build Dimension Dictionary with Spark";
public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ public String getColumWithTableAndSchema() {
return (getTableWithSchema() + "." + column.getName()).toUpperCase(Locale.ROOT);
}


public String getColumnWithTable() {
return (getTable() + "." + column.getName()).toUpperCase(Locale.ROOT);
}

// used by projection rewrite, see OLAPProjectRel
public enum InnerDataTypeEnum {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public boolean isEnableUHCDictStep() {
return true;
}

public boolean isEnabledSparkDimensionDictionary() {
return config.getConfig().isSparkDimensionDictionaryEnabled();
}

public LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob result) {
LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result);
CubeDesc cubeDesc = seg.getCubeDesc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ public CubingJob build() {
result.addTask(createBuildUHCDictStep(jobId));
}

result.addTask(createBuildDictionaryStep(jobId));

if (isEnabledSparkDimensionDictionary()) {
result.addTask(createBuildDictionarySparkStep(jobId));
} else {
result.addTask(createBuildDictionaryStep(jobId));
}

result.addTask(createSaveStatisticsStep(jobId));

// add materialize lookup tables if needed
Expand Down Expand Up @@ -127,6 +133,28 @@ public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
return sparkExecutable;
}

public SparkExecutable createBuildDictionarySparkStep(String jobId) {
final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());

sparkExecutable.setClassName(SparkBuildDictionary.class.getName());
sparkExecutable.setParam(SparkBuildDictionary.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
sparkExecutable.setParam(SparkBuildDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkBuildDictionary.OPTION_INPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId));
sparkExecutable.setParam(SparkBuildDictionary.OPTION_DICT_PATH.getOpt(), getDictRootPath(jobId));
sparkExecutable.setParam(SparkBuildDictionary.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
sparkExecutable.setParam(SparkBuildDictionary.OPTION_CUBING_JOB_ID.getOpt(), jobId);

sparkExecutable.setJobId(jobId);
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_DICTIONARY);
sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_SIZE_BYTES, getCounterOutputPath(jobId));

StringBuilder jars = new StringBuilder();
StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
sparkExecutable.setJars(jars.toString());
return sparkExecutable;
}


protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
Expand Down
Loading