-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Closed
Labels
engine:flinkFlink integrationFlink integration
Description
Hi,
I have following code snippet, that I want to
- read the kafka with flink kafka connector
- write the data from kafka into Hudi table
- query the hudi table and print on the console
I am sure that the kafka source can read data from kafka(I have tested this in the code ), But, no data is written into the hudi table directory/file, and also there is only an Empty set is print on the console.
Could someone help me out?Thanks very much
package org.example.streaming
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row
object Test002_ReadKafkaAndWriteHudi {
val hudi_table_name = "Test002_ReadKafkaAndWriteHudi"
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env, settings)
val topic = "topic1"
val groupId = "Test001_ReadKafka"
val startupMode = "latest-offset"
val source_ddl =
s"""
CREATE TABLE kafka_table (
a STRING,
b STRING
) WITH (
'connector' = 'kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = '$groupId',
'scan.startup.mode' = '$startupMode',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
)
""".stripMargin(' ')
tenv.executeSql(source_ddl)
val target_dll =
s"""
create table ${hudi_table_name} (
a varchar(20),
b varchar(20),
primary key(a) not enforced
)
with (
'connector' = 'hudi'
,'path' = 'file:///D:/data/hudi_demo/${hudi_table_name}'
,'table.type' = 'COPY_ON_WRITE'
,'table.name' = 'test_table'
,'write.insert.drop.duplicates' = 'true'
,'write.recordkey.field' = 'a'
,'write.precombine.field' = 'b'
,'write.shuffle.parallelism' = '1'
,'write.commit.max.retries' = '5'
,'write.cleaner.policy' = 'KEEP_LATEST_COMMITS'
)
""".stripMargin(' ')
tenv.executeSql(target_dll)
val sql =
s"""
insert into $hudi_table_name select a, b from kafka_table
""".stripMargin(' ')
tenv.executeSql(sql)
val query = s"select * from $hudi_table_name"
val result = tenv.sqlQuery(query)
result.execute().print()
}
}
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
engine:flinkFlink integrationFlink integration
Type
Projects
Status
✅ Done