Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 121 additions & 122 deletions flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_add_group_with_variable(self):
self.assertEqual(MetricTests.print_metric_group_path(new_group), 'root.key.value')

def test_metric_not_enabled(self):
fc = FunctionContext(None)
fc = FunctionContext(None, None)
with self.assertRaises(RuntimeError):
fc.get_metric_group()

Expand Down
6 changes: 4 additions & 2 deletions flink-python/pyflink/fn_execution/table/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, serialized_fn):
else:
self.base_metric_group = None
self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters}

def finish(self):
self._update_gauge(self.base_metric_group)
Expand All @@ -102,7 +103,7 @@ def process_element(self, value):
def open(self):
for user_defined_func in self.user_defined_funcs:
if hasattr(user_defined_func, 'open'):
user_defined_func.open(FunctionContext(self.base_metric_group))
user_defined_func.open(FunctionContext(self.base_metric_group, self.job_parameters))

def close(self):
for user_defined_func in self.user_defined_funcs:
Expand Down Expand Up @@ -323,11 +324,12 @@ def __init__(self, serialized_fn, keyed_state_backend):
self.state_cache_size = serialized_fn.state_cache_size
self.state_cleaning_enabled = serialized_fn.state_cleaning_enabled
self.data_view_specs = extract_data_view_specs(serialized_fn.udfs)
self.job_parameters = {p.key: p.value for p in serialized_fn.job_parameters}
super(AbstractStreamGroupAggregateOperation, self).__init__(
serialized_fn, keyed_state_backend)

def open(self):
self.group_agg_function.open(FunctionContext(self.base_metric_group))
self.group_agg_function.open(FunctionContext(self.base_metric_group, self.job_parameters))

def close(self):
self.group_agg_function.close()
Expand Down
12 changes: 7 additions & 5 deletions flink-python/pyflink/proto/flink-fn-execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ package org.apache.flink.fn_execution.v1;
option java_package = "org.apache.flink.fnexecution.v1";
option java_outer_classname = "FlinkFnApi";

message JobParameter {
string key = 1;
string value = 2;
}

// ------------------------------------------------------------------------
// Table API & SQL
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -65,6 +70,7 @@ message UserDefinedFunctions {
bool metric_enabled = 2;
repeated OverWindow windows = 3;
bool profile_enabled = 4;
repeated JobParameter job_parameters = 5;
}

// Used to describe the info of over window in pandas batch over window aggregation
Expand Down Expand Up @@ -182,6 +188,7 @@ message UserDefinedAggregateFunctions {
GroupWindow group_window = 12;

bool profile_enabled = 13;
repeated JobParameter job_parameters = 14;
}

// A representation of the data schema.
Expand Down Expand Up @@ -362,11 +369,6 @@ message UserDefinedDataStreamFunction {
REVISE_OUTPUT = 100;
}

message JobParameter {
string key = 1;
string value = 2;
}

message RuntimeContext {
string task_name = 1;
string task_name_with_subtasks = 2;
Expand Down
15 changes: 14 additions & 1 deletion flink-python/pyflink/table/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class FunctionContext(object):
and global job parameters, etc.
"""

def __init__(self, base_metric_group):
def __init__(self, base_metric_group, job_parameters):
self._base_metric_group = base_metric_group
self._job_parameters = job_parameters

def get_metric_group(self) -> MetricGroup:
"""
Expand All @@ -51,6 +52,18 @@ def get_metric_group(self) -> MetricGroup:
"metric with the 'python.metric.enabled' configuration.")
return self._base_metric_group

def get_job_parameter(self, key: str, default_value: str) -> str:
"""
Gets the global job parameter value associated with the given key as a string.

:param key: The key pointing to the associated value.
:param default_value: The default value which is returned in case global job parameter is
null or there is no value associated with the given key.

.. versionadded:: 1.17.0
"""
return self._job_parameters[key] if key in self._job_parameters else default_value


class UserDefinedFunction(abc.ABC):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public static FlinkFnApi.CoderInfoDescriptor createOverWindowArrowTypeCoderInfoD
// function utilities

public static FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto(
RuntimeContext runtimeContext,
PythonFunctionInfo[] userDefinedFunctions,
boolean isMetricEnabled,
boolean isProfileEnabled) {
Expand All @@ -144,6 +145,16 @@ public static FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto(
}
builder.setMetricEnabled(isMetricEnabled);
builder.setProfileEnabled(isProfileEnabled);
builder.addAllJobParameters(
runtimeContext.getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
.stream()
.map(
entry ->
FlinkFnApi.JobParameter.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.collect(Collectors.toList()));
return builder.build();
}

Expand Down Expand Up @@ -259,8 +270,7 @@ public static FlinkFnApi.UserDefinedDataStreamFunction createUserDefinedDataStre
.entrySet().stream()
.map(
entry ->
FlinkFnApi.UserDefinedDataStreamFunction
.JobParameter.newBuilder()
FlinkFnApi.JobParameter.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
Expand All @@ -269,8 +279,7 @@ public static FlinkFnApi.UserDefinedDataStreamFunction createUserDefinedDataStre
internalParameters.entrySet().stream()
.map(
entry ->
FlinkFnApi.UserDefinedDataStreamFunction
.JobParameter.newBuilder()
FlinkFnApi.JobParameter.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ protected FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto(
ProtoUtils.createUserDefinedAggregateFunctionProto(
aggregateFunctions[i], specs));
}
builder.addAllJobParameters(
getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
.stream()
.map(
entry ->
FlinkFnApi.JobParameter.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.collect(Collectors.toList()));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public RowData getFunctionInput(RowData element) {
@Override
public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
return ProtoUtils.createUserDefinedFunctionsProto(
getRuntimeContext(),
pandasAggFunctions,
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.stream.Collectors;

import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
Expand Down Expand Up @@ -263,6 +264,16 @@ public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
}
builder.setMetricEnabled(config.get(PYTHON_METRIC_ENABLED));
builder.setProfileEnabled(config.get(PYTHON_PROFILE_ENABLED));
builder.addAllJobParameters(
getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().entrySet()
.stream()
.map(
entry ->
FlinkFnApi.JobParameter.newBuilder()
.setKey(entry.getKey())
.setValue(entry.getValue())
.build())
.collect(Collectors.toList()));
// add windows
for (int i = 0; i < lowerBoundary.length; i++) {
FlinkFnApi.OverWindow.Builder windowBuilder = FlinkFnApi.OverWindow.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public PythonEnv getPythonEnv() {
@Override
public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
return ProtoUtils.createUserDefinedFunctionsProto(
getRuntimeContext(),
scalarFunctions,
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void openPythonInterpreter() {
interpreter.set(
"proto",
ProtoUtils.createUserDefinedFunctionsProto(
getRuntimeContext(),
scalarFunctions,
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void openPythonInterpreter() {
interpreter.set(
"proto",
ProtoUtils.createUserDefinedFunctionsProto(
getRuntimeContext(),
new PythonFunctionInfo[] {tableFunction},
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(RowType ru
@Override
public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
return ProtoUtils.createUserDefinedFunctionsProto(
getRuntimeContext(),
new PythonFunctionInfo[] {tableFunction},
config.get(PYTHON_METRIC_ENABLED),
config.get(PYTHON_PROFILE_ENABLED));
Expand Down