Skip to content

Commit

Permalink
[FLINK-2647] [streaming core] Distinguish between "close" (flushing b…
Browse files Browse the repository at this point in the history
…uffered data) and "dispose" (cleanup resources) in streaming operators.
  • Loading branch information
StephanEwen committed Sep 11, 2015
1 parent aec4b15 commit 407d74f
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 59 deletions.
Expand Up @@ -209,8 +209,7 @@ public void open(final Configuration parameters) throws Exception {
} }


@Override @Override
public void close() throws Exception { public void dispose() {
super.close();
this.bolt.cleanup(); this.bolt.cleanup();
} }


Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito; import org.powermock.api.mockito.PowerMockito;
Expand All @@ -43,9 +44,6 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;


import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;


@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
Expand Down Expand Up @@ -236,6 +234,8 @@ public void testClose() throws Exception {
wrapper.setup(mock(Output.class), taskContext); wrapper.setup(mock(Output.class), taskContext);


wrapper.close(); wrapper.close();
wrapper.dispose();

verify(bolt).cleanup(); verify(bolt).cleanup();
} }


Expand Down
Expand Up @@ -32,32 +32,51 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;



protected transient StreamingRuntimeContext runtimeContext; protected transient StreamingRuntimeContext runtimeContext;


protected transient ExecutionConfig executionConfig; protected transient ExecutionConfig executionConfig;


public transient Output<StreamRecord<OUT>> output; protected transient Output<StreamRecord<OUT>> output;


protected boolean inputCopyDisabled = false; protected boolean inputCopyDisabled = false;


// A sane default for most operators // A sane default for most operators
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;


// ------------------------------------------------------------------------
// Life Cycle
// ------------------------------------------------------------------------

@Override @Override
public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) { public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
this.output = output; this.output = output;
this.executionConfig = runtimeContext.getExecutionConfig(); this.executionConfig = runtimeContext.getExecutionConfig();
this.runtimeContext = runtimeContext; this.runtimeContext = runtimeContext;
} }


/**
* This default implementation of the interface method does nothing.
*/
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {}
}


/**
* This default implementation of the interface method does nothing.
*/
@Override @Override
public void close() throws Exception { public void close() throws Exception {}
}


/**
* This default implementation of the interface method does nothing.
*/
@Override
public void dispose() {}

// ------------------------------------------------------------------------
// Context and chaining properties
// ------------------------------------------------------------------------

@Override @Override
public final void setChainingStrategy(ChainingStrategy strategy) { public final void setChainingStrategy(ChainingStrategy strategy) {
this.chainingStrategy = strategy; this.chainingStrategy = strategy;
Expand Down
Expand Up @@ -36,6 +36,8 @@
import org.apache.flink.streaming.api.state.StreamOperatorState; import org.apache.flink.streaming.api.state.StreamOperatorState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/** /**
* This is used as the base class for operators that have a user-defined * This is used as the base class for operators that have a user-defined
Expand All @@ -46,11 +48,17 @@
* @param <F> * @param <F>
* The type of the user function * The type of the user function
*/ */
public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> { public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable>
extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);



protected final F userFunction; protected final F userFunction;

private boolean functionsClosed = false;


public AbstractUdfStreamOperator(F userFunction) { public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = userFunction; this.userFunction = userFunction;
Expand All @@ -72,9 +80,23 @@ public void open(Configuration parameters) throws Exception {
@Override @Override
public void close() throws Exception { public void close() throws Exception {
super.close(); super.close();
functionsClosed = true;
FunctionUtils.closeFunction(userFunction); FunctionUtils.closeFunction(userFunction);
} }


@Override
public void dispose() {
if (!functionsClosed) {
functionsClosed = true;
try {
FunctionUtils.closeFunction(userFunction);
}
catch (Throwable t) {
LOG.error("Exception while closing user function while failing or canceling task", t);
}
}
}

@Override @Override
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception { public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
Expand Down
Expand Up @@ -27,40 +27,71 @@
* Basic interface for stream operators. Implementers would implement one of * Basic interface for stream operators. Implementers would implement one of
* {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
* {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
* that process elements. You can use * that process elements.
* {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class for *
* custom operators. * The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
* offers default implementation for the lifecycle and properties methods.
* *
* @param <OUT> The output type of the operator * @param <OUT> The output type of the operator
*/ */
public interface StreamOperator<OUT> extends Serializable { public interface StreamOperator<OUT> extends Serializable {


// ------------------------------------------------------------------------
// Life Cycle
// ------------------------------------------------------------------------

/** /**
* Initializes the {@link StreamOperator} for input and output handling. * Initializes the operator. Sets access to the context and the output.
*/ */
public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext); void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext);


/** /**
* This method is called before any elements are processed. * This method is called immediately before any elements are processed, it should contain the
* operator's initialization logic.
*
* @throws java.lang.Exception An exception in this method causes the operator to fail.
*/ */
public void open(Configuration config) throws Exception; void open(Configuration config) throws Exception;


/** /**
* This method is called after no more elements for can arrive for processing. * This method is called after all records have been added to the operators via the methods
* {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
* {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
* {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
* <p>
* The method is expected to flush all remaining buffered data. Exceptions during this flushing
* of buffered should be propagated, in order to cause the operation to be recognized asa failed,
* because the last data items are not processed properly.
*
* @throws java.lang.Exception An exception in this method causes the operator to fail.
*/ */
public void close() throws Exception; void close() throws Exception;

/**
* This method is called at the very end of the operator's life, both in the case of a successful
* completion of the operation, and in the case of a failure and canceling.
*
* This method is expected to make a thorough effort to release all resources
* that the operator has acquired.
*/
void dispose();


// ------------------------------------------------------------------------
// Context and chaining properties
// ------------------------------------------------------------------------


public StreamingRuntimeContext getRuntimeContext(); StreamingRuntimeContext getRuntimeContext();


/** /**
* An operator can return true here to disable copying of its input elements. This overrides * An operator can return true here to disable copying of its input elements. This overrides
* the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig} * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
*/ */
public boolean isInputCopyingDisabled(); boolean isInputCopyingDisabled();


public void setChainingStrategy(ChainingStrategy strategy); void setChainingStrategy(ChainingStrategy strategy);


public ChainingStrategy getChainingStrategy(); ChainingStrategy getChainingStrategy();


/** /**
* Defines the chaining scheme for the operator. By default <b>ALWAYS</b> is used, * Defines the chaining scheme for the operator. By default <b>ALWAYS</b> is used,
Expand Down
Expand Up @@ -79,9 +79,7 @@ public void open(org.apache.flink.configuration.Configuration parameters) throws
} }


@Override @Override
public void close() throws Exception { public void dispose() {
super.close();

try { try {
centralCheck.running = false; centralCheck.running = false;
centralThread.interrupt(); centralThread.interrupt();
Expand Down
Expand Up @@ -185,6 +185,13 @@ public void close() throws Exception {
emitWindow(); emitWindow();
} }


@Override
public void dispose() {
if (activePolicyThread != null) {
activePolicyThread.interrupt();
}
}

/** /**
* This class allows the active trigger thread to call back and push fake * This class allows the active trigger thread to call back and push fake
* elements at any time. * elements at any time.
Expand Down
Expand Up @@ -162,60 +162,56 @@ public final void registerInputOutput() throws Exception {
public final void invoke() throws Exception { public final void invoke() throws Exception {
LOG.debug("Invoking {}", getName()); LOG.debug("Invoking {}", getName());


boolean operatorOpen = false; boolean disposed = false;
try { try {
openAllOperators(); openAllOperators();
operatorOpen = true;


// let the task do its work // let the task do its work
isRunning = true; isRunning = true;
run(); run();
isRunning = false;


if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Finished task {}", getName()); LOG.debug("Finished task {}", getName());
} }


// make sure no further checkpoint and notification actions happen // make sure no further checkpoint and notification actions happen.
// for that we set this task as not running and make sure no other thread is // we make sure that no other thread is currently in the locked scope before
// currently in the locked scope before we close the operators // we close the operators by trying to acquire the checkpoint scope lock
this.isRunning = false;
synchronized (checkpointLock) {} synchronized (checkpointLock) {}


// this is part of the main logic, so if this fails, the task is considered failed // this is part of the main logic, so if this fails, the task is considered failed
closeAllOperators(); closeAllOperators();
operatorOpen = false;


// make sure all data if flushed // make sure all data is flushed
outputHandler.flushOutputs(); outputHandler.flushOutputs();

// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
tryDisposeAllOperators();
disposed = true;
} }
finally { finally {
this.isRunning = false; isRunning = false;


// release the output resources. this method should never fail.
if (outputHandler != null) {
outputHandler.releaseOutputs();
}

// we must! perform this cleanup

try { try {
if (operatorOpen) { cleanup();
// we came here in a failure
closeAllOperators();
}
} }
catch (Throwable t) { catch (Throwable t) {
LOG.error("Error closing stream operators after an exception.", t); // catch and log the exception to not replace the original exception

LOG.error("Error during cleanup of stream task.");
} }
finally {
// we must! perform this cleanup // if the operators were not disposed before, do a hard dispose

if (!disposed) {
// release the output resources disposeAllOperators();
if (outputHandler != null) {
outputHandler.releaseOutputs();
}

// release this operator's resources
try {
cleanup();
}
catch (Throwable t) {
LOG.error("Error during cleanup of stream task.");
}
} }
} }
} }
Expand All @@ -237,14 +233,35 @@ private void openAllOperators() throws Exception {
private void closeAllOperators() throws Exception { private void closeAllOperators() throws Exception {
// We need to close them first to last, since upstream operators in the chain might emit // We need to close them first to last, since upstream operators in the chain might emit
// elements in their close methods. // elements in their close methods.
for (int i = outputHandler.getChainedOperators().size()-1; i >= 0; i--) { for (int i = outputHandler.getChainedOperators().size() - 1; i >= 0; i--) {
StreamOperator<?> operator = outputHandler.getChainedOperators().get(i); StreamOperator<?> operator = outputHandler.getChainedOperators().get(i);
if (operator != null) { if (operator != null) {
operator.close(); operator.close();
} }
} }
} }


private void tryDisposeAllOperators() throws Exception {
for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
if (operator != null) {
operator.dispose();
}
}
}

private void disposeAllOperators() {
for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
if (operator != null) {
try {
operator.dispose();
}
catch (Throwable t) {
LOG.error("Error during disposal of stream operator.", t);
}
}
}
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Access to properties and utilities // Access to properties and utilities
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down

0 comments on commit 407d74f

Please sign in to comment.