Skip to content

Commit

Permalink
[HOTFIX] Change the sample based on current code tree
Browse files Browse the repository at this point in the history
This is a hotfix for pyflink shell mode.

The output sample has several problems:
1. Need to import the necessary py pkg for avoiding NotDefinitaion
   error.
2. Use the current function for avoiding NoSuchFunction error.
  • Loading branch information
bzhaoopenstack committed Aug 31, 2022
1 parent fc5730a commit 14881a0
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions flink-python/pyflink/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@
import shutil
import tempfile
from pyflink.table.expressions import *
from pyflink.table.schema import Schema
sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
if os.path.isfile(sink_path):
Expand All @@ -101,8 +104,10 @@
.build())
.build())
t.select(col('a') + 1, col('b'), col('c')).insert_into("stream_sink")
st_env.execute("stream_job")
t.select(col('a') + 1, col('b'), col('c')).execute_insert("stream_sink")
# Execute an sample job via StreamExecutionEnvironment
s_env.execute("stream_job")
# show the results
with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
Expand Down

0 comments on commit 14881a0

Please sign in to comment.