Skip to content

Commit

Permalink
Merge pull request #17490 from akka/wip-17010-logger-mailbox-patriknw
Browse files Browse the repository at this point in the history
=act #17010 Drain log messages on system shutdown
  • Loading branch information
patriknw committed Jun 17, 2015
2 parents 1eaebce + 156e2a0 commit d19f694
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 4 deletions.
40 changes: 40 additions & 0 deletions akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ object LoggerSpec {
}
""").withFallback(AkkaSpec.testConf)

val slowConfig = ConfigFactory.parseString("""
akka {
stdout-loglevel = "ERROR"
loglevel = "ERROR"
loggers = ["akka.event.LoggerSpec$SlowLogger"]
}
""").withFallback(AkkaSpec.testConf)

val noLoggingConfig = ConfigFactory.parseString("""
akka {
stdout-loglevel = "OFF"
Expand Down Expand Up @@ -91,6 +99,19 @@ object LoggerSpec {
}
}

class SlowLogger extends Logging.DefaultLogger {
override def aroundReceive(r: Receive, msg: Any): Unit = {
msg match {
case event: LogEvent
if (event.message.toString.startsWith("msg1"))
Thread.sleep(500) // slow
super.aroundReceive(r, msg)
case _ super.aroundReceive(r, msg)
}

}
}

class ActorWithMDC extends Actor with DiagnosticActorLogging {
var reqId = 0

Expand Down Expand Up @@ -148,6 +169,25 @@ class LoggerSpec extends WordSpec with Matchers {
val out = createSystemAndLogToBuffer("defaultLogger", defaultConfig, true)
out.size should be > (0)
}

"drain logger queue on system shutdown" in {
val out = new java.io.ByteArrayOutputStream()
Console.withOut(out) {
val sys = ActorSystem("defaultLogger", slowConfig)
sys.log.error("msg1")
sys.log.error("msg2")
sys.log.error("msg3")
TestKit.shutdownActorSystem(sys, verifySystemShutdown = true)
out.flush()
out.close()
}

val logMessages = new String(out.toByteArray).split("\n")
logMessages.head should include("msg1")
logMessages.last should include("msg3")
logMessages.size should ===(3)
}

}

"An actor system configured with the logging turned off" must {
Expand Down
9 changes: 9 additions & 0 deletions akka-actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ akka {
akka.actor.mailbox.unbounded-control-aware-queue-based
"akka.dispatch.BoundedControlAwareMessageQueueSemantics" =
akka.actor.mailbox.bounded-control-aware-queue-based
"akka.event.LoggerMessageQueueSemantics" =
akka.actor.mailbox.logger-queue
}

unbounded-queue-based {
Expand Down Expand Up @@ -452,6 +454,13 @@ akka {
# com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.BoundedControlAwareMailbox"
}

# The LoggerMailbox will drain all messages in the mailbox
# when the system is shutdown and deliver them to the StandardOutLogger.
# Do not change this unless you know what you are doing.
logger-queue {
mailbox-type = "akka.event.LoggerMailboxType"
}
}

debug {
Expand Down
48 changes: 48 additions & 0 deletions akka-actor/src/main/scala/akka/event/LoggerMailbox.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event

import akka.dispatch.MessageQueue
import akka.dispatch.MailboxType
import akka.dispatch.UnboundedMailbox
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.dispatch.ProducesMessageQueue

trait LoggerMessageQueueSemantics

/**
* INTERNAL API
*/
private[akka] class LoggerMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType
with ProducesMessageQueue[LoggerMailbox] {

override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
case (Some(o), Some(s)) new LoggerMailbox(o, s)
case _ throw new IllegalArgumentException("no mailbox owner or system given")
}
}

/**
* INTERNAL API
*/
private[akka] class LoggerMailbox(owner: ActorRef, system: ActorSystem)
extends UnboundedMailbox.MessageQueue with LoggerMessageQueueSemantics {

override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
// Drain all remaining messages to the StandardOutLogger.
// cleanUp is called after switching out the mailbox, which is why
// this kind of look works without a limit.
while (envelope ne null) {
// Logging.StandardOutLogger is a MinimalActorRef, i.e. not a "real" actor
Logging.StandardOutLogger.tell(envelope.message, envelope.sender)
envelope = dequeue
}
}
super.cleanUp(owner, deadLetters)
}
}
3 changes: 2 additions & 1 deletion akka-actor/src/main/scala/akka/event/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import scala.concurrent.Await
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import java.util.Locale
import akka.dispatch.RequiresMessageQueue

/**
* This trait brings log level handling to the EventStream: it reads the log
Expand Down Expand Up @@ -824,7 +825,7 @@ object Logging {
* <code>akka.loggers</code> is not set, it defaults to just this
* logger.
*/
class DefaultLogger extends Actor with StdOutLogger {
class DefaultLogger extends Actor with StdOutLogger with RequiresMessageQueue[LoggerMessageQueueSemantics] {
override def receive: Receive = {
case InitializeLogger(_) sender() ! LoggerInitialized
case event: LogEvent print(event)
Expand Down
6 changes: 4 additions & 2 deletions akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import akka.actor._
import akka.event.LoggingAdapter
import java.util.logging
import scala.concurrent.{ ExecutionContext, Future }
import akka.dispatch.RequiresMessageQueue
import akka.event.LoggerMessageQueueSemantics

/**
* Makes the Akka `Logging` API available as the `log`
Expand All @@ -30,7 +32,7 @@ trait JavaLogging {
/**
* `java.util.logging` logger.
*/
class JavaLogger extends Actor {
class JavaLogger extends Actor with RequiresMessageQueue[LoggerMessageQueueSemantics] {

def receive = {
case event @ Error(cause, _, _, _) log(logging.Level.SEVERE, cause, event)
Expand Down Expand Up @@ -124,4 +126,4 @@ trait JavaLoggingAdapter extends LoggingAdapter {
}
}

}
}
4 changes: 3 additions & 1 deletion akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import akka.event.DummyClassForStringSources
import akka.util.Helpers
import akka.event.LoggingFilter
import akka.event.EventStream
import akka.dispatch.RequiresMessageQueue
import akka.event.LoggerMessageQueueSemantics

/**
* Base trait for all classes that wants to be able use the SLF4J logging infrastructure.
Expand Down Expand Up @@ -53,7 +55,7 @@ object Logger {
* The thread in which the logging was performed is captured in
* Mapped Diagnostic Context (MDC) with attribute name "sourceThread".
*/
class Slf4jLogger extends Actor with SLF4JLogging {
class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[LoggerMessageQueueSemantics] {

val mdcThreadAttributeName = "sourceThread"
val mdcActorSystemAttributeName = "sourceActorSystem"
Expand Down

0 comments on commit d19f694

Please sign in to comment.