Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-15913][python] Add Python TableFunction Runner and Operator in old planner #11020

Merged
merged 3 commits into from Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
Expand Up @@ -36,7 +36,7 @@
name='flink-fn-execution.proto',
package='org.apache.flink.fn_execution.v1',
syntax='proto3',
serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\xfc\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12K\n\x06inputs\x18\x02 \x03(\x0b\x32;.org.apache.flink.fn_execution.v1.UserDefinedFunction.Input\x1a\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"[\n\x14UserDefinedFunctions\x12\x43\n\x04udfs\x18\x01 \x03(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunction\"\x80\t\n\x06Schema\x12>\n\x06\x66ields\x18\x01 \x03(\x0b\x32..org.apache.flink.fn_execution.v1.Schema.Field\x1a\x97\x01\n\x07MapType\x12\x44\n\x08key_type\x18\x01 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x12\x46\n\nvalue_type\x18\x02 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x1a!\n\x0c\x44\x61teTimeType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x1a/\n\x0b\x44\x65\x63imalType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x12\r\n\x05scale\x18\x02 \x01(\x05\x1a\xec\x03\n\tFieldType\x12\x44\n\ttype_name\x18\x01 \x01(\x0e\x32\x31.org.apache.flink.fn_execution.v1.Schema.TypeName\x12\x10\n\x08nullable\x18\x02 \x01(\x08\x12U\n\x17\x63ollection_element_type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldTypeH\x00\x12\x44\n\x08map_type\x18\x04 \x01(\x0b\x32\x30.org.apache.flink.fn_execution.v1.Schema.MapTypeH\x00\x12>\n\nrow_schema\x18\x05 \x01(\x0b\x32(.org.apache.flink.fn_execution.v1.SchemaH\x00\x12O\n\x0e\x64\x61te_time_type\x18\x06 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.Schema.DateTimeTypeH\x00\x12L\n\x0c\x64\x65\x63imal_type\x18\x07 \x01(\x0b\x32\x34.org.apache.flink.fn_execution.v1.Schema.DecimalTypeH\x00\x42\x0b\n\ttype_info\x1al\n\x05\x46ield\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12@\n\x04type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\"\xea\x01\n\x08TypeName\x12\x07\n\x03ROW\x10\x00\x12\x0b\n\x07TINYINT\x10\x01\x12\x0c\n\x08SMALLINT\x10\x02\x12\x07\n\x03INT\x10\x03\x12\n\n\x06\x42IGINT\x10\x04\x12\x0b\n\x07\x44\x45\x43IMAL\x10\x05\x12\t\n\x05\x46LOAT\x10\x06\x12\n\n\x06\x44OUBLE\x10\x07\x12\x08\n\x04\x44\x41TE\x10\x08\x12\x08\n\x04TIME\x10\t\x12\x0c\n\x08\x44\x41TETIME\x10\n\x12\x0b\n\x07\x42OOLEAN\x10\x0b\x12\n\n\x06\x42INARY\x10\x0c\x12\r\n\tVARBINARY\x10\r\x12\x08\n\x04\x43HAR\x10\x0e\x12\x0b\n\x07VARCHAR\x10\x0f\x12\t\n\x05\x41RRAY\x10\x10\x12\x07\n\x03MAP\x10\x11\x12\x0c\n\x08MULTISET\x10\x12\x42-\n\x1forg.apache.flink.fnexecution.v1B\nFlinkFnApib\x06proto3')
serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\xfc\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12K\n\x06inputs\x18\x02 \x03(\x0b\x32;.org.apache.flink.fn_execution.v1.UserDefinedFunction.Input\x1a\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"[\n\x14UserDefinedFunctions\x12\x43\n\x04udfs\x18\x01 \x03(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunction\"\x98\t\n\x06Schema\x12>\n\x06\x66ields\x18\x01 \x03(\x0b\x32..org.apache.flink.fn_execution.v1.Schema.Field\x1a\x97\x01\n\x07MapType\x12\x44\n\x08key_type\x18\x01 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x12\x46\n\nvalue_type\x18\x02 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x1a!\n\x0c\x44\x61teTimeType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x1a/\n\x0b\x44\x65\x63imalType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x12\r\n\x05scale\x18\x02 \x01(\x05\x1a\xec\x03\n\tFieldType\x12\x44\n\ttype_name\x18\x01 \x01(\x0e\x32\x31.org.apache.flink.fn_execution.v1.Schema.TypeName\x12\x10\n\x08nullable\x18\x02 \x01(\x08\x12U\n\x17\x63ollection_element_type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldTypeH\x00\x12\x44\n\x08map_type\x18\x04 \x01(\x0b\x32\x30.org.apache.flink.fn_execution.v1.Schema.MapTypeH\x00\x12>\n\nrow_schema\x18\x05 \x01(\x0b\x32(.org.apache.flink.fn_execution.v1.SchemaH\x00\x12O\n\x0e\x64\x61te_time_type\x18\x06 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.Schema.DateTimeTypeH\x00\x12L\n\x0c\x64\x65\x63imal_type\x18\x07 \x01(\x0b\x32\x34.org.apache.flink.fn_execution.v1.Schema.DecimalTypeH\x00\x42\x0b\n\ttype_info\x1al\n\x05\x46ield\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12@\n\x04type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\"\x82\x02\n\x08TypeName\x12\x07\n\x03ROW\x10\x00\x12\x0b\n\x07TINYINT\x10\x01\x12\x0c\n\x08SMALLINT\x10\x02\x12\x07\n\x03INT\x10\x03\x12\n\n\x06\x42IGINT\x10\x04\x12\x0b\n\x07\x44\x45\x43IMAL\x10\x05\x12\t\n\x05\x46LOAT\x10\x06\x12\n\n\x06\x44OUBLE\x10\x07\x12\x08\n\x04\x44\x41TE\x10\x08\x12\x08\n\x04TIME\x10\t\x12\x0c\n\x08\x44\x41TETIME\x10\n\x12\x0b\n\x07\x42OOLEAN\x10\x0b\x12\n\n\x06\x42INARY\x10\x0c\x12\r\n\tVARBINARY\x10\r\x12\x08\n\x04\x43HAR\x10\x0e\x12\x0b\n\x07VARCHAR\x10\x0f\x12\t\n\x05\x41RRAY\x10\x10\x12\x07\n\x03MAP\x10\x11\x12\x0c\n\x08MULTISET\x10\x12\x12\x16\n\x12TABLE_FUNCTION_ROW\x10\x13\x42-\n\x1forg.apache.flink.fnexecution.v1B\nFlinkFnApib\x06proto3')
)


Expand Down Expand Up @@ -123,11 +123,15 @@
name='MULTISET', index=18, number=18,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='TABLE_FUNCTION_ROW', index=19, number=19,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=1329,
serialized_end=1563,
serialized_end=1587,
)
_sym_db.RegisterEnumDescriptor(_SCHEMA_TYPENAME)

Expand Down Expand Up @@ -499,7 +503,7 @@
oneofs=[
],
serialized_start=411,
serialized_end=1563,
serialized_end=1587,
)

_USERDEFINEDFUNCTION_INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/proto/flink-fn-execution.proto
Expand Up @@ -73,6 +73,7 @@ message Schema {
ARRAY = 16;
MAP = 17;
MULTISET = 18;
TABLE_FUNCTION_ROW = 19;
}

message MapType {
Expand Down
Expand Up @@ -20,25 +20,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import org.apache.beam.sdk.fn.data.FnDataReceiver;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

/**
* Base class for all stream operators to execute Python {@link ScalarFunction}s. It executes the Python
* {@link ScalarFunction}s in separate Python execution environment.
Expand Down Expand Up @@ -67,7 +54,7 @@
*/
@Internal
public abstract class AbstractPythonScalarFunctionOperator<IN, OUT, UDFIN>
extends AbstractPythonFunctionOperator<IN, OUT> {
extends AbstractStatelessFunctionOperator<IN, OUT, UDFIN> {

private static final long serialVersionUID = 1L;

Expand All @@ -76,154 +63,33 @@ public abstract class AbstractPythonScalarFunctionOperator<IN, OUT, UDFIN>
*/
protected final PythonFunctionInfo[] scalarFunctions;

/**
* The input logical type.
*/
protected final RowType inputType;

/**
* The output logical type.
*/
protected final RowType outputType;

/**
* The offsets of udf inputs.
*/
protected final int[] udfInputOffsets;

/**
* The offset of the fields which should be forwarded.
*/
protected final int[] forwardedFields;

/**
* The udf input logical type.
*/
protected transient RowType udfInputType;

/**
* The udf output logical type.
*/
protected transient RowType udfOutputType;

/**
* The queue holding the input elements for which the execution results have not been received.
*/
protected transient LinkedBlockingQueue<IN> forwardedInputQueue;

/**
* The queue holding the user-defined function execution results. The execution results are in
* the same order as the input elements.
*/
protected transient LinkedBlockingQueue<byte[]> udfResultQueue;

/**
* Reusable InputStream used to holding the execution results to be deserialized.
*/
protected transient ByteArrayInputStreamWithPos bais;

/**
* InputStream Wrapper.
*/
protected transient DataInputViewStreamWrapper baisWrapper;

AbstractPythonScalarFunctionOperator(
Configuration config,
PythonFunctionInfo[] scalarFunctions,
RowType inputType,
RowType outputType,
int[] udfInputOffsets,
int[] forwardedFields) {
super(config);
super(config, inputType, outputType, udfInputOffsets);
this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
this.inputType = Preconditions.checkNotNull(inputType);
this.outputType = Preconditions.checkNotNull(outputType);
this.udfInputOffsets = Preconditions.checkNotNull(udfInputOffsets);
this.forwardedFields = Preconditions.checkNotNull(forwardedFields);
}

@Override
public void open() throws Exception {
forwardedInputQueue = new LinkedBlockingQueue<>();
udfResultQueue = new LinkedBlockingQueue<>();
udfInputType = new RowType(
Arrays.stream(udfInputOffsets)
.mapToObj(i -> inputType.getFields().get(i))
.collect(Collectors.toList()));
udfOutputType = new RowType(outputType.getFields().subList(forwardedFields.length, outputType.getFieldCount()));
bais = new ByteArrayInputStreamWithPos();
baisWrapper = new DataInputViewStreamWrapper(bais);
userDefinedFunctionOutputType = new RowType(
outputType.getFields().subList(forwardedFields.length, outputType.getFieldCount()));
super.open();
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
bufferInput(element.getValue());
super.processElement(element);
emitResults();
}

@Override
public PythonEnv getPythonEnv() {
return scalarFunctions[0].getPythonFunction().getPythonEnv();
}

@Override
public PythonFunctionRunner<IN> createPythonFunctionRunner() throws IOException {
final FnDataReceiver<byte[]> udfResultReceiver = input -> {
// handover to queue, do not block the result receiver thread
udfResultQueue.put(input);
};

return new ProjectUdfInputPythonScalarFunctionRunner(
createPythonFunctionRunner(
udfResultReceiver,
createPythonEnvironmentManager()));
}

/**
* Buffers the specified input, it will be used to construct
* the operator result together with the udf execution result.
*/
public abstract void bufferInput(IN input);

public abstract UDFIN getUdfInput(IN element);

public abstract PythonFunctionRunner<UDFIN> createPythonFunctionRunner(
FnDataReceiver<byte[]> resultReceiver,
PythonEnvironmentManager pythonEnvironmentManager);

private class ProjectUdfInputPythonScalarFunctionRunner implements PythonFunctionRunner<IN> {

private final PythonFunctionRunner<UDFIN> pythonFunctionRunner;

ProjectUdfInputPythonScalarFunctionRunner(PythonFunctionRunner<UDFIN> pythonFunctionRunner) {
this.pythonFunctionRunner = pythonFunctionRunner;
}

@Override
public void open() throws Exception {
pythonFunctionRunner.open();
}

@Override
public void close() throws Exception {
pythonFunctionRunner.close();
}

@Override
public void startBundle() throws Exception {
pythonFunctionRunner.startBundle();
}

@Override
public void finishBundle() throws Exception {
pythonFunctionRunner.finishBundle();
}

@Override
public void processElement(IN element) throws Exception {
pythonFunctionRunner.processElement(getUdfInput(element));
}
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.List;

/**
* @param <IN> Type of the input elements.
* @param <OUT> Type of the output elements.
* @param <UDTFIN> Type of the UDTF input type.
*/
@Internal
public abstract class AbstractPythonTableFunctionOperator<IN, OUT, UDTFIN>
extends AbstractStatelessFunctionOperator<IN, OUT, UDTFIN> {

private static final long serialVersionUID = 1L;

/**
* The Python {@link TableFunction} to be executed.
*/
protected final PythonFunctionInfo tableFunction;

public AbstractPythonTableFunctionOperator(
Configuration config,
PythonFunctionInfo tableFunction,
RowType inputType,
RowType outputType,
int[] udtfInputOffsets) {
super(config, inputType, outputType, udtfInputOffsets);
this.tableFunction = Preconditions.checkNotNull(tableFunction);
}

@Override
public void open() throws Exception {
List<RowType.RowField> udtfOutputDataFields = new ArrayList<>(
outputType.getFields().subList(inputType.getFieldCount(), outputType.getFieldCount()));
userDefinedFunctionOutputType = new RowType(udtfOutputDataFields);
super.open();
}

@Override
public PythonEnv getPythonEnv() {
return tableFunction.getPythonFunction().getPythonEnv();
}

/**
* The received udtf execution result is a finish message when it is a byte with value 0x00.
*/
protected boolean isFinishResult(byte[] rawUdtfResult) {
return rawUdtfResult.length == 1 && rawUdtfResult[0] == 0x00;
}
}