Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] SqlQueryBasedTransformer new field issue with PostgresDebeziumSource #11468

Open
ashwinagalcha-ps opened this issue Jun 18, 2024 · 8 comments

Comments

@ashwinagalcha-ps
Copy link

ashwinagalcha-ps commented Jun 18, 2024

When using Kafka + Debezium + Streamer, we are able to write data and the job works fine, but when using the SqlQueryBasedTransformer, it is able to write data on S3 with the new field but ultimately the job fails.

query=SELECT *, extract(year from a.created_at) as year FROM <SRC> a

Below are the Hudi Deltastreamer job configs:
"--table-type", "COPY_ON_WRITE",
"--source-class", "org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource",
"--transformer-class", "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
"--hoodie-conf", f"hoodie.streamer.transformer.sql={query}",
"--source-ordering-field", output["source_ordering_field"],
"--target-base-path", f"s3a://{env_params['deltastreamer_bucket']}/{db_name}/{schema}/{output['table_name']}/",
"--target-table", output["table_name"],
"--auto.offset.reset=earliest
"--props", properties_file,
"--payload-class", "org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload",
"--enable-hive-sync",
"--hoodie-conf", "hoodie.datasource.hive_sync.mode=hms",
"--hoodie-conf", "hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true",
"--hoodie-conf", f"hoodie.deltastreamer.source.kafka.topic={connector_name}.{schema}.{output['table_name']}",
"--hoodie-conf", f"schema.registry.url={env_params['schema_registry_url']}",
"--hoodie-conf", f"hoodie.deltastreamer.schemaprovider.registry.url={env_params['schema_registry_url']}/subjects/{connector_name}.{schema}.{output['table_name']}-value/versions/latest",
"--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
"--hoodie-conf", "hoodie.datasource.hive_sync.use_jdbc=false",
"--hoodie-conf", f"hoodie.datasource.hive_sync.database={output['hive_database']}",
"--hoodie-conf", f"hoodie.datasource.hive_sync.table={output['table_name']}",
"--hoodie-conf", "hoodie.datasource.hive_sync.metastore.uris=",
"--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
"--hoodie-conf", "hoodie.datasource.hive_sync.support_timestamp=true",
"--hoodie-conf", "hoodie.deltastreamer.source.kafka.maxEvents=100000",
"--hoodie-conf", f"hoodie.datasource.write.recordkey.field={output['record_key']}",
"--hoodie-conf", f"hoodie.datasource.write.precombine.field={output['precombine_field']}",
"--hoodie-conf", f"hoodie.datasource.hive_sync.glue_database={output['hive_database']}",
"--continuous"

Properties file:
bootstrap.servers=host:port
auto.offset.reset=earliest
schema.registry.url=http://host:8081

Expected behavior: To be able to extract a new field (year) in the target hudi table with the help of SqlQueryBasedTransformer.

Environment Description

  • Hudi version : 0.14.0

  • Spark version : 3.4.1

  • Hadoop version : 3.3.4

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

  • Base image & jars:
    public.ecr.aws/ocean-spark/spark:platform-3.4.1-hadoop-3.3.4-java-11-scala-2.12-python-3.10-gen21

https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/0.14.0/hudi-utilities-bundle_2.12-0.14.0.jar

Stacktrace

2024-06-14T14:16:17.562785897Z org.apache.hudi.utilities.exception.HoodieTransformExecutionException: Failed to apply sql query based transformer
2024-06-14T14:16:17.562797467Z 	at org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:68)
2024-06-14T14:16:17.562805097Z 	at org.apache.hudi.utilities.transform.ChainedTransformer.apply(ChainedTransformer.java:105)
2024-06-14T14:16:17.562812197Z 	at org.apache.hudi.utilities.streamer.StreamSync.lambda$fetchFromSource$0(StreamSync.java:530)
2024-06-14T14:16:17.562819517Z 	at org.apache.hudi.common.util.Option.map(Option.java:108)
2024-06-14T14:16:17.562826327Z 	at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:530)
2024-06-14T14:16:17.562836838Z 	at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
2024-06-14T14:16:17.562844648Z 	at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
2024-06-14T14:16:17.562852958Z 	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
2024-06-14T14:16:17.562860358Z 	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
2024-06-14T14:16:17.562868059Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2024-06-14T14:16:17.562875549Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2024-06-14T14:16:17.562883938Z 	at java.base/java.lang.Thread.run(Thread.java:829)
2024-06-14T14:16:17.562893178Z Caused by: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITHOUT_SUGGESTION] A column or function parameter with name `a`.`created_at` cannot be resolved. ; line 1 pos 10;
2024-06-14T14:16:17.562900609Z 'Project ['a.created_at AS year#0]
2024-06-14T14:16:17.562907429Z +- SubqueryAlias a
2024-06-14T14:16:17.562914209Z    +- SubqueryAlias hoodie_src_tmp_table_42422003_d19b_4f7b_8f34_51304c6aca57
2024-06-14T14:16:17.562920689Z       +- View (`HOODIE_SRC_TMP_TABLE_42422003_d19b_4f7b_8f34_51304c6aca57`, [])
2024-06-14T14:16:17.562927469Z          +- LocalRelation <empty>
2024-06-14T14:16:17.562933689Z 
2024-06-14T14:16:17.562939999Z 	at org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:221)
2024-06-14T14:16:17.562947379Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:143)
2024-06-14T14:16:17.562965380Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:258)
2024-06-14T14:16:17.562972330Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
2024-06-14T14:16:17.562978850Z 	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
2024-06-14T14:16:17.562985980Z 	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
2024-06-14T14:16:17.562993190Z 	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
2024-06-14T14:16:17.563000240Z 	at scala.collection.Iterator.foreach(Iterator.scala:943)
2024-06-14T14:16:17.563007110Z 	at scala.collection.Iterator.foreach$(Iterator.scala:943)
2024-06-14T14:16:17.563014090Z 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
2024-06-14T14:16:17.563021020Z 	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
2024-06-14T14:16:17.563027680Z 	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
2024-06-14T14:16:17.563035050Z 	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
2024-06-14T14:16:17.563041790Z 	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294)
2024-06-14T14:16:17.563048220Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4(CheckAnalysis.scala:256)
2024-06-14T14:16:17.563055380Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4$adapted(CheckAnalysis.scala:256)
2024-06-14T14:16:17.563062311Z 	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
2024-06-14T14:16:17.563069851Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1(CheckAnalysis.scala:256)
2024-06-14T14:16:17.563077001Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1$adapted(CheckAnalysis.scala:163)
2024-06-14T14:16:17.563084861Z 	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
2024-06-14T14:16:17.563091541Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:163)
2024-06-14T14:16:17.563098181Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:160)
2024-06-14T14:16:17.563105001Z 	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:188)
2024-06-14T14:16:17.563111721Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:156)
2024-06-14T14:16:17.563118621Z 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:146)
2024-06-14T14:16:17.563125082Z 	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
2024-06-14T14:16:17.563138102Z 	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:211)
2024-06-14T14:16:17.563145552Z 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
2024-06-14T14:16:17.563213833Z 	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
2024-06-14T14:16:17.563224434Z 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
2024-06-14T14:16:17.563231294Z 	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
2024-06-14T14:16:17.563238014Z 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
2024-06-14T14:16:17.563245154Z 	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
2024-06-14T14:16:17.563258994Z 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
2024-06-14T14:16:17.563266284Z 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
2024-06-14T14:16:17.563273604Z 	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
2024-06-14T14:16:17.563280974Z 	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
2024-06-14T14:16:17.563288444Z 	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
2024-06-14T14:16:17.563295604Z 	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
2024-06-14T14:16:17.563302205Z 	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
2024-06-14T14:16:17.563309075Z 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
2024-06-14T14:16:17.563315785Z 	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
2024-06-14T14:16:17.563322805Z 	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
2024-06-14T14:16:17.563329675Z 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
2024-06-14T14:16:17.563336375Z 	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
2024-06-14T14:16:17.563343275Z 	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
2024-06-14T14:16:17.563350755Z 	at org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:64)```

@ashwinagalcha-ps ashwinagalcha-ps changed the title [SUPPORT] [SUPPORT] SqlQueryBasedTransformer new field issue with PostgresDebeziumSource Jun 18, 2024
@soumilshah1995
Copy link

slack chat Thread https://apache-hudi.slack.com/archives/C4D716NPQ/p1718691646054409

this issue arises when attempting to use transformer with
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \

--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
--hoodie-conf 'hoodie.deltastreamer.transformer.sql=SELECT * FROM '

Throws an issue even when using Select *

@ad1happy2go
Copy link
Collaborator

@ashwinagalcha-ps The sql should have SRC also, right?
"--hoodie-conf", "hoodie.streamer.transformer.sql=SELECT *, extract(year from a.created_at) as year FROM a",

https://hudi.apache.org/docs/transforms/

@ashwinagalcha-ps
Copy link
Author

@ad1happy2go I missed to add here. But we did test with in the query. Sorry for the confusion, i'll update the description.

@ashwinagalcha-ps
Copy link
Author

@ad1happy2go It was already added here but since it was plain text and not code github skipped <> while saving.

@soumilshah1995
Copy link

Hey Aditya I personally verified SQL transformer throws error

Here is lab : https://github.com/soumilshah1995/universal-datalakehouse-postgres-ingestion-deltastreamer

I changed the streamer code


========================================================
spark-submit \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --packages 'org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2' \
    --properties-file spark-config.properties \
    --master 'local[*]' \
    --executor-memory 1g \
     /Users/soumilshah/IdeaProjects/SparkProject/DeltaStreamer/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
    --table-type COPY_ON_WRITE \
    --target-base-path 's3a://warehouse/database=default/table_name=salestest'  \
    --target-table salestest \
    --op UPSERT \
    --source-limit 4000000 \
    --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
    --min-sync-interval-seconds 10 \
    --continuous \
    --source-ordering-field _event_origin_ts_ms \
    --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
    --hoodie-conf bootstrap.servers=localhost:7092 \
    --hoodie-conf schema.registry.url=http://localhost:8081 \
    --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/hive.public.sales-value/versions/latest \
    --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \
    --hoodie-conf hoodie.deltastreamer.source.kafka.topic=hive.public.sales \
    --hoodie-conf auto.offset.reset=earliest \
    --hoodie-conf 'hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC>' \
    --hoodie-conf hoodie.datasource.write.recordkey.field=salesid \
    --hoodie-conf 'hoodie.datasource.write.partitionpath.field=' \
    --hoodie-conf hoodie.datasource.write.precombine.field=_event_origin_ts_ms


if you see we just tried simple select *

    --hoodie-conf 'hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC>' \

@ad1happy2go
Copy link
Collaborator

Thanks @soumilshah1995 . Will checkout.

@soumilshah1995
Copy link

aye aye captain

@GlassCanon91
Copy link

GlassCanon91 commented Jul 17, 2024

Any one solve it? I'm also get this error when use kafka + Debezium + Streamer (with transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer ) . The error only happen when checkpoint at end of kafka topic and when hoodie.deltastreamer.transformer.sql pecifically reference to column name.

hoodie.deltastreamer.transformer.sql=SELECT *, DATE_FORMAT(a.created_at, 'yyyyMM') as partition_date FROM <SRC> a

Seem like when apply tranform class in spark submit, even when have 0 new record the Streamer force use my sql string in config to tmp table (which empty)

Caused by: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITHOUT_SUGGESTION] A column or function parameter with name `a`.`created_at` cannot be resolved. ; line 1 pos 22;
'Project ['DATE_FORMAT('a.created_at, yyyyMM) AS partition_date#0]
+- SubqueryAlias a
   +- SubqueryAlias hoodie_src_tmp_table_563d459b_083f_4fae_8929_34d181d152d7
      +- View (`HOODIE_SRC_TMP_TABLE_563d459b_083f_4fae_8929_34d181d152d7`, [])
         +- LocalRelation <empty>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 🚧 Needs Repro
Development

No branches or pull requests

4 participants