Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] HoodieMultiTableDeltastreamer - Bypassing SchemaProvider-Class requirement for ParquetDFS #2406

Closed
SureshK-T2S opened this issue Jan 5, 2021 · 16 comments
Assignees
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer) priority:major degraded perf; unable to move forward; potential bugs

Comments

@SureshK-T2S
Copy link

SureshK-T2S commented Jan 5, 2021

I am attempting to create a hudi table using a parquet file on S3. The motivation for this approach is based on this Hudi blog:
https://cwiki.apache.org/confluence/display/HUDI/2020/01/20/Change+Capture+Using+AWS+Database+Migration+Service+and+Hudi

To first attempt usage of deltastreamer to ingest a full initial batch load, I attempted to use parquet files used in an aws blog at s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/
https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/

At first I used the spark shell on EMR to load the data into a dataframe and view it, this happens with no issues:

image

I then attempted to use Hudi Deltastreamer as per my understanding of the documentation, however I ran into a couple of issues.

Steps to reproduce the behavior:

  1. Ran the following:
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
  --master yarn --deploy-mode client \
/usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \
  --source-ordering-field request_timestamp \
  --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
  --target-base-path s3://mysqlcdc-stream-prod/hudi_tryout/hudi_aws_test --target-table hudi_aws_test \
--hoodie-conf hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP

Stacktrace:

Exception in thread "main" java.io.IOException: Could not load key generator class org.apache.hudi.keygen.SimpleKeyGenerator
	at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:94)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:190)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:552)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:99)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class 
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:98)
	at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:92)
	... 17 more
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87)
	... 19 more
Caused by: java.lang.IllegalArgumentException: Property hoodie.datasource.write.partitionpath.field not found
	at org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:42)
	at org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:47)
	at org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:36)
	... 24 more
  1. I understand that for a timestamp based partition field it is recommended to use a CustomKeyGenerator:
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
  --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
  --master yarn --deploy-mode client \
/usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \
  --source-ordering-field request_timestamp \
  --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
  --target-base-path s3://mysqlcdc-stream-prod/hudi_tryout/hudi_aws_test --target-table hudi_aws_test \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP

This gives rise to a different error:

Exception in thread "main" java.io.IOException: Could not load key generator class org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP
	at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:94)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:190)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:552)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:99)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:464)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Unable to load class
	at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:56)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:98)
	at org.apache.hudi.DataSourceUtils.createKeyGenerator(DataSourceUtils.java:92)
	... 17 more
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.keygen.CustomKeyGenerator,hoodie.datasource.write.recordkey.field=request_timestamp,hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1,hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:53)
	... 20 more

Expected behavior
I've clearly specified the partition path field in hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP. However this consistently fails for me even on other parquet files. I assumed the problem might be that it needs to be added in the dfs-source.properties file, so I'd added the following to that file:

include=base.properties
hoodie.datasource.write.recordkey.field=request_timestamp
hoodie.datasource.write.partitionpath.field=request_timestamp

However that didn't fix anything. I also added the location of the file under --props, however it couldn't find the file even though I am able to display the contents of the file in terminal using cat.

Suspecting the choice of key generator being the issue, I tried several other partitioners including Custom, Complex and TimeBased. However it wasn't able to load class for any of them.

Please let me know if I am doing anything wrong here.

Environment Description

  • Hudi version : 0.6.0

  • Spark version : version 2.4.7-amzn-0 Using Scala version 2.11.12

  • Hive version :

  • Hadoop version : Hadoop 2.10.1-amzn-0

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

@bvaradar
Copy link
Contributor

bvaradar commented Jan 6, 2021

I think the command line parameters are not passed correctly

--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
--hoodie-conf hoodie.datasource.write.recordkey.field=request_timestamp:TIMESTAMP
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1
--hoodie-conf hoodie.datasource.write.partitionpath.field=request_timestamp:TIMESTAMP

On a related note, your record key and partition path are both same. This is ok if you are testing out a sample dataset but wont scale in real world as you would end-up with one record per directory.

@SureshK-T2S
Copy link
Author

Thanks for your response @bvaradar. Entering the parameters in that way did the trick for this particular issue.

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
>   --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
>   --master yarn --deploy-mode client \
> /usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \
>   --source-ordering-field updated_at \
>   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
>   --target-base-path s3://mysqlcdc-stream-prod/kinesis-glue-original/hudi_order_table --target-table hudi_order_table \
> --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator\
>   --hoodie-conf hoodie.datasource.write.recordkey.field=id:TIMESTAMP\
>   --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://mysqlcdc-stream-prod/kinesis-glue-original/order_table\
>   --hoodie-conf hoodie.datasource.write.partitionpath.field=order_placed_on:TIMESTAMP

However, I ran into another error shortly after(NullPointerException) that I assume is the same as the one seen here: https://issues.apache.org/jira/browse/HUDI-1200

Caused by: java.lang.NullPointerException
	at org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:35)
	at org.apache.hudi.keygen.CustomKeyGenerator.getRecordKey(CustomKeyGenerator.java:128)

As suggested in that ticket I pivoted to using TimeBasedKeyGenerator, but again I ran into another issue:

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
>   --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
>   --master yarn --deploy-mode client \
> /usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \
>   --source-ordering-field updated_at \
>   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
>   --target-base-path s3://mysqlcdc-stream-prod/kinesis-glue-original/hudi_order_table_2 --target-table hudi_order_table_2 \
> --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator\
>   --hoodie-conf hoodie.datasource.write.recordkey.field=id\
>   --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://mysqlcdc-stream-prod/kinesis-glue-original/order_table\
>   --hoodie-conf hoodie.datasource.write.partitionpath.field=order_placed_on:TIMESTAMP\
>   --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING\
>   --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.dateformat="yyyy-MM-dd hh:mm:ss"\
>   --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
Caused by: java.lang.RuntimeException: hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit is not specified but scalar it supplied as time value
	at org.apache.hudi.keygen.TimestampBasedKeyGenerator.convertLongTimeToMillis(TimestampBasedKeyGenerator.java:205)

I believe this is the same as the issue seen here: https://issues.apache.org/jira/browse/HUDI-1150

As per that ticket this might be caused by null values in the timestamp partition field. So I pivoted again, and opted for SimpleKeyGenerator on the same data with 3 simple fields instead, which worked great!

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
>   --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
>   --master yarn --deploy-mode client \
> /usr/lib/hudi/hudi-utilities-bundle.jar --table-type MERGE_ON_READ \
>   --source-ordering-field updated_at \
>   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
>   --target-base-path s3://mysqlcdc-stream-prod/kinesis-glue-original/hudi_order_table_2 --target-table hudi_order_table_2 \
> --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.ComplexKeyGenerator\
>   --hoodie-conf hoodie.datasource.write.recordkey.field=id\
>   --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://mysqlcdc-stream-prod/kinesis-glue-original/order_table\
>   --hoodie-conf hoodie.datasource.write.partitionpath.field=year,month,day

But when I loaded the data on Spark and checked the field in question there were no null values, only 1970-01-01 values. Could the null values automatically have been converted to those values at some part in the process?
image

Is it fine if I keep this ticket open while I troubleshoot further? If you'd rather I close this and open another ticket for a different issue please let me know.

@bvaradar
Copy link
Contributor

bvaradar commented Jan 7, 2021

No worries. Should be fine to keep this open

@n3nash
Copy link
Contributor

n3nash commented Feb 2, 2021

@SureshK-T2S Is there anything else related to this issue that needs to be discussed further ?

@n3nash n3nash self-assigned this Feb 2, 2021
@vinothchandar vinothchandar added the hudistreamer issues related to Hudi streamer (Formely deltastreamer) label Feb 6, 2021
@SureshK-T2S
Copy link
Author

SureshK-T2S commented Feb 6, 2021

Hello, thank you guys for giving me time with this. I have since had an issue with MultiTableDeltaStreamer, in particular getting it to work with ParquetDFS Data Source. Getting an issue due to the SchemaProvider or lack of one.

Command:

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer \
--packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4\
  --master yarn --deploy-mode client \
/usr/lib/hudi/hudi-utilities-bundle.jar --table-type COPY_ON_WRITE \
 --props s3:///temp/config/s3-source.properties \
  --config-folder s3:///temp/hudi-ingestion-config/\
  --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
 --continuous --source-ordering-field updated_at \
  --base-path-prefix s3://hudi-data-lake --target-table dummy_table --op UPSERT

S3 properties:

hoodie.deltastreamer.ingestion.tablesToBeIngested=db.table1,db.table2
hoodie.deltastreamer.ingestion.db.table1.configFile=s3://hudi-data-lake/configs/db/table1.properties
hoodie.deltastreamer.ingestion.db.table2.configFile=s3://hudi-data-lake/configs/db/table2.properties

Table1 properties:

hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.deltastreamer.source.dfs.root=s3://root_folder_1
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=year,month,day

Table2 properties:

hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.deltastreamer.source.dfs.root=s3://root_folder_2
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=year,month,day

Error:

Exception in thread "main" java.lang.NullPointerException
	at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.populateSchemaProviderProps(HoodieMultiTableDeltaStreamer.java:148)
	at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.populateTableExecutionContextList(HoodieMultiTableDeltaStreamer.java:128)
	at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.<init>(HoodieMultiTableDeltaStreamer.java:78)
	at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.main(HoodieMultiTableDeltaStreamer.java:201)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Reaching final steps of my setup, really hoping to be able to get this resolved and go live soon!

@nsivabalan
Copy link
Contributor

looks like you are required to set "--schemaprovider-class"
This blog covers multi table transformer w/ example. Might be of help to you.

@pratyakshsharma : can you please follow up on this ticket.

@nsivabalan nsivabalan added the priority:major degraded perf; unable to move forward; potential bugs label Feb 6, 2021
@SureshK-T2S
Copy link
Author

SureshK-T2S commented Feb 6, 2021

Thanks for your response. Till now using HoodieDeltaStreamer, I have not had to specify the Schema Provider Class when using ParquetDFS source.

Looking at the Schema Providers here, I was thinking NullTargetSchemaRegistryProvider would be good here but I experienced the following error:

java.io.IOException: Could not load schema provider class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider
	at org.apache.hudi.utilities.UtilHelpers.createSchemaProvider(UtilHelpers.java:107)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:550)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:104)
	at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.sync(HoodieMultiTableDeltaStreamer.java:354)
	at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.main(HoodieMultiTableDeltaStreamer.java:201)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class 
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:98)
	at org.apache.hudi.utilities.UtilHelpers.createSchemaProvider(UtilHelpers.java:105)
	... 17 more
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:87)
	... 19 more
Caused by: org.apache.hudi.exception.HoodieNotSupportedException: Required property hoodie.deltastreamer.schemaprovider.registry.url is missing
	at org.apache.hudi.DataSourceUtils.lambda$checkRequiredProperties$0(DataSourceUtils.java:144)
	at java.util.Collections$SingletonList.forEach(Collections.java:4824)
	at org.apache.hudi.DataSourceUtils.checkRequiredProperties(DataSourceUtils.java:142)
	at org.apache.hudi.utilities.schema.SchemaRegistryProvider.<init>(SchemaRegistryProvider.java:63)
	at org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider.<init>(NullTargetSchemaRegistryProvider.java:33)
	... 24 more

I tried adding hoodie.deltastreamer.schemaprovider.registry.url to the props with blank value but it gave me a malformed URL error.

Please let me know if I should be using a different schema provider class or approach.

@SureshK-T2S SureshK-T2S changed the title [SUPPORT] Deltastreamer - Property hoodie.datasource.write.partitionpath.field not found [SUPPORT] HoodieMultiTableDeltastreamer - Bypassing SchemaProvider-Class requirement for ParquetDFS Feb 8, 2021
@vinothchandar
Copy link
Member

cc @pratyakshsharma could you please help out in this issue?

@SureshK-T2S
Copy link
Author

Hi guys, any ideas on this one?

@nsivabalan
Copy link
Contributor

nsivabalan commented Feb 14, 2021

looks like there could be a bug. Here is the reason:
Deltastreamer works fine for Dataset <Row> sources w/o providing schema provider. But looks like in multi table delta streamer we missed to hold on to that assumption. Tests written for multi table delta streamer are for Dataset <GenericRecord> and hence schema providers are mandatory.

git diff
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 9d5ca3ca..91742ec0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -147,7 +147,7 @@ public class HoodieMultiTableDeltaStreamer {
   }
 
   private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) {
-    if (cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) {
+    if (cfg.schemaProviderClassName != null && cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) {
       String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
       String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
       typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix);

As you might have figured out, I don't have exp with this code base before. So, will have to write tests to ensure the fix works. But in the mean time, if you have access to StructType (schema), then you can try using RowBasedSchemaProvider to unblock for now.

@nsivabalan
Copy link
Contributor

Have put up a fix here: #2577. The aforementioned fix works.

@nsivabalan
Copy link
Contributor

Also, looks like we don't have good coverage of tests in multi table delta streamer. have added a jira here.

@nsivabalan
Copy link
Contributor

Closing this for now. Please do reach out to us if you need more help. Happy to help you out. Thanks for helping improve Hudi for better :)

@gopi-t2s
Copy link

Hi @nsivabalan,
@SureshK-T2S and me working together to setup multi table delta streamer

I downloaded the latest version(0.7.0) of hudi-utilities-bundle.jar from MAVEN reporisitory(https://mvnrepository.com/artifact/org.apache.hudi/hudi-utilities-bundle_2.11/0.7.0) and tried to run the spark-submit multi table delta streamer command without providing the schema provider class(hope this is not mandatory now after this fix #2577).

But still receiving the same error mentioned above by Suresh.
ERROR LOG:
Exception in thread "main" java.lang.NullPointerException at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.populateSchemaProviderProps(HoodieMultiTableDeltaStreamer.java:150) at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.populateTableExecutionContextList(HoodieMultiTableDeltaStreamer.java:130) at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.<init>(HoodieMultiTableDeltaStreamer.java:80) at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.main(HoodieMultiTableDeltaStreamer.java:203)

SPARK SUBMIT COMMAND
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer ls ~/hudi/hudi-utilities-bundle_2.11-0.7.0.jar``\ --table-type COPY_ON_WRITE \ --props s3://path/s3_source.properties \ --config-folder s3://folder-path \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --source-ordering-field updated_at \ --base-path-prefix s3://object --target-table dummy_table --op UPSERT

Do I miss anything here or the above PR is not merged in 0.7.0 version maven jar.
Could you share your valuable thoughts here.

Thank you..

@nsivabalan
Copy link
Contributor

yes, as you could see from commit, it was merged 2 to 3 weeks back. We have an upcoming release in a week or two. So, you should have it in 0.8.0. If you want to verify the fix, you can pull in latest master and try it out. 0.7.0 does not have this fix.

@gopi-t2s
Copy link

Thanks @nsivabalan for confirming.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer) priority:major degraded perf; unable to move forward; potential bugs
Projects
None yet
Development

No branches or pull requests

7 participants