Skip to content

Commit

Permalink
Merge pull request apache#6 from seznam/vasek/windowing-builders
Browse files Browse the repository at this point in the history
[BEAM-4443]  Improve windowing API
  • Loading branch information
VaclavPlajt committed Jun 12, 2018
2 parents e9ea8d5 + 1192ff6 commit 75edc06
Show file tree
Hide file tree
Showing 112 changed files with 2,857 additions and 6,762 deletions.
Expand Up @@ -36,10 +36,10 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareReduceFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAwareUnaryFunctor;
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSource;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.extensions.euphoria.core.util.Settings;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down
Expand Up @@ -24,18 +24,17 @@
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.SizeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -60,7 +59,7 @@ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
return doTranslate(operator, context);
}

<K, LeftT, RightT, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>> doTranslate(
<K, LeftT, RightT, OutputT, W extends BoundedWindow> PCollection<Pair<K, OutputT>> doTranslate(
Join<LeftT, RightT, K, OutputT, W> operator, BeamExecutorContext context) {
Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());

Expand Down Expand Up @@ -131,10 +130,9 @@ && hasFitsInMemoryHint(leftDataset.getProducer()))
/**
* BroadcastHashJoin supports only GlobalWindow or none.
*/
private boolean isAllowedWindowing(Windowing windowing) {
private boolean isAllowedWindowing(WindowingDesc<?, ?> windowing) {
return windowing == null
|| (windowing instanceof BeamWindowing
&& ((BeamWindowing) windowing).getWindowFn() instanceof GlobalWindows);
|| (windowing.getWindowFn() instanceof GlobalWindows);
}

static class BroadcastHashRightJoinFn<K, LeftT, RightT, OutputT>
Expand Down
Expand Up @@ -29,10 +29,10 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSink;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.executor.FlowUnfolder;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.DAG;
import org.apache.beam.sdk.extensions.euphoria.core.executor.graph.Node;
Expand Down
Expand Up @@ -17,24 +17,23 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import static org.apache.beam.sdk.extensions.euphoria.beam.common.OperatorTranslatorUtil.getKVInputCollection;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.euphoria.beam.common.OperatorTranslatorUtil;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.join.FullJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.InnerJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.JoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.LeftOuterJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.RightOuterJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
Expand All @@ -52,7 +51,7 @@ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
}


public <K, LeftT, RightT, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>>
public <K, LeftT, RightT, OutputT, W extends BoundedWindow> PCollection<Pair<K, OutputT>>
doTranslate(Join<LeftT, RightT, K, OutputT, W> operator, BeamExecutorContext context) {

Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());
Expand All @@ -63,12 +62,12 @@ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
@SuppressWarnings("unchecked") final PCollection<RightT> right = (PCollection<RightT>) context
.getInputs(operator).get(1);

PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(left,
operator.getLeftKeyExtractor(),
PCollection<KV<K, LeftT>> leftKvInput = OperatorTranslatorUtil.getKVInputCollection(
left, operator.getLeftKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-left");

PCollection<KV<K, RightT>> rightKvInput = getKVInputCollection(right,
operator.getRightKeyExtractor(),
PCollection<KV<K, RightT>> rightKvInput = OperatorTranslatorUtil.getKVInputCollection(
right, operator.getRightKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-right");

// and apply the same widowing on input Pcolections since the documentation states:
Expand All @@ -94,7 +93,7 @@ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn));
}

private <K, LeftT, RightT, OutputT, W extends Window<W>> JoinFn<LeftT, RightT, K, OutputT>
private <K, LeftT, RightT, OutputT, W extends BoundedWindow> JoinFn<LeftT, RightT, K, OutputT>
chooseJoinFn(
Join<LeftT, RightT, K, OutputT, W> operator, TupleTag<LeftT> leftTag,
TupleTag<RightT> rightTag) {
Expand Down
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.values.PCollection;

/**
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
Expand All @@ -38,6 +37,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

Expand All @@ -47,7 +47,7 @@
class ReduceByKeyTranslator implements OperatorTranslator<ReduceByKey> {

@SuppressWarnings("unchecked")
private static <InputT, K, V, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>>
private static <InputT, K, V, OutputT, W extends BoundedWindow> PCollection<Pair<K, OutputT>>
doTranslate(ReduceByKey<InputT, K, V, OutputT, W> operator, BeamExecutorContext context) {

//TODO Could we even do values sorting ?
Expand Down
Expand Up @@ -22,7 +22,7 @@
import java.util.Collections;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Datasets;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.values.PCollection;

/**
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 75edc06

Please sign in to comment.