Skip to content

[SUPPORT] illegal hive partition name assiged when using streamAPI reference from HoodieFlinkStreamer #5465

@baisui1981

Description

@baisui1981

Describe the problem you faced

i have make a hudi hive table sink using Flink StreamApi reference from demo HoodieFlinkStreamer.java

but found the final hive table partition name assiged value is illegal, seems that have ignore the config value hiveSyncPartitionFields which is setted on the instance of FlinkStreamerConfig

To Reproduce

Steps to reproduce the behavior:

by building instance of FlinkStreamerConfig as below:

    cfg.sourceAvroSchemaPath = "hdfs://namenode/user/admin/default/20220429173545/base/meta/schema.avsc"    
    cfg.targetBasePath = "hdfs://namenode/user/admin/default/20220429173545/base/hudi"    
    cfg.targetTableName = "base"    
    cfg.tableType = "COPY_ON_WRITE"    
    cfg.preCombine = true    
    cfg.sourceOrderingField = "update_date"    
    cfg.recordKeyField = "base_id"    
    cfg.keygenType = "TIMESTAMP"    
    cfg.partitionPathField = "start_time"    
    cfg.hiveSyncPartitionFields = "pt"    
    cfg.hiveSyncPartitionExtractorClass = "org.apache.hudi.hive.SlashEncodedHourPartitionValueExtractor"    
    cfg.setString("hoodie.deltastreamer.keygen.timebased.timestamp.type" , "EPOCHMILLISECONDS")    
    cfg.setString("hoodie.deltastreamer.keygen.timebased.output.dateformat" , "yyyy/MM/dd/HH")    
    cfg.setString("hoodie.deltastreamer.keygen.timebased.timezone" , "Asia/Shanghai")    
    cfg.writeRateLimit = 200l    
    cfg.hiveSyncEnabled = true    
    cfg.hiveSyncDb = "default"    
    cfg.hiveSyncTable = "base"    
    cfg.hiveSyncMode = "hms"    
    cfg.hiveSyncMetastoreUri = "thrift://192.168.28.201:9083"    
    cfg.payloadClassName = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"    
    cfg.compactionTargetIo = 512000    
    cfg.compactionTriggerStrategy = "num_commits"    
    cfg.compactionDeltaCommits = 5    
    cfg.compactionDeltaSeconds = 3600    
    cfg.cleanAsyncEnabled = true    
    cfg.cleanRetainCommits = 10    
    cfg.archiveMinCommits = 20    
    cfg.archiveMaxCommits = 30    

the key config property is cfg.hiveSyncPartitionFields = "pt"

After launch the Flink job , and after the process of checkpoint , found the Hive table struct ( by desc base) as below:

+--------------------------+-----------------------+-----------------------+--+
|         col_name         |       data_type       |        comment        |
+--------------------------+-----------------------+-----------------------+--+
| _hoodie_commit_time      | string                |                       |
| _hoodie_commit_seqno     | string                |                       |
| _hoodie_record_key       | string                |                       |
| _hoodie_partition_path   | string                |                       |
| _hoodie_file_name        | string                |                       |
| base_id                  | int                   |                       |
| start_time               | bigint                |                       |
| update_date              | date                  |                       |
| update_time              | bigint                |                       |
| price                    | decimal(5,2)          |                       |
| json_content             | string                |                       |
| col_blob                 | binary                |                       |
| col_text                 | string                |                       |
| start_time              | string                |                       |
|                          | NULL                  | NULL                  |
| # Partition Information  | NULL                  | NULL                  |
| # col_name               | data_type             | comment               |
|                          | NULL                  | NULL                  |
| start_time                 | string                |                       |
+--------------------------+-----------------------+-----------------------+--+

table base create table ddl:

CREATE TABLE `base` (
  `base_id` int(11) NOT NULL ,
  `start_time` datetime DEFAULT NULL,
  `update_date` date DEFAULT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `price` decimal(5,2) DEFAULT NULL,
  `json_content` json DEFAULT NULL,
  `col_blob` blob,
  `col_text` text,
  PRIMARY KEY (`base_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

Expected behavior
as my expection the partition name of hive table shall be 'pt' ,show as below:

+--------------------------+-----------------------+-----------------------+--+
|         col_name         |       data_type       |        comment        |
+--------------------------+-----------------------+-----------------------+--+
| _hoodie_commit_time      | string                |                       |
| _hoodie_commit_seqno     | string                |                       |
| _hoodie_record_key       | string                |                       |
| _hoodie_partition_path   | string                |                       |
| _hoodie_file_name        | string                |                       |
| base_id                  | int                   |                       |
| start_time               | bigint                |                       |
| update_date              | date                  |                       |
| update_time              | bigint                |                       |
| price                    | decimal(5,2)          |                       |
| json_content             | string                |                       |
| col_blob                 | binary                |                       |
| col_text                 | string                |                       |
| pt                       | string                |                       |
|                          | NULL                  | NULL                  |
| # Partition Information  | NULL                  | NULL                  |
| # col_name               | data_type             | comment               |
|                          | NULL                  | NULL                  |
| pt                       | string                |                       |
+--------------------------+-----------------------+-----------------------+--+

And I have check the hudi source code , found the key point is

hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf));

it is seems that hiveSyncConfig.partitionFields value assigned from FilePathUtils.extractPartitionKeys(conf) is an error?

then fix the assigment to :

hiveSyncConfig.partitionFields = 
Arrays.asList(org.apache.hadoop.util.StringUtils.split(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS)));

and repack the relevant jar package, relaunch the flink job , seems take effect

Environment Description

  • Hudi version : 0.10.1

  • Spark version :

  • Hive version : 2.3.1

  • Hadoop version : 2.7.3

  • Storage (HDFS/S3/GCS..) :HDFS

  • Running on Docker? (yes/no) :yes

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions