Skip to content

[SUPPORT] Flink failed to restore the task from the checkpoint: Unable to acquire the lock. Current lock owner information #12945

@Toroidals

Description

@Toroidals

Tips before filing an issue

  • Have you gone through our FAQs?y

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

Using Flink to write to a Hudi MOR table and synchronize it to Hive, an exception is triggered when restarting the task from the most recent complete checkpoint after the task stops abnormally: java.io.FileNotFoundException: File does not exist: /apps/hive/warehouse/hudi.db/hudi_wer_werfprd_cmf_clm_claim_bill_lines_cdc/.hoodie/.aux/lock.

To Reproduce

Steps to reproduce the behavior:
1.
public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));
options.put(FlinkOptions.PATH.key(), infoMap.get("hudi_hdfs_path"));
options.put("catalog.path", "hdfs:///apps/hudi/catalog/");
String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
});
log.info("fieldList: {}",  fieldList.toString());
for (ArrayList<String> columnList : fieldList) {
    builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
}
String[] hudiPrimaryKeys = infoMap.get("hudi_primary_key").split(",");
builder.pk(hudiPrimaryKeys);

options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field"));
**options.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), OverwriteWithLatestAvroPayload.class.getName());
options.put(FlinkOptions.RECORD_MERGER_IMPLS.key(), OverwriteWithLatestMerger.class.getName());
options.put(FlinkOptions.RECORD_MERGER_STRATEGY_ID.key(), "ce9acb64-bde0-424c-9b91-f6ebba25356d");**

options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets"));
options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type"));

options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy"));
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits"));
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds"));
options.put(FlinkOptions.COMPACTION_MAX_MEMORY.key(), infoMap.get("hudi_compaction_max_memory"));

options.put(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), "true");
options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "150");
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
options.put(FlinkOptions.HIVE_SYNC_DB.key(), "hudi");
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "mor_test_01");
options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), "thrift://xx01:9083,thrift://xx02:9083,thrift://xx03:9083");
options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), "jdbc:hive2://xx01:21181,xx02:21181,xx03:21181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2");
options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "part_dt");
options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "part_dt");

options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "20000");

options.put(FlinkOptions.WRITE_TASKS.key(), 8);

options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());

builder.options(options);
return builder;

}
2.
3.
4.

Expected behavior

When the task fails, it can be successfully restored from the most recent checkpoint instead of rolling back the task to a failed state.

Environment Description

  • Hudi version : 1.0.0

  • Flink version : 1.15.2

  • Hive version : 3.1.3

  • Hadoop version : 3.3.4

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) : no

Additional context

Add any other context about the problem here.

Stacktrace

https://gist.github.com/Toroidals/b53cd8f0b7ef54d16f9dd0e14d242fb7

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions