Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar IO: Allow to develop Sinks that support Schema but without setting it at build time (Sink<GenericObject>) #10034

Merged
merged 6 commits into from
Apr 11, 2021

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Mar 24, 2021

Motivation

Support users that want to implement a Sink that is aware of the Schema but it is not tied to a specific schema type at compile time.
With this patch you can implement a Sink that is aware of the Schema but it is not tied to a particular schema type at compile time.

public class ObjectSink implements Sink<GenericObject> {

    @Override
    public void write(Record<GenericObject> record) throws Exception {
        try {
            Schema<?> schema = record.getSchema();
            SchemaType type = record.getValue().getType();
            Object value = record.getValue().getNativeObject();
            log.info("Record {} type {} schema {} value {}", record, type, schema, value);
            record.ack();
        } catch (Throwable t) {
            .....
        }
    }

Modifications

We are adding support in TopicSchema.java to deal with GenericObject, the same way we did with GenericRecord. GenericRecord is a subtype of GenericObject, and AutoConsumeSchema now returns GenericObject instances that wrap primitive data types

Verifying this change

I will add test cases and integration tests as soon as the community accepts this feature.

Does this pull request potentially affect one of the following parts:

With this feature Sink developers will be able to write advanced Sinks

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

I will be happy to provide documentation as soon as this feature lands to Pulsar

@eolivelli
Copy link
Contributor Author

@eolivelli eolivelli changed the title Pulsar IO: Allow develop Sinks that support Schema but without setting it at build time (Sink<Object>) Pulsar IO: Allow to develop Sinks that support Schema but without setting it at build time (Sink<Object>) Mar 24, 2021
@eolivelli eolivelli changed the title Pulsar IO: Allow to develop Sinks that support Schema but without setting it at build time (Sink<Object>) Pulsar IO: Allow to develop Sinks that support Schema but without setting it at build time (Sink<GenericObject>) Mar 30, 2021
@eolivelli eolivelli self-assigned this Mar 31, 2021
@eolivelli eolivelli marked this pull request as ready for review April 8, 2021 07:27
@eolivelli eolivelli added area/connector component/schemaregistry type/feature The PR added a new feature or issue requested a new feature labels Apr 8, 2021
conf/standalone.conf Outdated Show resolved Hide resolved
@eolivelli
Copy link
Contributor Author

@codelipenghui I have reverted the conf file.

@eolivelli
Copy link
Contributor Author

eolivelli commented Apr 10, 2021

@codelipenghui @jerrypeng CI finally passed

@eolivelli eolivelli added this to the 2.8.0 milestone Apr 11, 2021
@eolivelli eolivelli merged commit 42e92a7 into apache:master Apr 11, 2021
@eolivelli eolivelli deleted the impl/sink-object branch April 11, 2021 14:44
eolivelli added a commit to datastax/pulsar that referenced this pull request May 12, 2021
…ting it at build time (Sink<GenericObject>) (apache#10034)

(cherry picked from commit 42e92a7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connector type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants