Skip to content
Permalink
Browse files

Proper awop factory

  • Loading branch information...
Arvid Heise
Arvid Heise committed Aug 26, 2019
1 parent 029ee39 commit 7815bc0047f41fd2d5dc3dcbe4adbd7be514e0da
@@ -23,6 +23,7 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;

import java.util.concurrent.TimeUnit;

@@ -78,7 +79,7 @@
true);

// create transform
AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
AsyncWaitOperatorFactory<IN, OUT> operator = new AsyncWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func),
timeout,
bufSize,
@@ -60,6 +60,7 @@
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
@@ -1172,8 +1173,10 @@ public ExecutionConfig getExecutionConfig() {
* @param <R>
* type of the return stream
* @return the data stream constructed
* @see #transform(String, TypeInformation, OneInputStreamOperatorFactory)
*/
@PublicEvolving
@Deprecated
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

// read the output type of the input Transform to coax out errors about MissingTypeInfo
@@ -1194,6 +1197,37 @@ public ExecutionConfig getExecutionConfig() {
return returnStream;
}

/**
* Method for passing user defined operators along with the type information that will transform the DataStream.
*
* @param operatorName name of the operator, for logging purposes
* @param outTypeInfo the output type of the operator
* @param operatorFactory the factory for the operator.
* @param <R> type of the return stream
* @return the data stream constructed
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo,
OneInputStreamOperatorFactory<T, R> operatorFactory) {

// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());

@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

getExecutionEnvironment().addOperator(resultTransform);

return returnStream;
}

/**
* Internal function for setting the partitioner for the DataStream.
*
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

/**
* A factory to create {@link OneInputStreamOperator}.
*
* @param <IN> The input type of the operator.
* @param <OUT> The output type of the operator.
*/
public interface OneInputStreamOperatorFactory<IN, OUT> extends StreamOperatorFactory<OUT> {
}
@@ -24,8 +24,6 @@
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

@@ -54,8 +52,6 @@
} else if (operator instanceof StreamSink &&
((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {
return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);
} else if (operator instanceof AsyncWaitOperator) {
return new AsyncWaitOperatorFactory<>((AsyncWaitOperator) operator);
} else if (operator instanceof AbstractUdfStreamOperator) {
return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);
} else {
@@ -50,6 +50,8 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -115,13 +117,14 @@
private transient Thread emitterThread;

/** Mailbox executor used to yield while waiting for buffers to empty. */
private transient MailboxExecutor mailboxExecutor;
private final transient MailboxExecutor mailboxExecutor;

public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
@Nonnull AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
@Nonnull AsyncDataStream.OutputMode outputMode,
@Nonnull MailboxExecutor mailboxExecutor) {
super(asyncFunction);

// TODO this is a temporary fix for the problems described under FLINK-13063 at the cost of breaking chains for
@@ -134,6 +137,8 @@ public AsyncWaitOperator(
this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

this.timeout = timeout;

this.mailboxExecutor = mailboxExecutor;
}

@Override
@@ -166,10 +171,6 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
}
}

void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
this.mailboxExecutor = mailboxExecutor;
}

@Override
public void open() throws Exception {
super.open();
@@ -17,22 +17,68 @@

package org.apache.flink.streaming.api.operators.async;

import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;

/**
* The factory of {@link AsyncWaitOperator}.
*
* @param <OUT> The output type of the operator
*/
public class AsyncWaitOperatorFactory<OUT> extends SimpleUdfStreamOperatorFactory<OUT> implements YieldingOperatorFactory {
public AsyncWaitOperatorFactory(AsyncWaitOperator<?, OUT> operator) {
super(operator);
public class AsyncWaitOperatorFactory<IN, OUT> implements OneInputStreamOperatorFactory<IN, OUT>, YieldingOperatorFactory {
private final AsyncFunction<IN, OUT> asyncFunction;
private final long timeout;
private final int capacity;
private final AsyncDataStream.OutputMode outputMode;
private MailboxExecutor mailboxExecutor;
private ChainingStrategy strategy = ChainingStrategy.HEAD;

public AsyncWaitOperatorFactory(AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
this.asyncFunction = asyncFunction;
this.timeout = timeout;
this.capacity = capacity;
this.outputMode = outputMode;
}

@Override public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
AsyncWaitOperator<?, OUT> operator = (AsyncWaitOperator<?, OUT>) getOperator();
operator.setMailboxExecutor(mailboxExecutor);
this.mailboxExecutor = mailboxExecutor;
}

@Override public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config,
Output output) {
AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(asyncFunction,
timeout,
capacity,
outputMode,
mailboxExecutor);
asyncWaitOperator.setup(containingTask, config, output);
return asyncWaitOperator;
}

@Override public void setChainingStrategy(ChainingStrategy strategy) {
this.strategy = strategy;
}

@Override public ChainingStrategy getChainingStrategy() {
return strategy;
}

@Override public boolean isOperatorSelectiveReading(ClassLoader classLoader) {
return false;
}

@Override public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return AsyncWaitOperator.class;
}
}

0 comments on commit 7815bc0

Please sign in to comment.
You can’t perform that action at this time.