From 784a602167f396cdcd1201509d3c122a5a85248f Mon Sep 17 00:00:00 2001 From: omaralvarez Date: Thu, 2 Jun 2016 11:09:08 +0200 Subject: [PATCH 1/6] [FLINK-4002] [py] Improve testing infraestructure --- .../org/apache/flink/python/api/test_main.py | 29 +++++++++++++++++-- .../org/apache/flink/python/api/test_main2.py | 29 +++++++++++++++++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 9b0f1442e2905..796c76facd82e 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -29,6 +29,12 @@ #Utilities class Id(MapFunction): def map(self, value): + """ + Simple map function to forward test results. + + :param value: Input value. + :return: Forwarded value. + """ return value @@ -39,10 +45,19 @@ def __init__(self, expected, name): self.name = name def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ index = 0 for value in iterator: - if value != self.expected[index]: - raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + try: + if value != self.expected[index]: + raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + except IndexError: + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") index += 1 #collector.collect(self.name + " successful!") @@ -54,6 +69,16 @@ def __init__(self, expected, name): self.name = name def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + This function does not compare element by element, since for example in a Union order is not guaranteed. + + Elements are removed from the expected values list for the whole DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ for value in iterator: if value in self.expected: try: diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index 787928c597c0e..b52173a04af44 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -27,6 +27,12 @@ #Utilities class Id(MapFunction): def map(self, value): + """ + Simple map function to forward test results. + + :param value: Input value. + :return: Forwarded value. + """ return value @@ -37,10 +43,19 @@ def __init__(self, expected, name): self.name = name def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ index = 0 for value in iterator: - if value != self.expected[index]: - raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + try: + if value != self.expected[index]: + raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + except IndexError: + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") index += 1 #collector.collect(self.name + " successful!") @@ -52,6 +67,16 @@ def __init__(self, expected, name): self.name = name def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + This function does not compare element by element, since for example in a Union order is not guaranteed. + + Elements are removed from the expected values list for the whole DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ for value in iterator: if value in self.expected: try: From 4090d425e957fc30b4788022ac24749145e6f9da Mon Sep 17 00:00:00 2001 From: omaralvarez Date: Fri, 3 Jun 2016 14:00:15 +0200 Subject: [PATCH 2/6] [FLINK-4002] [py] Fix Verify2() --- .../python/org/apache/flink/python/api/test_main.py | 11 +++++------ .../python/org/apache/flink/python/api/test_main2.py | 9 ++++----- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 796c76facd82e..6c856fd6a24e5 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -43,7 +43,7 @@ def __init__(self, expected, name): super(Verify, self).__init__() self.expected = expected self.name = name - +#TODO What if there are more expected than actual? def map_partition(self, iterator, collector): """ Compares elements in the expected values list against actual values in resulting DataSet. @@ -80,11 +80,10 @@ def map_partition(self, iterator, collector): :param collector: Collector for the result records. """ for value in iterator: - if value in self.expected: - try: - self.expected.remove(value) - except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) + try: + self.expected.remove(value) + except Exception: + raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) #collector.collect(self.name + " successful!") diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index b52173a04af44..a5d80fc3b6573 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -78,11 +78,10 @@ def map_partition(self, iterator, collector): :param collector: Collector for the result records. """ for value in iterator: - if value in self.expected: - try: - self.expected.remove(value) - except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) + try: + self.expected.remove(value) + except Exception: + raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) #collector.collect(self.name + " successful!") From b23beac35cd24470b91c531092a8c1eb36baa9d1 Mon Sep 17 00:00:00 2001 From: omar alvarez Date: Sun, 5 Jun 2016 12:20:00 +0200 Subject: [PATCH 3/6] [FLINK-4002] [py] Fix last error in Verify() and Verify2() --- .../test/python/org/apache/flink/python/api/test_main.py | 8 ++++++-- .../test/python/org/apache/flink/python/api/test_main2.py | 6 +++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 6c856fd6a24e5..647d7faa2eea2 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -43,7 +43,7 @@ def __init__(self, expected, name): super(Verify, self).__init__() self.expected = expected self.name = name -#TODO What if there are more expected than actual? + def map_partition(self, iterator, collector): """ Compares elements in the expected values list against actual values in resulting DataSet. @@ -59,6 +59,8 @@ def map_partition(self, iterator, collector): except IndexError: raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") index += 1 + if(index != len(self.expected)): + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") #collector.collect(self.name + " successful!") @@ -83,8 +85,10 @@ def map_partition(self, iterator, collector): try: self.expected.remove(value) except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) + raise Exception(self.name + " failed! Actual value " + str(value) + " not contained in expected values: " + str(self.expected)) #collector.collect(self.name + " successful!") + if self.expected: + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") if __name__ == "__main__": diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index a5d80fc3b6573..b9074dfa3670a 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -58,6 +58,8 @@ def map_partition(self, iterator, collector): raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") index += 1 #collector.collect(self.name + " successful!") + if(index != len(self.expected)): + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") class Verify2(MapPartitionFunction): @@ -81,8 +83,10 @@ def map_partition(self, iterator, collector): try: self.expected.remove(value) except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) + raise Exception(self.name + " failed! Actual value " + str(value) + " not contained in expected values: " + str(self.expected)) #collector.collect(self.name + " successful!") + if self.expected: + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") if __name__ == "__main__": From b1c7fb2295ac2653b9c741d91cc4f9983abc2c6f Mon Sep 17 00:00:00 2001 From: omaralvarez Date: Tue, 7 Jun 2016 19:51:29 +0200 Subject: [PATCH 4/6] [FLINK-4002] [py] Fix PythonPlanBinder error when arguments are not passed and modules are --- .../main/java/org/apache/flink/python/api/PythonPlanBinder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index f43a4f935e207..0c1781a260452 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -135,7 +135,7 @@ private void runPlan(String[] args) throws Exception { try { String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt(); - prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? 1 : split)); + prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); receivePlan(); From 48ea5b16acbc8ff3c3e32a8360f0eca4cccef080 Mon Sep 17 00:00:00 2001 From: omaralvarez Date: Tue, 7 Jun 2016 20:10:12 +0200 Subject: [PATCH 5/6] [FLINK-4002] [py] Reuse utility functions in tests --- .../python/api/PythonPlanBinderTest.java | 13 ++- .../org/apache/flink/python/api/test_main.py | 66 +-------------- .../org/apache/flink/python/api/test_main2.py | 68 +-------------- .../apache/flink/python/api/utils/utils.py | 82 +++++++++++++++++++ 4 files changed, 95 insertions(+), 134 deletions(-) create mode 100644 flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index 244e6b72da26c..759d3f2807f32 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -28,6 +28,14 @@ protected boolean skipCollectionExecution() { return true; } + private static String findUtilsFile() throws Exception { + + FileSystem fs = FileSystem.getLocalFileSystem(); + return fs.getWorkingDirectory().toString() + + "/src/test/python/org/apache/flink/python/api/utils/utils.py"; + + } + private static List findTestFiles() throws Exception { List files = new ArrayList(); FileSystem fs = FileSystem.getLocalFileSystem(); @@ -63,14 +71,15 @@ private static boolean isPython3Supported() { @Override protected void testProgram() throws Exception { + String utils = findUtilsFile(); if (isPython2Supported()) { for (String file : findTestFiles()) { - PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file}); + PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file, utils}); } } if (isPython3Supported()) { for (String file : findTestFiles()) { - PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file}); + PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file, utils}); } } } diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 647d7faa2eea2..c0a44149f8cba 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -25,71 +25,7 @@ from flink.plan.Constants import Order, WriteMode from flink.plan.Constants import INT, STRING import struct - -#Utilities -class Id(MapFunction): - def map(self, value): - """ - Simple map function to forward test results. - - :param value: Input value. - :return: Forwarded value. - """ - return value - - -class Verify(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - """ - Compares elements in the expected values list against actual values in resulting DataSet. - - :param iterator: Iterator for the corresponding DataSet partition. - :param collector: Collector for the result records. - """ - index = 0 - for value in iterator: - try: - if value != self.expected[index]: - raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) - except IndexError: - raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") - index += 1 - if(index != len(self.expected)): - raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") - #collector.collect(self.name + " successful!") - - -class Verify2(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify2, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - """ - Compares elements in the expected values list against actual values in resulting DataSet. - - This function does not compare element by element, since for example in a Union order is not guaranteed. - - Elements are removed from the expected values list for the whole DataSet. - - :param iterator: Iterator for the corresponding DataSet partition. - :param collector: Collector for the result records. - """ - for value in iterator: - try: - self.expected.remove(value) - except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + " not contained in expected values: " + str(self.expected)) - #collector.collect(self.name + " successful!") - if self.expected: - raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") - +from utils import Id, Verify if __name__ == "__main__": env = get_environment() diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index b9074dfa3670a..2ea6f91d986ae 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -18,76 +18,10 @@ ################################################################################ from flink.plan.Environment import get_environment from flink.functions.MapFunction import MapFunction -from flink.functions.MapPartitionFunction import MapPartitionFunction from flink.functions.CrossFunction import CrossFunction from flink.functions.JoinFunction import JoinFunction from flink.functions.CoGroupFunction import CoGroupFunction - - -#Utilities -class Id(MapFunction): - def map(self, value): - """ - Simple map function to forward test results. - - :param value: Input value. - :return: Forwarded value. - """ - return value - - -class Verify(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - """ - Compares elements in the expected values list against actual values in resulting DataSet. - - :param iterator: Iterator for the corresponding DataSet partition. - :param collector: Collector for the result records. - """ - index = 0 - for value in iterator: - try: - if value != self.expected[index]: - raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) - except IndexError: - raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") - index += 1 - #collector.collect(self.name + " successful!") - if(index != len(self.expected)): - raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") - - -class Verify2(MapPartitionFunction): - def __init__(self, expected, name): - super(Verify2, self).__init__() - self.expected = expected - self.name = name - - def map_partition(self, iterator, collector): - """ - Compares elements in the expected values list against actual values in resulting DataSet. - - This function does not compare element by element, since for example in a Union order is not guaranteed. - - Elements are removed from the expected values list for the whole DataSet. - - :param iterator: Iterator for the corresponding DataSet partition. - :param collector: Collector for the result records. - """ - for value in iterator: - try: - self.expected.remove(value) - except Exception: - raise Exception(self.name + " failed! Actual value " + str(value) + " not contained in expected values: " + str(self.expected)) - #collector.collect(self.name + " successful!") - if self.expected: - raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") - +from utils import Verify, Verify2 if __name__ == "__main__": env = get_environment() diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py new file mode 100644 index 0000000000000..0caea9505e005 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py @@ -0,0 +1,82 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.functions.MapFunction import MapFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction + +#Utilities +class Id(MapFunction): + def map(self, value): + """ + Simple map function to forward test results. + + :param value: Input value. + :return: Forwarded value. + """ + return value + + +class Verify(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ + index = 0 + for value in iterator: + try: + if value != self.expected[index]: + raise Exception(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + except IndexError: + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") + index += 1 + if(len(self.expected) != index): + raise Exception(self.name + " Test failed. Discrepancy in the number of elements between expected and actual values.") + #collector.collect(self.name + " successful!") + + +class Verify2(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify2, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + """ + Compares elements in the expected values list against actual values in resulting DataSet. + + This function does not compare element by element, since for example in a Union order is not guaranteed. + + Elements are removed from the expected values list for the whole DataSet. + + :param iterator: Iterator for the corresponding DataSet partition. + :param collector: Collector for the result records. + """ + for value in iterator: + try: + self.expected.remove(value) + except Exception: + raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) + #collector.collect(self.name + " successful!") + From 62152328da7bb650b889d5c7de7dec99369e0760 Mon Sep 17 00:00:00 2001 From: omaralvarez Date: Tue, 7 Jun 2016 20:14:03 +0200 Subject: [PATCH 6/6] [FLINK-4002] [py] Remove comment that is not necessary anymore --- .../src/test/python/org/apache/flink/python/api/utils/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py index 0caea9505e005..a383fc7d9c209 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/utils/utils.py @@ -18,7 +18,7 @@ from flink.functions.MapFunction import MapFunction from flink.functions.MapPartitionFunction import MapPartitionFunction -#Utilities + class Id(MapFunction): def map(self, value): """ @@ -79,4 +79,3 @@ def map_partition(self, iterator, collector): except Exception: raise Exception(self.name + " failed! Actual value " + str(value) + "not contained in expected values: "+str(self.expected)) #collector.collect(self.name + " successful!") -