Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[schema] expose the native record for struct schema #9614

Merged
merged 16 commits into from
Mar 10, 2021

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Feb 18, 2021

Motivation

Allow GenericRecord consumers to access the underlying implementation, such as Avro GenericRecord, Protobug DynamicMessage or JSON JSONNode.

Modifications

This is patch introduces support for retrieving such information, with two methods:

  • GenericRecord#getSchemaType: return the type of schema (JSON,ProfobufNative,AVRO)
  • GenericRecord#getNativeRecod(Class expectedType): return the internal record
Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME());
GenericRecord value = consumer.receive().getValue();
assertEquals(SchemaType.AVRO, res.getSchemaType());
org.apache.avro.generic.GenericRecord nativeRecord = res.getNativeRecord(org.apache.avro.generic.GenericRecord.class);
org.apache.avro.Schema schema = nativeRecord.getSchema();

Verifying this change

New tests cases are added

Does this pull request potentially affect one of the following parts:

This change introduces a new API

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@eolivelli
Copy link
Contributor Author

@sijie @shiv4289 @aahmed-se please take a look

@eolivelli
Copy link
Contributor Author

I will spend time in adding tests only when we reach consensus on the best API to provide to our users

@eolivelli eolivelli marked this pull request as ready for review February 19, 2021 08:16
@eolivelli
Copy link
Contributor Author

@sijie I have added tests, please take a look, the patch is ready

@eolivelli
Copy link
Contributor Author

/pulsarbot run-failure-checks

@eolivelli
Copy link
Contributor Author

@sijie the patch passed CI, please take a look.
This way we can close this topic

@codelipenghui @rdhabalia PTAL as well

@codelipenghui
Copy link
Contributor

@congbobo184 Could you please help review this PR?

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the consumer uses AUTO_CONSUME schema, how to distinguish the unwrap type? Seems the implementation is assuming the users already know the schema is Avro or protobuf. Users can use the ALWAYS_COMPATIBLE policy, the schemas of the topic might have avro and protobuf schema. So shall we need to get the Avro type from the GenericRecord first?

And, for the GenericAvroRecord, GenericProtobufNativeRecord, etc. They have exposed the native schema, I think it's easy to get the field type from the native schema since the field exposed the index and the name.

@congbobo184
Copy link
Contributor

I also have the same confusion as @codelipenghui said.

@eolivelli
Copy link
Contributor Author

@codelipenghui @congbobo184

When you are inside a Sink you have this situation and you can access all of the schema info from the Record

Record<GenericRecord> record = ....;
GenericRecord value = record.getValue();
SchemaInfo schemaInfo = record.getSchema().getSchemaInfo();
SchemaType type = schemaInfo.getType();
value.getFields().forEach( f -> {
    org.apache.avro.Schema.Field avroField = f.unwrap(org.apache.avro.Schema.Field.class);
    org.apache.avro.Schema fieldSchema = avroField.schema();
    log.info("Field {} unwrapped as {} schema {}", f, avroField, fieldSchema);
});

Unfortunately GenericRecord is used both to receive and to send values from/to Pulsar.

We cannot easily add GenericRecord#getSchema
We already have GenericRecord#getSchemaVersion and this is already akward for people that use GenericRecord to produce messages.

Do you think we should add something like

interface GenericRecord {
     /**
       * Access Schema information from the underlying Schema implementation.
       * This API is useful only while receiving messages.
       * Returns null in case that the requested Schema type is not available
      */
     default <T> T unwrapSchema(Class<T> schemaType) {
         return null;
     }
     /**
       * Access Schema information.
       * This API is useful only while receiving messages.
       * Returns null in case that the requested information is not available.
      */
     default Optional<SchemaInfo> getSchemaInfo() {
         return Options.empty();
     }
}

I will also like it, as it will help users that are not inside a Sink and they are receiving messages with the Consumer or Reader API.

In that case I would make getSchemaVersion() an optional method by adding a default implementation and adding a note on the JavaDoc that the method is intended to be used only while receiving messages.

@codelipenghui
Copy link
Contributor

@eolivelli Currently, the PulsarClient maintains a schema cache for all topics, you can see getSchemaProviderLoadingCache method in the PulsarClientImpl, so that you can get the schema by the topic name and the schema version.

@eolivelli
Copy link
Contributor Author

@codelipenghui I cannot get your point.
Are you suggesting to expose an API in the PulsarClient to access the Schema given a topic and a SchemaVersion ?

@sijie
Copy link
Member

sijie commented Feb 24, 2021

@eolivelli I shared the same question with @codelipenghui.

Why can't you use a code snippet like the below?

Record<GenericRecord> record = ....;
GenericRecord value = record.getValue();
SchemaInfo schemaInfo = record.getSchema().getSchemaInfo();
SchemaType type = schemaInfo.getType();

if (SchemaTye.AVRO == avro) {
   // the record is an AVRO record
    org.apache.avro.generic.GenericRecord avroRecord = ((GenericAvroRecord) value).getAvroRecord();
    handleAvroRecord(avro);
} else if (SchemaType.JSON == avro) {
   // the record is a JSON record
   JsonNode jsonRecord = ((GenericJsonRecord) value).getJsonNode();
   handleJsonRecord(jsonRecord);
} else if (SchemaType....) {
   ...
}

@eolivelli
Copy link
Contributor Author

@sijie @codelipenghui
The answer is that GenericAvroRecord is not part of the public API, it is in org.apache.pulsar.client.impl.schema.generic package.
GenericRecord is in org.apache.pulsar.client.api.schema and so it is safe to be used by clients and by Pulsar Functions/Pulsar IO modules.

I believe that we need a strong and feature complete public API, that we can maintain in the future and can be used by users.

@eolivelli
Copy link
Contributor Author

@sijie @codelipenghui ping

@codelipenghui
Copy link
Contributor

@eolivelli @sijie Is it better to add method getNativeRecord or getOriginalRecord in the GenericRecord interface?

@sijie
Copy link
Member

sijie commented Mar 4, 2021

I think what @codelipenghui suggested can be a good solution.

@eolivelli
Copy link
Contributor Author

Good idea. I will update the patch soon

@eolivelli
Copy link
Contributor Author

/pulsarbot run-failure-tests

* @return the internal representation of the record, or null if the requested information is not available.
*/
default <T> T getNativeRecord(Class<T> clazz) {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused why do you throw UnsupportedOperationException for getSchemaType and return null for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we do not have a good "default value" for getSchemaType

but for this method the expected behaviour is that you get a null value if you are asking for an unsupported interface, so returning null here is the expected behaviour.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw UnsupportedOperationException for getNativeRecord? It is a much clear behavior than returning null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that UnsupportedOperationException does not work well,
because code that calls that method will have to catch it, and catching UnsupportedOperationException smells.

I thinking about generic code that handles multiple different types of records.

*
* @return the internal representation of the record, or null if the requested information is not available.
*/
default <T> T getNativeRecord(Class<T> clazz) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default <T> T getNativeRecord(Class<T> clazz) {
default <T> T getNativeRecord() {

Why do we need Class<T> here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of reasons:

  • I am adding this specification because this way the user won't have to deal with ClassCastExceptions or to use 'instanceof'.
  • Also the implementation of GenericRecord will be allowed to implement some "compatibility" feature.
    For instance if in the future we will move away from JsonNode but the client code still expects to receive a JsonNode, we will be able to return a properly crafted JsonNode instance and to not break clients.
    I am not saying we will do this soon, but if we do not add such support we won't be able to add it in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about that. Having Class<T> clazz means that the users need to know what exactly class that each schema type will use. This will be exactly the same as what I comment on #9614 (comment). There is no difference between these two approaches.

Because people anyway need to know what exactly class to use, I will instead suggest keeping the interface as simple as Object getNativeRecord(). Returning Object is very commonly seen in a lot of data processing engines that are dealing with different object types. I don't see it is a big problem to do the same thing here.

@eolivelli eolivelli requested a review from sijie March 8, 2021 10:45
Copy link
Contributor

@dlg99 dlg99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

* @return the internal representation of the record, or null if the requested information is not available.
*/
default <T> T getNativeRecord(Class<T> clazz) {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw UnsupportedOperationException for getNativeRecord? It is a much clear behavior than returning null.

*
* @return the internal representation of the record, or null if the requested information is not available.
*/
default <T> T getNativeRecord(Class<T> clazz) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about that. Having Class<T> clazz means that the users need to know what exactly class that each schema type will use. This will be exactly the same as what I comment on #9614 (comment). There is no difference between these two approaches.

Because people anyway need to know what exactly class to use, I will instead suggest keeping the interface as simple as Object getNativeRecord(). Returning Object is very commonly seen in a lot of data processing engines that are dealing with different object types. I don't see it is a big problem to do the same thing here.

@eolivelli
Copy link
Contributor Author

@sijie I have updated getNativeRecord to returning Object with a "throw new UnsupportedOperationException" as default implementation.

PTAL

@codelipenghui @dlg99 you already Approved this patch in the old form, please take a look again.

@eolivelli
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

1 similar comment
@eolivelli
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@sijie sijie changed the title Schema API: expose underlying Schema information for Fields [schema] expose the native record for struct schema Mar 10, 2021
@sijie sijie merged commit 487d2af into apache:master Mar 10, 2021
@eolivelli eolivelli deleted the fix/field-types-2 branch March 10, 2021 08:17
@eolivelli
Copy link
Contributor Author

Thank you @sijie @codelipenghui and @dlg99 for your reviews and suggestions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants