Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GenericObject - Allow GenericRecord to wrap any Java Object #10057

Merged
merged 21 commits into from Apr 8, 2021

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Mar 27, 2021

Contents:

  • introduce new high level interface GenericObject, that represents every logical value on the Pulsar topic
  • allow AutoConsumeSchema to deal with every Schema type
  • handle schema less topics with AutoConsumeSchema (return Bytes schema and byte[] payload)
  • rename getNativeRecord to getNativeObject

How it works:

  • in case of non struct schema, like for primitives and for KeyValue, wrap the result into a PrimitiveRecord class, that is an implementation of GenericRecord that wraps a Object and a SchemaType, it returns an empty list of "fields"

Intended audience:

  • allow users to implement Sink, that is to write Sinks that work with every Schema type (the patch for Pulsar IO will follow up as a separate PR)
interface GenericObject {
      Object getNativeObject();
      SchemaType getSchemaType();
}
interface GenericRecord extends GenericObject

@eolivelli eolivelli changed the title Implement PulsarObject - Allow GenericRecord to wrap any Java Object Implement GenericObject - Allow GenericRecord to wrap any Java Object Mar 28, 2021
@eolivelli eolivelli marked this pull request as ready for review March 28, 2021 08:37
Copy link
Contributor Author

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerrypeng @lhotari I have answered to your questions and renamed PrimitiveRecord to GenericObjectWrapper.

PTAL again

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sijie sijie added this to the 2.8.0 milestone Mar 30, 2021
Copy link
Contributor Author

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie I have addresses your comments
PTAL again

@eolivelli eolivelli requested a review from sijie March 30, 2021 17:41
@eolivelli
Copy link
Contributor Author

@sijie CI passed.
can you please take a look again ?

* This is an abstraction over the logical value that is store into a Message.
* Pulsar decodes the payload of the Message using the Schema that is configured for the topic.
*/
public interface GenericObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this interface suppose to be used as a schema type for producers or consumers, .e.g. Consumer ? If so, can you write some tests that has consumers / producers that uses this new type of Schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we need this interface as all. Shouldn't having these additional methods in GenericRecord suffice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerrypeng
the idea is to let users user "GenericObject" instead of GenericRecord while dealing with "Any object", on the Consumer/Sink side, because GenericRecord for many users is related to a Struct type (like AVRO GenericRecord), and it carries "getFields()"

If so, can you write some tests that has consumers / producers that uses this new type of Schema?
I added tests about GenericObjectWrapper.

The main goal is to have Sink and having the ability to write Sinks that are not bound to a specific Schema at compile time.

@codelipenghui codelipenghui merged commit a2d39c0 into apache:master Apr 8, 2021
@eolivelli eolivelli deleted the impl/pulsar-object branch April 8, 2021 05:23
wangjialing218 pushed a commit to wangjialing218/pulsar that referenced this pull request Apr 9, 2021
…apache#10057)

Contents:
- introduce new high level interface GenericObject, that represents every logical value on the Pulsar topic
- allow AutoConsumeSchema to deal with every Schema type
- handle schema less topics with AutoConsumeSchema (return Bytes schema and byte[] payload)
- rename `getNativeRecord` to `getNativeObject`

How it works:
- in case of non struct schema, like for primitives and for KeyValue, wrap the result into a PrimitiveRecord class, that is an implementation of GenericRecord that wraps a Object and a SchemaType, it returns an empty list of "fields"

Intended audience:
- allow users to implement Sink<GenericObject>, that is to write Sinks that work with every Schema type (the patch for Pulsar IO will follow up as a separate PR)


```
interface GenericObject {
      Object getNativeObject();
      SchemaType getSchemaType();
}
interface GenericRecord extends GenericObject
```
eolivelli added a commit to datastax/pulsar that referenced this pull request May 12, 2021
…apache#10057)

Contents:
- introduce new high level interface GenericObject, that represents every logical value on the Pulsar topic
- allow AutoConsumeSchema to deal with every Schema type
- handle schema less topics with AutoConsumeSchema (return Bytes schema and byte[] payload)
- rename `getNativeRecord` to `getNativeObject`

How it works:
- in case of non struct schema, like for primitives and for KeyValue, wrap the result into a PrimitiveRecord class, that is an implementation of GenericRecord that wraps a Object and a SchemaType, it returns an empty list of "fields"

Intended audience:
- allow users to implement Sink<GenericObject>, that is to write Sinks that work with every Schema type (the patch for Pulsar IO will follow up as a separate PR)

```
interface GenericObject {
      Object getNativeObject();
      SchemaType getSchemaType();
}
interface GenericRecord extends GenericObject
```
hangc0276 added a commit to streamnative/pulsar-io-lakehouse that referenced this pull request May 9, 2022
Due to apache/pulsar#10057

We use GenericObject instead of GenericRecord to receive records.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants