Skip to content
Permalink
Browse files
[FLINK-27545][python][examples] Update the example in PyFlink shell
This closes #19673.
  • Loading branch information
dianfu committed May 9, 2022
1 parent 1fc2619 commit da0e2587a92d60098bc76f55bbc95d5de55e5a40
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 31 deletions.
@@ -80,7 +80,7 @@ $ pyflink-shell.sh local
... .option("field-delimiter", ",")
... .build())
... .build())
>>> t.select("a + 1, b, c")\
>>> t.select(col('a') + 1, col('b'), col('c'))\
... .execute_insert("stream_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
@@ -111,7 +111,7 @@ $ pyflink-shell.sh local
... .option("field-delimiter", ",")
... .build())
... .build())
>>> t.select("a + 1, b, c")\
>>> t.select(col('a') + 1, col('b'), col('c'))\
... .execute_insert("batch_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
@@ -74,34 +74,41 @@
Streaming - Use 's_env' and 'st_env' variables
*
* import tempfile
* import os
* import shutil
* sink_path = tempfile.gettempdir() + '/streaming.csv'
* if os.path.exists(sink_path):
* if os.path.isfile(sink_path):
* os.remove(sink_path)
* else:
* shutil.rmtree(sink_path)
* s_env.set_parallelism(1)
* t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
*
* st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
* .schema(Schema.new_builder()
* .column("a", DataTypes.BIGINT())
* .column("b", DataTypes.STRING())
* .column("c", DataTypes.STRING())
* .build())
* .option("path", sink_path)
* .format(FormatDescriptor.for_format("csv")
* .option("field-delimiter", ",")
* .build())
* .build())
*
* t.select("a + 1, b, c").insert_into("stream_sink")
*
* st_env.execute("stream_job")
```
import os
import shutil
import tempfile
sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
if os.path.isfile(sink_path):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
s_env.set_parallelism(1)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
.schema(Schema.new_builder()
.column("a", DataTypes.BIGINT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.STRING())
.build())
.option("path", sink_path)
.format(FormatDescriptor.for_format("csv")
.option("field-delimiter", ",")
.build())
.build())
t.select(col('a') + 1, col('b'), col('c')).insert_into("stream_sink")
st_env.execute("stream_job")
# show the results
with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
print(f.read())
```
'''
utf8_out.write(welcome_msg)

@@ -53,7 +53,8 @@ def test_stream_case(self):
.build())
.build())

t.select(t.a + 1, t.b, t.c).execute_insert("stream_sink").wait()
from pyflink.table.expressions import col
t.select(col('a') + 1, col('b'), col('c')).execute_insert("stream_sink").wait()

# verify code, do not copy these code to shell.py
with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:

0 comments on commit da0e258

Please sign in to comment.