Skip to content

Commit

Permalink
[Pulsar IO] ElasticSearch Sink: support Fixed and ENUM datatypes
Browse files Browse the repository at this point in the history
(cherry picked from commit dc7af79)
  • Loading branch information
eolivelli committed Oct 1, 2021
1 parent 4f7c838 commit 8d0a967
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;

import java.math.BigDecimal;
Expand Down Expand Up @@ -57,6 +59,8 @@ public static JsonNode toJson(Schema schema, Object value) {
}
try {
switch (schema.getType()) {
case NULL: // this should not happen
return jsonNodeFactory.nullNode();
case INT:
return jsonNodeFactory.numberNode((Integer) value);
case LONG:
Expand All @@ -69,8 +73,8 @@ public static JsonNode toJson(Schema schema, Object value) {
return jsonNodeFactory.booleanNode((Boolean) value);
case BYTES:
return jsonNodeFactory.binaryNode((byte[]) value);
case STRING:
return jsonNodeFactory.textNode(value.toString()); // can be a String or org.apache.avro.util.Utf8
case FIXED:
return jsonNodeFactory.binaryNode(((GenericFixed) value).bytes());
case ARRAY: {
Schema elementSchema = schema.getElementType();
ArrayNode arrayNode = jsonNodeFactory.arrayNode();
Expand All @@ -97,7 +101,12 @@ public static JsonNode toJson(Schema schema, Object value) {
continue;
return toJson(s, value);
}
default:
// this case should not happen
return jsonNodeFactory.textNode(value.toString());
case ENUM: // GenericEnumSymbol
case STRING: // can be a String or org.apache.avro.util.Utf8
return jsonNodeFactory.textNode(value.toString());
default: // do not fail the write
throw new UnsupportedOperationException("Unknown AVRO schema type=" + schema.getType());
}
} catch (ClassCastException error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public void testAvroToJson() throws IOException {
.name("d").type().doubleType().doubleDefault(10.0)
.name("f").type().floatType().floatDefault(10.0f)
.name("s").type().stringType().stringDefault("titi")
.name("fi").type().fixed("fi").size(3).fixedDefault(new byte[]{1,2,3})
.name("en").type().enumeration("en").symbols("a","b","c").enumDefault("b")
.name("array").type().optional().array().items(SchemaBuilder.builder().stringType())
.name("map").type().optional().map().values(SchemaBuilder.builder().intType())
.endRecord();
Expand All @@ -62,6 +64,8 @@ public void testAvroToJson() throws IOException {
genericRecord.put("d", 10.0);
genericRecord.put("f", 10.0f);
genericRecord.put("s", "toto");
genericRecord.put("fi", GenericData.get().createFixed(null, new byte[]{'a','b','c'}, schema.getField("fi").schema()));
genericRecord.put("en", GenericData.get().createEnum("b", schema.getField("en").schema()));
genericRecord.put("array", new String[] {"toto"});
genericRecord.put("map", ImmutableMap.of("a",10));
JsonNode jsonNode = JsonConverter.toJson(genericRecord);
Expand All @@ -70,6 +74,8 @@ public void testAvroToJson() throws IOException {
assertEquals(jsonNode.get("i").asInt(), 1);
assertEquals(jsonNode.get("b").asBoolean(), true);
assertEquals(jsonNode.get("bb").binaryValue(), "10".getBytes(StandardCharsets.UTF_8));
assertEquals(jsonNode.get("fi").binaryValue(), "abc".getBytes(StandardCharsets.UTF_8));
assertEquals(jsonNode.get("en").textValue(), "b");
assertEquals(jsonNode.get("d").asDouble(), 10.0);
assertEquals(jsonNode.get("f").numberValue(), 10.0f);
assertEquals(jsonNode.get("s").asText(), "toto");
Expand Down

0 comments on commit 8d0a967

Please sign in to comment.