Skip to content

Commit

Permalink
[FLINK-15913][python] Add Python Table Function Runner And Operator I…
Browse files Browse the repository at this point in the history
…n Legacy Planner-fix-1
  • Loading branch information
HuangXingBo committed Feb 7, 2020
1 parent b950cfa commit e2ea604
Show file tree
Hide file tree
Showing 21 changed files with 454 additions and 787 deletions.
8 changes: 4 additions & 4 deletions flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
Expand Up @@ -36,7 +36,7 @@
name='flink-fn-execution.proto',
package='org.apache.flink.fn_execution.v1',
syntax='proto3',
serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\xfc\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12K\n\x06inputs\x18\x02 \x03(\x0b\x32;.org.apache.flink.fn_execution.v1.UserDefinedFunction.Input\x1a\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"[\n\x14UserDefinedFunctions\x12\x43\n\x04udfs\x18\x01 \x03(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunction\"\x8b\t\n\x06Schema\x12>\n\x06\x66ields\x18\x01 \x03(\x0b\x32..org.apache.flink.fn_execution.v1.Schema.Field\x1a\x97\x01\n\x07MapType\x12\x44\n\x08key_type\x18\x01 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x12\x46\n\nvalue_type\x18\x02 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x1a!\n\x0c\x44\x61teTimeType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x1a/\n\x0b\x44\x65\x63imalType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x12\r\n\x05scale\x18\x02 \x01(\x05\x1a\xec\x03\n\tFieldType\x12\x44\n\ttype_name\x18\x01 \x01(\x0e\x32\x31.org.apache.flink.fn_execution.v1.Schema.TypeName\x12\x10\n\x08nullable\x18\x02 \x01(\x08\x12U\n\x17\x63ollection_element_type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldTypeH\x00\x12\x44\n\x08map_type\x18\x04 \x01(\x0b\x32\x30.org.apache.flink.fn_execution.v1.Schema.MapTypeH\x00\x12>\n\nrow_schema\x18\x05 \x01(\x0b\x32(.org.apache.flink.fn_execution.v1.SchemaH\x00\x12O\n\x0e\x64\x61te_time_type\x18\x06 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.Schema.DateTimeTypeH\x00\x12L\n\x0c\x64\x65\x63imal_type\x18\x07 \x01(\x0b\x32\x34.org.apache.flink.fn_execution.v1.Schema.DecimalTypeH\x00\x42\x0b\n\ttype_info\x1al\n\x05\x46ield\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12@\n\x04type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\"\xf5\x01\n\x08TypeName\x12\x07\n\x03ROW\x10\x00\x12\x0b\n\x07TINYINT\x10\x01\x12\x0c\n\x08SMALLINT\x10\x02\x12\x07\n\x03INT\x10\x03\x12\n\n\x06\x42IGINT\x10\x04\x12\x0b\n\x07\x44\x45\x43IMAL\x10\x05\x12\t\n\x05\x46LOAT\x10\x06\x12\n\n\x06\x44OUBLE\x10\x07\x12\x08\n\x04\x44\x41TE\x10\x08\x12\x08\n\x04TIME\x10\t\x12\x0c\n\x08\x44\x41TETIME\x10\n\x12\x0b\n\x07\x42OOLEAN\x10\x0b\x12\n\n\x06\x42INARY\x10\x0c\x12\r\n\tVARBINARY\x10\r\x12\x08\n\x04\x43HAR\x10\x0e\x12\x0b\n\x07VARCHAR\x10\x0f\x12\t\n\x05\x41RRAY\x10\x10\x12\x07\n\x03MAP\x10\x11\x12\x0c\n\x08MULTISET\x10\x12\x12\t\n\x05TABLE\x10\x13\x42-\n\x1forg.apache.flink.fnexecution.v1B\nFlinkFnApib\x06proto3')
serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 org.apache.flink.fn_execution.v1\"\xfc\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12K\n\x06inputs\x18\x02 \x03(\x0b\x32;.org.apache.flink.fn_execution.v1.UserDefinedFunction.Input\x1a\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 \x01(\x0cH\x00\x42\x07\n\x05input\"[\n\x14UserDefinedFunctions\x12\x43\n\x04udfs\x18\x01 \x03(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunction\"\x96\t\n\x06Schema\x12>\n\x06\x66ields\x18\x01 \x03(\x0b\x32..org.apache.flink.fn_execution.v1.Schema.Field\x1a\x97\x01\n\x07MapType\x12\x44\n\x08key_type\x18\x01 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x12\x46\n\nvalue_type\x18\x02 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\x1a!\n\x0c\x44\x61teTimeType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x1a/\n\x0b\x44\x65\x63imalType\x12\x11\n\tprecision\x18\x01 \x01(\x05\x12\r\n\x05scale\x18\x02 \x01(\x05\x1a\xec\x03\n\tFieldType\x12\x44\n\ttype_name\x18\x01 \x01(\x0e\x32\x31.org.apache.flink.fn_execution.v1.Schema.TypeName\x12\x10\n\x08nullable\x18\x02 \x01(\x08\x12U\n\x17\x63ollection_element_type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldTypeH\x00\x12\x44\n\x08map_type\x18\x04 \x01(\x0b\x32\x30.org.apache.flink.fn_execution.v1.Schema.MapTypeH\x00\x12>\n\nrow_schema\x18\x05 \x01(\x0b\x32(.org.apache.flink.fn_execution.v1.SchemaH\x00\x12O\n\x0e\x64\x61te_time_type\x18\x06 \x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.Schema.DateTimeTypeH\x00\x12L\n\x0c\x64\x65\x63imal_type\x18\x07 \x01(\x0b\x32\x34.org.apache.flink.fn_execution.v1.Schema.DecimalTypeH\x00\x42\x0b\n\ttype_info\x1al\n\x05\x46ield\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12@\n\x04type\x18\x03 \x01(\x0b\x32\x32.org.apache.flink.fn_execution.v1.Schema.FieldType\"\x80\x02\n\x08TypeName\x12\x07\n\x03ROW\x10\x00\x12\x0b\n\x07TINYINT\x10\x01\x12\x0c\n\x08SMALLINT\x10\x02\x12\x07\n\x03INT\x10\x03\x12\n\n\x06\x42IGINT\x10\x04\x12\x0b\n\x07\x44\x45\x43IMAL\x10\x05\x12\t\n\x05\x46LOAT\x10\x06\x12\n\n\x06\x44OUBLE\x10\x07\x12\x08\n\x04\x44\x41TE\x10\x08\x12\x08\n\x04TIME\x10\t\x12\x0c\n\x08\x44\x41TETIME\x10\n\x12\x0b\n\x07\x42OOLEAN\x10\x0b\x12\n\n\x06\x42INARY\x10\x0c\x12\r\n\tVARBINARY\x10\r\x12\x08\n\x04\x43HAR\x10\x0e\x12\x0b\n\x07VARCHAR\x10\x0f\x12\t\n\x05\x41RRAY\x10\x10\x12\x07\n\x03MAP\x10\x11\x12\x0c\n\x08MULTISET\x10\x12\x12\x14\n\x10TABLEFUNCTIONROW\x10\x13\x42-\n\x1forg.apache.flink.fnexecution.v1B\nFlinkFnApib\x06proto3')
)


Expand Down Expand Up @@ -124,14 +124,14 @@
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='TABLE', index=19, number=19,
name='TABLEFUNCTIONROW', index=19, number=19,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=1329,
serialized_end=1574,
serialized_end=1585,
)
_sym_db.RegisterEnumDescriptor(_SCHEMA_TYPENAME)

Expand Down Expand Up @@ -503,7 +503,7 @@
oneofs=[
],
serialized_start=411,
serialized_end=1574,
serialized_end=1585,
)

_USERDEFINEDFUNCTION_INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/proto/flink-fn-execution.proto
Expand Up @@ -73,7 +73,7 @@ message Schema {
ARRAY = 16;
MAP = 17;
MULTISET = 18;
TABLE = 19;
TABLEFUNCTIONROW = 19;
}

message MapType {
Expand Down
Expand Up @@ -20,25 +20,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import org.apache.beam.sdk.fn.data.FnDataReceiver;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

/**
* Base class for all stream operators to execute Python {@link ScalarFunction}s. It executes the Python
* {@link ScalarFunction}s in separate Python execution environment.
Expand Down Expand Up @@ -67,7 +54,7 @@
*/
@Internal
public abstract class AbstractPythonScalarFunctionOperator<IN, OUT, UDFIN>
extends AbstractPythonFunctionOperator<IN, OUT> {
extends AbstractStatelessFunctionOperator<IN, OUT, UDFIN> {

private static final long serialVersionUID = 1L;

Expand All @@ -76,154 +63,32 @@ public abstract class AbstractPythonScalarFunctionOperator<IN, OUT, UDFIN>
*/
protected final PythonFunctionInfo[] scalarFunctions;

/**
* The input logical type.
*/
protected final RowType inputType;

/**
* The output logical type.
*/
protected final RowType outputType;

/**
* The offsets of udf inputs.
*/
protected final int[] udfInputOffsets;

/**
* The offset of the fields which should be forwarded.
*/
protected final int[] forwardedFields;

/**
* The udf input logical type.
*/
protected transient RowType udfInputType;

/**
* The udf output logical type.
*/
protected transient RowType udfOutputType;

/**
* The queue holding the input elements for which the execution results have not been received.
*/
protected transient LinkedBlockingQueue<IN> forwardedInputQueue;

/**
* The queue holding the user-defined function execution results. The execution results are in
* the same order as the input elements.
*/
protected transient LinkedBlockingQueue<byte[]> udfResultQueue;

/**
* Reusable InputStream used to holding the execution results to be deserialized.
*/
protected transient ByteArrayInputStreamWithPos bais;

/**
* InputStream Wrapper.
*/
protected transient DataInputViewStreamWrapper baisWrapper;

AbstractPythonScalarFunctionOperator(
Configuration config,
PythonFunctionInfo[] scalarFunctions,
RowType inputType,
RowType outputType,
int[] udfInputOffsets,
int[] forwardedFields) {
super(config);
super(config, inputType, outputType, udfInputOffsets);
this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
this.inputType = Preconditions.checkNotNull(inputType);
this.outputType = Preconditions.checkNotNull(outputType);
this.udfInputOffsets = Preconditions.checkNotNull(udfInputOffsets);
this.forwardedFields = Preconditions.checkNotNull(forwardedFields);
}

@Override
public void open() throws Exception {
forwardedInputQueue = new LinkedBlockingQueue<>();
udfResultQueue = new LinkedBlockingQueue<>();
udfInputType = new RowType(
Arrays.stream(udfInputOffsets)
.mapToObj(i -> inputType.getFields().get(i))
.collect(Collectors.toList()));
udfOutputType = new RowType(outputType.getFields().subList(forwardedFields.length, outputType.getFieldCount()));
bais = new ByteArrayInputStreamWithPos();
baisWrapper = new DataInputViewStreamWrapper(bais);
super.open();
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
bufferInput(element.getValue());
super.processElement(element);
emitResults();
}

@Override
public PythonEnv getPythonEnv() {
return scalarFunctions[0].getPythonFunction().getPythonEnv();
}

@Override
public PythonFunctionRunner<IN> createPythonFunctionRunner() throws IOException {
final FnDataReceiver<byte[]> udfResultReceiver = input -> {
// handover to queue, do not block the result receiver thread
udfResultQueue.put(input);
};

return new ProjectUdfInputPythonScalarFunctionRunner(
createPythonFunctionRunner(
udfResultReceiver,
createPythonEnvironmentManager()));
}

/**
* Buffers the specified input, it will be used to construct
* the operator result together with the udf execution result.
*/
public abstract void bufferInput(IN input);

public abstract UDFIN getUdfInput(IN element);

public abstract PythonFunctionRunner<UDFIN> createPythonFunctionRunner(
FnDataReceiver<byte[]> resultReceiver,
PythonEnvironmentManager pythonEnvironmentManager);

private class ProjectUdfInputPythonScalarFunctionRunner implements PythonFunctionRunner<IN> {

private final PythonFunctionRunner<UDFIN> pythonFunctionRunner;

ProjectUdfInputPythonScalarFunctionRunner(PythonFunctionRunner<UDFIN> pythonFunctionRunner) {
this.pythonFunctionRunner = pythonFunctionRunner;
}

@Override
public void open() throws Exception {
pythonFunctionRunner.open();
}

@Override
public void close() throws Exception {
pythonFunctionRunner.close();
}

@Override
public void startBundle() throws Exception {
pythonFunctionRunner.startBundle();
}

@Override
public void finishBundle() throws Exception {
pythonFunctionRunner.finishBundle();
}

@Override
public void processElement(IN element) throws Exception {
pythonFunctionRunner.processElement(getUdfInput(element));
}
}
}

0 comments on commit e2ea604

Please sign in to comment.