From 2a851ad091ab6035b84a97259bebf9cab03886e8 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 31 Jul 2018 10:22:41 -0700 Subject: [PATCH 1/2] Add convenience methods for pojo and javabean schema registration. --- .../beam/sdk/schemas/SchemaRegistry.java | 88 ++++++++++++++++++- .../beam/sdk/schemas/SchemaRegistryTest.java | 21 +++++ 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java index 895d8abb787f4..af3d9cb3e7bc3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java @@ -28,6 +28,7 @@ import java.util.ServiceLoader; import java.util.Set; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -67,8 +68,40 @@ private static class SchemaEntry { private final Map entries = Maps.newHashMap(); private final ArrayDeque providers; + class PerTypeRegisteredProvider implements SchemaProvider { + private Map providers = Maps.newHashMap(); + + void registerProvider(TypeDescriptor typeDescriptor, SchemaProvider schemaProvider) { + providers.put(typeDescriptor, schemaProvider); + } + + @Nullable + @Override + public Schema schemaFor(TypeDescriptor typeDescriptor) { + SchemaProvider schemaProvider = providers.get(typeDescriptor); + return (schemaProvider != null) ? schemaProvider.schemaFor(typeDescriptor) : null; + } + + @Nullable + @Override + public SerializableFunction toRowFunction(TypeDescriptor typeDescriptor) { + SchemaProvider schemaProvider = providers.get(typeDescriptor); + return (schemaProvider != null) ? schemaProvider.toRowFunction(typeDescriptor) : null; + } + + @Nullable + @Override + public SerializableFunction fromRowFunction(TypeDescriptor typeDescriptor) { + SchemaProvider schemaProvider = providers.get(typeDescriptor); + return (schemaProvider != null) ? schemaProvider.fromRowFunction(typeDescriptor) : null; + } + } + + private PerTypeRegisteredProvider perTypeRegisteredProviders = new PerTypeRegisteredProvider(); + private SchemaRegistry() { providers = new ArrayDeque<>(REGISTERED_SCHEMA_PROVIDERS); + providers.addFirst(perTypeRegisteredProviders); } public static SchemaRegistry createDefault() { @@ -104,6 +137,57 @@ public void registerSchemaProvider(SchemaProvider schemaProvider) { providers.addFirst(schemaProvider); } + /** Register a {@link SchemaProvider} to be used for a specific type. * */ + public void registerSchemaProvider(Class clazz, SchemaProvider schemaProvider) { + registerSchemaProvider(TypeDescriptor.of(clazz), schemaProvider); + } + + /** Register a {@link SchemaProvider} to be used for a specific type. * */ + public void registerSchemaProvider( + TypeDescriptor typeDescriptor, SchemaProvider schemaProvider) { + perTypeRegisteredProviders.registerProvider(typeDescriptor, schemaProvider); + } + + /** + * Register a POJO type for automatic schema inference. + * + *

Currently schema field names will match field names in the POJO, and all fields must be + * mutable (i.e. no final fields). + */ + public void registerPOJO(Class clazz) { + registerPOJO(TypeDescriptor.of(clazz)); + } + + /** + * Register a POJO type for automatic schema inference. + * + *

Currently schema field names will match field names in the POJO, and all fields must be + * mutable (i.e. no final fields). + */ + public void registerPOJO(TypeDescriptor typeDescriptor) { + registerSchemaProvider(typeDescriptor, new JavaFieldSchema()); + } + + /** + * Register a JavaBean type for automatic schema inference. + * + *

Currently schema field names will match getter names in the bean, and all getters must have + * matching setters. + */ + public void registerJavaBean(Class clazz) { + registerJavaBean(TypeDescriptor.of(clazz)); + } + + /** + * Register a JavaBean type for automatic schema inference. + * + *

Currently schema field names will match getter names in the bean, and all getters must have + * matching setters. + */ + public void registerJavaBean(TypeDescriptor typeDescriptor) { + registerSchemaProvider(typeDescriptor, new JavaBeanSchema()); + } + /** * Get a schema for a given {@link Class} type. If no schema exists, throws {@link * NoSuchSchemaException}. @@ -151,13 +235,13 @@ public SerializableFunction getToRowFunction(TypeDescriptor typeD return getProviderResult((SchemaProvider p) -> p.toRowFunction(typeDescriptor)); } - /** Rerieve the function that converts a {@link Row} object to an object of the specified type. */ + /** Retrieve the function that converts a {@link Row} object to the specified type. */ public SerializableFunction getFromRowFunction(Class clazz) throws NoSuchSchemaException { return getFromRowFunction(TypeDescriptor.of(clazz)); } - /** Rerieve the function that converts a {@link Row} object to an object of the specified type. */ + /** Retrieve the function that converts a {@link Row} object to the specified type. */ public SerializableFunction getFromRowFunction(TypeDescriptor typeDescriptor) throws NoSuchSchemaException { SchemaEntry entry = entries.get(typeDescriptor); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java index dbfe21a6e04c7..28b56fb89b08d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java @@ -18,11 +18,16 @@ package org.apache.beam.sdk.schemas; +import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.SIMPLE_BEAN_SCHEMA; +import static org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_SCHEMA; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; import java.util.List; +import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean; +import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; @@ -202,4 +207,20 @@ public void testDefaultSchemaProvider() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault(); assertEquals(EMPTY_SCHEMA, registry.getSchema(TestDefaultSchemaClass.class)); } + + @Test + public void testRegisterPojo() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + registry.registerPOJO(SimplePOJO.class); + Schema schema = registry.getSchema(SimplePOJO.class); + assertTrue(SIMPLE_POJO_SCHEMA.equivalent(schema)); + } + + @Test + public void testRegisterJavaBean() throws NoSuchSchemaException { + SchemaRegistry registry = SchemaRegistry.createDefault(); + registry.registerJavaBean(SimpleBean.class); + Schema schema = registry.getSchema(SimpleBean.class); + assertTrue(SIMPLE_BEAN_SCHEMA.equivalent(schema)); + } } From a1a07a5d9856fac0c52eaa28898736acaad44ebe Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 31 Jul 2018 17:40:58 -0700 Subject: [PATCH 2/2] Address code-review comments. --- .../java/org/apache/beam/sdk/schemas/SchemaRegistry.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java index af3d9cb3e7bc3..ec26c421937af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java @@ -68,8 +68,8 @@ private static class SchemaEntry { private final Map entries = Maps.newHashMap(); private final ArrayDeque providers; - class PerTypeRegisteredProvider implements SchemaProvider { - private Map providers = Maps.newHashMap(); + private static class PerTypeRegisteredProvider implements SchemaProvider { + private final Map providers = Maps.newHashMap(); void registerProvider(TypeDescriptor typeDescriptor, SchemaProvider schemaProvider) { providers.put(typeDescriptor, schemaProvider); @@ -97,7 +97,8 @@ public SerializableFunction fromRowFunction(TypeDescriptor typeDe } } - private PerTypeRegisteredProvider perTypeRegisteredProviders = new PerTypeRegisteredProvider(); + private final PerTypeRegisteredProvider perTypeRegisteredProviders = + new PerTypeRegisteredProvider(); private SchemaRegistry() { providers = new ArrayDeque<>(REGISTERED_SCHEMA_PROVIDERS);