Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behaviors.logMessages implementation #26238

Merged
merged 1 commit into from Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -4,6 +4,7 @@

package akka.actor.typed.javadsl;

import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.event.Logging;
import akka.japi.pf.PFBuilder;
Expand Down Expand Up @@ -72,4 +73,22 @@ public void loggingProvidesMDC() {
testKit.spawn(behavior);
eventFilter.awaitDone(FiniteDuration.create(3, TimeUnit.SECONDS));
}

@Test
johanandren marked this conversation as resolved.
Show resolved Hide resolved
public void logMessagesBehavior() {
Behavior<String> behavior = Behaviors.logMessages(Behaviors.empty());

CustomEventFilter eventFilter =
new CustomEventFilter(
new PFBuilder<Logging.LogEvent, Object>()
.match(
Logging.LogEvent.class,
(event) -> event.message().equals("received message Hello"))
.build(),
1);

ActorRef<String> ref = testKit.spawn(behavior);
ref.tell("Hello");
eventFilter.awaitDone(FiniteDuration.create(3, TimeUnit.SECONDS));
}
}
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.actor.typed

import akka.actor
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.event.Logging
import akka.testkit.EventFilter
import org.scalatest.WordSpecLike

class LogMessagesSpec extends ScalaTestWithActorTestKit("""
akka.loglevel = DEBUG # test verifies debug
akka.loggers = ["akka.testkit.TestEventListener"]
""") with WordSpecLike {

implicit val untyped: actor.ActorSystem = system.toUntyped

"The log messages behavior" should {
"log signals" in {
val behavior: Behavior[Signal] = Behaviors.logMessages(Behaviors.empty)

val ref: ActorRef[Signal] = spawn(behavior)

EventFilter.debug("received signal PostStop", source = ref.path.toString, occurrences = 1).intercept {
testKit.stop(ref)
}
}

"log messages" in {
val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.empty)

val ref: ActorRef[String] = spawn(behavior)

EventFilter.debug("received message Hello", source = ref.path.toString, occurrences = 1).intercept {
ref ! "Hello"
}
}

"log messages with provided log level" in {
val opts = LogOptions().withLevel(Logging.InfoLevel)
val behavior: Behavior[String] = Behaviors.logMessages(opts, Behaviors.empty)

val ref: ActorRef[String] = spawn(behavior)

EventFilter.info("received message Hello", source = ref.path.toString, occurrences = 1).intercept {
ref ! "Hello"
}
}

"log messages with provided logger" in {
val logger = system.log
seglo marked this conversation as resolved.
Show resolved Hide resolved
val opts = LogOptions().withLogger(logger)
val behavior: Behavior[String] = Behaviors.logMessages(opts, Behaviors.empty)

val ref: ActorRef[String] = spawn(behavior)

EventFilter.debug("received message Hello", source = "LogMessagesSpec", occurrences = 1).intercept {
ref ! "Hello"
}
}

"not log messages when not enabled" in {
val opts = LogOptions().withEnabled(false)
val behavior: Behavior[String] = Behaviors.logMessages(opts, Behaviors.empty)

val ref: ActorRef[String] = spawn(behavior)

EventFilter.debug("received message Hello", source = ref.path.toString, occurrences = 0).intercept {
ref ! "Hello"
}
}
seglo marked this conversation as resolved.
Show resolved Hide resolved

"log messages with decorated MDC values" in {
val behavior = Behaviors.withMdc[String](Map("mdc" -> true))(Behaviors.logMessages(Behaviors.empty))

val ref = spawn(behavior)
EventFilter.custom({
case logEvent if logEvent.level == Logging.DebugLevel ⇒
logEvent.message should ===("received message Hello")
logEvent.mdc should ===(Map("mdc" -> true))
true
case other ⇒ system.log.error(s"Unexpected log event: {}", other); false
}, occurrences = 1).intercept {
ref ! "Hello"
}
}
}
}
72 changes: 72 additions & 0 deletions akka-actor-typed/src/main/scala/akka/actor/typed/Logger.scala
Expand Up @@ -4,7 +4,10 @@

package akka.actor.typed

import java.util.Optional

import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.Logging
import akka.event.Logging._

/**
Expand Down Expand Up @@ -41,6 +44,75 @@ object LogMarker {

}

/**
* Logging options when using `Behaviors.logMessages`.
*/
@DoNotInherit
abstract sealed class LogOptions {
/**
* User control whether messages are logged or not. This is useful when you want to have an application configuration
* to control when to log messages.
*/
def withEnabled(enabled: Boolean): LogOptions

/**
* The [[akka.event.Logging.LogLevel]] to use when logging messages.
*/
def withLevel(level: LogLevel): LogOptions

/**
* A [[akka.actor.typed.Logger]] to use when logging messages.
*/
def withLogger(logger: Logger): LogOptions

def enabled: Boolean
def level: LogLevel
def logger: Option[Logger]
/** Java API */
def getLogger: Optional[Logger]
}

/**
* Factories for log options
*/
object LogOptions {
/**
* INTERNAL API
*/
@InternalApi
private[akka] final case class LogOptionsImpl(enabled: Boolean, level: LogLevel, logger: Option[Logger])
extends LogOptions {
/**
* User control whether messages are logged or not. This is useful when you want to have an application configuration
* to control when to log messages.
*/
override def withEnabled(enabled: Boolean): LogOptions = this.copy(enabled = enabled)

/**
* The [[akka.event.Logging.LogLevel]] to use when logging messages.
*/
override def withLevel(level: LogLevel): LogOptions = this.copy(level = level)

/**
* A [[akka.actor.typed.Logger]] to use when logging messages.
*/
override def withLogger(logger: Logger): LogOptions = this.copy(logger = Option(logger))

/** Java API */
override def getLogger: Optional[Logger] = Optional.ofNullable(logger.orNull)
}

/**
* Scala API: Create a new log options with defaults.
*/
def apply(): LogOptions = LogOptionsImpl(enabled = true, Logging.DebugLevel, None)

/**
* Java API: Create a new log options.
*/
def create(): LogOptions = apply()
}

/**
* Logging API provided inside of actors through the actor context.
*
Expand Down
Expand Up @@ -6,8 +6,9 @@ package akka.actor.typed.internal

import akka.actor.typed
import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior }
import akka.actor.typed.LogOptions.LogOptionsImpl
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
import akka.actor.typed.{ ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal, TypedActorContext }
import akka.actor.typed.{ LogOptions, _ }
import akka.annotation.InternalApi
import akka.util.LineNumbers

Expand Down Expand Up @@ -126,6 +127,35 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte

}

/**
* Log all messages for this decorated ReceiveTarget[T] to logger before receiving it ourselves.
*
* INTERNAL API
*/
@InternalApi
private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) extends BehaviorInterceptor[T, T] {

import BehaviorInterceptor._

override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
if (opts.enabled)
opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received message {}", msg)
target(ctx, msg)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The untyped one actually invokes receive and looks at the result to be able to say if it was handled/unhandled in the log message, should we perhaps do that here as well? We could actually even log what the new behavior/state is. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that LoggingReceive uses isDefinedAt to see if a message is handled an encodes that into the message. I wasn't sure how to do that with the Behavior because I don't know how to access the underlying PartialFunction.

We could actually even log what the new behavior/state is.

I'm not sure what you mean by this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of an idea than entirely sure we should do this but,
instead of logging before (or in addition to logging before perhaps) we could do something like this:

val newBehavior = target(ctx, msg)
val handled = if (newBehavior == Behavior.UnhandledBehavior) "unhandled" else "handled"
log.debug("received message {}, was {}")
// or even the full state, perhaps behind a separate config flag
if (opts.logNewState)
  log.debug("handled message {}, new behavior {}", msg, newBehavior)
newBehavior

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be useful. I can include it if you like.

I have a question regarding the following line

val handled = if (newBehavior == Behavior.UnhandledBehavior) "unhandled" else "handled"

This is checking if the newBehavior is a Behavior.UnhandledBehavior, but in LoggingReceive the logic is checking whether a message is actually handled by a behavior, which can be done using PartialFunction.isDefinedAt. Is there a way to access the underlying PF of the behavior to do a similar check?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the typed case we don't know if it is a PF at all, so no, the only way to know that a message was unhandled is to call the behavior and look at the return value. Maybe we could skip that now and look into what would be useful in a follow up PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #26271 to track that


override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
if (opts.enabled)
opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received signal {}", signal)
target(ctx, signal)
seglo marked this conversation as resolved.
Show resolved Hide resolved
}

// only once in the same behavior stack
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match {
case LogMessagesInterceptor(`opts`) ⇒ true
case _ ⇒ false
}
}

/**
* INTERNAL API
*/
Expand Down
Expand Up @@ -7,9 +7,8 @@ package akka.actor.typed.javadsl
import java.util.Collections
import java.util.function.{ Function ⇒ JFunction }

import akka.actor.typed.{ ActorRef, Behavior, BehaviorInterceptor, Signal, SupervisorStrategy }
import akka.actor.typed._
import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl, WithMdcBehaviorInterceptor }
import akka.actor.typed.scaladsl
import akka.annotation.ApiMayChange
import akka.japi.function.{ Function2 ⇒ JapiFunction2 }
import akka.japi.pf.PFBuilder
Expand Down Expand Up @@ -185,6 +184,22 @@ object Behaviors {
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
scaladsl.Behaviors.monitor(monitor, behavior)

/**
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
* [[akka.actor.typed.LogOptions]] default configuration before invoking the wrapped behavior.
* To include an MDC context then first wrap `logMessages` with `withMDC`.
*/
def logMessages[T](behavior: Behavior[T]): Behavior[T] =
scaladsl.Behaviors.logMessages(behavior)

/**
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
* [[akka.actor.typed.LogOptions]] configuration before invoking the wrapped behavior.
* To include an MDC context then first wrap `logMessages` with `withMDC`.
*/
def logMessages[T](logOptions: LogOptions, behavior: Behavior[T]): Behavior[T] =
scaladsl.Behaviors.logMessages(logOptions, behavior)

/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
* initial state) whenever it throws an exception of the given class or a
Expand Down
Expand Up @@ -157,6 +157,22 @@ object Behaviors {
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.intercept(new MonitorInterceptor[T](monitor))(behavior)

/**
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
* [[akka.actor.typed.LogOptions]] default configuration before invoking the wrapped behavior.
* To include an MDC context then first wrap `logMessages` with `withMDC`.
*/
def logMessages[T](behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.intercept(new LogMessagesInterceptor[T](LogOptions()))(behavior)

/**
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
* [[akka.actor.typed.LogOptions]] configuration before invoking the wrapped behavior.
* To include an MDC context then first wrap `logMessages` with `withMDC`.
*/
def logMessages[T](logOptions: LogOptions, behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.intercept(new LogMessagesInterceptor[T](logOptions))(behavior)

/**
* Wrap the given behavior with the given [[SupervisorStrategy]] for
* the given exception.
Expand Down