Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1023] Add validation error messages in delta sync #1710

Merged
merged 3 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,55 +54,55 @@
public class HoodieWriteConfig extends DefaultHoodieConfig {

public static final String TABLE_NAME = "hoodie.table.name";
private static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
private static final String BASE_PATH_PROP = "hoodie.base.path";
private static final String AVRO_SCHEMA = "hoodie.avro.schema";
private static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate";
private static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
private static final String DEFAULT_PARALLELISM = "1500";
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
private static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete";
private static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
public static final String BASE_PATH_PROP = "hoodie.base.path";
public static final String AVRO_SCHEMA = "hoodie.avro.schema";
public static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate";
public static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
public static final String DEFAULT_PARALLELISM = "1500";
public static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
public static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
public static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
public static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
public static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
public static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
public static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
public static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
public static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete";
public static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
public static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level";
private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;

private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";

private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
public static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
public static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
public static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
public static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;

public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";

public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
// time between successive attempts to ensure written data's metadata is consistent on storage
private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
public static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
"hoodie.consistency.check.initial_interval_ms";
private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
public static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;

// max interval time
private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
public static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
public static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;

// maximum number of checks, for consistency of written data. Will wait upto 256 Secs
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;

/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
Expand All @@ -114,9 +114,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
* (disabled by default) which will allow this old behavior.
*/
private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT =
public static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT =
"_.hoodie.allow.multi.write.on.same.instant";
private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
public static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";

private ConsistencyGuardConfig consistencyGuardConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
Expand Down Expand Up @@ -74,6 +72,10 @@

import scala.collection.JavaConversions;

import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP;
import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;

Expand Down Expand Up @@ -497,25 +499,30 @@ private void setupWriteClient() {
* @param schemaProvider Schema Provider
*/
private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) {
final boolean combineBeforeUpsert = true;
final boolean autoCommit = false;
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, combineBeforeUpsert)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode. otherwise enabled for MOR
.withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
.withAutoCommit(false).withProps(props);
.withAutoCommit(autoCommit).withProps(props);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this will override this.. the withProps(). no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar you're right. it was my overlook. But i guess this line is redundant anyway as index is default to BLOOM, setting the index config may cause some confusion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense


if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
builder = builder.withSchema(schemaProvider.getTargetSchema().toString());
}
HoodieWriteConfig config = builder.build();

// Validate what deltastreamer assumes of write-config to be really safe
ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled());
ValidationUtils.checkArgument(!config.shouldAutoCommit());
ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes);
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert());
ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled(),
String.format("%s should be set to %s", INLINE_COMPACT_PROP, cfg.isInlineCompactionEnabled()));
ValidationUtils.checkArgument(!config.shouldAutoCommit(),
String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP, autoCommit));
ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes,
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP, cfg.filterDupes));
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP, combineBeforeUpsert));

return config;
}
Expand Down