Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR][YARN] Make memLimitExceededLogMessage more clean #23030

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -598,13 +597,20 @@ private[yarn] class YarnAllocator(
(false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))
val vmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX virtual memory used".r
val diag = vmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val message = "Container killed by YARN for exceeding virtual memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key} or disabling " +
s"${YarnConfiguration.NM_VMEM_CHECK_ENABLED} because of YARN-4714."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you removed the other config option I asked you to add... :-/

(true, message)
case PMEM_EXCEEDED_EXIT_CODE =>
(true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
val pmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX physical memory used".r
val diag = pmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val message = "Container killed by YARN for exceeding physical memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
(true, message)
case _ =>
// all the failures which not covered above, like:
// disk failure, kill by app master or resource manager, ...
Expand Down Expand Up @@ -735,18 +741,6 @@ private[yarn] class YarnAllocator(

private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val PMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
val VMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
val VMEM_EXCEEDED_EXIT_CODE = -103
val PMEM_EXCEEDED_EXIT_CODE = -104

def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
val matcher = pattern.matcher(diagnostics)
val diag = if (matcher.find()) " " + matcher.group() + "." else ""
s"Container killed by YARN for exceeding memory limits. $diag " +
"Consider boosting spark.yarn.executor.memoryOverhead or " +
"disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714."
}
}
Expand Up @@ -29,7 +29,6 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.rpc.RpcEndpointRef
Expand Down Expand Up @@ -377,17 +376,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava)
}

test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
"5.8 GB of 4.2 GB virtual memory used. Killing container."
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}

test("window based failure executor counting") {
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
val handler = createAllocator(4)
Expand Down