From f68cb95af1c27dd8bf99e1a2374969915dd567f9 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 12:20:49 +0100 Subject: [PATCH 01/17] [FLINK-2021] Rework the K-Means examples to use built-in argument parsing facilities instead of ad-hoc functions --- .../examples/java/clustering/KMeans.java | 71 +++++++++++++------ .../examples/scala/clustering/KMeans.scala | 71 ++++++++++++------- 2 files changed, 94 insertions(+), 48 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index 1730e2a5ec42b..28517829708a8 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -27,6 +29,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.Option; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.api.java.utils.RequiredParameters; +import org.apache.flink.api.java.utils.RequiredParametersException; import org.apache.flink.configuration.Configuration; import org.apache.flink.examples.java.clustering.util.KMeansData; import org.apache.flink.api.java.DataSet; @@ -83,7 +89,7 @@ public class KMeans { public static void main(String[] args) throws Exception { - if(!parseParameters(args)) { + if(!parseParameters(ParameterTool.fromArgs(args))) { return; } @@ -291,31 +297,54 @@ public Centroid map(Tuple3 value) { private static String centersPath = null; private static String outputPath = null; private static int numIterations = 10; - - private static boolean parseParameters(String[] programArguments) { - - if(programArguments.length > 0) { - // parse input arguments + + private static final Option POINTS_PATH_OPTION = + new Option("pointsPath").alt("P").help("The path to the input points"); + private static final Option CENTERS_PATH_OPTION = + new Option("centersPath").alt("C").help("The path to the input centroids"); + private static final Option OUTPUT_PATH_OPTION = + new Option("outputPath").alt("O").help("The path where the output will be written"); + private static final Option NUM_ITERATIONS_OPTION = + new Option("numIterations").alt("N").help("The number of iteration performed by the K-Means algorithm"); + + private static boolean parseParameters(final ParameterTool params) throws RequiredParametersException { + + final RequiredParameters requiredParameters = new RequiredParameters(); + boolean parseStatus = false; + + requiredParameters.add(POINTS_PATH_OPTION); + requiredParameters.add(CENTERS_PATH_OPTION); + requiredParameters.add(OUTPUT_PATH_OPTION); + requiredParameters.add(NUM_ITERATIONS_OPTION); + + try { + requiredParameters.applyTo(params); + pointsPath = params.get(POINTS_PATH_OPTION.getName()); + centersPath = params.get(CENTERS_PATH_OPTION.getName()); + outputPath = params.get(OUTPUT_PATH_OPTION.getName()); + numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName()); fileOutput = true; - if(programArguments.length == 4) { - pointsPath = programArguments[0]; - centersPath = programArguments[1]; - outputPath = programArguments[2]; - numIterations = Integer.parseInt(programArguments[3]); + parseStatus = true; + } catch (RequiredParametersException e) { + if (params.getNumberOfParameters() == 0) { + printRunWithDefaultParams(); + parseStatus = true; } else { - System.err.println("Usage: KMeans "); - return false; + System.out.println(requiredParameters.getHelp(e.getMissingArguments())); } - } else { - System.out.println("Executing K-Means example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" We provide a data generator to create synthetic input files for this program."); - System.out.println(" Usage: KMeans "); } - return true; + + return parseStatus; } - + + private static void printRunWithDefaultParams() { + System.out.println("Executing K-Means example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" We provide a data generator to create synthetic input files for this program."); + System.out.println(" Usage: KMeans "); + } + private static DataSet getPointDataSet(ExecutionEnvironment env) { if(fileOutput) { // read points from CSV file diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index 08a3e62517421..42003df1864bf 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -19,6 +19,7 @@ package org.apache.flink.examples.scala.clustering import org.apache.flink.api.common.functions._ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.java.utils.{Option, ParameterTool, RequiredParameters, RequiredParametersException} import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.examples.java.clustering.util.KMeansData @@ -72,7 +73,7 @@ import scala.collection.JavaConverters._ object KMeans { def main(args: Array[String]) { - if (!parseParameters(args)) { + if (!parseParameters(ParameterTool.fromArgs(args))) { return } @@ -104,36 +105,52 @@ object KMeans { } - private def parseParameters(programArguments: Array[String]): Boolean = { - if (programArguments.length > 0) { + private val POINTS_PATH_OPTION: Option = + new Option("pointsPath").alt("P").help("The path to the input points") + private val CENTERS_PATH_OPTION: Option = + new Option("centersPath").alt("C").help("The path to the input centroids") + private val OUTPUT_PATH_OPTION: Option = + new Option("outputPath").alt("O").help("The path where the output will be written") + private val NUM_ITERATIONS_OPTION: Option = + new Option("numIterations").alt("N").help("The number of iteration performed by the K-Means algorithm") + + @throws(classOf[RequiredParametersException]) + private def parseParameters(params: ParameterTool): Boolean = { + val requiredParameters: RequiredParameters = new RequiredParameters + var parseStatus: Boolean = false + requiredParameters.add(POINTS_PATH_OPTION) + requiredParameters.add(CENTERS_PATH_OPTION) + requiredParameters.add(OUTPUT_PATH_OPTION) + requiredParameters.add(NUM_ITERATIONS_OPTION) + try { + requiredParameters.applyTo(params) + pointsPath = params.get(POINTS_PATH_OPTION.getName) + centersPath = params.get(CENTERS_PATH_OPTION.getName) + outputPath = params.get(OUTPUT_PATH_OPTION.getName) + numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName) fileOutput = true - if (programArguments.length == 4) { - pointsPath = programArguments(0) - centersPath = programArguments(1) - outputPath = programArguments(2) - numIterations = Integer.parseInt(programArguments(3)) - - true - } - else { - System.err.println("Usage: KMeans ") - - false + parseStatus = true + } + catch { + case e: RequiredParametersException => { + if (params.getNumberOfParameters == 0) { + printRunWithDefaultParams() + parseStatus = true + } + else { + println(requiredParameters.getHelp(e.getMissingArguments)) + } } } - else { - System.out.println("Executing K-Means example with default parameters and built-in default " + - "data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" We provide a data generator to create synthetic input files for this " + - "program.") - System.out.println(" Usage: KMeans ") + return parseStatus + } - true - } + private def printRunWithDefaultParams() { + println("Executing K-Means example with default parameters and built-in default data.") + println(" Provide parameters to read input data from files.") + println(" See the documentation for the correct format of input files.") + println(" We provide a data generator to create synthetic input files for this program.") + println(" Usage: KMeans ") } private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = { From a432cc204741dd0a2d505886b987c4113e781c84 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 12:33:15 +0100 Subject: [PATCH 02/17] [FLINK-2021] Renamed user-facing names for options --- .../org/apache/flink/examples/java/clustering/KMeans.java | 8 ++++---- .../apache/flink/examples/scala/clustering/KMeans.scala | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index 28517829708a8..c0e4adc8cbff7 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -299,13 +299,13 @@ public Centroid map(Tuple3 value) { private static int numIterations = 10; private static final Option POINTS_PATH_OPTION = - new Option("pointsPath").alt("P").help("The path to the input points"); + new Option("points").alt("P").help("The path to the input points"); private static final Option CENTERS_PATH_OPTION = - new Option("centersPath").alt("C").help("The path to the input centroids"); + new Option("centroids").alt("C").help("The path to the input centroids"); private static final Option OUTPUT_PATH_OPTION = - new Option("outputPath").alt("O").help("The path where the output will be written"); + new Option("output").alt("O").help("The path where the output will be written"); private static final Option NUM_ITERATIONS_OPTION = - new Option("numIterations").alt("N").help("The number of iteration performed by the K-Means algorithm"); + new Option("iterations").alt("I").help("The number of iteration performed by the K-Means algorithm"); private static boolean parseParameters(final ParameterTool params) throws RequiredParametersException { diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index 42003df1864bf..b457ac36049aa 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -106,13 +106,13 @@ object KMeans { } private val POINTS_PATH_OPTION: Option = - new Option("pointsPath").alt("P").help("The path to the input points") + new Option("points").alt("P").help("The path to the input points") private val CENTERS_PATH_OPTION: Option = - new Option("centersPath").alt("C").help("The path to the input centroids") + new Option("centroids").alt("C").help("The path to the input centroids") private val OUTPUT_PATH_OPTION: Option = - new Option("outputPath").alt("O").help("The path where the output will be written") + new Option("output").alt("O").help("The path where the output will be written") private val NUM_ITERATIONS_OPTION: Option = - new Option("numIterations").alt("N").help("The number of iteration performed by the K-Means algorithm") + new Option("iterations").alt("I").help("The number of iteration performed by the K-Means algorithm") @throws(classOf[RequiredParametersException]) private def parseParameters(params: ParameterTool): Boolean = { From a6d7f38a0a41f5e7e1290afd267eb6b260f3ae06 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 13:07:39 +0100 Subject: [PATCH 03/17] [FLINK-2021] Updated documentation to be in sync with reworked examples --- docs/quickstart/run_example_quickstart.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/quickstart/run_example_quickstart.md b/docs/quickstart/run_example_quickstart.md index 3fd2801908280..169f82de87433 100644 --- a/docs/quickstart/run_example_quickstart.md +++ b/docs/quickstart/run_example_quickstart.md @@ -100,9 +100,9 @@ The Flink web interface allows to submit Flink programs using a graphical user i 5. Enter the arguments and options in the lower box:
Leave the Entry Class and Parallelism form empty
Enter the following program arguments:
- (KMeans expects the following args: <points path> <centers path> <result path> <num iterations> + (KMeans expects the following args: --points <points path> --centroids <centers path> --output <result path> --iterations <num iterations> {% highlight bash %} - /tmp/kmeans/points /tmp/kmeans/centers /tmp/kmeans/result 10 + --points /tmp/kmeans/points --centroids /tmp/kmeans/centers --output /tmp/kmeans/result --iterations 10 {% endhighlight %}
6. Press Submit to start the job From 31a6a1e4d1bae900866fc74aff9402b2cd3cee94 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 14:40:39 +0100 Subject: [PATCH 04/17] [FLINK-2021] Removed return statement in Scala, updated usage message to reflect the changes --- .../org/apache/flink/examples/java/clustering/KMeans.java | 2 +- .../org/apache/flink/examples/scala/clustering/KMeans.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index c0e4adc8cbff7..f18f54e1ce01c 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -342,7 +342,7 @@ private static void printRunWithDefaultParams() { System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); System.out.println(" We provide a data generator to create synthetic input files for this program."); - System.out.println(" Usage: KMeans "); + System.out.println(" Usage: KMeans --points --centroids --output --iterations "); } private static DataSet getPointDataSet(ExecutionEnvironment env) { diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index b457ac36049aa..c194aed182e9d 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -142,7 +142,7 @@ object KMeans { } } } - return parseStatus + parseStatus } private def printRunWithDefaultParams() { @@ -150,7 +150,7 @@ object KMeans { println(" Provide parameters to read input data from files.") println(" See the documentation for the correct format of input files.") println(" We provide a data generator to create synthetic input files for this program.") - println(" Usage: KMeans ") + println(" Usage: KMeans --points --centroids --output --iterations ") } private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = { From 2c4978dce0d4a8a8cd11c2e32319a71b2b57427a Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 15:01:45 +0100 Subject: [PATCH 05/17] [FLINK-2021] Delegating the usage line to the the object --- .../org/apache/flink/examples/java/clustering/KMeans.java | 4 +--- .../apache/flink/examples/scala/clustering/KMeans.scala | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index f18f54e1ce01c..3a4bfe3eec9c9 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -329,9 +329,8 @@ private static boolean parseParameters(final ParameterTool params) throws Requir if (params.getNumberOfParameters() == 0) { printRunWithDefaultParams(); parseStatus = true; - } else { - System.out.println(requiredParameters.getHelp(e.getMissingArguments())); } + System.out.println(requiredParameters.getHelp(e.getMissingArguments())); } return parseStatus; @@ -342,7 +341,6 @@ private static void printRunWithDefaultParams() { System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); System.out.println(" We provide a data generator to create synthetic input files for this program."); - System.out.println(" Usage: KMeans --points --centroids --output --iterations "); } private static DataSet getPointDataSet(ExecutionEnvironment env) { diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index c194aed182e9d..f695f71e5aa31 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -130,16 +130,13 @@ object KMeans { numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName) fileOutput = true parseStatus = true - } - catch { + } catch { case e: RequiredParametersException => { if (params.getNumberOfParameters == 0) { printRunWithDefaultParams() parseStatus = true } - else { - println(requiredParameters.getHelp(e.getMissingArguments)) - } + println(requiredParameters.getHelp(e.getMissingArguments)) } } parseStatus @@ -150,7 +147,6 @@ object KMeans { println(" Provide parameters to read input data from files.") println(" See the documentation for the correct format of input files.") println(" We provide a data generator to create synthetic input files for this program.") - println(" Usage: KMeans --points --centroids --output --iterations ") } private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = { From 65b64746a006c01b92b02f4549d744776ed00980 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 15:05:04 +0100 Subject: [PATCH 06/17] [FLINK-2021] Removed unused imports (checkstyle compliance) --- .../java/org/apache/flink/examples/java/clustering/KMeans.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index 3a4bfe3eec9c9..e7adc44e1f3d8 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -20,8 +20,6 @@ import java.io.Serializable; import java.util.Collection; -import java.util.List; -import java.util.function.Consumer; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; From 47c0d26f3fa7b3d9913114e45e8895b70c80e538 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 15:47:00 +0100 Subject: [PATCH 07/17] [FLINK-2021] Removed the parseParameters and printRunWithDefaultParams methods --- .../examples/java/clustering/KMeans.java | 69 +++++----- .../examples/scala/clustering/KMeans.scala | 119 +++++++++++------- 2 files changed, 106 insertions(+), 82 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index e7adc44e1f3d8..f22f2b6e9675f 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -86,8 +86,37 @@ public class KMeans { // ************************************************************************* public static void main(String[] args) throws Exception { - - if(!parseParameters(ParameterTool.fromArgs(args))) { + + // Checking input parameters + final ParameterTool params = ParameterTool.fromArgs(args); + final RequiredParameters requiredParameters = new RequiredParameters(); + boolean paramsOk = false; + + requiredParameters.add(POINTS_PATH_OPTION); + requiredParameters.add(CENTERS_PATH_OPTION); + requiredParameters.add(OUTPUT_PATH_OPTION); + requiredParameters.add(NUM_ITERATIONS_OPTION); + + try { + requiredParameters.applyTo(params); + pointsPath = params.get(POINTS_PATH_OPTION.getName()); + centersPath = params.get(CENTERS_PATH_OPTION.getName()); + outputPath = params.get(OUTPUT_PATH_OPTION.getName()); + numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName()); + fileOutput = true; + paramsOk = true; + } catch (RequiredParametersException e) { + if (params.getNumberOfParameters() == 0) { + System.out.println("Executing K-Means example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" We provide a data generator to create synthetic input files for this program."); + paramsOk = true; + } + System.out.println(requiredParameters.getHelp(e.getMissingArguments())); + } + + if(!paramsOk) { return; } @@ -305,42 +334,6 @@ public Centroid map(Tuple3 value) { private static final Option NUM_ITERATIONS_OPTION = new Option("iterations").alt("I").help("The number of iteration performed by the K-Means algorithm"); - private static boolean parseParameters(final ParameterTool params) throws RequiredParametersException { - - final RequiredParameters requiredParameters = new RequiredParameters(); - boolean parseStatus = false; - - requiredParameters.add(POINTS_PATH_OPTION); - requiredParameters.add(CENTERS_PATH_OPTION); - requiredParameters.add(OUTPUT_PATH_OPTION); - requiredParameters.add(NUM_ITERATIONS_OPTION); - - try { - requiredParameters.applyTo(params); - pointsPath = params.get(POINTS_PATH_OPTION.getName()); - centersPath = params.get(CENTERS_PATH_OPTION.getName()); - outputPath = params.get(OUTPUT_PATH_OPTION.getName()); - numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName()); - fileOutput = true; - parseStatus = true; - } catch (RequiredParametersException e) { - if (params.getNumberOfParameters() == 0) { - printRunWithDefaultParams(); - parseStatus = true; - } - System.out.println(requiredParameters.getHelp(e.getMissingArguments())); - } - - return parseStatus; - } - - private static void printRunWithDefaultParams() { - System.out.println("Executing K-Means example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" We provide a data generator to create synthetic input files for this program."); - } - private static DataSet getPointDataSet(ExecutionEnvironment env) { if(fileOutput) { // read points from CSV file diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index f695f71e5aa31..f200d429d74ed 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -27,53 +27,84 @@ import org.apache.flink.examples.java.clustering.util.KMeansData import scala.collection.JavaConverters._ /** - * This example implements a basic K-Means clustering algorithm. - * - * K-Means is an iterative clustering algorithm and works as follows: - * K-Means is given a set of data points to be clustered and an initial set of ''K'' cluster - * centers. - * In each iteration, the algorithm computes the distance of each data point to each cluster center. - * Each point is assigned to the cluster center which is closest to it. - * Subsequently, each cluster center is moved to the center (''mean'') of all points that have - * been assigned to it. - * The moved cluster centers are fed into the next iteration. - * The algorithm terminates after a fixed number of iterations (as in this implementation) - * or if cluster centers do not (significantly) move in an iteration. - * This is the Wikipedia entry for the [[http://en.wikipedia - * .org/wiki/K-means_clustering K-Means Clustering algorithm]]. - * - * This implementation works on two-dimensional data points. - * It computes an assignment of data points to cluster centers, i.e., - * each data point is annotated with the id of the final cluster (center) it belongs to. - * - * Input files are plain text files and must be formatted as follows: - * - * - Data points are represented as two double values separated by a blank character. - * Data points are separated by newline characters. - * For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3, - * y=7.2). - * - Cluster centers are represented by an integer id and a point value. - * For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2, - * y=3.2) and (id=2, x=2.9, y=5.7). - * - * Usage: - * {{{ - * KMeans - * }}} - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.clustering.util.KMeansData]] - * and 10 iterations. - * - * This example shows how to use: - * - * - Bulk iterations - * - Broadcast variables in bulk iterations - * - Custom Java objects (PoJos) - */ + * This example implements a basic K-Means clustering algorithm. + * + * K-Means is an iterative clustering algorithm and works as follows: + * K-Means is given a set of data points to be clustered and an initial set of ''K'' cluster + * centers. + * In each iteration, the algorithm computes the distance of each data point to each cluster center. + * Each point is assigned to the cluster center which is closest to it. + * Subsequently, each cluster center is moved to the center (''mean'') of all points that have + * been assigned to it. + * The moved cluster centers are fed into the next iteration. + * The algorithm terminates after a fixed number of iterations (as in this implementation) + * or if cluster centers do not (significantly) move in an iteration. + * This is the Wikipedia entry for the [[http://en.wikipedia + * .org/wiki/K-means_clustering K-Means Clustering algorithm]]. + * + * This implementation works on two-dimensional data points. + * It computes an assignment of data points to cluster centers, i.e., + * each data point is annotated with the id of the final cluster (center) it belongs to. + * + * Input files are plain text files and must be formatted as follows: + * + * - Data points are represented as two double values separated by a blank character. + * Data points are separated by newline characters. + * For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3, + * y=7.2). + * - Cluster centers are represented by an integer id and a point value. + * For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2, + * y=3.2) and (id=2, x=2.9, y=5.7). + * + * Usage: + * {{{ + * KMeans + * }}} + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.clustering.util.KMeansData]] + * and 10 iterations. + * + * This example shows how to use: + * + * - Bulk iterations + * - Broadcast variables in bulk iterations + * - Custom Java objects (PoJos) + */ object KMeans { def main(args: Array[String]) { - if (!parseParameters(ParameterTool.fromArgs(args))) { + + // Checking input parameters + val params: ParameterTool = ParameterTool.fromArgs(args) + val requiredParameters: RequiredParameters = new RequiredParameters + var paramsOk: Boolean = false + + requiredParameters.add(POINTS_PATH_OPTION) + requiredParameters.add(CENTERS_PATH_OPTION) + requiredParameters.add(OUTPUT_PATH_OPTION) + requiredParameters.add(NUM_ITERATIONS_OPTION) + + try { + requiredParameters.applyTo(params) + pointsPath = params.get(POINTS_PATH_OPTION.getName) + centersPath = params.get(CENTERS_PATH_OPTION.getName) + outputPath = params.get(OUTPUT_PATH_OPTION.getName) + numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName) + fileOutput = true + paramsOk = true + } catch { + case e: RequiredParametersException => + if (params.getNumberOfParameters == 0) { + println("Executing K-Means example with default parameters and built-in default data.") + println(" Provide parameters to read input data from files.") + println(" See the documentation for the correct format of input files.") + println(" We provide a data generator to create synthetic input files for this program.") + paramsOk = true + } + println(requiredParameters.getHelp(e.getMissingArguments)) + } + + if (!paramsOk) { return } From 63bb710babcd1b0fb8d7a5253e442612ef3a86e6 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Thu, 21 Jan 2016 18:08:47 +0100 Subject: [PATCH 08/17] [FLINK-2021] Removing static members, ditching RequiredParameters to improve readability --- .../examples/java/clustering/KMeans.java | 226 +++++++----------- 1 file changed, 87 insertions(+), 139 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index f22f2b6e9675f..8cf46b43874ae 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -27,10 +27,7 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.utils.Option; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.api.java.utils.RequiredParameters; -import org.apache.flink.api.java.utils.RequiredParametersException; import org.apache.flink.configuration.Configuration; import org.apache.flink.examples.java.clustering.util.KMeansData; import org.apache.flink.api.java.DataSet; @@ -39,23 +36,23 @@ /** * This example implements a basic K-Means clustering algorithm. - * + * *

* K-Means is an iterative clustering algorithm and works as follows:
* K-Means is given a set of data points to be clustered and an initial set of K cluster centers. * In each iteration, the algorithm computes the distance of each data point to each cluster center. * Each point is assigned to the cluster center which is closest to it. * Subsequently, each cluster center is moved to the center (mean) of all points that have been assigned to it. - * The moved cluster centers are fed into the next iteration. - * The algorithm terminates after a fixed number of iterations (as in this implementation) + * The moved cluster centers are fed into the next iteration. + * The algorithm terminates after a fixed number of iterations (as in this implementation) * or if cluster centers do not (significantly) move in an iteration.
* This is the Wikipedia entry for the K-Means Clustering algorithm. - * + * *

* This implementation works on two-dimensional data points.
- * It computes an assignment of data points to cluster centers, i.e., + * It computes an assignment of data points to cluster centers, i.e., * each data point is annotated with the id of the final cluster (center) it belongs to. - * + * *

* Input files are plain text files and must be formatted as follows: *

    @@ -65,11 +62,11 @@ *
  • Cluster centers are represented by an integer id and a point value.
    * For example "1 6.2 3.2\n2 2.9 5.7\n" gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7). *
- * + * *

- * Usage: KMeans <points path> <centers path> <result path> <num iterations>
- * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations. - * + * Usage: KMeans --points --centroids --output --iterations
+ * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations. + * *

* This example shows how to use: *

    @@ -80,56 +77,52 @@ */ @SuppressWarnings("serial") public class KMeans { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - + public static void main(String[] args) throws Exception { // Checking input parameters final ParameterTool params = ParameterTool.fromArgs(args); - final RequiredParameters requiredParameters = new RequiredParameters(); - boolean paramsOk = false; - - requiredParameters.add(POINTS_PATH_OPTION); - requiredParameters.add(CENTERS_PATH_OPTION); - requiredParameters.add(OUTPUT_PATH_OPTION); - requiredParameters.add(NUM_ITERATIONS_OPTION); - - try { - requiredParameters.applyTo(params); - pointsPath = params.get(POINTS_PATH_OPTION.getName()); - centersPath = params.get(CENTERS_PATH_OPTION.getName()); - outputPath = params.get(OUTPUT_PATH_OPTION.getName()); - numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName()); - fileOutput = true; - paramsOk = true; - } catch (RequiredParametersException e) { - if (params.getNumberOfParameters() == 0) { - System.out.println("Executing K-Means example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" We provide a data generator to create synthetic input files for this program."); - paramsOk = true; - } - System.out.println(requiredParameters.getHelp(e.getMissingArguments())); + if (params.getNumberOfParameters() < 4) { + System.out.println("Executing K-Means example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" We provide a data generator to create synthetic input files for this program."); + System.out.println(" Usage: KMeans --points --centroids --output --iterations"); } - if(!paramsOk) { - return; - } - // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataSet points = getPointDataSet(env); - DataSet centroids = getCentroidDataSet(env); - + env.getConfig().setGlobalJobParameters(params); // make parameters available in the web interface + + // get input data: + // points + DataSet points; + if(params.has("points")) { + // read points from CSV file + points = env.readCsvFile(params.get("points")) + .fieldDelimiter(" ") + .includeFields(true, true) + .types(Double.class, Double.class) + .map(new TuplePointConverter()); + } else { + points = KMeansData.getDefaultPointDataSet(env); + } + + // centroids + DataSet centroids; + if(params.has("centroids")) { + centroids = env.readCsvFile(params.get("centroids")) + .fieldDelimiter(" ") + .includeFields(true, true, true) + .types(Integer.class, Double.class, Double.class) + .map(new TupleCentroidConverter()); + } else { + centroids = KMeansData.getDefaultCentroidDataSet(env); + } + // set number of bulk iterations for KMeans algorithm - IterativeDataSet loop = centroids.iterate(numIterations); - + IterativeDataSet loop = centroids.iterate(params.getInt("iterations", 10)); + DataSet newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") @@ -138,99 +131,98 @@ public static void main(String[] args) throws Exception { .groupBy(0).reduce(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager()); - + // feed new centroids back into next iteration DataSet finalCentroids = loop.closeWith(newCentroids); - + DataSet> clusteredPoints = points - // assign points to final clusters - .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); - + // assign points to final clusters + .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); + // emit result - if (fileOutput) { - clusteredPoints.writeAsCsv(outputPath, "\n", " "); + if (params.has("output")) { + clusteredPoints.writeAsCsv(params.get("output"), "\n", " "); // since file sinks are lazy, we trigger the execution explicitly env.execute("KMeans Example"); - } - else { + } else { clusteredPoints.print(); } } - + // ************************************************************************* // DATA TYPES // ************************************************************************* - + /** * A simple two-dimensional point. */ public static class Point implements Serializable { - + public double x, y; - + public Point() {} public Point(double x, double y) { this.x = x; this.y = y; } - + public Point add(Point other) { x += other.x; y += other.y; return this; } - + public Point div(long val) { x /= val; y /= val; return this; } - + public double euclideanDistance(Point other) { return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y)); } - + public void clear() { x = y = 0.0; } - + @Override public String toString() { return x + " " + y; } } - + /** - * A simple two-dimensional centroid, basically a point with an ID. + * A simple two-dimensional centroid, basically a point with an ID. */ public static class Centroid extends Point { - + public int id; - + public Centroid() {} - + public Centroid(int id, double x, double y) { super(x,y); this.id = id; } - + public Centroid(int id, Point p) { super(p.x, p.y); this.id = id; } - + @Override public String toString() { return id + " " + super.toString(); } } - + // ************************************************************************* // USER FUNCTIONS // ************************************************************************* - + /** Converts a {@code Tuple2} into a Point. */ @ForwardedFields("0->x; 1->y") public static final class TuplePointConverter implements MapFunction, Point> { @@ -240,7 +232,7 @@ public Point map(Tuple2 t) throws Exception { return new Point(t.f0, t.f1); } } - + /** Converts a {@code Tuple3} into a Centroid. */ @ForwardedFields("0->id; 1->x; 2->y") public static final class TupleCentroidConverter implements MapFunction, Centroid> { @@ -250,7 +242,7 @@ public Centroid map(Tuple3 t) throws Exception { return new Centroid(t.f0, t.f1, t.f2); } } - + /** Determines the closest cluster center for a data point. */ @ForwardedFields("*->1") public static final class SelectNearestCenter extends RichMapFunction> { @@ -261,19 +253,19 @@ public static final class SelectNearestCenter extends RichMapFunction map(Point p) throws Exception { - + double minDistance = Double.MAX_VALUE; int closestCentroidId = -1; - + // check all cluster centers for (Centroid centroid : centroids) { // compute distance double distance = p.euclideanDistance(centroid); - - // update nearest cluster if necessary + + // update nearest cluster if necessary if (distance < minDistance) { minDistance = distance; closestCentroidId = centroid.id; @@ -281,30 +273,30 @@ public Tuple2 map(Point p) throws Exception { } // emit a new record with the center id and the data point. - return new Tuple2(closestCentroidId, p); + return new Tuple2<>(closestCentroidId, p); } } - + /** Appends a count variable to the tuple. */ @ForwardedFields("f0;f1") public static final class CountAppender implements MapFunction, Tuple3> { @Override public Tuple3 map(Tuple2 t) { - return new Tuple3(t.f0, t.f1, 1L); - } + return new Tuple3<>(t.f0, t.f1, 1L); + } } - + /** Sums and counts point coordinates. */ @ForwardedFields("0") public static final class CentroidAccumulator implements ReduceFunction> { @Override public Tuple3 reduce(Tuple3 val1, Tuple3 val2) { - return new Tuple3(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2); + return new Tuple3<>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2); } } - + /** Computes new centroid from coordinate sum and count of points. */ @ForwardedFields("0->id") public static final class CentroidAverager implements MapFunction, Centroid> { @@ -314,48 +306,4 @@ public Centroid map(Tuple3 value) { return new Centroid(value.f0, value.f1.div(value.f2)); } } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String pointsPath = null; - private static String centersPath = null; - private static String outputPath = null; - private static int numIterations = 10; - - private static final Option POINTS_PATH_OPTION = - new Option("points").alt("P").help("The path to the input points"); - private static final Option CENTERS_PATH_OPTION = - new Option("centroids").alt("C").help("The path to the input centroids"); - private static final Option OUTPUT_PATH_OPTION = - new Option("output").alt("O").help("The path where the output will be written"); - private static final Option NUM_ITERATIONS_OPTION = - new Option("iterations").alt("I").help("The number of iteration performed by the K-Means algorithm"); - - private static DataSet getPointDataSet(ExecutionEnvironment env) { - if(fileOutput) { - // read points from CSV file - return env.readCsvFile(pointsPath) - .fieldDelimiter(" ") - .includeFields(true, true) - .types(Double.class, Double.class) - .map(new TuplePointConverter()); - } else { - return KMeansData.getDefaultPointDataSet(env); - } - } - - private static DataSet getCentroidDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env.readCsvFile(centersPath) - .fieldDelimiter(" ") - .includeFields(true, true, true) - .types(Integer.class, Double.class, Double.class) - .map(new TupleCentroidConverter()); - } else { - return KMeansData.getDefaultCentroidDataSet(env); - } - } } From 857e625d29490112bfd63d0595aa6bcfe4ab8890 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Fri, 22 Jan 2016 10:11:46 +0100 Subject: [PATCH 09/17] [FLINK-2021] Reworked the Scala example to keep it up to date with the Java one --- .../examples/scala/clustering/KMeans.scala | 195 +++++------------- 1 file changed, 56 insertions(+), 139 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index f200d429d74ed..d97c545c97134 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -19,7 +19,7 @@ package org.apache.flink.examples.scala.clustering import org.apache.flink.api.common.functions._ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields -import org.apache.flink.api.java.utils.{Option, ParameterTool, RequiredParameters, RequiredParametersException} +import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.examples.java.clustering.util.KMeansData @@ -58,7 +58,7 @@ import scala.collection.JavaConverters._ * * Usage: * {{{ - * KMeans + * KMeans --points --centroids --output --iterations * }}} * If no parameters are provided, the program is run with default data from * [[org.apache.flink.examples.java.clustering.util.KMeansData]] @@ -68,52 +68,58 @@ import scala.collection.JavaConverters._ * * - Bulk iterations * - Broadcast variables in bulk iterations - * - Custom Java objects (PoJos) + * - Custom Scala objects */ object KMeans { def main(args: Array[String]) { - // Checking input parameters + // checking input parameters val params: ParameterTool = ParameterTool.fromArgs(args) - val requiredParameters: RequiredParameters = new RequiredParameters - var paramsOk: Boolean = false - - requiredParameters.add(POINTS_PATH_OPTION) - requiredParameters.add(CENTERS_PATH_OPTION) - requiredParameters.add(OUTPUT_PATH_OPTION) - requiredParameters.add(NUM_ITERATIONS_OPTION) - - try { - requiredParameters.applyTo(params) - pointsPath = params.get(POINTS_PATH_OPTION.getName) - centersPath = params.get(CENTERS_PATH_OPTION.getName) - outputPath = params.get(OUTPUT_PATH_OPTION.getName) - numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName) - fileOutput = true - paramsOk = true - } catch { - case e: RequiredParametersException => - if (params.getNumberOfParameters == 0) { - println("Executing K-Means example with default parameters and built-in default data.") - println(" Provide parameters to read input data from files.") - println(" See the documentation for the correct format of input files.") - println(" We provide a data generator to create synthetic input files for this program.") - paramsOk = true - } - println(requiredParameters.getHelp(e.getMissingArguments)) - } - - if (!paramsOk) { - return + if (params.getNumberOfParameters < 4) { + println("Executing K-Means example with default parameters and built-in default data.") + println(" Provide parameters to read input data from files.") + println(" See the documentation for the correct format of input files.") + println(" We provide a data generator to create synthetic input files for this program.") + println(" Usage: KMeans --points --centroids --output --iterations ") + } + + // set up execution environment + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + + // get input data: + // points + var points: DataSet[Point] = null + if (params.has("points")) { + points = + env.readCsvFile[(Double, Double)]( + params.get("points"), + fieldDelimiter = " ", + includedFields = Array(0, 1)) + .map { x => new Point(x._1, x._2) } + } else { + env.fromCollection(KMeansData.POINTS map { + case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double]) + }) + } + + // centroids + var centroids: DataSet[KMeans.Centroid] = null + if (params.has("centroids")) { + centroids = + env.readCsvFile[(Int, Double, Double)]( + params.get("centroids"), + fieldDelimiter = " ", + includedFields = Array(0, 1, 2)) + .map { x => new Centroid(x._1, x._2, x._3) } + } else { + centroids = env.fromCollection(KMeansData.CENTROIDS map { + case Array(id, x, y) => + new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double]) + }) } - val env = ExecutionEnvironment.getExecutionEnvironment - - val points: DataSet[Point] = getPointDataSet(env) - val centroids: DataSet[Centroid] = getCentroidDataSet(env) - - val finalCentroids = centroids.iterate(numIterations) { currentCentroids => + val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) { currentCentroids => val newCentroids = points .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids") .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2") @@ -126,106 +132,19 @@ object KMeans { val clusteredPoints: DataSet[(Int, Point)] = points.map(new SelectNearestCenter).withBroadcastSet(finalCentroids, "centroids") - if (fileOutput) { - clusteredPoints.writeAsCsv(outputPath, "\n", " ") + if (params.has("output")) { + clusteredPoints.writeAsCsv(params.get("output"), "\n", " ") env.execute("Scala KMeans Example") - } - else { + } else { clusteredPoints.print() } } - private val POINTS_PATH_OPTION: Option = - new Option("points").alt("P").help("The path to the input points") - private val CENTERS_PATH_OPTION: Option = - new Option("centroids").alt("C").help("The path to the input centroids") - private val OUTPUT_PATH_OPTION: Option = - new Option("output").alt("O").help("The path where the output will be written") - private val NUM_ITERATIONS_OPTION: Option = - new Option("iterations").alt("I").help("The number of iteration performed by the K-Means algorithm") - - @throws(classOf[RequiredParametersException]) - private def parseParameters(params: ParameterTool): Boolean = { - val requiredParameters: RequiredParameters = new RequiredParameters - var parseStatus: Boolean = false - requiredParameters.add(POINTS_PATH_OPTION) - requiredParameters.add(CENTERS_PATH_OPTION) - requiredParameters.add(OUTPUT_PATH_OPTION) - requiredParameters.add(NUM_ITERATIONS_OPTION) - try { - requiredParameters.applyTo(params) - pointsPath = params.get(POINTS_PATH_OPTION.getName) - centersPath = params.get(CENTERS_PATH_OPTION.getName) - outputPath = params.get(OUTPUT_PATH_OPTION.getName) - numIterations = params.getInt(NUM_ITERATIONS_OPTION.getName) - fileOutput = true - parseStatus = true - } catch { - case e: RequiredParametersException => { - if (params.getNumberOfParameters == 0) { - printRunWithDefaultParams() - parseStatus = true - } - println(requiredParameters.getHelp(e.getMissingArguments)) - } - } - parseStatus - } - - private def printRunWithDefaultParams() { - println("Executing K-Means example with default parameters and built-in default data.") - println(" Provide parameters to read input data from files.") - println(" See the documentation for the correct format of input files.") - println(" We provide a data generator to create synthetic input files for this program.") - } - - private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = { - if (fileOutput) { - env.readCsvFile[(Double, Double)]( - pointsPath, - fieldDelimiter = " ", - includedFields = Array(0, 1)) - .map { x => new Point(x._1, x._2)} - } - else { - val points = KMeansData.POINTS map { - case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(points) - } - } - - private def getCentroidDataSet(env: ExecutionEnvironment): DataSet[Centroid] = { - if (fileOutput) { - env.readCsvFile[(Int, Double, Double)]( - centersPath, - fieldDelimiter = " ", - includedFields = Array(0, 1, 2)) - .map { x => new Centroid(x._1, x._2, x._3)} - } - else { - val centroids = KMeansData.CENTROIDS map { - case Array(id, x, y) => - new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(centroids) - } - } - - private var fileOutput: Boolean = false - private var pointsPath: String = null - private var centersPath: String = null - private var outputPath: String = null - private var numIterations: Int = 10 - /** - * A simple two-dimensional point. - */ - class Point(var x: Double, var y: Double) extends Serializable { - def this() { - this(0, 0) - } + * A simple two-dimensional point. + */ + class Point(var x: Double = 0, var y: Double = 0) extends Serializable { def add(other: Point): Point = { x += other.x @@ -254,12 +173,9 @@ object KMeans { } /** - * A simple two-dimensional centroid, basically a point with an ID. - */ - class Centroid(var id: Int, x: Double, y: Double) extends Point(x, y) { - def this() { - this(0, 0, 0) - } + * A simple two-dimensional centroid, basically a point with an ID. + */ + class Centroid(var id: Int = 0, x: Double = 0, y: Double = 0) extends Point(x, y) { def this(id: Int, p: Point) { this(id, p.x, p.y) @@ -294,6 +210,7 @@ object KMeans { } } + } From c7a29835dcdda0fba56e0ce552c0076090551fcf Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Tue, 26 Jan 2016 12:44:13 +0100 Subject: [PATCH 10/17] [FLINK-2021] Restored input source reading in separate methods --- .../examples/java/clustering/KMeans.java | 66 +++++++++++-------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index 8cf46b43874ae..6b95c8bab7688 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -64,7 +64,7 @@ *
* *

- * Usage: KMeans --points --centroids --output --iterations
+ * Usage: KMeans --points <path> --centroids <path> --output <path> --iterations <n>
* If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations. * *

@@ -72,7 +72,7 @@ *

    *
  • Bulk iterations *
  • Broadcast variables in bulk iterations - *
  • Custom Java objects (PoJos) + *
  • Custom Java objects (POJOs) *
*/ @SuppressWarnings("serial") @@ -87,7 +87,7 @@ public static void main(String[] args) throws Exception { System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); System.out.println(" We provide a data generator to create synthetic input files for this program."); - System.out.println(" Usage: KMeans --points --centroids --output --iterations"); + System.out.println(" Usage: KMeans --points --centroids --output --iterations "); } // set up execution environment @@ -95,30 +95,9 @@ public static void main(String[] args) throws Exception { env.getConfig().setGlobalJobParameters(params); // make parameters available in the web interface // get input data: - // points - DataSet points; - if(params.has("points")) { - // read points from CSV file - points = env.readCsvFile(params.get("points")) - .fieldDelimiter(" ") - .includeFields(true, true) - .types(Double.class, Double.class) - .map(new TuplePointConverter()); - } else { - points = KMeansData.getDefaultPointDataSet(env); - } - - // centroids - DataSet centroids; - if(params.has("centroids")) { - centroids = env.readCsvFile(params.get("centroids")) - .fieldDelimiter(" ") - .includeFields(true, true, true) - .types(Integer.class, Double.class, Double.class) - .map(new TupleCentroidConverter()); - } else { - centroids = KMeansData.getDefaultCentroidDataSet(env); - } + // read the points and centroids from the provided paths or fall back to default data + DataSet points = getPointDataSet(params, env); + DataSet centroids = getCentroidDataSet(params, env); // set number of bulk iterations for KMeans algorithm IterativeDataSet loop = centroids.iterate(params.getInt("iterations", 10)); @@ -150,6 +129,39 @@ public static void main(String[] args) throws Exception { } } + // ************************************************************************* + // DATA SOURCE READING (POINTS AND CENTROIDS) + // ************************************************************************* + + private static DataSet getCentroidDataSet(ParameterTool params, ExecutionEnvironment env) { + DataSet centroids; + if(params.has("centroids")) { + centroids = env.readCsvFile(params.get("centroids")) + .fieldDelimiter(" ") + .includeFields(true, true, true) + .types(Integer.class, Double.class, Double.class) + .map(new TupleCentroidConverter()); + } else { + centroids = KMeansData.getDefaultCentroidDataSet(env); + } + return centroids; + } + + private static DataSet getPointDataSet(ParameterTool params, ExecutionEnvironment env) { + DataSet points; + if(params.has("points")) { + // read points from CSV file + points = env.readCsvFile(params.get("points")) + .fieldDelimiter(" ") + .includeFields(true, true) + .types(Double.class, Double.class) + .map(new TuplePointConverter()); + } else { + points = KMeansData.getDefaultPointDataSet(env); + } + return points; + } + // ************************************************************************* // DATA TYPES // ************************************************************************* From dd4e278545182f0d4cbf403911f550c1badde125 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Tue, 26 Jan 2016 14:46:09 +0100 Subject: [PATCH 11/17] [FLINK-2021] Using POJO types instead of tuple converters --- .../examples/java/clustering/KMeans.java | 32 +++---------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java index 6b95c8bab7688..ab817afaca4b1 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -135,12 +135,10 @@ public static void main(String[] args) throws Exception { private static DataSet getCentroidDataSet(ParameterTool params, ExecutionEnvironment env) { DataSet centroids; - if(params.has("centroids")) { + if (params.has("centroids")) { centroids = env.readCsvFile(params.get("centroids")) .fieldDelimiter(" ") - .includeFields(true, true, true) - .types(Integer.class, Double.class, Double.class) - .map(new TupleCentroidConverter()); + .pojoType(Centroid.class, "id", "x", "y"); } else { centroids = KMeansData.getDefaultCentroidDataSet(env); } @@ -149,13 +147,11 @@ private static DataSet getCentroidDataSet(ParameterTool params, Execut private static DataSet getPointDataSet(ParameterTool params, ExecutionEnvironment env) { DataSet points; - if(params.has("points")) { + if (params.has("points")) { // read points from CSV file points = env.readCsvFile(params.get("points")) .fieldDelimiter(" ") - .includeFields(true, true) - .types(Double.class, Double.class) - .map(new TuplePointConverter()); + .pojoType(Point.class, "x", "y"); } else { points = KMeansData.getDefaultPointDataSet(env); } @@ -235,26 +231,6 @@ public String toString() { // USER FUNCTIONS // ************************************************************************* - /** Converts a {@code Tuple2} into a Point. */ - @ForwardedFields("0->x; 1->y") - public static final class TuplePointConverter implements MapFunction, Point> { - - @Override - public Point map(Tuple2 t) throws Exception { - return new Point(t.f0, t.f1); - } - } - - /** Converts a {@code Tuple3} into a Centroid. */ - @ForwardedFields("0->id; 1->x; 2->y") - public static final class TupleCentroidConverter implements MapFunction, Centroid> { - - @Override - public Centroid map(Tuple3 t) throws Exception { - return new Centroid(t.f0, t.f1, t.f2); - } - } - /** Determines the closest cluster center for a data point. */ @ForwardedFields("*->1") public static final class SelectNearestCenter extends RichMapFunction> { From fa631f3698300b0c4f353024e223297a943255f2 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Fri, 29 Jan 2016 12:35:44 +0100 Subject: [PATCH 12/17] [FLINK-2021] Restored source reading methods in Scala examples, introduced case classes --- .../examples/scala/clustering/KMeans.scala | 104 ++++++++++-------- 1 file changed, 60 insertions(+), 44 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index d97c545c97134..aa8da357c7977 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -68,7 +68,7 @@ import scala.collection.JavaConverters._ * * - Bulk iterations * - Broadcast variables in bulk iterations - * - Custom Scala objects + * - Scala case classes */ object KMeans { @@ -88,36 +88,9 @@ object KMeans { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment // get input data: - // points - var points: DataSet[Point] = null - if (params.has("points")) { - points = - env.readCsvFile[(Double, Double)]( - params.get("points"), - fieldDelimiter = " ", - includedFields = Array(0, 1)) - .map { x => new Point(x._1, x._2) } - } else { - env.fromCollection(KMeansData.POINTS map { - case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double]) - }) - } - - // centroids - var centroids: DataSet[KMeans.Centroid] = null - if (params.has("centroids")) { - centroids = - env.readCsvFile[(Int, Double, Double)]( - params.get("centroids"), - fieldDelimiter = " ", - includedFields = Array(0, 1, 2)) - .map { x => new Centroid(x._1, x._2, x._3) } - } else { - centroids = env.fromCollection(KMeansData.CENTROIDS map { - case Array(id, x, y) => - new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double]) - }) - } + // read the points and centroids from the provided paths or fall back to default data + val points: DataSet[Point] = getPointDataSet(params, env) + val centroids: DataSet[Centroid] = getCentroidDataSet(params, env) val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) { currentCentroids => val newCentroids = points @@ -141,49 +114,92 @@ object KMeans { } + // ************************************************************************* + // DATA SOURCE READING (POINTS AND CENTROIDS) + // ************************************************************************* + + def getCentroidDataSet(params: ParameterTool, env: ExecutionEnvironment): DataSet[Centroid] = { + if (params.has("centroids")) { + env.readCsvFile[Centroid]( + params.get("centroids"), + fieldDelimiter = " ", + includedFields = Array(0, 1, 2)) + } else { + env.fromCollection(KMeansData.CENTROIDS map { + case Array(id, x, y) => + new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double]) + }) + } + } + + def getPointDataSet(params: ParameterTool, env: ExecutionEnvironment): DataSet[Point] = { + if (params.has("points")) { + env.readCsvFile[Point]( + params.get("points"), + fieldDelimiter = " ", + includedFields = Array(0, 1)) + } else { + env.fromCollection(KMeansData.POINTS map { + case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double]) + }) + } + } + + // ************************************************************************* + // DATA TYPES + // ************************************************************************* + /** - * A simple two-dimensional point. + * Common trait for operations supported by both points and centroids + * Note: case class inheritance is not allowed in Scala */ - class Point(var x: Double = 0, var y: Double = 0) extends Serializable { + trait Coordinate extends Serializable { - def add(other: Point): Point = { + var x: Double + var y: Double + + def add(other: Coordinate): this.type = { x += other.x y += other.y this } - def div(other: Long): Point = { + def div(other: Long): this.type = { x /= other y /= other this } - def euclideanDistance(other: Point): Double = { + def euclideanDistance(other: Coordinate): Double = Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y)) - } def clear(): Unit = { x = 0 y = 0 } - override def toString: String = { - x + " " + y - } + override def toString: String = + s"$x $y" + } + /** + * A simple two-dimensional point. + */ + case class Point(var x: Double = 0, var y: Double = 0) extends Coordinate + /** * A simple two-dimensional centroid, basically a point with an ID. */ - class Centroid(var id: Int = 0, x: Double = 0, y: Double = 0) extends Point(x, y) { + case class Centroid(var id: Int = 0, var x: Double = 0, var y: Double = 0) extends Coordinate { def this(id: Int, p: Point) { this(id, p.x, p.y) } - override def toString: String = { - id + " " + super.toString - } + override def toString: String = + s"$id ${super.toString}" + } /** Determines the closest cluster center for a data point. */ From f1d8e9f30f61ce1958ee4a272f471dcf472d5542 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Fri, 29 Jan 2016 12:39:54 +0100 Subject: [PATCH 13/17] [FLINK-2021] Documentation updated to reflect changes in the example --- docs/quickstart/run_example_quickstart.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/quickstart/run_example_quickstart.md b/docs/quickstart/run_example_quickstart.md index 169f82de87433..ff0fa8b16ed63 100644 --- a/docs/quickstart/run_example_quickstart.md +++ b/docs/quickstart/run_example_quickstart.md @@ -100,7 +100,7 @@ The Flink web interface allows to submit Flink programs using a graphical user i 5. Enter the arguments and options in the lower box:
Leave the Entry Class and Parallelism form empty
Enter the following program arguments:
- (KMeans expects the following args: --points <points path> --centroids <centers path> --output <result path> --iterations <num iterations> + (KMeans expects the following args: --points <path> --centroids <path> --output <path> --iterations <n> {% highlight bash %} --points /tmp/kmeans/points --centroids /tmp/kmeans/centers --output /tmp/kmeans/result --iterations 10 {% endhighlight %}
From 51eef63d5640db37a64a2258d252d479d8399641 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Fri, 29 Jan 2016 14:32:19 +0100 Subject: [PATCH 14/17] [FLINK-2021] Indentation problem in docs fixed --- docs/quickstart/run_example_quickstart.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/quickstart/run_example_quickstart.md b/docs/quickstart/run_example_quickstart.md index ff0fa8b16ed63..7e3d999d1f38c 100644 --- a/docs/quickstart/run_example_quickstart.md +++ b/docs/quickstart/run_example_quickstart.md @@ -101,9 +101,7 @@ The Flink web interface allows to submit Flink programs using a graphical user i Leave the Entry Class and Parallelism form empty
Enter the following program arguments:
(KMeans expects the following args: --points <path> --centroids <path> --output <path> --iterations <n> - {% highlight bash %} - --points /tmp/kmeans/points --centroids /tmp/kmeans/centers --output /tmp/kmeans/result --iterations 10 - {% endhighlight %}
+ {% highlight bash %}--points /tmp/kmeans/points --centroids /tmp/kmeans/centers --output /tmp/kmeans/result --iterations 10{% endhighlight %}
6. Press Submit to start the job From 9d66b8dced7cfe154d1ccf418d09a6633b9026bd Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Fri, 29 Jan 2016 14:33:41 +0100 Subject: [PATCH 15/17] [FLINK-2021] ScalaDoc comment re-indented to comply with coding style --- .../examples/scala/clustering/KMeans.scala | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index aa8da357c7977..04123f70d46bd 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -27,49 +27,49 @@ import org.apache.flink.examples.java.clustering.util.KMeansData import scala.collection.JavaConverters._ /** - * This example implements a basic K-Means clustering algorithm. - * - * K-Means is an iterative clustering algorithm and works as follows: - * K-Means is given a set of data points to be clustered and an initial set of ''K'' cluster - * centers. - * In each iteration, the algorithm computes the distance of each data point to each cluster center. - * Each point is assigned to the cluster center which is closest to it. - * Subsequently, each cluster center is moved to the center (''mean'') of all points that have - * been assigned to it. - * The moved cluster centers are fed into the next iteration. - * The algorithm terminates after a fixed number of iterations (as in this implementation) - * or if cluster centers do not (significantly) move in an iteration. - * This is the Wikipedia entry for the [[http://en.wikipedia - * .org/wiki/K-means_clustering K-Means Clustering algorithm]]. - * - * This implementation works on two-dimensional data points. - * It computes an assignment of data points to cluster centers, i.e., - * each data point is annotated with the id of the final cluster (center) it belongs to. - * - * Input files are plain text files and must be formatted as follows: - * - * - Data points are represented as two double values separated by a blank character. - * Data points are separated by newline characters. - * For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3, - * y=7.2). - * - Cluster centers are represented by an integer id and a point value. - * For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2, - * y=3.2) and (id=2, x=2.9, y=5.7). - * - * Usage: - * {{{ - * KMeans --points --centroids --output --iterations - * }}} - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.clustering.util.KMeansData]] - * and 10 iterations. - * - * This example shows how to use: - * - * - Bulk iterations - * - Broadcast variables in bulk iterations - * - Scala case classes - */ + * This example implements a basic K-Means clustering algorithm. + * + * K-Means is an iterative clustering algorithm and works as follows: + * K-Means is given a set of data points to be clustered and an initial set of ''K'' cluster + * centers. + * In each iteration, the algorithm computes the distance of each data point to each cluster center. + * Each point is assigned to the cluster center which is closest to it. + * Subsequently, each cluster center is moved to the center (''mean'') of all points that have + * been assigned to it. + * The moved cluster centers are fed into the next iteration. + * The algorithm terminates after a fixed number of iterations (as in this implementation) + * or if cluster centers do not (significantly) move in an iteration. + * This is the Wikipedia entry for the [[http://en.wikipedia + * .org/wiki/K-means_clustering K-Means Clustering algorithm]]. + * + * This implementation works on two-dimensional data points. + * It computes an assignment of data points to cluster centers, i.e., + * each data point is annotated with the id of the final cluster (center) it belongs to. + * + * Input files are plain text files and must be formatted as follows: + * + * - Data points are represented as two double values separated by a blank character. + * Data points are separated by newline characters. + * For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3, + * y=7.2). + * - Cluster centers are represented by an integer id and a point value. + * For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2, + * y=3.2) and (id=2, x=2.9, y=5.7). + * + * Usage: + * {{{ + * KMeans --points --centroids --output --iterations + * }}} + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.clustering.util.KMeansData]] + * and 10 iterations. + * + * This example shows how to use: + * + * - Bulk iterations + * - Broadcast variables in bulk iterations + * - Scala case classes + */ object KMeans { def main(args: Array[String]) { From 4895a7ed5a9291d6df77fa5b71794ece5fba9af5 Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Fri, 29 Jan 2016 15:50:42 +0100 Subject: [PATCH 16/17] [FLINK-2021] Tests updated to reflect usage of ParameterTools in KMeans --- .../flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java | 6 +++++- .../test/optimizer/jsonplan/JsonJobGraphGenerationTest.java | 6 +++++- .../flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java index dad2f12330dec..534bff531ab11 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java @@ -80,7 +80,11 @@ public void dumpIterativeKMeans() { PreviewPlanEnvironment env = new PreviewPlanEnvironment(); env.setAsContext(); try { - KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + KMeans.main(new String[] { + "--points ", IN_FILE, + "--centroids ", IN_FILE, + "--output ", OUT_FILE, + "--iterations", "123"}); } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { // all good. } catch (Exception e) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java index a9ade6a5a1d08..604839b6f8d7c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java @@ -165,7 +165,11 @@ public void testKMeans() { TestingExecutionEnvironment.setAsNext(validator, parallelism); String tmpDir = ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH; - KMeans.main(new String[] {tmpDir, tmpDir, tmpDir, "100"}); + KMeans.main(new String[] { + "--points ", tmpDir, + "--centroids ", tmpDir, + "--output ", tmpDir, + "--iterations", "100"}); } catch (AbortError ignored) {} diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java index 57c82b513050a..fe59c29d97d5f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java @@ -81,7 +81,11 @@ public void dumpIterativeKMeans() { PreviewPlanEnvironment env = new PreviewPlanEnvironment(); env.setAsContext(); try { - KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + KMeans.main(new String[] { + "--points ", IN_FILE, + "--centroids ", IN_FILE, + "--output ", OUT_FILE, + "--iterations", "123"}); } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { // all good. } catch (Exception e) { From 89cd989870c0d3f186879d740a2c4f162b09381c Mon Sep 17 00:00:00 2001 From: Stefano Baghino Date: Fri, 29 Jan 2016 17:44:20 +0100 Subject: [PATCH 17/17] [FLINK-2021] Boy Scout rule: fixed typo in method name --- .../test/optimizer/jsonplan/JsonJobGraphGenerationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java index 604839b6f8d7c..678d5d80ccb11 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java @@ -217,7 +217,7 @@ public void testConnectedComponents() { private static interface JsonValidator { - void validtateJson(String json) throws Exception; + void validateJson(String json) throws Exception; } private static class GenericValidator implements JsonValidator { @@ -231,7 +231,7 @@ private static class GenericValidator implements JsonValidator { } @Override - public void validtateJson(String json) throws Exception { + public void validateJson(String json) throws Exception { final Map idToNode = new HashMap<>(); // validate the produced JSON @@ -334,7 +334,7 @@ public JobExecutionResult execute(String jobName) throws Exception { JsonParser parser = new JsonFactory().createJsonParser(jsonPlan); while (parser.nextToken() != null); - validator.validtateJson(jsonPlan); + validator.validateJson(jsonPlan); throw new AbortError(); }