From c659b51803545f067192de2760e0d7df958ef236 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 14 Jul 2016 14:42:55 -0400 Subject: [PATCH] [FLINK-3630] [docs] Little mistake in documentation --- docs/apis/batch/dataset_transformations.md | 112 ++++++++++++++++----- 1 file changed, 88 insertions(+), 24 deletions(-) diff --git a/docs/apis/batch/dataset_transformations.md b/docs/apis/batch/dataset_transformations.md index 8e65389f81753..9be9bc0ac5411 100644 --- a/docs/apis/batch/dataset_transformations.md +++ b/docs/apis/batch/dataset_transformations.md @@ -246,6 +246,13 @@ This problem can be overcome by hinting the return type of `project` operator li DataSet> ds2 = ds.>project(0).distinct(0); ~~~ + +
+ +~~~scala +Not supported. +~~~ +
@@ -777,11 +784,15 @@ DataSet> combinedWords = input .combineGroup(new GroupCombineFunction() { public void combine(Iterable words, Collector>) { // combine + String key = null; int count = 0; + for (String word : words) { + key = word; count++; } - out.collect(new Tuple2(word, count)); + // emit tuple with word and count + out.collect(new Tuple2(key, count)); } }); @@ -790,11 +801,15 @@ DataSet> output = combinedWords .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange public void reduce(Iterable>, Collector>) { + String key = null; int count = 0; + for (Tuple2 word : words) { + key = word; count++; } - out.collect(new Tuple2(word, count)); + // emit tuple with word and count + out.collect(new Tuple2(key, count)); } }); ~~~ @@ -809,26 +824,39 @@ val combinedWords: DataSet[(String, Int)] = input .groupBy(0) .combineGroup { (words, out: Collector[(String, Int)]) => + var key: String = null var count = 0 + for (word <- words) { - count++ + key = word + count += 1 } - out.collect(word, count) + out.collect((key, count)) } val output: DataSet[(String, Int)] = combinedWords .groupBy(0) .reduceGroup { (words, out: Collector[(String, Int)]) => - var count = 0 - for ((word, Int) <- words) { - count++ + var key: String = null + var sum = 0 + + for ((word, sum) <- words) { + key = word + sum += count } - out.collect(word, count) + out.collect((key, sum)) } ~~~ +
+
+ +~~~python +Not supported. +~~~ +
@@ -1418,10 +1446,35 @@ DataSet> ratings.join(weights) // [...] ~~~ + +
+ +~~~scala +case class Rating(name: String, category: String, points: Int) + +val ratings: DataSet[Ratings] = // [...] +val weights: DataSet[(String, Double)] = // [...] + +val weightedRatings = ratings.join(weights).where("category").equalTo(0) { + (rating, weight, out: Collector[(String, Double)]) => + if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2) +} + +~~~ + +
+
+Not supported. +
+ + #### Join with Projection (Java/Python Only) A Join transformation can construct result tuples using a projection as shown here: +
+
+ ~~~java DataSet> input1 = // [...] DataSet> input2 = // [...] @@ -1443,25 +1496,12 @@ The join projection works also for non-Tuple DataSets. In this case, `projectFir
~~~scala -case class Rating(name: String, category: String, points: Int) - -val ratings: DataSet[Ratings] = // [...] -val weights: DataSet[(String, Double)] = // [...] - -val weightedRatings = ratings.join(weights).where("category").equalTo(0) { - (rating, weight, out: Collector[(String, Double)]) => - if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2) -} - +Not supported. ~~~
-#### Join with Projection (Java/Python Only) - -A Join transformation can construct result tuples using a projection as shown here: - ~~~python result = input1.join(input2).where(0).equal_to(0) \ .project_first(0,2).project_second(1).project_first(1); @@ -1709,6 +1749,23 @@ DataSet> movies.leftOuterJoin(ratings) // [...] ~~~ +
+
+ +~~~scala +Not supported. +~~~ + +
+
+ +~~~python +Not supported. +~~~ + +
+
+ #### Join Algorithm Hints The Flink runtime can execute outer joins in various ways. Each possible way outperforms the others under @@ -2200,7 +2257,7 @@ DataSet> in = // [...] // in descending order on the first String field. // Apply a MapPartition transformation on the sorted partitions. DataSet> out = in.sortPartition(1, Order.ASCENDING) - .sortPartition(0, Order.DESCENDING) + .sortPartition(0, Order.DESCENDING) .mapPartition(new PartitionMapper()); ~~~ @@ -2213,10 +2270,17 @@ val in: DataSet[(String, Int)] = // [...] // in descending order on the first String field. // Apply a MapPartition transformation on the sorted partitions. val out = in.sortPartition(1, Order.ASCENDING) - .sortPartition(0, Order.DESCENDING) + .sortPartition(0, Order.DESCENDING) .mapPartition { ... } ~~~ +
+
+ +~~~python +Not supported. +~~~ +