-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Describe the problem you faced
Seeing repetitive error when writing to a single MOR table causing delay in batches for 45 minutes to an hour and a half.
During this time there are no new files being compacted and ingest appears to stop until it fixes itself.
Async Compaction is Enabled by default
Configuration as bellow:
df.writeStream .trigger(Trigger.ProcessingTime(s"${triggerTimeInSeconds} seconds")) .format(format) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(HoodieWriteConfig.FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP, value = false) .option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC, "snappy") .option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, expectedFileSizeInBytes) .option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES) .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, (expectedFileSizeInBytes / 100) * 80) .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, nDeltaCommits) .option(HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE, value = BloomFilterTypeCode.DYNAMIC_V0.name()) .option(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES, value = "1000000") .option(HoodieIndexConfig.HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, value = "1000000") .option(HoodieMetricsConfig.METRICS_ON, value = "true") .option(HoodieMetricsConfig.METRICS_REPORTER_TYPE, MetricsReporterType.DATADOG.name()) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "RK") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[CustomKeyGenerator].getName) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "P1:SIMPLE,P2:SIMPLE") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, hudiTablePrecombineKey) .option(DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY, value = false) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, value = true) .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, value = true) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p1,p2") .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, hiveDb) .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) .outputMode(OutputMode.Append()) .queryName(queryName) .start(tableAbsolutePath)
Running on EMR 6.4.0
-
Hudi version : 0.9.0
-
Spark version : 3.1.2
-
Hive version : AWS Glue
-
Hadoop version : 3.2.1
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : No
Stacktrace
Metadata
Metadata
Assignees
Labels
Type
Projects
Status