Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Custom HoodieRecordPayload for use in flink sql #7100

Closed
yabha-isomap opened this issue Oct 31, 2022 · 6 comments
Closed

[SUPPORT] Custom HoodieRecordPayload for use in flink sql #7100

yabha-isomap opened this issue Oct 31, 2022 · 6 comments
Labels
flink Issues related to flink priority:major degraded perf; unable to move forward; potential bugs

Comments

@yabha-isomap
Copy link

  1. I am trying to use Apache Hudi with Flink sql by following Hudi's flink guide
  2. The basics are working, but now I need to provide custom implementation of HoodieRecordPayload as suggested on this FAQ.
  3. But when I am passing this config as shown in following listing, it doesn't work. Basically my custom class (MyHudiPoc.Poc) doesn't get picked and instead normal behaviour continues.
CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi',
  'hoodie.compaction.payload.class' = 'MyHudiPoc.Poc', -- My custom class
  'hoodie.datasource.write.payload.class' = 'MyHudiPoc.Poc',  -- My custom class
  'write.payload.class' = 'MyHudiPoc.Poc',  -- My custom class
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');


insert into t1 values
  ('id1','Danny1',27,TIMESTAMP '1970-01-01 00:00:01','par1');
  1. I even tried passing it through /etc/hudi/conf/hudi-default.conf
---
"hoodie.compaction.payload.class": MyHudiPoc.Poc
"hoodie.datasource.write.payload.class": MyHudiPoc.Poc
"write.payload.class": MyHudiPoc.Poc

I am also passing my custom jar while starting flink sql client.

/bin/sql-client.sh embedded \
    -j ../jars/hudi-flink1.15-bundle-0.12.1.jar \
    -j ./plugins/flink-s3-fs-hadoop-1.15.1.jar \
    -j ./plugins/parquet-hive-bundle-1.8.1.jar \
    -j ./plugins/flink-sql-connector-kafka-1.15.1.jar \
    -j my-hudi-poc-1.0-SNAPSHOT.jar \
    shell
  1. I am able to pass my custom class in spark example but not in flink.
  2. Tried with both COW and MOR type of tables.

Any idea what I am doing wrong?

See listing in the question.

@kaori-seasons
Copy link
Contributor

kaori-seasons commented Nov 1, 2022

Maybe you should use payload.class?
FallbackKey is the API of flink1.15, which may not be suitable
image

@yabha-isomap
Copy link
Author

Thanks @complone . Tried with that also, but no luck.

CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi',
  'hoodie.compaction.payload.class' = 'gsHudiPoc.Poc', -- My custom class
  'write.payload.class' = 'gsHudiPoc.Poc',  -- My custom class
  'payload.class' = 'gsHudiPoc.Poc',  -- My custom class
  'hoodie.datasource.write.payload.class' = 'gsHudiPoc.Poc',  -- My custom class
  'table.type' = 'COPY_ON_WRITE'
);

Let me try looking into the code of FlinkOptions.java

@nsivabalan nsivabalan added the flink Issues related to flink label Nov 2, 2022
@nsivabalan
Copy link
Contributor

@yuzhaojing : can you assist here please.

@nsivabalan nsivabalan added the priority:major degraded perf; unable to move forward; potential bugs label Nov 2, 2022
@danny0405
Copy link
Contributor

Did you try to use payload.class instead of write.payload.class then, we have changed the option key recent days.
write.payload.class is changed as fallback option key but it only works in Flink 1.15.x.

Feel free to re-open it if you still have problem here.

@yabha-isomap
Copy link
Author

Thanks. I was able to get it to work with DataStream API.
One tip for anyone facing this issue, put debugging message in the constructor of the class (and not in any method) to verify if your class is getting picked or not.
In my case, class was getting picked but method was not getting called because of some code issue.

@lucienoz
Copy link

lucienoz commented Aug 2, 2023

spark sql cow table how to set payload.class ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink priority:major degraded perf; unable to move forward; potential bugs
Projects
None yet
Development

No branches or pull requests

5 participants