From 59aa16840b3ced96641f6f335c2196bf330c4c06 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 19 Nov 2015 13:30:48 +0100 Subject: [PATCH] [FLINK-2440][py] Expand Environment feature coverage --- .../flink/python/api/PythonPlanBinder.java | 4 +- .../python/api/flink/example/TPCHQuery10.py | 2 +- .../python/api/flink/example/TPCHQuery3.py | 2 +- .../api/flink/example/TriangleEnumeration.py | 2 +- .../api/flink/example/WebLogAnalysis.py | 2 +- .../python/api/flink/example/WordCount.py | 2 +- .../python/api/flink/plan/Environment.py | 38 ++++++++++++++----- .../org/apache/flink/python/api/test_main.py | 2 +- .../org/apache/flink/python/api/test_main2.py | 3 +- 9 files changed, 37 insertions(+), 20 deletions(-) diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index f4f501ac66c88..a27a58985a749 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -278,9 +278,7 @@ private enum Parameters { } private void receiveParameters() throws IOException { - Integer parameterCount = (Integer) receiver.getRecord(true); - - for (int x = 0; x < parameterCount; x++) { + for (int x = 0; x < 4; x++) { Tuple value = (Tuple) receiver.getRecord(true); switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { case DOP: diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py index 032ef85389238..cc9e7cf1963d9 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py @@ -110,6 +110,6 @@ def join(self, value1, value2): result.write_csv(sys.argv[5], '\n', '|', WriteMode.OVERWRITE) - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py index 5fafb017ddbfe..3eb72c9792f77 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py @@ -99,7 +99,7 @@ def join(self, value1, value2): result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE) - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py index 27276352579c4..b1b3ef4325ab0 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py @@ -147,6 +147,6 @@ def join(self, value1, value2): triangles.output() - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py index d571cf91520ad..676043fdc873c 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py @@ -82,6 +82,6 @@ def co_group(self, iterator1, iterator2, collector): result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE) - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py index 8a89a6fc1ac12..71c2e28337183 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py @@ -56,6 +56,6 @@ def reduce(self, iterator, collector): else: result.output() - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index bea6212e7e93c..169e31b4da265 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -40,7 +40,10 @@ def __init__(self): self._counter = 0 #parameters - self._parameters = [] + self._dop = -1 + self._local_mode = False + self._debug_mode = False + self._retry = 0 #sets self._sources = [] @@ -114,15 +117,28 @@ def from_elements(self, *elements): self._sources.append(child) return child_set - def set_degree_of_parallelism(self, degree): + def set_parallelism(self, parallelism): """ - Sets the degree of parallelism (DOP) for operations executed through this environment. + Sets the parallelism for operations executed through this environment. Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with x parallel instances. - :param degreeOfParallelism: The degree of parallelism + :param parallelism: The degree of parallelism """ - self._parameters.append(("dop", degree)) + self._dop = parallelism + + def get_parallelism(self): + """ + Gets the parallelism with which operation are executed by default. + :return The parallelism used by operations. + """ + return self._dop + + def set_number_of_execution_retries(self, count): + self._retry = count + + def get_number_of_execution_retries(self): + return self._retry def execute(self, local=False, debug=False): """ @@ -132,8 +148,8 @@ def execute(self, local=False, debug=False): """ if debug: local = True - self._parameters.append(("mode", local)) - self._parameters.append(("debug", debug)) + self._local_mode = local + self._debug_mode = debug self._optimize_plan() plan_mode = sys.stdin.readline().rstrip('\n') == "plan" @@ -243,9 +259,11 @@ def _send_plan(self): self._send_broadcast() def _send_parameters(self): - self._collector.collect(len(self._parameters)) - for parameter in self._parameters: - self._collector.collect(parameter) + collect = self._collector.collect + collect(("dop", self._dop)) + collect(("debug", self._debug_mode)) + collect(("mode", self._local_mode)) + collect(("retry", self._retry)) def _send_sources(self): for source in self._sources: diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 9a3a5e4b02d6c..16e1a8c3e40ce 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -227,6 +227,6 @@ def combine(self, iterator, collector): .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output() #Execution - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index 2f30cdae9dee7..56e325059df33 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -1,3 +1,4 @@ + # ############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -140,6 +141,6 @@ def map(self, value): .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output() #Execution - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True)