From d5aeb665c9d6c11219809a21801fb575b6ee33ed Mon Sep 17 00:00:00 2001 From: Vincent Royer Date: Thu, 29 Apr 2021 00:03:47 +0200 Subject: [PATCH] Add AvroSchema UUID support --- .../pulsar/client/impl/schema/AvroSchema.java | 1 + .../client/impl/schema/util/SchemaUtil.java | 8 ++++++-- .../pulsar/client/impl/schema/AvroSchemaTest.java | 15 +++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index d5a99f82ff384..e405497b83cd5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -122,6 +122,7 @@ public static void addLogicalTypeConversions(ReflectData reflectData, boolean js // Skip if have not provide joda-time dependency. } } + reflectData.addLogicalTypeConversion(new Conversions.UUIDConversion()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java index 7e1e1c052f5eb..b73fb03f346c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java @@ -18,10 +18,12 @@ */ package org.apache.pulsar.client.impl.schema.util; +import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.reflect.ReflectData; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -88,8 +90,10 @@ public static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class try { return parseAvroSchema(pojo.getDeclaredField("SCHEMA$").get(null).toString()); } catch (NoSuchFieldException | IllegalAccessException | IllegalArgumentException ignored) { - return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) - : ReflectData.get().getSchema(pojo); + ReflectData reflectData = schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get() + : ReflectData.get(); + AvroSchema.addLogicalTypeConversions(reflectData, schemaDefinition.isJsr310ConversionEnabled()); + return reflectData.getSchema(pojo); } } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java index dcf3abe7adfcc..340c53c897175 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java @@ -36,6 +36,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import java.util.UUID; + import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -426,4 +428,17 @@ public void testAvroSchemaUserDefinedReadAndWriter() { assertEquals(field1, foo.getField1()); } + static class MyPojo { + public UUID uid; + } + + @Test + public void testAvroUUID() { + org.apache.pulsar.client.api.Schema schema = org.apache.pulsar.client.api.Schema.AVRO(MyPojo.class); + MyPojo pojo1 = new MyPojo(); + pojo1.uid = UUID.randomUUID(); + MyPojo pojo2 = schema.decode(schema.encode(pojo1)); + assertEquals(pojo1.uid, pojo2.uid); + } + }