From e64c3c04f6867c3c0347bb6cb74bda4c0f4684aa Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 7 Jul 2021 13:10:08 +0800 Subject: [PATCH] [FLINK-23280][python] Python ExplainDetails does not have JSON_EXECUTION_PLAN option --- flink-python/pyflink/table/explain_detail.py | 3 +++ flink-python/pyflink/table/tests/test_explain.py | 9 +++++++++ .../pyflink/table/tests/test_table_environment_api.py | 6 ++++-- flink-python/pyflink/util/java_utils.py | 4 +++- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py index b3b549fbe3a7b..12cbcdfce7402 100644 --- a/flink-python/pyflink/table/explain_detail.py +++ b/flink-python/pyflink/table/explain_detail.py @@ -34,3 +34,6 @@ class ExplainDetail(object): # The changelog mode produced by a physical rel node. # e.g. GroupAggregate(..., changelogMode=[I,UA,D]) CHANGELOG_MODE = 1 + + # The execution plan in json format of the program. + JSON_EXECUTION_PLAN = 2 diff --git a/flink-python/pyflink/table/tests/test_explain.py b/flink-python/pyflink/table/tests/test_explain.py index 06a7ce2ea612e..8fad6c10c317e 100644 --- a/flink-python/pyflink/table/tests/test_explain.py +++ b/flink-python/pyflink/table/tests/test_explain.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import json from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase from pyflink.table.explain_detail import ExplainDetail @@ -29,6 +30,14 @@ def test_explain(self): assert isinstance(result, str) + result = t.group_by("c").select(t.a.sum, t.c.alias('b')).explain( + ExplainDetail.JSON_EXECUTION_PLAN) + assert isinstance(result, str) + try: + json.loads(result.split('== Physical Execution Plan ==')[1]) + except: + self.fail('The execution plan of explain detail is not in json format.') + if __name__ == '__main__': import unittest diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 7ec3bbc488bfc..897af0222b54f 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -69,7 +69,8 @@ def test_explain_with_extended(self): t = t_env.from_elements([], schema) result = t.select(t.a + 1, t.b, t.c) - actual = result.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE) + actual = result.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE, + ExplainDetail.JSON_EXECUTION_PLAN) assert isinstance(actual, str) @@ -347,7 +348,8 @@ def test_explain_with_multi_sinks(self): stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source) stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source) - actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE) + actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE, + ExplainDetail.JSON_EXECUTION_PLAN) self.assertIsInstance(actual, str) def test_register_java_function(self): diff --git a/flink-python/pyflink/util/java_utils.py b/flink-python/pyflink/util/java_utils.py index 8ea8d9b8695fc..df9c2c9bc5817 100644 --- a/flink-python/pyflink/util/java_utils.py +++ b/flink-python/pyflink/util/java_utils.py @@ -143,7 +143,9 @@ def to_j_explain_detail_arr(p_extra_details): gateway = get_gateway() def to_j_explain_detail(p_extra_detail): - if p_extra_detail == ExplainDetail.CHANGELOG_MODE: + if p_extra_detail == ExplainDetail.JSON_EXECUTION_PLAN: + return gateway.jvm.org.apache.flink.table.api.ExplainDetail.JSON_EXECUTION_PLAN + elif p_extra_detail == ExplainDetail.CHANGELOG_MODE: return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE else: return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST