Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #6842] feat(schema): allow for schema reader and writer registration on SchemaDefinition #6905

Merged
merged 5 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.api.schema;

import java.util.Map;
import java.util.Optional;

import org.apache.pulsar.client.internal.DefaultImplementation;

/**
Expand Down Expand Up @@ -76,4 +78,18 @@ static <T> SchemaDefinitionBuilder<T> builder() {
* @return the flag of supportSchemaVersioning
*/
boolean getSupportSchemaVersioning();

/**
* Get a configured schema reader.
*
* @return optional containing configured schema reader or empty optional if none is configure
*/
Optional<SchemaReader<T>> getSchemaReaderOpt();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is used for more than just JSON schemas, right? If so, it will be confusing for users to be able to set a reader and writer for any type of schema when they will only be used for JSON schemas and ignored otherwise. So we either need to support this broadly, or instead we can overload JSONSchema.of() for a more targeted approach.

Copy link
Contributor Author

@Persi Persi May 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface can be used for other schemas as well, but both SchemaReader and SchemaWriter as well. I see what you mean, did not recognize the Producer and Consumer instantiators with Schema as parameter. But I am not sure if Schema class is meant to be used like that from outside the framework. As they already introduced some default Schema instantiators in the client api Schema class, I would not promote Schema.of for public use, as it is an internal of the framework.

Copy link

@hansenc hansenc May 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you'll probably want to overload Schema.JSON(Class<T>) and Schema.JSON(SchemaDefinition) as well as JSONSchema.of(). All of these are referenced in the docs so presumably they are meant for public use. (Though I would have guessed JSONSchema.of is not meant for public use since it's in the impl package. 🤷)


/**
* Get a configured schema writer.
*
* @return optional containing configured schema writer or empty optional if none is configure
*/
Optional<SchemaWriter<T>> getSchemaWriterOpt();
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ public interface SchemaDefinitionBuilder<T> {
*/
SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning);

/**
* Set schema reader for deserialization of object data.
*
* @param reader reader for object data
*
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withSchemaReader(SchemaReader<T> reader);

/**
* Set schema writer for serialization of objects.
*
* @param writer writer for objects
*
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withSchemaWriter(SchemaWriter<T> writer);

/**
* Build the schema definition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.JsonReader;
import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.JacksonJsonReader;
import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -51,11 +52,11 @@ public class JSONSchema<T> extends StructSchema<T> {

private final Class<T> pojo;

private JSONSchema(SchemaInfo schemaInfo, Class<T> pojo) {
private JSONSchema(SchemaInfo schemaInfo, Class<T> pojo, SchemaReader<T> reader, SchemaWriter<T> writer) {
super(schemaInfo);
this.pojo = pojo;
setWriter(new JsonWriter<>(JSON_MAPPER.get()));
setReader(new JsonReader<>(JSON_MAPPER.get(), pojo));
setWriter(writer);
setReader(reader);
}

@Override
Expand Down Expand Up @@ -88,7 +89,11 @@ public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() {
}

public static <T> JSONSchema<T> of(SchemaDefinition<T> schemaDefinition) {
return new JSONSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.JSON), schemaDefinition.getPojo());
SchemaReader<T> reader = schemaDefinition.getSchemaReaderOpt()
.orElseGet(() -> new JacksonJsonReader<>(JSON_MAPPER.get(), schemaDefinition.getPojo()));
SchemaWriter<T> writer = schemaDefinition.getSchemaWriterOpt()
.orElseGet(() -> new JacksonJsonWriter<>(JSON_MAPPER.get()));
return new JSONSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.JSON), schemaDefinition.getPojo(), reader, writer);
}

public static <T> JSONSchema<T> of(Class<T> pojo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -72,6 +74,10 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
*/
private boolean supportSchemaVersioning = false;

private SchemaReader<T> reader;

private SchemaWriter<T> writer;

@Override
public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
this.alwaysAllowNull = alwaysAllowNull;
Expand Down Expand Up @@ -114,6 +120,18 @@ public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties)
return this;
}

@Override
public SchemaDefinitionBuilder<T> withSchemaReader(SchemaReader<T> reader) {
this.reader=reader;
return this;
}

@Override
public SchemaDefinitionBuilder<T> withSchemaWriter(SchemaWriter<T> writer) {
this.writer = writer;
return this;
}

@Override
public SchemaDefinition<T> build() {
checkArgument(StringUtils.isNotBlank(jsonDef) || clazz != null,
Expand All @@ -125,7 +143,7 @@ public SchemaDefinition<T> build() {
properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
jsr310ConversionEnabled);
jsr310ConversionEnabled, reader, writer);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@


import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/**
* A json schema definition
* {@link org.apache.pulsar.client.api.schema.SchemaDefinition} for the json schema definition.
*/
public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {

/**
* the schema definition class
*/
private Class<T> pojo;
private Class<T> pojo;
/**
* The flag of schema type always allow null
*
Expand All @@ -52,15 +55,22 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{

private final boolean jsr310ConversionEnabled;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties,
boolean supportSchemaVersioning, boolean jsr310ConversionEnabled) {
private final SchemaReader<T> reader;

private final SchemaWriter<T> writer;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String, String> properties,
boolean supportSchemaVersioning, boolean jsr310ConversionEnabled, SchemaReader<T> reader, SchemaWriter<T> writer) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
this.supportSchemaVersioning = supportSchemaVersioning;
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
this.reader = reader;
this.writer = writer;
}

/**
* get schema whether always allow null or not
*
Expand All @@ -83,6 +93,7 @@ public boolean isJsr310ConversionEnabled() {
public String getJsonDef() {
return jsonDef;
}

/**
* Get pojo schema definition
*
Expand All @@ -98,6 +109,16 @@ public boolean getSupportSchemaVersioning() {
return supportSchemaVersioning;
}

@Override
public Optional<SchemaReader<T>> getSchemaReaderOpt() {
return Optional.ofNullable(reader);
}

@Override
public Optional<SchemaWriter<T>> getSchemaWriterOpt() {
return Optional.ofNullable(writer);
}

/**
* Get schema class
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.reader;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;

/**
* Reader implementation for reading objects from JSON.
*
* @param <T> object type to read
*/
public class JacksonJsonReader<T> implements SchemaReader<T> {
Persi marked this conversation as resolved.
Show resolved Hide resolved
private final Class<T> pojo;
private final ObjectMapper objectMapper;

public JacksonJsonReader(ObjectMapper objectMapper, Class<T> pojo) {
this.pojo = pojo;
this.objectMapper = objectMapper;
}

@Override
public T read(byte[] bytes, int offset, int length) {
try {
return objectMapper.readValue(bytes, offset, length, this.pojo);
} catch (IOException e) {
throw new SchemaSerializationException(e);
}
}

@Override
public T read(InputStream inputStream) {
try {
return objectMapper.readValue(inputStream, pojo);
} catch (IOException e) {
throw new SchemaSerializationException(e);
} finally {
try {
inputStream.close();
} catch (IOException e) {
log.error("JsonReader close inputStream close error", e);
}
}
}

private static final Logger log = LoggerFactory.getLogger(JacksonJsonReader.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
import java.io.IOException;
import java.io.InputStream;

/**
* Reader implementation for reading objects from JSON.
*
* @param <T> object type to read
* @deprecated use {@link JacksonJsonReader} instead.
*/
@Deprecated
public class JsonReader<T> implements SchemaReader<T> {
private final Class<T> pojo;
private final ObjectMapper objectMapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.writer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaWriter;

/**
* Writer implementation for writing objects as JSON.
*
* @param <T>
*/
public class JacksonJsonWriter<T> implements SchemaWriter<T> {

private final ObjectMapper objectMapper;

public JacksonJsonWriter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public byte[] write(T message) {
try {
return objectMapper.writeValueAsBytes(message);
} catch (JsonProcessingException e) {
throw new SchemaSerializationException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaWriter;

/**
* Writer implementation for writing objects as JSON.
*
* @param <T>
* @deprecated use {@link JacksonJsonWriter} instead.
*/
@Deprecated
public class JsonWriter<T> implements SchemaWriter<T> {

private final ObjectMapper objectMapper;
Expand Down