Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to set "scrub.invalid.names" to "true" when using the JSONSchema Converter #2947

Closed
richardhuang11 opened this issue Jan 23, 2024 · 2 comments

Comments

@richardhuang11
Copy link

richardhuang11 commented Jan 23, 2024

Background

We've been running into the same issue as described in #1861, where we are trying to use the JSON Schema converter to convert ingested JSON messages to Parquet in s3. However, as mentioned in the issue,

The JsonSchemaConverter outputs schemas with fieldnames such as "io.confluent.connect.json.OneOf.field.0", which clashes with the official avro library. So eventually, when such schemas are converted to avro in the AvroData class, it will throw an exception.

PR #1873 by @rayokota addresses this issue and exposes the fix behind the scrub.invalid.names config.

Problem

We're unable to set scrub.invalid.names to true when using the JsonSchemaConverter, as it's only available for the AvroConverter and the ProtobufConverter (docs). However, the JsonSchemaConverter uses avro-data under the hood, and the fix is implemented in avro-data. We've been trying to work around this by manually setting the config by shading the avro-data jar, but haven't had success.

Is there a way to set scrub.invalid.names when using the JsonSchemaConverter?

@rayokota
Copy link
Member

@richardhuang11 , can you try setting generalized.sum.type.support=true to see if that works for you?

@richardhuang11
Copy link
Author

Nice, that seems to have fixed the issue. Thanks!
The field name is now connect_union_field_0 rather than io.confluent.connect.json.OneOf.field.0.

Running into this error now, but it’s probably unrelated to the previous issue.
https://stackoverflow.com/questions/74811594/avro-schema-must-be-a-record

Caused by: java.lang.IllegalArgumentException: Avro schema must be a record.
	at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:124)
	at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:150)
	at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)
	at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:182)
	at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:563)
	at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:102)
	at io.confluent.connect.s3.format.S3RetriableRecordWriter.write(S3RetriableRecordWriter.java:46)
	at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:107)
	at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562)
	at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants