Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4453] Add convenience methods for pojo and javabean schema registration. #6114

Merged
merged 2 commits into from Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,7 @@
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -67,8 +68,40 @@ private static class SchemaEntry<T> {
private final Map<TypeDescriptor, SchemaEntry> entries = Maps.newHashMap();
private final ArrayDeque<SchemaProvider> providers;

class PerTypeRegisteredProvider implements SchemaProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private Map<TypeDescriptor, SchemaProvider> providers = Maps.newHashMap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


void registerProvider(TypeDescriptor typeDescriptor, SchemaProvider schemaProvider) {
providers.put(typeDescriptor, schemaProvider);
}

@Nullable
@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
SchemaProvider schemaProvider = providers.get(typeDescriptor);
return (schemaProvider != null) ? schemaProvider.schemaFor(typeDescriptor) : null;
}

@Nullable
@Override
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {
SchemaProvider schemaProvider = providers.get(typeDescriptor);
return (schemaProvider != null) ? schemaProvider.toRowFunction(typeDescriptor) : null;
}

@Nullable
@Override
public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
SchemaProvider schemaProvider = providers.get(typeDescriptor);
return (schemaProvider != null) ? schemaProvider.fromRowFunction(typeDescriptor) : null;
}
}

private PerTypeRegisteredProvider perTypeRegisteredProviders = new PerTypeRegisteredProvider();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private SchemaRegistry() {
providers = new ArrayDeque<>(REGISTERED_SCHEMA_PROVIDERS);
providers.addFirst(perTypeRegisteredProviders);
}

public static SchemaRegistry createDefault() {
Expand Down Expand Up @@ -104,6 +137,57 @@ public void registerSchemaProvider(SchemaProvider schemaProvider) {
providers.addFirst(schemaProvider);
}

/** Register a {@link SchemaProvider} to be used for a specific type. * */
public <T> void registerSchemaProvider(Class<T> clazz, SchemaProvider schemaProvider) {
registerSchemaProvider(TypeDescriptor.of(clazz), schemaProvider);
}

/** Register a {@link SchemaProvider} to be used for a specific type. * */
public <T> void registerSchemaProvider(
TypeDescriptor<T> typeDescriptor, SchemaProvider schemaProvider) {
perTypeRegisteredProviders.registerProvider(typeDescriptor, schemaProvider);
}

/**
* Register a POJO type for automatic schema inference.
*
* <p>Currently schema field names will match field names in the POJO, and all fields must be
* mutable (i.e. no final fields).
*/
public <T> void registerPOJO(Class<T> clazz) {
registerPOJO(TypeDescriptor.of(clazz));
}

/**
* Register a POJO type for automatic schema inference.
*
* <p>Currently schema field names will match field names in the POJO, and all fields must be
* mutable (i.e. no final fields).
*/
public <T> void registerPOJO(TypeDescriptor<T> typeDescriptor) {
registerSchemaProvider(typeDescriptor, new JavaFieldSchema());
}

/**
* Register a JavaBean type for automatic schema inference.
*
* <p>Currently schema field names will match getter names in the bean, and all getters must have
* matching setters.
*/
public <T> void registerJavaBean(Class<T> clazz) {
registerJavaBean(TypeDescriptor.of(clazz));
}

/**
* Register a JavaBean type for automatic schema inference.
*
* <p>Currently schema field names will match getter names in the bean, and all getters must have
* matching setters.
*/
public <T> void registerJavaBean(TypeDescriptor<T> typeDescriptor) {
registerSchemaProvider(typeDescriptor, new JavaBeanSchema());
}

/**
* Get a schema for a given {@link Class} type. If no schema exists, throws {@link
* NoSuchSchemaException}.
Expand Down Expand Up @@ -151,13 +235,13 @@ public <T> SerializableFunction<T, Row> getToRowFunction(TypeDescriptor<T> typeD
return getProviderResult((SchemaProvider p) -> p.toRowFunction(typeDescriptor));
}

/** Rerieve the function that converts a {@link Row} object to an object of the specified type. */
/** Retrieve the function that converts a {@link Row} object to the specified type. */
public <T> SerializableFunction<Row, T> getFromRowFunction(Class<T> clazz)
throws NoSuchSchemaException {
return getFromRowFunction(TypeDescriptor.of(clazz));
}

/** Rerieve the function that converts a {@link Row} object to an object of the specified type. */
/** Retrieve the function that converts a {@link Row} object to the specified type. */
public <T> SerializableFunction<Row, T> getFromRowFunction(TypeDescriptor<T> typeDescriptor)
throws NoSuchSchemaException {
SchemaEntry entry = entries.get(typeDescriptor);
Expand Down
Expand Up @@ -18,11 +18,16 @@

package org.apache.beam.sdk.schemas;

import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.SIMPLE_BEAN_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -202,4 +207,20 @@ public void testDefaultSchemaProvider() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
assertEquals(EMPTY_SCHEMA, registry.getSchema(TestDefaultSchemaClass.class));
}

@Test
public void testRegisterPojo() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
registry.registerPOJO(SimplePOJO.class);
Schema schema = registry.getSchema(SimplePOJO.class);
assertTrue(SIMPLE_POJO_SCHEMA.equivalent(schema));
}

@Test
public void testRegisterJavaBean() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
registry.registerJavaBean(SimpleBean.class);
Schema schema = registry.getSchema(SimpleBean.class);
assertTrue(SIMPLE_BEAN_SCHEMA.equivalent(schema));
}
}