Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flink-python/pyflink/table/explain_detail.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ class ExplainDetail(object):
# The changelog mode produced by a physical rel node.
# e.g. GroupAggregate(..., changelogMode=[I,UA,D])
CHANGELOG_MODE = 1

# The execution plan in json format of the program.
JSON_EXECUTION_PLAN = 2
9 changes: 9 additions & 0 deletions flink-python/pyflink/table/tests/test_explain.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import json

from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase
from pyflink.table.explain_detail import ExplainDetail
Expand All @@ -29,6 +30,14 @@ def test_explain(self):

assert isinstance(result, str)

result = t.group_by("c").select(t.a.sum, t.c.alias('b')).explain(
ExplainDetail.JSON_EXECUTION_PLAN)
assert isinstance(result, str)
try:
json.loads(result.split('== Physical Execution Plan ==')[1])
except:
self.fail('The execution plan of explain detail is not in json format.')


if __name__ == '__main__':
import unittest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def test_explain_with_extended(self):
t = t_env.from_elements([], schema)
result = t.select(t.a + 1, t.b, t.c)

actual = result.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
actual = result.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE,
ExplainDetail.JSON_EXECUTION_PLAN)

assert isinstance(actual, str)

Expand Down Expand Up @@ -347,7 +348,8 @@ def test_explain_with_multi_sinks(self):
stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)

actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE,
ExplainDetail.JSON_EXECUTION_PLAN)
self.assertIsInstance(actual, str)

def test_register_java_function(self):
Expand Down
4 changes: 3 additions & 1 deletion flink-python/pyflink/util/java_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ def to_j_explain_detail_arr(p_extra_details):
gateway = get_gateway()

def to_j_explain_detail(p_extra_detail):
if p_extra_detail == ExplainDetail.CHANGELOG_MODE:
if p_extra_detail == ExplainDetail.JSON_EXECUTION_PLAN:
return gateway.jvm.org.apache.flink.table.api.ExplainDetail.JSON_EXECUTION_PLAN
elif p_extra_detail == ExplainDetail.CHANGELOG_MODE:
return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
else:
return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
Expand Down