Skip to content

Commit

Permalink
[FLINK-2425] [runtime] Cleanup code for forwarding config and hostnam…
Browse files Browse the repository at this point in the history
…e into TaskManager's RuntimeEnvironment
  • Loading branch information
StephanEwen committed Aug 3, 2015
1 parent 5bf2197 commit c3ef61d
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 46 deletions.
Expand Up @@ -75,9 +75,9 @@ public class RuntimeEnvironment implements Environment {

private final AccumulatorRegistry accumulatorRegistry;

private Configuration taskManagerConfiguration;
private final Configuration taskManagerConfiguration;

private String hostname;
private final String hostname;

// ------------------------------------------------------------------------

Expand All @@ -95,13 +95,13 @@ public RuntimeEnvironment(
MemoryManager memManager,
IOManager ioManager,
BroadcastVariableManager bcVarManager,
AccumulatorRegistry accumulatorRegistry,
AccumulatorRegistry accumulatorRegistry,
InputSplitProvider splitProvider,
Map<String, Future<Path>> distCacheEntries,
ResultPartitionWriter[] writers,
InputGate[] inputGates,
ActorGateway jobManager,
RuntimeConfiguration taskManagerConfig) {
TaskManagerRuntimeInfo taskManagerInfo) {

checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);

Expand All @@ -124,8 +124,8 @@ public RuntimeEnvironment(
this.writers = checkNotNull(writers);
this.inputGates = checkNotNull(inputGates);
this.jobManager = checkNotNull(jobManager);
this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration();
this.hostname = taskManagerConfig.hostname();
this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
this.hostname = taskManagerInfo.getHostname();
}

// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -145,6 +145,9 @@ public class Task implements Runnable {
/** The name of the class that holds the invokable code */
private final String nameOfInvokableClass;

/** Access to task manager configuration and host names*/
private final TaskManagerRuntimeInfo taskManagerConfig;

/** The memory manager to be used by this task */
private final MemoryManager memoryManager;

Expand Down Expand Up @@ -214,9 +217,6 @@ public class Task implements Runnable {
* initialization, to be memory friendly */
private volatile SerializedValue<StateHandle<?>> operatorState;

/** Access to task manager configuration and host names*/
private RuntimeConfiguration taskManagerConfig;

/**
* <p><b>IMPORTANT:</b> This constructor may not start any work that would need to
* be undone in the case of a failing task deployment.</p>
Expand All @@ -231,7 +231,7 @@ public Task(TaskDeploymentDescriptor tdd,
FiniteDuration actorAskTimeout,
LibraryCacheManager libraryCache,
FileCache fileCache,
RuntimeConfiguration taskManagerConfig)
TaskManagerRuntimeInfo taskManagerConfig)
{
checkArgument(tdd.getNumberOfSubtasks() > 0);
checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
Expand Down
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskmanager;

import org.apache.flink.configuration.Configuration;

/**
* Encapsulation of TaskManager runtime information, like hostname and configuration.
*/
public class TaskManagerRuntimeInfo implements java.io.Serializable {

private static final long serialVersionUID = 5598219619760274072L;

/** host name of the interface that the TaskManager uses to communicate */
private final String hostname;

/** configuration that the TaskManager was started with */
private final Configuration configuration;

/**
* Creates a runtime info.
* @param hostname The host name of the interface that the TaskManager uses to communicate.
* @param configuration The configuration that the TaskManager was started with.
*/
public TaskManagerRuntimeInfo(String hostname, Configuration configuration) {
this.hostname = hostname;
this.configuration = configuration;
}

/**
* Gets host name of the interface that the TaskManager uses to communicate.
* @return The host name of the interface that the TaskManager uses to communicate.
*/
public String getHostname() {
return hostname;
}

/**
* Gets the configuration that the TaskManager was started with.
* @return The configuration that the TaskManager was started with.
*/
public Configuration getConfiguration() {
return configuration;
}
}

This file was deleted.

Expand Up @@ -173,6 +173,10 @@ class TaskManager(

private val currentRegistrationSessionID: UUID = UUID.randomUUID()

private val runtimeInfo = new TaskManagerRuntimeInfo(
connectionInfo.getHostname(),
new UnmodifiableConfiguration(config.configuration))

// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -893,9 +897,7 @@ class TaskManager(
config.timeout,
libCache,
fileCache,
new RuntimeConfiguration(
self.path.toSerializationFormat,
new UnmodifiableConfiguration(config.configuration)))
runtimeInfo)

log.info(s"Received task ${task.getTaskNameWithSubtasks}")

Expand Down
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
Expand Down Expand Up @@ -129,7 +128,6 @@ public void testMixedAsyncCallsInOrder() {
}

private static Task createTask() {

LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());

Expand Down Expand Up @@ -161,9 +159,7 @@ private static Task createTask() {
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
mock(FileCache.class),
new RuntimeConfiguration(
taskManagerGateway.path(),
new UnmodifiableConfiguration(new Configuration())));
new TaskManagerRuntimeInfo("localhost", new Configuration()));
}

public static class CheckpointsInOrderInvokable extends AbstractInvokable
Expand Down
Expand Up @@ -19,9 +19,8 @@
package org.apache.flink.runtime.taskmanager;

import com.google.common.collect.Maps;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
Expand Down Expand Up @@ -49,6 +48,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import scala.concurrent.duration.FiniteDuration;

import java.lang.reflect.Field;
Expand Down Expand Up @@ -727,9 +727,7 @@ private Task createTask(Class<? extends AbstractInvokable> invokable,
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
mock(FileCache.class),
new RuntimeConfiguration(
taskManagerGateway.path(),
new UnmodifiableConfiguration(new Configuration())));
new TaskManagerRuntimeInfo("localhost", new Configuration()));
}

private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
Expand Down

0 comments on commit c3ef61d

Please sign in to comment.