Skip to content
Permalink
Browse files
[FLINK-27297][python] Add get_execution_environment method with confi…
…guration argument.

This closes #19685.
  • Loading branch information
deadwind4 authored and dianfu committed May 12, 2022
1 parent 74f28a5 commit ff25dff56a0808a369569058c43111ea7b383384
Showing 5 changed files with 38 additions and 14 deletions.
@@ -32,11 +32,10 @@ For Python DataStream API program, the config options could be set as following:
```python
from pyflink.common import Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.util.java_utils import get_j_env_configuration
env = StreamExecutionEnvironment.get_execution_environment()
config = Configuration(j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
config = Configuration()
config.set_integer("python.fn-execution.bundle.size", 1000)
env = StreamExecutionEnvironment.get_execution_environment(config)
```

For Python Table API program, all the config options available for Java/Scala Table API
@@ -32,11 +32,10 @@ For Python DataStream API program, the config options could be set as following:
```python
from pyflink.common import Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.util.java_utils import get_j_env_configuration
env = StreamExecutionEnvironment.get_execution_environment()
config = Configuration(j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
config = Configuration()
config.set_integer("python.fn-execution.bundle.size", 1000)
env = StreamExecutionEnvironment.get_execution_environment(config)
```

For Python Table API program, all the config options available for Java/Scala Table API
@@ -16,9 +16,10 @@
# limitations under the License.
################################################################################
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode, Configuration)
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
from pyflink.util.java_utils import get_j_env_configuration


class ExecutionConfigTests(PyFlinkTestCase):
@@ -273,3 +274,15 @@ def test_equals_and_hash(self):
self.assertEqual(config1, config2)

self.assertEqual(hash(config1), hash(config2))

def test_get_execution_environment_with_config(self):
configuration = Configuration()
configuration.set_integer('parallelism.default', 12)
configuration.set_string('pipeline.name', 'haha')
env = StreamExecutionEnvironment.get_execution_environment(configuration)
execution_config = env.get_config()

self.assertEqual(execution_config.get_parallelism(), 12)
config = Configuration(
j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
self.assertEqual(config.get_string('pipeline.name', ''), 'haha')
@@ -807,17 +807,30 @@ def register_cached_file(self, file_path: str, name: str, executable: bool = Fal
self._j_stream_execution_environment.registerCachedFile(file_path, name, executable)

@staticmethod
def get_execution_environment() -> 'StreamExecutionEnvironment':
def get_execution_environment(configuration: Configuration = None) \
-> 'StreamExecutionEnvironment':
"""
Creates an execution environment that represents the context in which the
program is currently executed. If the program is invoked standalone, this
method returns a local execution environment.
When executed from the command line the given configuration is stacked on top of the
global configuration which comes from the flink-conf.yaml, potentially overriding
duplicated options.
:param configuration: The configuration to instantiate the environment with.
:return: The execution environment of the context in which the program is executed.
"""
gateway = get_gateway()
j_stream_exection_environment = gateway.jvm.org.apache.flink.streaming.api.environment\
.StreamExecutionEnvironment.getExecutionEnvironment()
JStreamExecutionEnvironment = gateway.jvm.org.apache.flink.streaming.api.environment \
.StreamExecutionEnvironment

if configuration:
j_stream_exection_environment = JStreamExecutionEnvironment.getExecutionEnvironment(
configuration._j_configuration)
else:
j_stream_exection_environment = JStreamExecutionEnvironment.getExecutionEnvironment()

return StreamExecutionEnvironment(j_stream_exection_environment)

def add_source(self, source_func: SourceFunction, source_name: str = 'Custom Source',
@@ -23,7 +23,7 @@
from py4j.protocol import Py4JJavaError
from typing import Iterable

from pyflink.common import RowKind, WatermarkStrategy
from pyflink.common import RowKind, WatermarkStrategy, Configuration
from pyflink.common.serializer import TypeSerializer
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
@@ -237,12 +237,12 @@ def setUp(self) -> None:
from pyflink.datastream import StreamExecutionEnvironment

super(DataStreamConversionTestCases, self).setUp()
self.env = StreamExecutionEnvironment.get_execution_environment()
config = Configuration()
config.set_string("akka.ask.timeout", "20 s")
self.env = StreamExecutionEnvironment.get_execution_environment(config)
self.t_env = StreamTableEnvironment.create(self.env)

self.env.set_parallelism(2)
config = get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("akka.ask.timeout", "20 s")
self.t_env.get_config().set(
"python.fn-execution.bundle.size", "1")
self.test_sink = DataStreamTestSinkFunction()

0 comments on commit ff25dff

Please sign in to comment.