Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types #10974

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -59,7 +60,7 @@
@Experimental(Kind.SCHEMAS)
public class SchemaCoder<T> extends CustomCoder<T> {
// This contains a map of primitive types to their coders.
public static final ImmutableMap<TypeName, Coder> CODER_MAP =
public static final Map<TypeName, Coder> CODER_MAP =
Copy link
Member Author

@iemejia iemejia Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required because we should not be exposing Guava (even our own vendored version) in the public API and it was surfaced by a GcpCoreApiSurfaceTest break

ImmutableMap.<TypeName, Coder>builder()
.put(TypeName.BYTE, ByteCoder.of())
.put(TypeName.BYTES, ByteArrayCoder.of())
Expand Down
Expand Up @@ -226,27 +226,16 @@ public <T> void registerJavaBean(TypeDescriptor<T> typeDescriptor) {
}

/**
* Get a schema for a given {@link Class} type. If no schema exists, throws {@link
* Retrieve a {@link Schema} for a given {@link Class} type. If no schema exists, throws {@link
* NoSuchSchemaException}.
*/
public <T> Schema getSchema(Class<T> clazz) throws NoSuchSchemaException {
return getSchema(TypeDescriptor.of(clazz));
}

private <ReturnT> ReturnT getProviderResult(Function<SchemaProvider, ReturnT> f)
throws NoSuchSchemaException {
for (SchemaProvider provider : providers) {
ReturnT result = f.apply(provider);
if (result != null) {
return result;
}
}
throw new NoSuchSchemaException();
}

/**
* Retrieve a schema for a given {@link TypeDescriptor} type. If no schema exists, throws {@link
* NoSuchSchemaException}.
* Retrieve a {@link Schema} for a given {@link TypeDescriptor} type. If no schema exists, throws
* {@link NoSuchSchemaException}.
*/
public <T> Schema getSchema(TypeDescriptor<T> typeDescriptor) throws NoSuchSchemaException {
SchemaEntry entry = entries.get(typeDescriptor);
Expand All @@ -256,13 +245,17 @@ public <T> Schema getSchema(TypeDescriptor<T> typeDescriptor) throws NoSuchSchem
return getProviderResult((SchemaProvider p) -> p.schemaFor(typeDescriptor));
}

/** Rerieve the function that converts an object of the specified type to a {@link Row} object. */
/**
* Retrieve the function that converts an object of the specified type to a {@link Row} object.
*/
public <T> SerializableFunction<T, Row> getToRowFunction(Class<T> clazz)
throws NoSuchSchemaException {
return getToRowFunction(TypeDescriptor.of(clazz));
}

/** Rerieve the function that converts an object of the specified type to a {@link Row} object. */
/**
* Retrieve the function that converts an object of the specified type to a {@link Row} object.
*/
public <T> SerializableFunction<T, Row> getToRowFunction(TypeDescriptor<T> typeDescriptor)
throws NoSuchSchemaException {
SchemaEntry entry = entries.get(typeDescriptor);
Expand All @@ -288,6 +281,38 @@ public <T> SerializableFunction<Row, T> getFromRowFunction(TypeDescriptor<T> typ
return getProviderResult((SchemaProvider p) -> p.fromRowFunction(typeDescriptor));
}

/**
* Retrieve a {@link SchemaCoder} for a given {@link Class} type. If no schema exists, throws
* {@link * NoSuchSchemaException}.
*/
public <T> SchemaCoder<T> getSchemaCoder(Class<T> clazz) throws NoSuchSchemaException {
return getSchemaCoder(TypeDescriptor.of(clazz));
}

/**
* Retrieve a {@link SchemaCoder} for a given {@link TypeDescriptor} type. If no schema exists,
* throws {@link * NoSuchSchemaException}.
*/
public <T> SchemaCoder<T> getSchemaCoder(TypeDescriptor<T> typeDescriptor)
throws NoSuchSchemaException {
return SchemaCoder.of(
getSchema(typeDescriptor),
typeDescriptor,
getToRowFunction(typeDescriptor),
getFromRowFunction(typeDescriptor));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine, but can you explain the use case? The goal was for users to never have to deal with SchemaCoder (the fact that schemas are implemented via a special coder should be a Beam implementation details), but I'd understand if we have cases where the coder is still needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #10978 for a concrete case. PTransform authors may benefit of being able to infer a SchemaCoder for a given type from the SchemaRegistry.
We can even add that too to PubsubIO to read JavaBeans and be able to query them with Beam's SQL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you in the fact that SchemaCoder is an internal detail that regular users (authors of Pipelines) should not care about. I felt tempted to mark this method as @Internal however PTransform authors (e.g. IO authors) will find this useful (as I did for the PR I mention above for KafkaIO schema support), so probably worth to let it available, also I cannot think of a better place to put this method than here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's useful when integrating schema code with code that does not yet understand schemas.

In the KafkaIO example I think that the ideal solution would be to allow a Schema on KafkaRecord (this probably requires us to add Java generic type awareness to schema inference though), in which case they keyCoder and valueCoder isn't needed. I agree that allowing easy inference of SchemaCoder allows for lower-effort integration of schemas in code like this, though hopefully this is just a short-term solution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I have not thought about making KafkaRecord 'schema' like good point. There are some consequences on that that are still not clear to me (like how will we deal with the runtime resolution part of Schemas for KV that we do now with the Confluent Schema Registry support. I am going to give it a try and ping you once I have something in the other PR #10978. Let's continue that discussion there.

}

private <ReturnT> ReturnT getProviderResult(Function<SchemaProvider, ReturnT> f)
throws NoSuchSchemaException {
for (SchemaProvider provider : providers) {
ReturnT result = f.apply(provider);
if (result != null) {
return result;
}
}
throw new NoSuchSchemaException();
}

static {
// find all statically-registered SchemaProviders.
List<SchemaProvider> providersToRegister = Lists.newArrayList();
Expand Down
Expand Up @@ -225,4 +225,24 @@ public void testRegisterJavaBean() throws NoSuchSchemaException {
Schema schema = registry.getSchema(SimpleBean.class);
assertTrue(SIMPLE_BEAN_SCHEMA.equivalent(schema));
}

@Test
public void testGetSchemaCoder() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
registry.registerJavaBean(SimpleBean.class);

Schema schema = registry.getSchema(SimpleBean.class);
SerializableFunction<SimpleBean, Row> toRowFunction =
registry.getToRowFunction(SimpleBean.class);
SerializableFunction<Row, SimpleBean> fromRowFunction =
registry.getFromRowFunction(SimpleBean.class);
SchemaCoder schemaCoder = registry.getSchemaCoder(SimpleBean.class);

assertTrue(schema.equivalent(schemaCoder.getSchema()));
assertTrue(toRowFunction.equals(schemaCoder.getToRowFunction()));
assertTrue(fromRowFunction.equals(schemaCoder.getFromRowFunction()));

thrown.expect(NoSuchSchemaException.class);
registry.getSchemaCoder(Double.class);
}
}