Skip to content

Commit

Permalink
[HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored and Zouxxyy committed Oct 19, 2022
1 parent 215e62b commit 5c8db8b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 32 deletions.
2 changes: 2 additions & 0 deletions docker/demo/sparksql-bootstrap-prep-source.commands
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@
import org.apache.spark.sql.functions.col

val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
// in the value. Currently it fails the tests due to slash encoding.
df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
System.exit(0)
Original file line number Diff line number Diff line change
Expand Up @@ -231,33 +231,33 @@ private void ingestFirstBatchAndHiveSync() throws Exception {
executeSparkSQLCommand(SPARKSQL_BS_PREP_COMMANDS, true);
List<String> bootstrapCmds = CollectionUtils.createImmutableList(
"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type COPY_ON_WRITE "
+ " --run-bootstrap "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --initial-checkpoint-provider"
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
+ " --table-type COPY_ON_WRITE "
+ " --run-bootstrap "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --initial-checkpoint-provider"
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
+ " --table-type MERGE_ON_READ "
+ " --run-bootstrap "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --initial-checkpoint-provider"
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+ String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
+ " --table-type MERGE_ON_READ "
+ " --run-bootstrap "
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
+ " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
+ " --props /var/demo/config/dfs-source.properties"
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
+ " --initial-checkpoint-provider"
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
+ String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
executeCommandStringsInDocker(ADHOC_1_CONTAINER, bootstrapCmds);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
Expand All @@ -33,6 +35,9 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;

Expand All @@ -48,8 +53,16 @@
import java.util.HashMap;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT;
import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_TIMEZONE;
import static org.apache.hudi.config.HoodieBootstrapConfig.KEYGEN_CLASS_NAME;
import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;

Expand Down Expand Up @@ -187,16 +200,44 @@ private void initializeTable() throws IOException {
+ ". Cannot bootstrap data on top of an existing table");
}
}
HoodieTableMetaClient.withPropertyBuilder()

HoodieTableMetaClient.PropertyBuilder builder = HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(props)
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
.setPreCombineField(props.getString(
PRECOMBINE_FIELD_NAME.key(), PRECOMBINE_FIELD_NAME.defaultValue()))
.setPopulateMetaFields(props.getBoolean(
POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
.setArchiveLogFolder(props.getString(
ARCHIVELOG_FOLDER.key(), ARCHIVELOG_FOLDER.defaultValue()))
.setPayloadClassName(cfg.payloadClassName)
.setBaseFileFormat(cfg.baseFileFormat)
.setBootstrapIndexClass(cfg.bootstrapIndexClass)
.setBootstrapBasePath(bootstrapBasePath)
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
.setHiveStylePartitioningEnable(props.getBoolean(
HIVE_STYLE_PARTITIONING_ENABLE.key(),
Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())
))
.setUrlEncodePartitioning(props.getBoolean(
URL_ENCODE_PARTITIONING.key(),
Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(
props.getString(
TIMELINE_TIMEZONE.key(),
String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
.setPartitionMetafileUseBaseFormat(props.getBoolean(
PARTITION_METAFILE_USE_BASE_FORMAT.key(),
PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
if (!StringUtils.isNullOrEmpty(partitionColumns)) {
builder.setPartitionFields(partitionColumns).setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()));
} else {
builder.setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()));
}

builder.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
}

public HoodieWriteConfig getBootstrapConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -630,11 +630,14 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
Dataset<Row> sourceDf = sqlContext.read()
.format("org.apache.hudi")
.load(tableBasePath);
sourceDf.write().format("parquet").save(bootstrapSourcePath);
// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
// in the value. Currently it fails the tests due to slash encoding.
sourceDf.write().format("parquet").partitionBy("rider").save(bootstrapSourcePath);

String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
cfg.runBootstrap = true;
cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
cfg.configs.add("hoodie.bootstrap.parallelism=5");
cfg.targetBasePath = newDatasetBasePath;
Expand Down

0 comments on commit 5c8db8b

Please sign in to comment.