Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4396] Add a boolean parameter to decide whether the partition is cascade or not when hive table columns changes #6139

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
public static final ConfigProperty<String> HIVE_SYNC_BUCKET_SYNC_SPEC = HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
public static final ConfigProperty<String> HIVE_SYNC_COMMENT = HiveSyncConfigHolder.HIVE_SYNC_COMMENT;

public static final ConfigProperty<Boolean> HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE = HiveSyncConfigHolder.HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE;
public static String getBucketSpec(String bucketCols, int bucketNum) {
return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
}
Expand Down Expand Up @@ -129,6 +129,8 @@ public static class HiveSyncConfigParams {
public Boolean createManagedTable;
@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
public Integer batchSyncNum;
@Parameter(names = {"--partition-cascade"}, description = "Partition cascade when table columns change.")
public Boolean partitionCascade;
Comment on lines +132 to +133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general we should avoid adding parameters/configs to tweak the logic inside the sync. There are so many configs already exposing the impl. details and making meta sync hard to use.

For this logic of determining cascade, can you figure out a way to detect it from schema/column changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, good idea about the configs settings.

And the logic cascade could not be detected from schema/column changes as far as I think, but the table parameters might help set this config in my opinion, then the user needs to create/alter the table with TBLPROPERTIES: "hoodie.datasource.hive_sync.partition_cascade=false" through HiveSQL, and this is not directly to use.

If we compare these two plans, I prefer the original plan, then what's your opinion?

@Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
public Boolean syncAsSparkDataSourceTable;
@Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
Expand Down Expand Up @@ -163,6 +165,7 @@ public TypedProperties toProps() {
props.setPropertyIfNonNull(HIVE_TABLE_SERDE_PROPERTIES.key(), serdeProperties);
props.setPropertyIfNonNull(HIVE_SYNC_AS_DATA_SOURCE_TABLE.key(), syncAsSparkDataSourceTable);
props.setPropertyIfNonNull(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), sparkSchemaLengthThreshold);
props.setPropertyIfNonNull(HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE.key(), partitionCascade);
props.setPropertyIfNonNull(HIVE_CREATE_MANAGED_TABLE.key(), createManagedTable);
props.setPropertyIfNonNull(HIVE_BATCH_SYNC_PARTITION_NUM.key(), batchSyncNum);
props.setPropertyIfNonNull(HIVE_SYNC_BUCKET_SYNC.key(), bucketSync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,9 @@ public class HiveSyncConfigHolder {
.key("hoodie.datasource.hive_sync.sync_comment")
.defaultValue("false")
.withDocumentation("Whether to sync the table column comments while syncing the table.");

public static final ConfigProperty<Boolean> HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_cascade")
.defaultValue(true)
.withDocumentation("whether the partition is cascade or not when hive table columns changes");
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
Expand Down Expand Up @@ -141,7 +142,7 @@ public void createTable(String tableName, MessageType storageSchema, String inpu
@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
try {
boolean cascade = syncConfig.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
boolean cascade = (!syncConfig.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) && (syncConfig.getBooleanOrDefault(HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE));
List<FieldSchema> fieldSchema = HiveSchemaUtil.convertParquetSchemaToHiveFieldSchema(newSchema, syncConfig);
Table table = client.getTable(databaseName, tableName);
StorageDescriptor sd = table.getSd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void updateTableDefinition(String tableName, MessageType newSchema) {
try {
String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS), config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE));
// Cascade clause should not be present for non-partitioned tables
String cascadeClause = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0 ? " cascade" : "";
String cascadeClause = (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty() && (config.getBooleanOrDefault(HIVE_SYNC_PARTITION_CASCADE_WITH_COLUMN_CHANGE))) ? " cascade" : "";
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
.append(databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
.append(HIVE_ESCAPE_CHARACTER).append(tableName)
Expand Down