Skip to content

Commit

Permalink
Add JsonRecordBuilder implementation (apache#10052)
Browse files Browse the repository at this point in the history
Provide a JSON GenericRecord builder allowing to produce JsonGenericRecord from the GenericJsonSchema.

Add a new class org.apache.pulsar.client.impl.schema.generic.JsonRecordBuilderImpl
Modify the org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema.newRecordBuilder()

Add a unit test in org.apache.pulsar.client.impl.schema.JSONSchemaTest

(cherry picked from commit 08cbdc7)
  • Loading branch information
Vincent Royer authored and eolivelli committed May 12, 2021
1 parent ea5b676 commit fe980e7
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ Field build() {
case TIMESTAMP:
baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
break;
case JSON:
checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.JSON,
"The field is expected to be using JSON schema but "
+ genericSchema.getSchemaInfo().getType() + " schema is found");
AvroBaseStructSchema genericJsonSchema = (AvroBaseStructSchema) genericSchema;
baseSchema = genericJsonSchema.getAvroSchema();
break;
case AVRO:
checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.AVRO,
"The field is expected to be using AVRO schema but "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl.schema.generic;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.common.schema.SchemaInfo;

Expand All @@ -41,6 +43,18 @@ public GenericJsonSchema(SchemaInfo schemaInfo) {

@Override
public GenericRecordBuilder newRecordBuilder() {
throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
return new JsonRecordBuilderImpl(this);
}

public boolean supportSchemaVersioning() {
return true;
}

public Schema<GenericRecord> clone() {
Schema<GenericRecord> schema = of(this.schemaInfo, ((AbstractMultiVersionGenericReader)this.reader).useProvidedSchemaAsReaderSchema);
if (this.schemaInfoProvider != null) {
schema.setSchemaInfoProvider(this.schemaInfoProvider);
}
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;

import java.util.HashMap;
import java.util.Map;

public class JsonRecordBuilderImpl implements GenericRecordBuilder {

private static ObjectMapper objectMapper = new ObjectMapper();

private final GenericSchemaImpl genericSchema;
private Map<String, Object> map = new HashMap<>();

public JsonRecordBuilderImpl(GenericSchemaImpl genericSchema) {
this.genericSchema = genericSchema;
}

/**
* Sets the value of a field.
*
* @param fieldName the name of the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
@Override
public GenericRecordBuilder set(String fieldName, Object value) {
if (value instanceof GenericRecord) {
if (!(value instanceof GenericJsonRecord))
throw new IllegalArgumentException("JSON Record Builder doesn't support non-JSON record as a field");
GenericJsonRecord genericJsonRecord = (GenericJsonRecord) value;
value = genericJsonRecord.getJsonNode();
}

map.put(fieldName, value);
return this;
}

/**
* Sets the value of a field.
*
* @param field the field to set.
* @param value the value to set.
* @return a reference to the RecordBuilder.
*/
@Override
public GenericRecordBuilder set(Field field, Object value) {
set(field.getName(), value);
return this;
}

/**
* Clears the value of the given field.
*
* @param fieldName the name of the field to clear.
* @return a reference to the RecordBuilder.
*/
@Override
public GenericRecordBuilder clear(String fieldName) {
map.remove(fieldName);
return this;
}

/**
* Clears the value of the given field.
*
* @param field the field to clear.
* @return a reference to the RecordBuilder.
*/
@Override
public GenericRecordBuilder clear(Field field) {
clear(field.getName());
return this;
}

@Override
public GenericRecord build() {
JsonNode jn = objectMapper.valueToTree(map);
return new GenericJsonRecord(
null,
genericSchema.getFields(),
jn,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,30 @@

import java.util.Collections;
import java.util.List;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.DerivedFoo;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBarList;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.skyscreamer.jsonassert.JSONAssert;
import org.testng.Assert;
Expand Down Expand Up @@ -332,4 +344,109 @@ public void testDecodeByteBuf() {
assertEquals(jsonSchema.decode(byteBuf), foo1);

}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class Seller {
public String state;
public String street;
public long zipCode;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class PC {
public String brand;
public String model;
public int year;
public GPU gpu;
public Seller seller;
}

private enum GPU {
AMD, NVIDIA
}

@Test
public void testEncodeAndDecodeObject() throws JsonProcessingException {
JSONSchema<PC> jsonSchema = JSONSchema.of(SchemaDefinition.<PC>builder().withPojo(PC.class).build());
PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
new Seller("WA", "street", 98004));
byte[] encoded = jsonSchema.encode(pc);
PC roundtrippedPc = jsonSchema.decode(encoded);
assertEquals(roundtrippedPc, pc);
}

@Test
public void testGetNativeSchema() throws SchemaValidationException {
JSONSchema<PC> schema2 = JSONSchema.of(PC.class);
org.apache.avro.Schema avroSchema2 = (Schema) schema2.getNativeSchema().get();
assertSame(schema2.schema, avroSchema2);
}

@Test
public void testJsonGenericRecordBuilder() {
JSONSchema<Seller> sellerJsonSchema = JSONSchema.of(Seller.class);

RecordSchemaBuilder sellerSchemaBuilder = SchemaBuilder.record("seller");
sellerSchemaBuilder.field("state").type(SchemaType.STRING);
sellerSchemaBuilder.field("street").type(SchemaType.STRING);
sellerSchemaBuilder.field("zipCode").type(SchemaType.INT64);
SchemaInfo sellerSchemaInfo = sellerSchemaBuilder.build(SchemaType.JSON);
GenericSchemaImpl sellerGenericSchema = GenericSchemaImpl.of(sellerSchemaInfo);

JSONSchema<PC> pcJsonSchema = JSONSchema.of(PC.class);

RecordSchemaBuilder pcSchemaBuilder = SchemaBuilder.record("pc");
pcSchemaBuilder.field("brand").type(SchemaType.STRING);
pcSchemaBuilder.field("model").type(SchemaType.STRING);
pcSchemaBuilder.field("gpu").type(SchemaType.STRING);
pcSchemaBuilder.field("year").type(SchemaType.INT64);
pcSchemaBuilder.field("seller", sellerGenericSchema).type(SchemaType.JSON).optional();
SchemaInfo pcGenericSchemaInfo = pcSchemaBuilder.build(SchemaType.JSON);
GenericSchemaImpl pcGenericSchema = GenericSchemaImpl.of(pcGenericSchemaInfo);

Seller seller = new Seller("USA","oakstreet",9999);
PC pc = new PC("dell","g3",2020, GPU.AMD, seller);

byte[] bytes = pcJsonSchema.encode(pc);
Assert.assertTrue(bytes.length > 0);

Object pc2 = pcJsonSchema.decode(bytes);
assertEquals(pc, pc2);

GenericRecord sellerRecord = sellerGenericSchema.newRecordBuilder()
.set("state", "USA")
.set("street", "oakstreet")
.set("zipCode", 9999)
.build();

GenericRecord pcRecord = pcGenericSchema.newRecordBuilder()
.set("brand", "dell")
.set("model","g3")
.set("year", 2020)
.set("gpu", GPU.AMD)
.set("seller", sellerRecord)
.build();

byte[] bytes3 = pcGenericSchema.encode(pcRecord);
Assert.assertTrue(bytes3.length > 0);
GenericRecord pc3Record = pcGenericSchema.decode(bytes3);

for(Field field : pc3Record.getFields()) {
assertTrue(pcGenericSchema.getFields().contains(field));
}
assertEquals("dell", pc3Record.getField("brand"));
assertEquals("g3", pc3Record.getField("model"));
assertEquals(2020, pc3Record.getField("year"));
assertEquals(GPU.AMD.toString(), pc3Record.getField("gpu"));


GenericRecord seller3Record = (GenericRecord) pc3Record.getField("seller");
assertEquals("USA", seller3Record.getField("state"));
assertEquals("oakstreet", seller3Record.getField("street"));
assertEquals(9999, seller3Record.getField("zipCode"));
}
}

0 comments on commit fe980e7

Please sign in to comment.