From 7bca344adca9eb3386c1754cf99868f12761ef91 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Wed, 6 Apr 2016 15:44:47 -0400 Subject: [PATCH] [FLINK-3469] Improve documentation for grouping keys --- docs/apis/batch/dataset_transformations.md | 120 ++++++++++++++++++--- 1 file changed, 105 insertions(+), 15 deletions(-) diff --git a/docs/apis/batch/dataset_transformations.md b/docs/apis/batch/dataset_transformations.md index 31a1dfa1c73f0..be9691c114f0f 100644 --- a/docs/apis/batch/dataset_transformations.md +++ b/docs/apis/batch/dataset_transformations.md @@ -275,6 +275,69 @@ element using a user-defined reduce function. For each group of input elements, a reduce function successively combines pairs of elements into one element until only a single element for each group remains. +#### Reduce on DataSet Grouped by Key Expression + +Key expressions specify one or more fields of each element of a DataSet. Each key expression is +either the name of a public field or a getter method. A dot can be used to drill down into objects. +The key expression "*" selects all fields. +The following code shows how to group a POJO DataSet using key expressions and to reduce it +with a reduce function. + +
+
+ +~~~java +// some ordinary POJO +public class WC { + public String word; + public int count; + // [...] +} + +// ReduceFunction that sums Integer attributes of a POJO +public class WordCounter implements ReduceFunction { + @Override + public WC reduce(WC in1, WC in2) { + return new WC(in1.word, in1.count + in2.count); + } +} + +// [...] +DataSet words = // [...] +DataSet wordCounts = words + // DataSet grouping on field "word" + .groupBy("word") + // apply ReduceFunction on grouped DataSet + .reduce(new WordCounter()); +~~~ + +
+
+ +~~~scala +// some ordinary POJO +class WC(val word: String, val count: Int) { + def this() { + this(null, -1) + } + // [...] +} + +val words: DataSet[WC] = // [...] +val wordCounts = words.groupBy("word").reduce { + (w1, w2) => new WC(w1.word, w1.count + w2.count) +} +~~~ + +
+
+ +~~~python +Not supported. +~~~ +
+
+ #### Reduce on DataSet Grouped by KeySelector Function A key-selector function extracts a key value from each element of a DataSet. The extracted key @@ -305,9 +368,16 @@ public class WordCounter implements ReduceFunction { DataSet words = // [...] DataSet wordCounts = words // DataSet grouping on field "word" - .groupBy("word") + .groupBy(new SelectWord()) // apply ReduceFunction on grouped DataSet .reduce(new WordCounter()); + +public class SelectWord implements KeySelector { + @Override + public String getKey(Word w) { + return w.word; + } +} ~~~ @@ -332,7 +402,14 @@ val wordCounts = words.groupBy { _.word } reduce {
~~~python -Not supported. +class WordCounter(ReduceFunction): + def reduce(self, in1, in2): + return (in1[0], in1[1] + in2[1]) + +words = // [...] +wordCounts = words \ + .group_by(lambda x: x[0]) \ + .reduce(WordCounter()) ~~~
@@ -347,10 +424,9 @@ The following code shows how to use field position keys and apply a reduce funct ~~~java DataSet> tuples = // [...] -DataSet> reducedTuples = - tuples +DataSet> reducedTuples = tuples // group DataSet on first and second field of Tuple - .groupBy(0,1) + .groupBy(0, 1) // apply ReduceFunction on grouped DataSet .reduce(new MyTupleReducer()); ~~~ @@ -364,11 +440,29 @@ val tuples = DataSet[(String, Int, Double)] = // [...] val reducedTuples = tuples.groupBy(0, 1).reduce { ... } ~~~ + +
+ +~~~python + reducedTuples = tuples.group_by(0, 1).reduce( ... ) +~~~ + +
+ #### Reduce on DataSet grouped by Case Class Fields When using Case Classes you can also specify the grouping key using the names of the fields: +
+
+ +~~~java +Not supported. +~~~ +
+
+ ~~~scala case class MyClass(val a: String, b: Int, c: Double) val tuples = DataSet[MyClass] = // [...] @@ -380,9 +474,8 @@ val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
~~~python - reducedTuples = tuples.group_by(0, 1).reduce( ... ) +Not supported. ~~~ -
@@ -442,11 +535,6 @@ val output = input.groupBy(0).reduceGroup { } ~~~ -#### GroupReduce on DataSet Grouped by Case Class Fields - -Works analogous to grouping by Case Class fields in *Reduce* transformations. - -
@@ -462,13 +550,15 @@ Works analogous to grouping by Case Class fields in *Reduce* transformations. output = data.group_by(0).reduce_group(DistinctReduce()) ~~~ -
-#### GroupReduce on DataSet Grouped by KeySelector Function +#### GroupReduce on DataSet Grouped by Key Expression, KeySelector Function, or Case Class Fields + +Work analogous to [key expressions](#reduce-on-dataset-grouped-by-key-expression), +[key-selector functions](#reduce-on-dataset-grouped-by-keyselector-function), +and [case class fields](#reduce-on-dataset-grouped-by-case-class-fields) in *Reduce* transformations. -Works analogous to key-selector functions in *Reduce* transformations. #### GroupReduce on sorted groups