Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] HoodieStreamer: HoodieKeyException error when using SQL Transformer #11635

Closed
YousifS7 opened this issue Jul 15, 2024 · 8 comments
Closed

Comments

@YousifS7
Copy link

Hello,
We are using org.apache.hudi.utilities.streamer.HoodieStreamer class to extract data out of Kafka and write to Hudi table. The Kafka topic is populated via Debezium using SQL Server table. We are using EMR 7.1.0 to run Spark-Submit.

To Reproduce

Steps to reproduce the behavior:

  1. Sync SQL Server table to Kafka via Debezium (AvroConverter)
  2. Provision EMR 7.1.0
  3. Run below Spark-Submit:

-- Spark-Submit

spark-submit 
--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 
--conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--class org.apache.hudi.utilities.streamer.HoodieStreamer s3://some_bucket/jars/hudi-utilities-slim-bundle.jar 
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider 
--props s3://some_bucket/configs/some_table.properties 
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
--enable-sync 
--source-ordering-field ts_ms 
--target-base-path s3://some_bucket/some_folder/some_table 
--target-table dbo.some_table 
--table-type COPY_ON_WRITE 
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
--op BULK_INSERT 
--transformer-class org.apache.hudi.utilities.transform.SqlFileBasedTransformer

-- Properties File

hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.recordkey.field=hudi_key
hoodie.datasource.write.partitionpath.field=partition_path
hoodie.datasource.write.precombine.field=ts_ms
hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest
schema.registry.url=http://localhost:8081
hoodie.streamer.source.kafka.topic=test.dbo.some_table
bootstrap.servers=localhost:9092
auto.offset.reset=earliest

-- SQL Transfomer File

CACHE TABLE dbz_filtered AS
SELECT ts_ms, op, before, after FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');

CACHE TABLE dbz_events AS
SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM dbz_filtered;

CACHE TABLE dbz_fields AS
SELECT ts_ms, source_fields.* FROM dbz_events;

SELECT s.*, Concat(s.col1, s.col2) AS hudi_key, YEAR(FROM_UNIXTIME(s.col2 / 1000)) AS partition_path FROM dbz_fields s;

Environment Description

  • Hudi version : 0.15

  • Spark version : 3.5.0

  • Hive version : N/A

  • Hadoop version : N/A

  • Storage : S3

  • Running on Docker? : No

Error Message

 org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "hudi_key" cannot be null or empty.

I have confirmed that the transformer SQL file is extracting the data correctly. Could you please verify whether the transformer runs before the Hudi writer and if it is supplying the writer with the final transformed data? If so, could you help determine why the 'hudi_key' field is not being passed correctly? I can confirm that the 'hudi_key' is generated by concatenating two non-null fields.

Thank you

@wombatu-kun
Copy link
Contributor

Try CACHE TABLE dbz_fields AS SELECT ts_ms, source_fields FROM dbz_events;. (source_fields without .*)

@YousifS7
Copy link
Author

Hi @wombatu-kun, Thank you for getting back to me.

I have modified the SQL File to the following:

CACHE TABLE dbz_filtered AS
SELECT ts_ms, op, before, after FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');

CACHE TABLE dbz_events AS
SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM dbz_filtered;

CACHE TABLE dbz_fields AS
SELECT ts_ms, source_fields FROM dbz_events;

SELECT source_fields.*, Concat(source_fields.col1, source_fields.col2) AS hudi_key, YEAR(FROM_UNIXTIME(source_fields.col2 / 1000)) AS partition_path FROM dbz_fields;

I'm still getting the same error:

HoodieKeyException: recordKey value: "null" for field: "hudi_key" cannot be null or empty.

Not sure if I'm missing something else.

Thank you

@wombatu-kun
Copy link
Contributor

what if you try this?

CACHE TABLE dbz_fields AS
SELECT ts_ms, source_fields FROM dbz_events;

SELECT s.*, Concat(s.col1, s.col2) AS hudi_key, YEAR(FROM_UNIXTIME(s.col2 / 1000)) AS partition_path FROM dbz_fields s;

@YousifS7
Copy link
Author

Trying the suggested SQL is giving the following error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `s`.`col1` cannot be resolved. Did you mean one of the following? [`s`.`ts_ms`, `s`.`source_fields`].

@YousifS7
Copy link
Author

Following up on this, I was able to run the spark app without errors, however, even when specifying columns that are in the 'before' and 'after' Struct type extracted from Kafka, only the following columns were written to the Hudi table:

ts_ms, op, before, after, source

Transformer File Query

CACHE TABLE dbz_filtered AS
SELECT a.ts_ms, a.op, a.before, a.after, a.source FROM <SRC> a WHERE a.op IN ('d', 'u', 'c', 'r');

SELECT b.before.col1 AS before_col1, b.after.col1 AS after_col1, b.ts_ms, b.op, b.before, b.after, b.source FROM dbz_filtered b;

It seems the additional columns extracted from the envelopes 'before' and 'after' are being ignored for some reason. I even tried to specify a static columns like: 'test' AS test_columns, it was not written on the Hudi tables.

Properties File

hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.recordkey.field=op
hoodie.datasource.write.partitionpath.field=op
hoodie.datasource.write.precombine.field=ts_ms
hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest
schema.registry.url=http://localhost:8081
hoodie.streamer.source.kafka.topic=test.dbo.some_table
bootstrap.servers=localhost:9092
auto.offset.reset=earliest

Spark Submit

spark-submit 
--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.hudi:hudi-aws-bundle:0.15.0
--conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--class org.apache.hudi.utilities.streamer.HoodieStreamer s3://some_bucket/jars/hudi-utilities-slim-bundle.jar 
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider 
--props s3://some_bucket/configs/some_table.properties 
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
--enable-sync 
--source-ordering-field ts_ms 
--target-base-path s3://some_bucket/some_folder/some_table 
--target-table dbo.some_table 
--table-type COPY_ON_WRITE 
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
--op BULK_INSERT 
--transformer-class org.apache.hudi.utilities.transform.SqlFileBasedTransformer

Is there something else that I might be missing?

Thank you

@wombatu-kun
Copy link
Contributor

As I see from documentation here https://hudi.apache.org/docs/hoodie_streaming_ingestion#schema-providers

By default, Spark will infer the schema of the source and use that inferred schema when writing to a table...

May be you should add target schema with additional fields to schema registry and provide Url to that schema in config hoodie.streamer.schemaprovider.registry.targetUrl

@YousifS7
Copy link
Author

@wombatu-kun Couldn't that be extracted from the Transformer?

SqlFileBasedTransformer

As I understand it, the Transformer sits between the Source and Target. For some reason my additional fields are being ignored.
I included several fields into my SQL Transformer but none of the additional fields were written to the Hudi table.

Thank you

@YousifS7
Copy link
Author

Hi @wombatu-kun,

I wanted to provide an update on the issue. After some experimentation with the hoodie.streamer.schemaprovider.registry.targetUrl configuration, I was able to resolve the problem. Initially, I assumed that Spark would infer the schema from the transformed data. However, I registered a custom flattened schema in my schema registry to match the desired output and utilized the hoodie.streamer.schemaprovider.registry.targetUrl setting. Once I ran the job with these configurations, it worked.

Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Archived in project
Development

No branches or pull requests

3 participants