Skip to content

Commit

Permalink
[hotfix] StreamTask and OperatorChain properly clean up partially ini…
Browse files Browse the repository at this point in the history
…tialized resources upon failures during initialization
  • Loading branch information
StephanEwen committed Oct 16, 2015
1 parent f2d5038 commit d5a016c
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 70 deletions.
Expand Up @@ -74,29 +74,47 @@ public OperatorChain(StreamTask<OUT, ?> containingTask,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];

for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
// from here on, we need to make sure that the output writers are shut down again on failure
boolean success = false;
try {
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);

RecordWriterOutput<?> streamOutput = createStreamOutput(
outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());

this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(containingTask, configuration,
chainedConfigs, userCodeClassloader, streamOutputMap, allOps);

RecordWriterOutput<?> streamOutput = createStreamOutput(
outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());

streamOutputMap.put(outEdge, streamOutput);
this.streamOutputs[i] = streamOutput;
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);

// add the head operator to the end of the list
this.allOperators[this.allOperators.length - 1] = headOperator;

success = true;
}
finally {
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
output.clearBuffers();
}
}
}
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(containingTask, configuration,
chainedConfigs, userCodeClassloader, streamOutputMap, allOps);

this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);

// add the head operator to the end of the list
this.allOperators[this.allOperators.length - 1] = headOperator;
}

//


public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
Expand Down
Expand Up @@ -127,6 +127,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>

/** The map of user-defined accumulators of this task */
private Map<String, Accumulator<?, ?>> accumulatorMap;

/** The state to be restored once the initialization is done */
private StreamTaskStateList lazyRestoreState;

/** This field is used to forward an exception that is caught in the timer thread. Subclasses
* must ensure that exceptions stored here get thrown on the actual execution Thread. */
Expand Down Expand Up @@ -155,31 +158,44 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>

@Override
public final void registerInputOutput() throws Exception {
LOG.debug("Begin initialization for {}", getName());
LOG.debug("registerInputOutput for {}", getName());

AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();

userClassLoader = getUserCodeClassLoader();
configuration = new StreamConfig(getTaskConfiguration());
accumulatorMap = accumulatorRegistry.getUserMap();

stateBackend = createStateBackend();
stateBackend.initializeForJob(getEnvironment().getJobID());

headOperator = configuration.getStreamOperator(userClassLoader);
operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());

if (headOperator != null) {
headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
}
boolean initializationCompleted = false;
try {
AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();

timerService = Executors.newSingleThreadScheduledExecutor(
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
userClassLoader = getUserCodeClassLoader();
configuration = new StreamConfig(getTaskConfiguration());
accumulatorMap = accumulatorRegistry.getUserMap();

// task specific initialization
init();

LOG.debug("Finish initialization for {}", getName());
stateBackend = createStateBackend();
stateBackend.initializeForJob(getEnvironment().getJobID());

headOperator = configuration.getStreamOperator(userClassLoader);
operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());

if (headOperator != null) {
headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
}

timerService = Executors.newSingleThreadScheduledExecutor(
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));

// task specific initialization
init();

initializationCompleted = true;
}
finally {
if (!initializationCompleted) {
if (timerService != null) {
timerService.shutdownNow();
}
if (operatorChain != null) {
operatorChain.releaseOutputs();
}
}
}
}

@Override
Expand All @@ -188,6 +204,9 @@ public final void invoke() throws Exception {

boolean disposed = false;
try {
// first order of business is to ive operators back their state
restoreStateLazy();

// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
synchronized (lock) {
Expand Down Expand Up @@ -223,7 +242,7 @@ public final void invoke() throws Exception {
finally {
isRunning = false;

timerService.shutdown();
timerService.shutdownNow();

// release the output resources. this method should never fail.
if (operatorChain != null) {
Expand Down Expand Up @@ -263,7 +282,9 @@ public final void cancel() throws Exception {

private void openAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
operator.open();
if (operator != null) {
operator.open();
}
}
}

Expand All @@ -272,20 +293,27 @@ private void closeAllOperators() throws Exception {
// elements in their close methods.
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (int i = allOperators.length - 1; i >= 0; i--) {
allOperators[i].close();
StreamOperator<?> operator = allOperators[i];
if (operator != null) {
operator.close();
}
}
}

private void tryDisposeAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
operator.dispose();
if (operator != null) {
operator.dispose();
}
}
}

private void disposeAllOperators() {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
try {
operator.dispose();
if (operator != null) {
operator.dispose();
}
}
catch (Throwable t) {
LOG.error("Error during disposal of stream operator.", t);
Expand Down Expand Up @@ -354,22 +382,36 @@ public RecordWriterOutput<?>[] getStreamOutputs() {
// ------------------------------------------------------------------------

@Override
public void setInitialState(StreamTaskStateList initialState) throws Exception {
LOG.info("Restoring checkpointed state to task {}", getName());

final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
final StreamTaskState[] states = initialState.getState(userClassLoader);

for (int i = 0; i < states.length; i++) {
StreamTaskState state = states[i];
StreamOperator<?> operator = allOperators[i];
public void setInitialState(StreamTaskStateList initialState) {
lazyRestoreState = initialState;
}

public void restoreStateLazy() throws Exception {
if (lazyRestoreState != null) {
LOG.info("Restoring checkpointed state to task {}", getName());

if (state != null && operator != null) {
LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
operator.restoreState(state);
try {
final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader);

// be GC friendly
lazyRestoreState = null;

for (int i = 0; i < states.length; i++) {
StreamTaskState state = states[i];
StreamOperator<?> operator = allOperators[i];

if (state != null && operator != null) {
LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
operator.restoreState(state);
}
else if (operator != null) {
LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
}
}
}
else if (operator != null) {
LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
catch (Exception e) {
throw new Exception("Could not restore checkpointed state to operators and functions", e);
}
}
}
Expand All @@ -380,24 +422,27 @@ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exceptio

synchronized (lock) {
if (isRunning) {

// since both state checkpointing and downstream barrier emission occurs in this
// lock scope, they are an atomic operation regardless of the order in which they occur
// we immediately emit the checkpoint barriers, so the downstream operators can start
// their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);

// now draw the state snapshot
try {
final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
final StreamTaskState[] states = new StreamTaskState[allOperators.length];

for (int i = 0; i < states.length; i++) {
StreamTaskState state = allOperators[i].snapshotOperatorState(checkpointId, timestamp);
states[i] = state.isEmpty() ? null : state;
StreamOperator<?> operator = allOperators[i];
if (operator != null) {
StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
states[i] = state.isEmpty() ? null : state;
}
}

StreamTaskStateList allStates = new StreamTaskStateList(states);

// since both state checkpointing and downstream barrier emission occurs in this
// lock scope, they are an atomic operation regardless of the order in which they occur
// we immediately emit the checkpoint barriers, so the downstream operators can start
// their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);

if (allStates.isEmpty()) {
getEnvironment().acknowledgeCheckpoint(checkpointId);
} else {
Expand All @@ -420,7 +465,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Notification of complete checkpoint for task {}", getName());

for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
operator.notifyOfCompletedCheckpoint(checkpointId);
if (operator != null) {
operator.notifyOfCompletedCheckpoint(checkpointId);
}
}
}
else {
Expand Down

0 comments on commit d5a016c

Please sign in to comment.