Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 11007] Schema.NATIVE_AVRO: add a version of AUTO_PRODUCE_BYTES that doesn't validate the message in encode #11238

Merged
merged 18 commits into from
Jul 22, 2021

Conversation

Zhen-hao
Copy link
Contributor

@Zhen-hao Zhen-hao commented Jul 6, 2021

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #11007

Motivation

When ingesting event/message data from external systems such as Kafka and Cassandra, the events very often are already serialized with Avro with the schemas also available. In such cases, a Pulsar producer doesn't need to perform the validation step again when sending the events to a topic.

Modifications

Introduce a new class AutoProduceValidatedAvroBytesSchema that extends AutoProduceBytesSchema implements Schema<byte[]>.

TODO: make the public AutoProduceValidatedAvroBytesSchema(org.apache.avro.Schema schema) constructor accessible to the client.

Add NATIVE_AVRO method to org.apache.pulsar.client.api.Schema which calls AutoProduceValidatedAvroBytesSchema's constructor via reflection.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@Zhen-hao Zhen-hao changed the title Issue 11007 [Issue 11007] add a version of AUTO_PRODUCE_BYTES that doesn't validate message in encode Jul 6, 2021
@Zhen-hao Zhen-hao changed the title [Issue 11007] add a version of AUTO_PRODUCE_BYTES that doesn't validate message in encode [Issue 11007] add a version of AUTO_PRODUCE_BYTES that doesn't validate the message in encode Jul 6, 2021
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for sharing this work.
I left one comment

@Anonymitaet
Copy link
Member

@Zhen-hao thanks for your contribution. For this PR, do we need to update docs?

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 7, 2021

@Zhen-hao thanks for your contribution. For this PR, do we need to update docs?

I think so. Maybe next to that of AUTO_PRODUCE_BYTES?

I need to figure out why the build is failing and where to add tests.
Any guidance is welcome!

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 8, 2021

@Anonymitaet I see site2/docs/io-develop.md, site2/website/versioned_docs/version-2.8.0/io-develop.md, and site2/website/versioned_docs/version-2.8.1/io-develop.md.
Are the latter two manually maintained?

Should I only change site2/docs/io-develop.md?

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 8, 2021

how often does the CI run? I find the feedback loop too long.

@Anonymitaet
Copy link
Member

@Anonymitaet I see site2/docs/io-develop.md, site2/website/versioned_docs/version-2.8.0/io-develop.md, and site2/website/versioned_docs/version-2.8.1/io-develop.md.
Are the latter two manually maintained?

Should I only change site2/docs/io-develop.md?

They are updated manually. If the doc changes apply to several versions, we need to update them all.
But we can update one version first, after the contents get approved, then copy them to the rest versions.

Would you like to add docs accordingly? Then you can ping me to review, thanks

@Anonymitaet Anonymitaet added the doc-required Your PR changes impact docs and you will update later. label Jul 8, 2021
@Anonymitaet Anonymitaet added this to the 2.9.0 milestone Jul 8, 2021
@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 8, 2021

I don't understand why the build is failing with

Error:  Medium: Unread field: org.apache.pulsar.client.impl.schema.AutoProduceValidatedAvroBytesSchema.schema [org.apache.pulsar.client.impl.schema.AutoProduceValidatedAvroBytesSchema] At AutoProduceValidatedAvroBytesSchema.java:[line 43] URF_UNREAD_FIELD

In AutoProduceValidatedAvroBytesSchema's super class AutoProduceBytesSchema, the schema field is used in

@Override
    public SchemaInfo getSchemaInfo() {
        ensureSchemaInitialized();

        return schema.getSchemaInfo();
    }

@eolivelli
Copy link
Contributor

I don't understand why the build is failing with

Error:  Medium: Unread field: org.apache.pulsar.client.impl.schema.AutoProduceValidatedAvroBytesSchema.schema [org.apache.pulsar.client.impl.schema.AutoProduceValidatedAvroBytesSchema] At AutoProduceValidatedAvroBytesSchema.java:[line 43] URF_UNREAD_FIELD

In AutoProduceValidatedAvroBytesSchema's super class AutoProduceBytesSchema, the schema field is used in

@Override
    public SchemaInfo getSchemaInfo() {
        ensureSchemaInitialized();

        return schema.getSchemaInfo();
    }

AutoProduceBytesSchema already has a "schema" field, and you are adding a new field that hides the inherited one

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 8, 2021

AutoProduceBytesSchema already has a "schema" field, and you are adding a new field that hides the inherited one

OK. I think the easiest to do is copy the same code to the new class.

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 8, 2021

OK. I think the easiest to do is copy the same code to the new class.

Or, only use super.schema in the new class.
Never mind. schema in the super class is private...

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion we are on our way.
We have to add tests.
You can find examples like SimpleSchemaTest or SchemaTest....

@sijie is current approach following your expectations?

Tagging @codelipenghui @congbobo184 @gaoran10

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 9, 2021

In my opinion we are on our way.
We have to add tests.
You can find examples like SimpleSchemaTest or SchemaTest....

@sijie is current approach following your expectations?

Tagging @codelipenghui @congbobo184 @gaoran10

I will find time over the weekend to add the tests.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good!

I left one last comment about getNativeSchema


@Override
public Optional<Object> getNativeSchema() {
return Optional.ofNullable(schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you have to return the original org.apache.avro.Schema object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment looks not addressed yet

@Zhen-hao
Copy link
Contributor Author

there is one test error I can't figure out:

org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
        at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:82)
        at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:41)
        at org.apache.pulsar.client.api.SimpleSchemaTest.newNativeAvroProducerForMessageSchemaWithBatch(SimpleSchemaTest.java:597)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
        at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
        at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
        at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:542)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:173)
        at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:493)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:291)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:80)
        ... 14 more

from this code

what am I missing here? @eolivelli

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try to use the debugger with a break point in that point ? probably you have not overridden every method in the Schema interface.

by the way I believe that your new NativeAvroBytesSchema is useful only for the Producer.
if you want to read raw bytes you simply do not set a Schema in the consumer or you use Schema.BYTES.

NativeAvroBytesSchemadoes not perform validation and it does not decode the payload, so it is not worth to be used on the Consumer side.

I suggest you to:

  • throw UnsupportedOperationException in all "decode" methods of NativeAvroBytesSchema
  • write explicitly in the Javadoc that this is a Schema to be used on the Producer side
  • use Schema.BYTES (or no Schema) in the Consumer in your test


@Override
public Optional<Object> getNativeSchema() {
return Optional.ofNullable(this.nativeSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it cannot be null


@Override
public Schema<byte[]> clone() {
return new AutoProduceBytesSchema<>(schema.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return new NativeAvroBytesSchema(nativeSchema)

@Zhen-hao
Copy link
Contributor Author

Did you try to use the debugger with a break point in that point ? probably you have not overridden every method in the Schema interface.

by the way I believe that your new NativeAvroBytesSchema is useful only for the Producer.
if you want to read raw bytes you simply do not set a Schema in the consumer or you use Schema.BYTES.

NativeAvroBytesSchemadoes not perform validation and it does not decode the payload, so it is not worth to be used on the Consumer side.

I suggest you to:

  • throw UnsupportedOperationException in all "decode" methods of NativeAvroBytesSchema
  • write explicitly in the Javadoc that this is a Schema to be used on the Producer side
  • use Schema.BYTES (or no Schema) in the Consumer in your test

You are right. I've made the changes as you suggested.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sijie can you take another look please ?

@eolivelli
Copy link
Contributor

@Zhen-hao I believe you can remove the "draft" flag from this PR

@Zhen-hao Zhen-hao marked this pull request as ready for review July 14, 2021 12:44
@Zhen-hao
Copy link
Contributor Author

@Zhen-hao I believe you can remove the "draft" flag from this PR

Cool. I still need to update some documentation page before we can merge.

@eolivelli
Copy link
Contributor

Cool. I still need to update some documentation page before we can merge.

where do you want to add docs apart from the JavaDocs ?

@Zhen-hao
Copy link
Contributor Author

I see site2/docs/io-develop.md has a section Handle schema information which mentions AUTO_PRODUCE_BYTES.
I'm thinking of adding some words about NATIVE_AVRO.

maybe @Anonymitaet knows better if this is necessary.

@eolivelli
Copy link
Contributor

it is a good idea, btw we can separate code changes from documentation changes. you can follow up with a separate PR.

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Jul 14, 2021

it is a good idea, btw we can separate code changes from documentation changes. you can follow up with a separate PR.

If other reviewers agree, I don't mind doing that;)

@Zhen-hao Zhen-hao requested a review from sijie July 16, 2021 06:41
@sijie
Copy link
Member

sijie commented Jul 20, 2021

@Zhen-hao looks good to me!

@sijie sijie merged commit a78b029 into apache:master Jul 22, 2021
Technoboy- pushed a commit to Technoboy-/pulsar that referenced this pull request Jul 22, 2021
…te the message in `encode` (apache#11238)

Fixes apache#11007

### Motivation

When ingesting event/message data from external systems such as Kafka and Cassandra, the events very often are already serialized with Avro with the schemas also available. In such cases, a Pulsar producer doesn't need to perform the validation step again when sending the events to a topic.

### Modifications

Introduce a new class `AutoProduceValidatedAvroBytesSchema` that ~~extends `AutoProduceBytesSchema`~~ implements `Schema<byte[]>`.

~~TODO: make the `public AutoProduceValidatedAvroBytesSchema(org.apache.avro.Schema schema)` constructor accessible to the client.~~

Add `NATIVE_AVRO` method to `org.apache.pulsar.client.api.Schema` which calls `AutoProduceValidatedAvroBytesSchema`'s constructor via reflection.
@Anonymitaet
Copy link
Member

@Zhen-hao thanks for your great work. Would you like to add docs accordingly? Then you can ping me to review, thanks

@Zhen-hao
Copy link
Contributor Author

Zhen-hao commented Aug 4, 2021

@Zhen-hao thanks for your great work. Would you like to add docs accordingly? Then you can ping me to review, thanks

That's the plan. I was overwhelmed by work. I will find time later this week or next week to make a new PR.

@Anonymitaet
Copy link
Member

@Zhen-hao thanks for your great work. Would you like to add docs accordingly? Then you can ping me to review, thanks

That's the plan. I was overwhelmed by work. I will find time later this week or next week to make a new PR.

Hi @Zhen-hao any progress on the docs? Thanks

@Zhen-hao
Copy link
Contributor Author

@Anonymitaet unfortunately, I haven't got to it yet. but it is on my list...

@Anonymitaet
Copy link
Member

@Zhen-hao thanks for your feedback. Please do not forget to add docs to let users know the changes.

@eolivelli eolivelli changed the title [Issue 11007] add a version of AUTO_PRODUCE_BYTES that doesn't validate the message in encode [Issue 11007] Schema.NATIVE_AVRO: add a version of AUTO_PRODUCE_BYTES that doesn't validate the message in encode Sep 23, 2021
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Sep 23, 2021
…te the message in `encode` (apache#11238)

Fixes apache#11007

### Motivation

When ingesting event/message data from external systems such as Kafka and Cassandra, the events very often are already serialized with Avro with the schemas also available. In such cases, a Pulsar producer doesn't need to perform the validation step again when sending the events to a topic.

### Modifications

Introduce a new class `AutoProduceValidatedAvroBytesSchema` that ~~extends `AutoProduceBytesSchema`~~ implements `Schema<byte[]>`.

~~TODO: make the `public AutoProduceValidatedAvroBytesSchema(org.apache.avro.Schema schema)` constructor accessible to the client.~~

Add `NATIVE_AVRO` method to `org.apache.pulsar.client.api.Schema` which calls `AutoProduceValidatedAvroBytesSchema`'s constructor via reflection.

(cherry picked from commit a78b029)
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Sep 23, 2021
…te the message in `encode` (apache#11238)

Fixes apache#11007

When ingesting event/message data from external systems such as Kafka and Cassandra, the events very often are already serialized with Avro with the schemas also available. In such cases, a Pulsar producer doesn't need to perform the validation step again when sending the events to a topic.

Introduce a new class `AutoProduceValidatedAvroBytesSchema` that ~~extends `AutoProduceBytesSchema`~~ implements `Schema<byte[]>`.

~~TODO: make the `public AutoProduceValidatedAvroBytesSchema(org.apache.avro.Schema schema)` constructor accessible to the client.~~

Add `NATIVE_AVRO` method to `org.apache.pulsar.client.api.Schema` which calls `AutoProduceValidatedAvroBytesSchema`'s constructor via reflection.

(cherry picked from commit a78b029)
@Anonymitaet Anonymitaet removed the doc-required Your PR changes impact docs and you will update later. label Oct 14, 2021
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Feb 28, 2022
…te the message in `encode` (apache#11238)

Fixes apache#11007

### Motivation

When ingesting event/message data from external systems such as Kafka and Cassandra, the events very often are already serialized with Avro with the schemas also available. In such cases, a Pulsar producer doesn't need to perform the validation step again when sending the events to a topic.

### Modifications

Introduce a new class `AutoProduceValidatedAvroBytesSchema` that ~~extends `AutoProduceBytesSchema`~~ implements `Schema<byte[]>`.

~~TODO: make the `public AutoProduceValidatedAvroBytesSchema(org.apache.avro.Schema schema)` constructor accessible to the client.~~

Add `NATIVE_AVRO` method to `org.apache.pulsar.client.api.Schema` which calls `AutoProduceValidatedAvroBytesSchema`'s constructor via reflection.

(cherry picked from commit a78b029)
(cherry picked from commit f4ef4f3)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…te the message in `encode` (apache#11238)

Fixes apache#11007

### Motivation

When ingesting event/message data from external systems such as Kafka and Cassandra, the events very often are already serialized with Avro with the schemas also available. In such cases, a Pulsar producer doesn't need to perform the validation step again when sending the events to a topic.

### Modifications

Introduce a new class `AutoProduceValidatedAvroBytesSchema` that ~~extends `AutoProduceBytesSchema`~~ implements `Schema<byte[]>`.

~~TODO: make the `public AutoProduceValidatedAvroBytesSchema(org.apache.avro.Schema schema)` constructor accessible to the client.~~

Add `NATIVE_AVRO` method to `org.apache.pulsar.client.api.Schema` which calls `AutoProduceValidatedAvroBytesSchema`'s constructor via reflection.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add direct support for message values already serialized in Apache Avro
4 participants