Skip to content

Commit

Permalink
Merge pull request #31711: Set SchemaCoder for key in WithKeys transform
Browse files Browse the repository at this point in the history
  • Loading branch information
aromanenko-dev committed Jul 2, 2024
2 parents 3b8ddda + bacb9ec commit bf8afb8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -129,6 +132,20 @@ public KV<K, V> apply(V element) {
// TODO: Remove when we can set the coder inference context.
result.setCoder(KvCoder.of(keyCoder, in.getCoder()));
} catch (CannotProvideCoderException exc) {
if (keyType != null) {
try {
SchemaRegistry schemaRegistry = SchemaRegistry.createDefault();
SchemaCoder<K> schemaCoder =
SchemaCoder.of(
schemaRegistry.getSchema(keyType),
keyType,
schemaRegistry.getToRowFunction(keyType),
schemaRegistry.getFromRowFunction(keyType));
result.setCoder(KvCoder.of(schemaCoder, in.getCoder()));
} catch (NoSuchSchemaException exception) {
// No Schema.
}
}
// let lazy coder inference have a try
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -175,4 +184,47 @@ public void withLambdaAndNoTypeDescriptorShouldThrow() {

p.run();
}

@Test
@Category(NeedsRunner.class)
public void testKeySchemaCoderSet() throws NoSuchSchemaException {
PCollection<KV<Pojo, String>> pCollection =
p.apply(Create.of(Lists.newArrayList("1", "2", "3")).withType(TypeDescriptors.strings()))
.apply(
WithKeys.<Pojo, String>of(v -> new Pojo(1, v))
.withKeyType(TypeDescriptor.of(Pojo.class)));

TypeDescriptor<Pojo> keyType = TypeDescriptor.of(Pojo.class);
SchemaRegistry schemaRegistry = SchemaRegistry.createDefault();
SchemaCoder<Pojo> schemaCoder =
SchemaCoder.of(
schemaRegistry.getSchema(keyType),
keyType,
schemaRegistry.getToRowFunction(keyType),
schemaRegistry.getFromRowFunction(keyType));
Coder<KV<Pojo, String>> expectedCoder = KvCoder.of(schemaCoder, StringUtf8Coder.of());
assertEquals(expectedCoder, pCollection.getCoder());

p.run();
}

@DefaultSchema(JavaBeanSchema.class)
private static class Pojo {
private final long num;
private final String str;

@SchemaCreate
public Pojo(long num, String str) {
this.num = num;
this.str = str;
}

public long getNum() {
return this.num;
}

public String getStr() {
return this.str;
}
}
}

0 comments on commit bf8afb8

Please sign in to comment.