From a868da834b4b583037003e3d33c4a7dbab543e78 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Sat, 11 Oct 2014 11:05:02 +0200 Subject: [PATCH] [doc] Update programming guide for scala expression keys --- docs/programming_guide.md | 139 +++++++++++++++++++++++++++++--------- 1 file changed, 106 insertions(+), 33 deletions(-) diff --git a/docs/programming_guide.md b/docs/programming_guide.md index fdea5993a9402..85d518a99b5fc 100644 --- a/docs/programming_guide.md +++ b/docs/programming_guide.md @@ -872,6 +872,7 @@ values. Keys are "virtual": they are defined as functions over the actual data to guide the grouping operator. ### Define keys for Tuples +{:.no_toc} The simplest case is grouping a data set of Tuples on one or more fields of the Tuple: @@ -904,6 +905,8 @@ DataSet,String,Long>> ds; Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use a string-based expression, as explained below. For this particular example, you would have to specfiy `f0.f0`. ### Define key using a String Expression +{:.no_toc} + Starting from release 0.7-incubating, you can use String-based key expressions to select keys. The String expressions allow to specify the name of the field in a class you want to group by. @@ -964,6 +967,7 @@ These are valid expressions for the example POJO above: Please note that you can only use types inside POJOs that Flink is able to serialize. Currently, we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`). ### Define key using a Key Selector Function +{:.no_toc} An additional way to define keys are "key selector" functions, which takes as argument one dataset element and returns a key of an @@ -1027,64 +1031,133 @@ you do not need to physically pack the data set types into keys and values. Keys are "virtual": they are defined as functions over the actual data to guide the grouping operator. -The simplest case is grouping a data set of Case Classes on one or more -of it's fields: +### Define keys for Tuples +{:.no_toc} + +The simplest case is grouping a data set of Tuples on one or more +fields of the Tuple: {% highlight scala %} -case class WordCount(docId: Int, word: String, count: Int) -val input: DataSet[WordCount] = // [...] +val input: DataSet[(Int, String, Long)] = // [...] val grouped = input - .groupBy("word") + .groupBy(0) .reduceGroup(/*do something*/) {% endhighlight %} -The data set is grouped on the second field of the Case Class (the one of -String type). The group reduce function will thus receive groups of elements with -the same value in the second field. +The data set is grouped on the first field of the tuples (the one of +Integer type). The group-reduce function will thus receive groups of tuples with +the same value in the first field. {% highlight scala %} -val input: DataSet[WordCount] = // [...] +val input: DataSet[(Int, String, Long)] = // [...] val grouped = input - .groupBy("docId", "word") - .reduceGroup(/*do something*/); + .groupBy(0,1) + .reduce(/*do something*/) {% endhighlight %} -Here the DataSet is grouped on the composite key consisting of the first and the -second fields, therefore the group reduce function will receive groups -with the same value in both fields. +The data set is grouped on the composite key consisting of the first and the +second fields, therefore the group-reduce function will receive groups +with the same value for both fields. -As a special case, fields of Tuple DataSets can also be specified by (zero-based) index: +A note on nested Tuples: If you have a DataSet with a nested tuple, such as: {% highlight scala %} -val input: DataSet[(Int, String, Int)] = // [...] -val grouped = input - .groupBy(0, 1) - .reduceGroup(/*do something*/); +val ds: DataSet[((Int, Float), String, Long)] +{% endhighlight %} + +Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Int and +Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use a +string-based expression, as explained below. For this particular example, you would have to specfiy +`"_1._1"`. + +### Define key using a String Expression +{:.no_toc} + +Starting from release 0.7-incubating, you can use String-based key expressions to select keys. + +The String expressions allow to specify the name of the field in a class you want to group by. + +In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field +"word", we just pass this name to the `groupBy()` function. + +{% highlight java %} +// some ordinary POJO (Plain old Java Object) +class WC(var word: String, var count: Int) { + def this() { this("", 0L) } +} +val words: DataSet[WC] = // [...] +val wordCounts = words.groupBy("word").reduce(/*do something*/) + +// or, as a case class, which is less typing +case class WC(word: String, count: Int) +val words: DataSet[WC] = // [...] +val wordCounts = words.groupBy("word").reduce(/*do something*/) {% endhighlight %} -For DataSets that don't contain Case Classes or Tuples, key definition is done via a "key selector" -function, which takes as argument one dataset element and must return a key of an -arbitrary data type. For example: +**Conditions** for a class to enable using field selection expressions: + +- The class must be public +- It must have a public constructor without arguments or be a case class. +- All fields either have to be public or there must be getters and setters for all non-public + fields. If the field name is `foo` the getter and setters must be called `foo` and `foo_=`. This + is what normally gets generated when you hava a `var foo` in your class. This also automatically + applies to case classes since the getters and setters are automatically generated. + +**Valid Expressions**: + +- You can select POJO fields by their field name +- You can select Tuple fields by their field name as well. For example `_1` or `_6`. +- You can select nested fields in POJOs and Tuples. Expressions like `user.zip` or `user.groupId` + are valid. Flink also supports POJOs inside Tuples: `_2.user.zip`. +- You can select all fields at each level. To select all fields, specify `*`. This also works for + the nested case: `user.*`. + +**Example for nested POJO** + {% highlight scala %} -// some ordinary object -class WC { - val word: String - val count: Int +class WC(var complex: ComplexNestedClass, var count: Int) { + def this() { this(null, 0) } +} +class ComplexNestedClass( + var someNumber: Int, + someFloat: Float, + word: (Long, Long, String), + hadoopCitizen: IntWritable) { + def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) } } +{% endhighlight %} + +These are valid expressions for the example POJO above: + +- `count`: The count field in the `WC` class. +- `complex.*`: Selects all fields in the `ComplexNestedClass`. +- `complex.word._3`: Selects the last field in the Tuple3. +- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key. + +Please note that you can only use types inside POJOs that Flink is able to serialize. Currently, +we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`). + +### Define key using a Key Selector Function +{:.no_toc} + +An additional way to define keys are "key selector" functions, which +takes as argument one dataset element and returns a key of an +arbitrary data type by performing an arbitrary computation on this +element. For example: +{% highlight scala %} +// some ordinary case class +case class WC(word: String, count: Int) val words: DataSet[WC] = // [...] -val counts: DataSet[WC] = words groupBy { _.word } reduce { /*do something*/} +val wordCounts = words + .groupBy( _.word ).reduce(/*do something*/) {% endhighlight %} Remember that keys are not only used for grouping, but also joining and matching data sets: {% highlight scala %} -// some object +// some case class case class Rating(name: String, category: String, points: Int) + val ratings: DataSet[Rating] = // [...] val weights: DataSet[(String, Double)] = // [...] - -// Tuples are also Case Classes in Scala, so we could use this: val weightedRatings = ratings.join(weights).where("category").equalTo("_1") - -// Or This: -val weightedRatings2 = ratings.join(weights).where("category").equalTo(0) {% endhighlight %}