Skip to content

Commit

Permalink
ElasticSearch Sink: handle Cassandra cql_varint and cql_decimal logic…
Browse files Browse the repository at this point in the history
…al types

- add support for dealing with custom logical types 'cql_varint' and 'cql_decimal'
- add test case for 'cql_duration'

(cherry picked from commit e162dec)
  • Loading branch information
eolivelli committed Sep 28, 2021
1 parent c1c297f commit 4f7c838
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,19 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericRecord;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.math.BigInteger;
import java.util.*;

/**
* Convert an AVRO GenericRecord to a JsonNode.
*/
public class JsonConverter {

private static Map<String, LogicalTypeConverter<?>> logicalTypeConverters = new HashMap<>();
private static Map<String, LogicalTypeConverter> logicalTypeConverters = new HashMap<>();
private static final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);

public static JsonNode toJson(GenericRecord genericRecord) {
Expand All @@ -54,135 +49,142 @@ public static JsonNode toJson(GenericRecord genericRecord) {
}

public static JsonNode toJson(Schema schema, Object value) {
if (schema.getLogicalType() != null && logicalTypeConverters.containsKey(schema.getLogicalType().getName())) {
return logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema, value);
}
if (value == null) {
return jsonNodeFactory.nullNode();
}
switch(schema.getType()) {
case INT:
return jsonNodeFactory.numberNode((Integer) value);
case LONG:
return jsonNodeFactory.numberNode((Long) value);
case DOUBLE:
return jsonNodeFactory.numberNode((Double) value);
case FLOAT:
return jsonNodeFactory.numberNode((Float) value);
case BOOLEAN:
return jsonNodeFactory.booleanNode((Boolean) value);
case BYTES:
return jsonNodeFactory.binaryNode((byte[]) value);
case STRING:
return jsonNodeFactory.textNode(value.toString()); // can be a String or org.apache.avro.util.Utf8
case ARRAY: {
Schema elementSchema = schema.getElementType();
ArrayNode arrayNode = jsonNodeFactory.arrayNode();
for(Object elem : (Object[]) value) {
JsonNode fieldValue = toJson(elementSchema, elem);
arrayNode.add(fieldValue);
if (schema.getLogicalType() != null && logicalTypeConverters.containsKey(schema.getLogicalType().getName())) {
return logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema, value);
}
try {
switch (schema.getType()) {
case INT:
return jsonNodeFactory.numberNode((Integer) value);
case LONG:
return jsonNodeFactory.numberNode((Long) value);
case DOUBLE:
return jsonNodeFactory.numberNode((Double) value);
case FLOAT:
return jsonNodeFactory.numberNode((Float) value);
case BOOLEAN:
return jsonNodeFactory.booleanNode((Boolean) value);
case BYTES:
return jsonNodeFactory.binaryNode((byte[]) value);
case STRING:
return jsonNodeFactory.textNode(value.toString()); // can be a String or org.apache.avro.util.Utf8
case ARRAY: {
Schema elementSchema = schema.getElementType();
ArrayNode arrayNode = jsonNodeFactory.arrayNode();
for (Object elem : (Object[]) value) {
JsonNode fieldValue = toJson(elementSchema, elem);
arrayNode.add(fieldValue);
}
return arrayNode;
}
return arrayNode;
}
case MAP: {
Map<String, Object> map = (Map<String, Object>) value;
ObjectNode objectNode = jsonNodeFactory.objectNode();
for(Map.Entry<String, Object> entry : map.entrySet()) {
JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue());
objectNode.set(entry.getKey(), jsonNode);
case MAP: {
Map<String, Object> map = (Map<String, Object>) value;
ObjectNode objectNode = jsonNodeFactory.objectNode();
for (Map.Entry<String, Object> entry : map.entrySet()) {
JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue());
objectNode.set(entry.getKey(), jsonNode);
}
return objectNode;
}
return objectNode;
case RECORD:
return toJson((GenericRecord) value);
case UNION:
for (Schema s : schema.getTypes()) {
if (s.getType() == Schema.Type.NULL)
continue;
return toJson(s, value);
}
default:
throw new UnsupportedOperationException("Unknown AVRO schema type=" + schema.getType());
}
case RECORD:
return toJson((GenericRecord) value);
case UNION:
for(Schema s: schema.getTypes()) {
if (s.getType() == Schema.Type.NULL)
continue;
return toJson(s, value);
}
default:
throw new UnsupportedOperationException("Unknown AVRO schema type="+schema.getType());
} catch (ClassCastException error) {
throw new IllegalArgumentException("Error while converting a value of type " + value.getClass() + " to a " + schema.getType()
+ ": " + error, error);
}
}

abstract static class LogicalTypeConverter<T> {
final Conversion<T> conversion;

public LogicalTypeConverter(Conversion<T> conversion) {
this.conversion = conversion;
}
abstract static class LogicalTypeConverter {

abstract JsonNode toJson(Schema schema, Object value);
}

private static void checkType(Object value, String name, Class expected) {
if (value == null) {
throw new IllegalArgumentException("Invalid type for " + name + ", expected " + expected.getName() + " but was NULL");
}
if (!expected.isInstance(value)) {
throw new IllegalArgumentException("Invalid type for " + name + ", expected " + expected.getName() + " but was " + value.getClass());
}
}

static {
logicalTypeConverters.put("decimal", new JsonConverter.LogicalTypeConverter<BigDecimal>(new Conversions.DecimalConversion()) {
logicalTypeConverters.put("cql_varint", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof BigDecimal)) {
throw new IllegalArgumentException("Invalid type for Decimal, expected BigDecimal but was " + value.getClass());
}
BigDecimal decimal = (BigDecimal)value;
return jsonNodeFactory.numberNode(decimal);
checkType(value, "cql_varint", byte[].class);
return jsonNodeFactory.numberNode(new BigInteger((byte[]) value));
}
});
logicalTypeConverters.put("date", new JsonConverter.LogicalTypeConverter<LocalDate>(new TimeConversions.DateConversion()) {
logicalTypeConverters.put("cql_decimal", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof Integer)) {
throw new IllegalArgumentException("Invalid type for date, expected Integer but was " + value.getClass());
}
checkType(value, "cql_decimal", GenericRecord.class);
GenericRecord record = (GenericRecord) value;
Object bigint = record.get("bigint");
checkType(bigint, "cql_decimal - bigint", byte[].class);
Object scale = record.get("scale");
checkType(scale, "cql_decimal - scale", Integer.class);
BigInteger asBigint = new BigInteger((byte[]) record.get("bigint"));
return jsonNodeFactory.numberNode(new BigDecimal(asBigint, (Integer) scale));
}
});
logicalTypeConverters.put("date", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
checkType(value, "date", Integer.class);
Integer daysFromEpoch = (Integer)value;
return jsonNodeFactory.numberNode(daysFromEpoch);
}
});
logicalTypeConverters.put("time-millis", new JsonConverter.LogicalTypeConverter<LocalTime>(new TimeConversions.TimeMillisConversion()) {
logicalTypeConverters.put("time-millis", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof Integer)) {
throw new IllegalArgumentException("Invalid type for time-millis, expected Integer but was " + value.getClass());
}
checkType(value, "time-millis", Integer.class);
Integer timeMillis = (Integer)value;
return jsonNodeFactory.numberNode(timeMillis);
}
});
logicalTypeConverters.put("time-micros", new JsonConverter.LogicalTypeConverter<LocalTime>(new TimeConversions.TimeMicrosConversion()) {
logicalTypeConverters.put("time-micros", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof Long)) {
throw new IllegalArgumentException("Invalid type for time-micros, expected Long but was " + value.getClass());
}
checkType(value, "time-micros", Long.class);
Long timeMicro = (Long)value;
return jsonNodeFactory.numberNode(timeMicro);
}
});
logicalTypeConverters.put("timestamp-millis", new JsonConverter.LogicalTypeConverter<Instant>(new TimeConversions.TimestampMillisConversion()) {
logicalTypeConverters.put("timestamp-millis", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof Long)) {
throw new IllegalArgumentException("Invalid type for timestamp-millis, expected Long but was " + value.getClass());
}
checkType(value, "timestamp-millis", Long.class);
Long epochMillis = (Long)value;
return jsonNodeFactory.numberNode(epochMillis);
}
});
logicalTypeConverters.put("timestamp-micros", new JsonConverter.LogicalTypeConverter<Instant>(new TimeConversions.TimestampMicrosConversion()) {
logicalTypeConverters.put("timestamp-micros", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof Long)) {
throw new IllegalArgumentException("Invalid type for timestamp-micros, expected Long but was " + value.getClass());
}
checkType(value, "timestamp-micros", Long.class);
Long epochMillis = (Long)value;
return jsonNodeFactory.numberNode(epochMillis);
}
});
logicalTypeConverters.put("uuid", new JsonConverter.LogicalTypeConverter<UUID>(new Conversions.UUIDConversion()) {
logicalTypeConverters.put("uuid", new JsonConverter.LogicalTypeConverter() {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof String)) {
throw new IllegalArgumentException("Invalid type for uuid, expected String but was " + value.getClass());
}
checkType(value, "uuid", String.class);
String uuidString = (String)value;
return jsonNodeFactory.textNode(uuidString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.google.common.collect.ImmutableMap;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
Expand All @@ -29,7 +30,7 @@
import org.testng.annotations.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.*;

Expand Down Expand Up @@ -81,7 +82,24 @@ public void testAvroToJson() throws IOException {

@Test
public void testLogicalTypesToJson() {
Schema decimalType = LogicalTypes.decimal(3,3).addToSchema(Schema.create(Schema.Type.BYTES));
org.apache.avro.Schema decimalType = new LogicalType("cql_decimal").addToSchema(
org.apache.avro.SchemaBuilder.record("record")
.fields()
.name("bigint").type().bytesType().noDefault()
.name("scale").type().intType().noDefault()
.endRecord()
);
org.apache.avro.Schema varintType = new LogicalType("cql_varint").addToSchema(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)
);
org.apache.avro.Schema durationType = new LogicalType("cql_duration").addToSchema(
org.apache.avro.SchemaBuilder.record("record")
.fields()
.name("months").type().intType().noDefault()
.name("days").type().intType().noDefault()
.name("nanoseconds").type().longType().noDefault()
.endRecord()
);
Schema dateType = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
Schema timestampMillisType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampMicrosType = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Expand All @@ -97,12 +115,24 @@ public void testLogicalTypesToJson() {
.name("timemillis").type(timeMillisType).noDefault()
.name("timemicros").type(timeMicrosType).noDefault()
.name("myuuid").type(uuidType).noDefault()
.name("myvarint").type(varintType).noDefault()
.name("myduration").type(durationType).noDefault()
.endRecord();

final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
BigDecimal myDecimal = new BigDecimal("10.34");

UUID myUuid = UUID.randomUUID();
Calendar calendar = new GregorianCalendar(TimeZone.getTimeZone("Europe/Copenhagen"));

GenericRecord myDecimal = new GenericData.Record(decimalType);
myDecimal.put("bigint", BigInteger.valueOf(123).toByteArray());
myDecimal.put("scale", 2);

GenericRecord myduration = new GenericData.Record(durationType);
myduration.put("months", 5);
myduration.put("days", 2);
myduration.put("nanoseconds", 1000 * 1000 * 1000 * 30L); // 30 seconds

GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("amount", myDecimal);
genericRecord.put("mydate", (int)calendar.toInstant().getEpochSecond());
Expand All @@ -111,13 +141,20 @@ public void testLogicalTypesToJson() {
genericRecord.put("timemillis", (int)(calendar.getTimeInMillis() % MILLIS_PER_DAY));
genericRecord.put("timemicros", (calendar.getTimeInMillis() %MILLIS_PER_DAY) * 1000);
genericRecord.put("myuuid", myUuid.toString());
genericRecord.put("myvarint", BigInteger.valueOf(12234).toByteArray());
genericRecord.put("myduration", myduration);

JsonNode jsonNode = JsonConverter.toJson(genericRecord);
assertEquals(new BigDecimal(jsonNode.get("amount").asText()), myDecimal);
assertEquals(jsonNode.get("amount").asText(), "1.23");
assertEquals(jsonNode.get("myvarint").asLong(), 12234);
assertEquals(jsonNode.get("mydate").asInt(), calendar.toInstant().getEpochSecond());
assertEquals(jsonNode.get("tsmillis").asInt(), (int)calendar.getTimeInMillis());
assertEquals(jsonNode.get("tsmicros").asLong(), calendar.getTimeInMillis() * 1000);
assertEquals(jsonNode.get("timemillis").asInt(), (int)(calendar.getTimeInMillis() % MILLIS_PER_DAY));
assertEquals(jsonNode.get("timemicros").asLong(), (calendar.getTimeInMillis() %MILLIS_PER_DAY) * 1000);
assertEquals(jsonNode.get("myduration").get("months").asInt(), 5);
assertEquals(jsonNode.get("myduration").get("days").asInt(), 2);
assertEquals(jsonNode.get("myduration").get("nanoseconds").asLong(), 30000000000L);
assertEquals(UUID.fromString(jsonNode.get("myuuid").asText()), myUuid);
}
}

0 comments on commit 4f7c838

Please sign in to comment.