From a489762d46110d393adb30388f4665f2f7b3a566 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 24 Jun 2014 16:06:35 +0200 Subject: [PATCH 1/3] Changed default number of task slots per machine to one. Add logging for task slots. --- .../eu/stratosphere/nephele/taskmanager/TaskManager.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index 789955a803b32..d866c6475c530 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -46,7 +46,6 @@ import eu.stratosphere.runtime.io.network.LocalConnectionManager; import eu.stratosphere.runtime.io.network.NetworkConnectionManager; import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager; -import eu.stratosphere.nephele.instance.Hardware; import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult; import eu.stratosphere.nephele.types.IntegerRecord; @@ -357,10 +356,13 @@ public TaskManager(ExecutionMode executionMode) throws Exception { HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem(); int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1); - if (slots == -1) { - slots = Hardware.getNumberCPUCores(); + if (slots == -1) { + slots = 1; + LOG.info("Number of task slots not configured. Creating one task slot."); } else if (slots <= 0) { throw new Exception("Illegal value for the number of task slots: " + slots); + } else { + LOG.info("Creating " + slots + " task slot(s)."); } this.numberOfSlots = slots; From d85ccd3023d1e590d81f365fd0118b121943acc5 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 24 Jun 2014 17:52:26 +0200 Subject: [PATCH 2/3] LocalExecutor automatically picks up the maximum DOP for plans as its number of slots. --- .../eu/stratosphere/client/LocalExecutor.java | 11 ++++- .../java/eu/stratosphere/api/common/Plan.java | 49 +++++++++---------- .../scheduler/DefaultScheduler.java | 2 +- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java index b017220244afc..d73f89343d93b 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java @@ -42,7 +42,7 @@ public class LocalExecutor extends PlanExecutor { private static boolean DEFAULT_OVERWRITE = false; - private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1; + private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1; private final Object lock = new Object(); // we lock to ensure singleton execution @@ -214,6 +214,15 @@ public JobExecutionResult executePlan(Plan plan) throws Exception { if (this.nephele == null) { // we start a session just for us now shutDownAtEnd = true; + + // configure the number of local slots equal to the parallelism of the local plan + if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) { + int maxParallelism = plan.getMaximumParallelism(); + if (maxParallelism > 0) { + this.taskManagerNumSlots = maxParallelism; + } + } + start(); } else { // we use the existing session diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java index b6f5385ef0282..88241079429a6 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java @@ -62,11 +62,6 @@ public class Plan implements Visitable> { * The default parallelism to use for nodes that have no explicitly specified parallelism. */ protected int defaultParallelism = DEFAULT_PARALELLISM; - - /** - * The maximal number of machines to use in the job. - */ - protected int maxNumberMachines; /** * Hash map for files in the distributed cache: registered name to cache entry. @@ -234,28 +229,6 @@ public void setJobName(String jobName) { checkNotNull(jobName, "The job name must not be null."); this.jobName = jobName; } - - /** - * Gets the maximum number of machines to be used for this job. - * - * @return The maximum number of machines to be used for this job. - */ - public int getMaxNumberMachines() { - return this.maxNumberMachines; - } - - /** - * Sets the maximum number of machines to be used for this job. - * - * @param maxNumberMachines The the maximum number to set. - */ - public void setMaxNumberMachines(int maxNumberMachines) { - if (maxNumberMachines == 0 || maxNumberMachines < -1) { - throw new IllegalArgumentException("The maximum number of machines must be positive, or -1 if no limit is imposed."); - } - - this.maxNumberMachines = maxNumberMachines; - } /** * Gets the default degree of parallelism for this job. That degree is always used when an operator @@ -338,4 +311,26 @@ public void registerCachedFile(String name, DistributedCacheEntry entry) throws public Set> getCachedFiles() { return this.cacheFile.entrySet(); } + + public int getMaximumParallelism() { + MaxDopVisitor visitor = new MaxDopVisitor(); + accept(visitor); + return Math.max(visitor.maxDop, this.defaultParallelism); + } + + // -------------------------------------------------------------------------------------------- + + private static final class MaxDopVisitor implements Visitor> { + + private int maxDop = -1; + + @Override + public boolean preVisit(Operator visitable) { + this.maxDop = Math.max(this.maxDop, visitable.getDegreeOfParallelism()); + return true; + } + + @Override + public void postVisit(Operator visitable) {} + } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java index 745b19941989a..48c9143076008 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/DefaultScheduler.java @@ -269,7 +269,7 @@ protected void requestInstances(final ExecutionStage executionStage) throws Inst final int requiredSlots = executionStage.getRequiredSlots(); - LOG.info("Requesting " + requiredSlots + " for job " + executionGraph.getJobID()); + LOG.info("Requesting " + requiredSlots + " slots for job " + executionGraph.getJobID()); this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(), requiredSlots); From 130e6861cec91400b1cce00bfa65ff973cf99b90 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 24 Jun 2014 18:32:25 +0200 Subject: [PATCH 3/3] [FLINK-973] [FLINK-969] Unify, clean up, and extend all environment logging at JobManager/TaskManager startup --- .../nephele/jobmanager/JobManager.java | 19 +-- .../nephele/jobmanager/JobManagerUtils.java | 48 +----- .../jobmanager/web/JobmanagerInfoServlet.java | 6 +- .../nephele/taskmanager/TaskManager.java | 25 +-- .../runtime/util/EnvironmentInformation.java | 160 ++++++++++++++++++ 5 files changed, 173 insertions(+), 85 deletions(-) create mode 100644 stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java index 640140751d062..877288c367881 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java @@ -79,7 +79,6 @@ import eu.stratosphere.nephele.jobgraph.AbstractJobVertex; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobID; -import eu.stratosphere.nephele.jobmanager.JobManagerUtils.RevisionInformation; import eu.stratosphere.nephele.jobmanager.accumulators.AccumulatorManager; import eu.stratosphere.nephele.jobmanager.archive.ArchiveListener; import eu.stratosphere.nephele.jobmanager.archive.MemoryArchivist; @@ -103,6 +102,7 @@ import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult; import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse; import eu.stratosphere.runtime.io.network.RemoteReceiver; +import eu.stratosphere.runtime.util.EnvironmentInformation; import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory; import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult; import eu.stratosphere.nephele.topology.NetworkTopology; @@ -287,17 +287,6 @@ public void shutdown() { this.isShutDown = true; LOG.debug("Shutdown of job manager completed"); } - - /** - * Log Stratosphere version information. - */ - private static void logVersionInformation() { - RevisionInformation rev = JobManagerUtils.getRevisionInformation(); - LOG.info("Starting Stratosphere JobManager " - + "(Version: " + JobManagerUtils.getVersion() + ", " - + "Rev:" + rev.commitId + ", " - + "Date:" + rev.commitDate + ")"); - } /** * Entry point for the program @@ -340,9 +329,6 @@ public static void main(String[] args) { @SuppressWarnings("static-access") public static JobManager initialize(String[] args) throws Exception { - // output the version and revision information to the log - logVersionInformation(); - final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg() .withDescription("Specify configuration directory.").create("configDir"); @@ -375,6 +361,9 @@ public static JobManager initialize(String[] args) throws Exception { System.exit(FAILURE_RETURN_CODE); } + // print some startup environment info, like user, code revision, etc + EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager"); + // First, try to load global configuration GlobalConfiguration.loadConfiguration(configDir); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java index f2e1d334377c2..31879b81913e7 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java @@ -13,18 +13,15 @@ package eu.stratosphere.nephele.jobmanager; -import java.io.IOException; -import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.Properties; -import eu.stratosphere.nephele.ExecutionMode; -import eu.stratosphere.nephele.instance.InstanceManager; -import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import eu.stratosphere.nephele.ExecutionMode; +import eu.stratosphere.nephele.instance.InstanceManager; +import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler; import eu.stratosphere.util.StringUtils; /** @@ -167,43 +164,4 @@ static String getInstanceManagerClassName(ExecutionMode executionMode) { throw new RuntimeException("Unrecognized Execution Mode."); } } - - /** - * Returns the version of Stratosphere as String. - * If version == null, then the JobManager runs from inside the IDE (or somehow not from the maven build jar) - * @return String - */ - public static String getVersion() { - String version = JobManagerUtils.class.getPackage().getImplementationVersion(); - return version; - } - - /** - * Returns the revision of Stratosphere as String. - * @return String - */ - public static RevisionInformation getRevisionInformation() { - RevisionInformation info = new RevisionInformation(); - String revision = ""; - String commitDate = ""; - try { - Properties properties = new Properties(); - InputStream propFile = JobManagerUtils.class.getClassLoader().getResourceAsStream(".version.properties"); - if (propFile != null) { - properties.load(propFile); - revision = properties.getProperty("git.commit.id.abbrev"); - commitDate = properties.getProperty("git.commit.time"); - } - } catch (IOException e) { - LOG.info("Cannot determine code revision. Unable ro read version property file."); - } - info.commitId = revision; - info.commitDate = commitDate; - return info; - } - - public static class RevisionInformation { - public String commitId; - public String commitDate; - } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java index 8ffbd57fd15e8..4b23399ce5ed3 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java @@ -40,13 +40,13 @@ import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.jobgraph.JobStatus; import eu.stratosphere.nephele.jobmanager.JobManager; -import eu.stratosphere.nephele.jobmanager.JobManagerUtils; import eu.stratosphere.nephele.managementgraph.ManagementGraph; import eu.stratosphere.nephele.managementgraph.ManagementGraphIterator; import eu.stratosphere.nephele.managementgraph.ManagementGroupVertex; import eu.stratosphere.nephele.managementgraph.ManagementGroupVertexID; import eu.stratosphere.nephele.managementgraph.ManagementVertex; import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent; +import eu.stratosphere.runtime.util.EnvironmentInformation; import eu.stratosphere.util.StringUtils; @@ -516,8 +516,8 @@ private void writeJsonForArchivedJobGroupvertex(PrintWriter wrt, RecentJobEvent */ private void writeJsonForVersion(PrintWriter wrt) { wrt.write("{"); - wrt.write("\"version\": \"" + JobManagerUtils.getVersion() + "\","); - wrt.write("\"revision\": \"" + JobManagerUtils.getRevisionInformation().commitId + "\""); + wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\","); + wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\""); wrt.write("}"); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index d866c6475c530..bedafaf4e6b83 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -58,7 +58,6 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.security.UserGroupInformation; import eu.stratosphere.api.common.cache.DistributedCache; import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry; @@ -80,8 +79,6 @@ import eu.stratosphere.nephele.ipc.RPC; import eu.stratosphere.nephele.ipc.Server; import eu.stratosphere.nephele.jobgraph.JobID; -import eu.stratosphere.nephele.jobmanager.JobManagerUtils; -import eu.stratosphere.nephele.jobmanager.JobManagerUtils.RevisionInformation; import eu.stratosphere.nephele.net.NetUtils; import eu.stratosphere.nephele.profiling.ProfilingUtils; import eu.stratosphere.nephele.profiling.TaskManagerProfiler; @@ -98,6 +95,7 @@ import eu.stratosphere.runtime.io.channels.ChannelID; import eu.stratosphere.runtime.io.network.ChannelManager; import eu.stratosphere.runtime.io.network.InsufficientResourcesException; +import eu.stratosphere.runtime.util.EnvironmentInformation; import eu.stratosphere.util.StringUtils; /** @@ -173,18 +171,6 @@ public TaskManager(ExecutionMode executionMode) throws Exception { if (executionMode == null) { throw new NullPointerException("Execution mode must not be null."); } - - RevisionInformation rev = JobManagerUtils.getRevisionInformation(); - LOG.info("Starting Stratosphere TaskManager " - + "(Version: " + JobManagerUtils.getVersion() + ", " - + "Rev:" + rev.commitId + ", " - + "Date:" + rev.commitDate + ")"); - - try { - LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName()); - } catch (Throwable t) { - LOG.error("Cannot determine user group information.", t); - } LOG.info("Execution mode: " + executionMode); @@ -513,14 +499,9 @@ public static void main(String[] args) throws IOException { LOG.info("Setting temporary directory to "+tempDirVal); GlobalConfiguration.includeConfiguration(c); } - System.err.println("Configuration "+GlobalConfiguration.getConfiguration()); - LOG.info("Current user "+UserGroupInformation.getCurrentUser().getShortUserName()); - { - // log the available JVM memory - long maxMemoryMiBytes = Runtime.getRuntime().maxMemory() >>> 20; - LOG.info("Starting TaskManager in a JVM with " + maxMemoryMiBytes + " MiBytes maximum heap size."); - } + // print some startup environment info, like user, code revision, etc + EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager"); // Create a new task manager object try { diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java new file mode 100644 index 0000000000000..314deb8d0e886 --- /dev/null +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java @@ -0,0 +1,160 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.runtime.util; + +import java.io.InputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +public class EnvironmentInformation { + + private static final Log LOG = LogFactory.getLog(EnvironmentInformation.class); + + private static final String UNKNOWN = ""; + + private static final String LOG_FILE_OPTION = "-Dlog.file"; + + private static final String LOG_CONFIGURAION_OPTION = "-Dlog4j.configuration"; + + + /** + * Returns the version of the code as String. If version == null, then the JobManager does not run from a + * maven build. An example is a source code checkout, compile, and run from inside an IDE. + * + * @return The version string. + */ + public static String getVersion() { + return EnvironmentInformation.class.getPackage().getImplementationVersion(); + } + + /** + * Returns the code revision (commit and commit date) of Stratosphere. + * + * @return The code revision. + */ + public static RevisionInformation getRevisionInformation() { + RevisionInformation info = new RevisionInformation(); + String revision = UNKNOWN; + String commitDate = UNKNOWN; + try { + Properties properties = new Properties(); + InputStream propFile = EnvironmentInformation.class.getClassLoader().getResourceAsStream(".version.properties"); + if (propFile != null) { + properties.load(propFile); + revision = properties.getProperty("git.commit.id.abbrev"); + commitDate = properties.getProperty("git.commit.time"); + } + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot determine code revision: Unable ro read version property file.", t); + } else { + LOG.info("Cannot determine code revision: Unable ro read version property file."); + } + } + info.commitId = revision; + info.commitDate = commitDate; + return info; + } + + public static class RevisionInformation { + public String commitId; + public String commitDate; + } + + public static String getUserRunning() { + try { + return UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot determine user/group information for the current user.", t); + } else { + LOG.info("Cannot determine user/group information for the current user."); + } + return UNKNOWN; + } + } + + public static long getMaxJvmMemory() { + return Runtime.getRuntime().maxMemory() >>> 20; + } + + + public static String getJvmVersion() { + try { + final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); + return bean.getVmName() + " - " + bean.getVmVendor() + " - " + bean.getSpecVersion() + '/' + bean.getVmVersion(); + } + catch (Throwable t) { + return UNKNOWN; + } + } + + public static String getJvmStartupOptions() { + try { + final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean(); + final StringBuilder bld = new StringBuilder(); + for (String s : bean.getInputArguments()) { + if (!s.startsWith(LOG_FILE_OPTION) && !s.startsWith(LOG_CONFIGURAION_OPTION)) { + bld.append(s).append(' '); + } + } + return bld.toString(); + } + catch (Throwable t) { + return UNKNOWN; + } + } + + + public static void logEnvironmentInfo(Log log, String componentName) { + if (log.isInfoEnabled()) { + RevisionInformation rev = getRevisionInformation(); + String version = getVersion(); + + String user = getUserRunning(); + + String jvmVersion = getJvmVersion(); + String options = getJvmStartupOptions(); + + String javaHome = System.getenv("JAVA_HOME"); + + long memory = getMaxJvmMemory(); + + log.info("-------------------------------------------------------"); + log.info(" Starting " + componentName + " (Version: " + version + ", " + + "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")"); + log.info(" Current user: " + user); + log.info(" JVM: " + jvmVersion); + log.info(" Startup Options: " + options); + log.info(" Maximum heap size: " + memory + " MiBytes"); + log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome)); + log.info("-------------------------------------------------------"); + } + } + + // -------------------------------------------------------------------------------------------- + + private EnvironmentInformation() {} + + public static void main(String[] args) { + logEnvironmentInfo(LOG, "Test"); + } +}