Skip to content

Commit

Permalink
[FLINK-15913][python] Add Python Table Function Runner And Operator I…
Browse files Browse the repository at this point in the history
…n Legacy Planner
  • Loading branch information
HuangXingBo committed Feb 5, 2020
1 parent 0905647 commit 024d27c
Show file tree
Hide file tree
Showing 17 changed files with 1,537 additions and 141 deletions.
1 change: 1 addition & 0 deletions flink-python/pyflink/proto/flink-fn-execution.proto
Expand Up @@ -73,6 +73,7 @@ message Schema {
ARRAY = 16;
MAP = 17;
MULTISET = 18;
TABLE = 19;
}

message MapType {
Expand Down
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.python;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import org.apache.beam.sdk.fn.data.FnDataReceiver;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

/**
* @param <IN> Type of the input elements.
* @param <OUT> Type of the output elements.
* @param <UDTFIN> Type of the UDF input type.
* @param <UDTFOUT> Type of the UDF input type.
*/
public abstract class AbstractPythonTableFunctionOperator<IN, OUT, UDTFIN, UDTFOUT>
extends AbstractPythonFunctionOperator<IN, OUT> {

private static final long serialVersionUID = 1L;

/**
* The Python {@link TableFunction} to be executed.
*/
protected final PythonFunctionInfo tableFunction;

/**
* The input logical type.
*/
protected final RowType inputType;

/**
* The output logical type.
*/
protected final RowType outputType;

/**
* The offsets of udtf inputs.
*/
protected final int[] udtfInputOffsets;

/**
* The udtf input logical type.
*/
protected transient RowType udtfInputType;

/**
* The udtf output logical type.
*/
protected transient RowType udtfOutputType;

/**
* The queue holding the input elements for which the execution results have not been received.
*/
protected transient LinkedBlockingQueue<IN> forwardedInputQueue;

/**
* The queue holding the user-defined table function execution results. The execution results
* are in the same order as the input elements.
*/
protected transient LinkedBlockingQueue<UDTFOUT> udtfResultQueue;

public AbstractPythonTableFunctionOperator(
Configuration config,
PythonFunctionInfo tableFunction,
RowType inputType,
RowType outputType,
int[] udtfInputOffsets) {
super(config);
this.tableFunction = Preconditions.checkNotNull(tableFunction);
this.inputType = Preconditions.checkNotNull(inputType);
this.outputType = Preconditions.checkNotNull(outputType);
this.udtfInputOffsets = Preconditions.checkNotNull(udtfInputOffsets);
}

@Override
public void open() throws Exception {
forwardedInputQueue = new LinkedBlockingQueue<>();
udtfResultQueue = new LinkedBlockingQueue<>();
udtfInputType = new RowType(
Arrays.stream(udtfInputOffsets)
.mapToObj(i -> inputType.getFields().get(i))
.collect(Collectors.toList()));
List<RowType.RowField> udtfOutputDataFields = new ArrayList<>(
outputType.getFields().subList(inputType.getFieldCount(), outputType.getFieldCount()));
udtfOutputType = new RowType(udtfOutputDataFields);
super.open();
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
bufferInput(element.getValue());
super.processElement(element);
emitResults();
}

@Override
public PythonFunctionRunner<IN> createPythonFunctionRunner() throws Exception {
final FnDataReceiver<UDTFOUT> udtfResultReceiver = input -> {
// handover to queue, do not block the result receiver thread
udtfResultQueue.put(input);
};

return new ProjectUdfInputPythonTableFunctionRunner(
createPythonFunctionRunner(
udtfResultReceiver,
createPythonEnvironmentManager()));
}

@Override
public PythonEnv getPythonEnv() {
return tableFunction.getPythonFunction().getPythonEnv();
}

/**
* Buffers the specified input, it will be used to construct
* the operator result together with the udtf execution result.
*/
public abstract void bufferInput(IN input);

public abstract UDTFIN getUdtfInput(IN element);

public abstract PythonFunctionRunner<UDTFIN> createPythonFunctionRunner(
FnDataReceiver<UDTFOUT> resultReceiver,
PythonEnvironmentManager pythonEnvironmentManager);

private class ProjectUdfInputPythonTableFunctionRunner implements PythonFunctionRunner<IN> {

private final PythonFunctionRunner<UDTFIN> pythonFunctionRunner;

ProjectUdfInputPythonTableFunctionRunner(PythonFunctionRunner<UDTFIN> pythonFunctionRunner) {
this.pythonFunctionRunner = pythonFunctionRunner;
}

@Override
public void open() throws Exception {
pythonFunctionRunner.open();
}

@Override
public void close() throws Exception {
pythonFunctionRunner.close();
}

@Override
public void startBundle() throws Exception {
pythonFunctionRunner.startBundle();
}

@Override
public void finishBundle() throws Exception {
pythonFunctionRunner.finishBundle();
}

@Override
public void processElement(IN element) throws Exception {
pythonFunctionRunner.processElement(getUdtfInput(element));
}
}
}
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.python;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.runners.python.PythonTableFunctionRunner;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import org.apache.beam.sdk.fn.data.FnDataReceiver;

/**
* The Python {@link TableFunction} operator for the legacy planner.
*/
public class PythonTableFunctionOperator extends AbstractPythonTableFunctionOperator<CRow, CRow, Row, Row> {

private static final long serialVersionUID = 1L;

/**
* The collector used to collect records.
*/
private transient StreamRecordCRowWrappingCollector cRowWrapper;

public PythonTableFunctionOperator(
Configuration config,
PythonFunctionInfo tableFunction,
RowType inputType,
RowType outputType,
int[] udtfInputOffsets) {
super(config, tableFunction, inputType, outputType, udtfInputOffsets);
}

@Override
public void open() throws Exception {
super.open();
this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
}

private boolean isFinishResult(Row result) {
return result.getArity() == 0;
}

@Override
public void emitResults() {
Row udtfResult;
CRow input = null;
while ((udtfResult = udtfResultQueue.poll()) != null) {
if (input == null) {
input = forwardedInputQueue.poll();
}
if (isFinishResult(udtfResult)) {
input = forwardedInputQueue.poll();
}
if (input != null && !isFinishResult(udtfResult)) {
cRowWrapper.setChange(input.change());
cRowWrapper.collect(Row.join(input.row(), udtfResult));
}
}
}

@Override
public void bufferInput(CRow input) {
forwardedInputQueue.add(input);
}

@Override
public Row getUdtfInput(CRow element) {
return Row.project(element.row(), udtfInputOffsets);
}

@Override
public PythonFunctionRunner<Row> createPythonFunctionRunner(
FnDataReceiver<Row> resultReceiver,
PythonEnvironmentManager pythonEnvironmentManager) {
return new PythonTableFunctionRunner(
getRuntimeContext().getTaskName(),
resultReceiver,
tableFunction,
pythonEnvironmentManager,
udtfInputType,
udtfOutputType);
}

/**
* The collector is used to convert a {@link Row} to a {@link CRow}.
*/
private static class StreamRecordCRowWrappingCollector implements Collector<Row> {

private final Collector<StreamRecord<CRow>> out;
private final CRow reuseCRow = new CRow();

/**
* For Table API & SQL jobs, the timestamp field is not used.
*/
private final StreamRecord<CRow> reuseStreamRecord = new StreamRecord<>(reuseCRow);

StreamRecordCRowWrappingCollector(Collector<StreamRecord<CRow>> out) {
this.out = out;
}

public void setChange(boolean change) {
this.reuseCRow.change_$eq(change);
}

@Override
public void collect(Row record) {
reuseCRow.row_$eq(record);
out.collect(reuseStreamRecord);
}

@Override
public void close() {
out.close();
}
}
}

0 comments on commit 024d27c

Please sign in to comment.