Skip to content

Commit

Permalink
Implement GetDefaultOutputCoder in DirectGroupByKey
Browse files Browse the repository at this point in the history
This uses the standard Coder Inference path to set coders, rather than
explicitly setting the output coders for intermediate PCollections.
  • Loading branch information
tgroh committed Dec 12, 2016
1 parent bfd21d7 commit 52d29c5
Showing 1 changed file with 20 additions and 16 deletions.
Expand Up @@ -22,6 +22,7 @@

import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
Expand All @@ -46,9 +47,6 @@ public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delega

@Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();

// This operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
Expand All @@ -61,19 +59,11 @@ public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
// By default, implement GroupByKey via a series of lower-level operations.
return input
.apply(new DirectGroupByKeyOnly<K, V>())
.setCoder(
KeyedWorkItemCoder.of(
inputCoder.getKeyCoder(),
inputCoder.getValueCoder(),
inputWindowingStrategy.getWindowFn().windowCoder()))

// Group each key's values by window, merging windows as needed.
.apply(
"GroupAlsoByWindow",
new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy))

.setCoder(
KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy));
}

static final class DirectGroupByKeyOnly<K, V>
Expand All @@ -85,6 +75,16 @@ public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
}

DirectGroupByKeyOnly() {}

@Override
protected Coder<?> getDefaultOutputCoder(
@SuppressWarnings("unused") PCollection<KV<K, V>> input)
throws CannotProvideCoderException {
return KeyedWorkItemCoder.of(
GroupByKey.getKeyCoder(input.getCoder()),
GroupByKey.getInputValueCoder(input.getCoder()),
input.getWindowingStrategy().getWindowFn().windowCoder());
}
}

static final class DirectGroupAlsoByWindow<K, V>
Expand Down Expand Up @@ -117,14 +117,18 @@ private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>
return kvCoder;
}

public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
}

public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
return getKeyedWorkItemCoder(inputCoder).getElementCoder();
}

@Override
protected Coder<?> getDefaultOutputCoder(
@SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
throws CannotProvideCoderException {
KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
}

@Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
Expand Down

0 comments on commit 52d29c5

Please sign in to comment.