Skip to content

Commit

Permalink
[FLINK-4505] [cluster mngt] Separate TaskManager service configuratio…
Browse files Browse the repository at this point in the history
…n from TaskManagerConfiguration; Implement TaskManagerRunner

Refactors the startup logic so that is easier to reuse.

This closes #2461.
  • Loading branch information
tillrohrmann committed Sep 28, 2016
1 parent df25f0a commit 5513fe6
Show file tree
Hide file tree
Showing 19 changed files with 1,195 additions and 942 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.highavailability;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;

public class HighAvailabilityServicesUtils {

public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);

switch(highAvailabilityMode) {
case NONE:
final String resourceManagerAddress = null;
return new NonHaServices(resourceManagerAddress);
case ZOOKEEPER:
throw new UnsupportedOperationException("ZooKeeper high availability services " +
"have not been implemented yet.");
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.NetUtils;
import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class RpcServiceUtils {
private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);

/**
* Utility method to create RPC service from configuration and hostname, port.
*
* @param hostname The hostname/address that describes the TaskManager's data location.
* @param port If true, the TaskManager will not initiate the TCP network stack.
* @param configuration The configuration for the TaskManager.
* @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
* @throws IOException Thrown, if the actor system can not bind to the address
* @throws Exception Thrown is some other error occurs while creating akka actor system
*/
public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port));

final ActorSystem actorSystem;

try {
Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);

LOG.debug("Using akka configuration \n {}.", akkaConfig);

actorSystem = AkkaUtils.createActorSystem(akkaConfig);
} catch (Throwable t) {
if (t instanceof ChannelException) {
Throwable cause = t.getCause();
if (cause != null && t.getCause() instanceof java.net.BindException) {
String address = NetUtils.hostAndPortToUrlString(hostname, port);
throw new IOException("Unable to bind AkkaRpcService actor system to address " +
address + " - " + cause.getMessage(), t);
}
}
throw new Exception("Could not create TaskManager actor system", t);
}

final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
return new AkkaRpcService(actorSystem, timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {

Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);



if (actorSystemAddress.host().isDefined()) {
address = actorSystemAddress.host().get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.jboss.netty.channel.ChannelException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
Expand All @@ -39,7 +37,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.BindException;

import java.util.UUID;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -60,7 +58,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private final HighAvailabilityServices haServices;

/** The task manager configuration */
private final TaskExecutorConfiguration taskExecutorConfig;
private final TaskManagerConfiguration taskManagerConfiguration;

/** The I/O manager component in the task manager */
private final IOManager ioManager;
Expand All @@ -71,36 +69,46 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
/** The network component in the task manager */
private final NetworkEnvironment networkEnvironment;

/** The metric registry in the task manager */
private final MetricRegistry metricRegistry;

/** The number of slots in the task manager, should be 1 for YARN */
private final int numberOfSlots;

/** The fatal error handler to use in case of a fatal error */
private final FatalErrorHandler fatalErrorHandler;

// --------- resource manager --------

private TaskExecutorToResourceManagerConnection resourceManagerConnection;

// ------------------------------------------------------------------------

public TaskExecutor(
TaskExecutorConfiguration taskExecutorConfig,
TaskManagerConfiguration taskManagerConfiguration,
TaskManagerLocation taskManagerLocation,
RpcService rpcService,
MemoryManager memoryManager,
IOManager ioManager,
NetworkEnvironment networkEnvironment,
HighAvailabilityServices haServices) {
HighAvailabilityServices haServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) {

super(rpcService);

checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0.");
checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");

this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
this.taskManagerLocation = checkNotNull(taskManagerLocation);
this.memoryManager = checkNotNull(memoryManager);
this.ioManager = checkNotNull(ioManager);
this.networkEnvironment = checkNotNull(networkEnvironment);
this.haServices = checkNotNull(haServices);
this.metricRegistry = checkNotNull(metricRegistry);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

this.numberOfSlots = taskExecutorConfig.getNumberOfSlots();
this.numberOfSlots = taskManagerConfiguration.getNumberSlots();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -158,6 +166,7 @@ public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLe
}
}

/**
* Requests a slot from the TaskManager
*
* @param allocationID id for the request
Expand All @@ -169,22 +178,6 @@ public SlotRequestReply requestSlot(AllocationID allocationID, UUID resourceMana
return new SlotRequestRegistered(allocationID);
}

/**
public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
return null;
}
@Override
return null;
}
@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
return null;
}
@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -222,7 +215,7 @@ public void run() {
void onFatalError(Throwable t) {
// to be determined, probably delegate to a fatal error handler that
// would either log (mini cluster) ot kill the process (yarn, mesos, ...)
log.error("FATAL ERROR", t);
fatalErrorHandler.onFatalError(t);
}

// ------------------------------------------------------------------------
Expand Down

This file was deleted.

0 comments on commit 5513fe6

Please sign in to comment.