Skip to content

Commit

Permalink
[FLINK-11718] Add onStart method to TaskExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Feb 27, 2019
1 parent 6b9b242 commit 1023694
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 24 deletions.
Expand Up @@ -271,25 +271,43 @@ public TaskExecutor(
// ------------------------------------------------------------------------

@Override
public void start() throws Exception {
super.start();

// start by connecting to the ResourceManager
public void onStart() throws Exception {
try {
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
startTaskExecutorServices();
} catch (Exception e) {
onFatalError(e);
final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
onFatalError(exception);
throw exception;
}

// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl());
startRegistrationTimeout();
}

private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl());

fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

startRegistrationTimeout();
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}

private void handleStartTaskExecutorServicesException(Exception e) throws Exception {
try {
stopTaskExecutorServices();
} catch (Exception inner) {
e.addSuppressed(inner);
}

throw e;
}

/**
Expand Down Expand Up @@ -318,27 +336,52 @@ public CompletableFuture<Void> onStop() {
resourceManagerHeartbeatManager.stop();

try {
resourceManagerLeaderRetriever.stop();
stopTaskExecutorServices();
} catch (Exception e) {
throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
}

if (throwable != null) {
return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
} else {
log.info("Stopped TaskExecutor {}.", getAddress());
return CompletableFuture.completedFuture(null);
}
}

private void stopTaskExecutorServices() throws Exception {
Exception exception = null;

try {
jobLeaderService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

taskSlotTable.stop();

try {
resourceManagerLeaderRetriever.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
taskExecutorServices.shutDown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
fileCache.shutdown();
} catch (Throwable t) {
throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

// it will call close() recursively from the parent to children
taskManagerMetricGroup.close();

if (throwable != null) {
return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
} else {
log.info("Stopped TaskExecutor {}.", getAddress());
return CompletableFuture.completedFuture(null);
}
ExceptionUtils.tryRethrowException(exception);
}

// ======================================================================
Expand Down
Expand Up @@ -344,7 +344,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
.setTaskStateManager(localStateStoresManager)
.build();

final TaskExecutor taskManager = new TaskExecutor(
final TestingTaskExecutor taskManager = new TestingTaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
Expand All @@ -357,6 +357,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {

try {
taskManager.start();
taskManager.waitUntilStarted();

rpc.registerGateway(jobMasterAddress, jobMasterGateway);

Expand Down Expand Up @@ -1047,7 +1048,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
.setTaskStateManager(localStateStoresManager)
.build();

final TaskExecutor taskManager = new TaskExecutor(
final TestingTaskExecutor taskManager = new TestingTaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
Expand All @@ -1060,6 +1061,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {

try {
taskManager.start();
taskManager.waitUntilStarted();

final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);

Expand Down Expand Up @@ -1273,7 +1275,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception {
.setTaskStateManager(localStateStoresManager)
.build();

final TaskExecutor taskExecutor = new TaskExecutor(
final TestingTaskExecutor taskExecutor = new TestingTaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
Expand Down Expand Up @@ -1305,6 +1307,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception {
haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever);

taskExecutor.start();
taskExecutor.waitUntilStarted();

final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);

Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;

import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

/**
* {@link TaskExecutor} extension for testing purposes.
*/
class TestingTaskExecutor extends TaskExecutor {
private final CompletableFuture<Void> startFuture = new CompletableFuture<>();

public TestingTaskExecutor(
RpcService rpcService,
TaskManagerConfiguration taskManagerConfiguration,
HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices,
HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup,
@Nullable String metricQueryServicePath,
BlobCacheService blobCacheService,
FatalErrorHandler fatalErrorHandler) {
super(
rpcService,
taskManagerConfiguration,
haServices,
taskExecutorServices,
heartbeatServices,
taskManagerMetricGroup,
metricQueryServicePath,
blobCacheService,
fatalErrorHandler);
}

@Override
public void onStart() throws Exception {
try {
super.onStart();
} catch (Exception e) {
startFuture.completeExceptionally(e);
throw e;
}

startFuture.complete(null);
}

void waitUntilStarted() {
startFuture.join();
}
}

0 comments on commit 1023694

Please sign in to comment.