Skip to content

Commit

Permalink
Add LogWithMarker to Akka Stream #28450
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Mar 5, 2020
1 parent f1dbb79 commit c46861e
Show file tree
Hide file tree
Showing 16 changed files with 937 additions and 9 deletions.
Expand Up @@ -18,6 +18,8 @@ Log elements flowing through the stream as well as completion and erroring. By d
completion signals are logged on debug level, and errors are logged on Error level.
This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow.

See also @ref:[logWithMarker](logWithMarker.md).

## Example

Scala
Expand Down
@@ -0,0 +1,42 @@
# logWithMarker

Log elements flowing through the stream as well as completion and erroring.

@ref[Simple operators](../index.md#simple-operators)

## Signature


Scala
: @@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #logWithMarker }

Java
: @@snip [FlowLogWithMarkerTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowLogWithMarkerTest.java) { #signature }

## Description

Log elements flowing through the stream as well as completion and erroring. By default element and
completion signals are logged on debug level, and errors are logged on Error level.
This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow.

See also @ref:[log](log.md).

## Example

Scala
: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/LogWithMarker.scala) { #logWithMarker }

Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #logWithMarker }

## Reactive Streams semantics

@@@div { .callout }

**emits** when upstream emits

**backpressures** when downstream backpressures

**completes** when upstream completes

@@@
2 changes: 2 additions & 0 deletions akka-docs/src/main/paradox/stream/operators/index.md
Expand Up @@ -164,6 +164,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="limit"></a>@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.|
|Source/Flow|<a name="limitweighted"></a>@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Limit the total weight of incoming elements|
|Source/Flow|<a name="log"></a>@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.|
|Source/Flow|<a name="logwithmarker"></a>@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.|
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow|<a name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.|
Expand Down Expand Up @@ -467,6 +468,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [monitor](Source-or-Flow/monitor.md)
* [initialDelay](Source-or-Flow/initialDelay.md)
* [log](Source-or-Flow/log.md)
* [logWithMarker](Source-or-Flow/logWithMarker.md)
* [asFlowWithContext](Flow/asFlowWithContext.md)
* [fromSinkAndSource](Flow/fromSinkAndSource.md)
* [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md)
Expand Down
16 changes: 16 additions & 0 deletions akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
Expand Up @@ -35,12 +35,14 @@
// #zip

// #log
import akka.event.LogMarker;
import akka.stream.Attributes;

// #log

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -61,6 +63,20 @@ void logExample() {
;
}

void logWithMarkerExample() {
Flow.of(String.class)
// #logWithMarker
.logWithMarker(
"myStream", (e) -> LogMarker.create("myMarker", Collections.singletonMap("element", e)))
.addAttributes(
Attributes.createLogLevels(
Attributes.logLevelOff(), // onElement
Attributes.logLevelInfo(), // onFinish
Attributes.logLevelError())) // onFailure
// #logWithMarker
;
}

void zipWithIndexExample() {
// #zip-with-index
Source.from(Arrays.asList("apple", "orange", "banana"))
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.stream.operators.sourceorflow

import akka.stream.scaladsl.Flow
//#logWithMarker
import akka.event.LogMarker
import akka.stream.Attributes

//#logWithMarker

object LogWithMarker {
def logWithMarkerExample(): Unit = {
Flow[String]
//#logWithMarker
.logWithMarker(name = "myStream", e => LogMarker(name = "myMarker", properties = Map("element" -> e)))
.addAttributes(
Attributes.logLevels(
onElement = Attributes.LogLevels.Off,
onFinish = Attributes.LogLevels.Info,
onFailure = Attributes.LogLevels.Error))
//#logWithMarker
}
}
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.javadsl;

import akka.NotUsed;
import akka.event.LogMarker;
import akka.event.MarkerLoggingAdapter;
import akka.japi.function.Function;

public class FlowLogWithMarkerTest {

public static // #signature
<In> Flow<In, In, NotUsed> logWithMarker(
String name,
Function<In, LogMarker> marker,
Function<In, Object> extract,
MarkerLoggingAdapter log)
// #signature
{
return Flow.<In>create().logWithMarker(name, marker, extract, log);
}
}
@@ -0,0 +1,197 @@
/*
* Copyright (C) 2014-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.scaladsl

import akka.NotUsed
import akka.event.{ DummyClassForStringSources, LogMarker, Logging }
import akka.stream.ActorAttributes._
import akka.stream.Attributes.LogLevels
import akka.stream.Supervision._
import akka.stream._
import akka.stream.testkit.{ ScriptedTest, StreamSpec }
import akka.testkit.TestProbe

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace

class FlowLogWithMarkerSpec extends StreamSpec("""
akka.loglevel = DEBUG # test verifies logging
""") with ScriptedTest {

val logProbe = {
val p = TestProbe()
system.eventStream.subscribe(p.ref, classOf[Logging.LogEvent])
p
}

"A LogWithMarker" must {

val supervisorPath = SystemMaterializer(system).materializer.supervisor.path
val LogSrc = s"akka.stream.LogWithMarker($supervisorPath)"
val LogClazz = classOf[Materializer]
val mdc = Logging.emptyMDC

"on Flow" must {

"debug each element" in {
val debugging = Flow[Int].logWithMarker("my-debug", _ => LogMarker("my-marker"))
Source(1 to 2).via(debugging).runWith(Sink.ignore)

logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 1", mdc, LogMarker("my-marker")))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 2", mdc, LogMarker("my-marker")))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished."))
}

"allow disabling element logging" in {
val disableElementLogging =
Attributes.logLevels(onElement = LogLevels.Off, onFinish = Logging.DebugLevel, onFailure = Logging.DebugLevel)

val debugging = Flow[Int].logWithMarker("my-debug", _ => LogMarker("my-marker"))
Source(1 to 2).via(debugging).withAttributes(disableElementLogging).runWith(Sink.ignore)

logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished."))
}

}

"on javadsl.Flow" must {
"debug each element" in {
val log = Logging.withMarker(system, "com.example.ImportantLogger")

val debugging: javadsl.Flow[Integer, Integer, NotUsed] = javadsl.Flow
.of(classOf[Integer])
.logWithMarker("log-1", _ => LogMarker("marker-1"))
.logWithMarker("log-2", _ => LogMarker("marker-2"), new akka.japi.function.Function[Integer, Integer] {
def apply(i: Integer) = i
})
.logWithMarker("log-3", _ => LogMarker("marker-3"), new akka.japi.function.Function[Integer, Integer] {
def apply(i: Integer) = i
}, log)
.logWithMarker("log-4", _ => LogMarker("marker-4"), log)

javadsl.Source.single[Integer](1).via(debugging).runWith(javadsl.Sink.ignore[Integer](), system)

var counter = 0
var finishCounter = 0
import scala.concurrent.duration._
logProbe.fishForMessage(3.seconds) {
case Logging.Debug(_, _, msg: String) if msg contains "Element: 1" =>
counter += 1
counter == 4 && finishCounter == 4

case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished" =>
finishCounter += 1
counter == 4 && finishCounter == 4
}
}
}

"on Source" must {
"debug each element" in {
Source(1 to 2).logWithMarker("flow-s2", _ => LogMarker("marker-s2")).runWith(Sink.ignore)

logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 1"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 2"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Upstream finished."))
}

"allow extracting value to be logged" in {
case class Complex(a: Int, b: String)
Source.single(Complex(1, "42")).logWithMarker("flow-s3", _ => LogMarker("marker-s3"), _.b).runWith(Sink.ignore)

logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Element: 42"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Upstream finished."))
}

"log upstream failure" in {
val cause = new TestException
Source.failed(cause).logWithMarker("flow-4", (_: Any) => LogMarker("marker-4")).runWith(Sink.ignore)
logProbe.expectMsg(Logging.Error(cause, LogSrc, LogClazz, "[flow-4] Upstream failed."))
}

"allow passing in custom LoggingAdapter" in {
val log = Logging.withMarker(system, "com.example.ImportantLogger")
val marker = LogMarker("marker-5")

Source.single(42).logWithMarker("flow-5", _ => marker)(log).runWith(Sink.ignore)

val src = "com.example.ImportantLogger(akka://FlowLogWithMarkerSpec)"
val clazz = classOf[DummyClassForStringSources]
logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Element: 42", mdc, marker))
logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Upstream finished."))
}

"allow configuring log levels via Attributes" in {
val logAttrs = Attributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel)

Source
.single(42)
.logWithMarker("flow-6", _ => LogMarker("marker-6"))
.withAttributes(Attributes
.logLevels(onElement = Logging.WarningLevel, onFinish = Logging.InfoLevel, onFailure = Logging.DebugLevel))
.runWith(Sink.ignore)

logProbe.expectMsg(Logging.Warning(LogSrc, LogClazz, "[flow-6] Element: 42", mdc, LogMarker("marker-6")))
logProbe.expectMsg(Logging.Info(LogSrc, LogClazz, "[flow-6] Upstream finished."))

val cause = new TestException
Source
.failed(cause)
.logWithMarker("flow-6e", (_: Any) => LogMarker("marker-6e"))
.withAttributes(logAttrs)
.runWith(Sink.ignore)
logProbe.expectMsg(
Logging
.Debug(LogSrc, LogClazz, "[flow-6e] Upstream failed, cause: FlowLogWithMarkerSpec$TestException: Boom!"))
}

"follow supervision strategy when exception thrown" in {
val ex = new RuntimeException() with NoStackTrace
val future = Source(1 to 5)
.logWithMarker("hi", _ => LogMarker("marker-hi"), _ => throw ex)
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.fold(0)(_ + _))
Await.result(future, 500.millis) shouldEqual 0
}
}

"on javadsl.Source" must {
"debug each element" in {
val log = Logging.withMarker(system, "com.example.ImportantLogger")

javadsl.Source
.single[Integer](1)
.logWithMarker("log-1", _ => LogMarker("marker-1"))
.logWithMarker("log-2", _ => LogMarker("marker-2"), new akka.japi.function.Function[Integer, Integer] {
def apply(i: Integer) = i
})
.logWithMarker("log-3", _ => LogMarker("marker-3"), new akka.japi.function.Function[Integer, Integer] {
def apply(i: Integer) = i
}, log)
.logWithMarker("log-4", _ => LogMarker("marker-4"), log)
.runWith(javadsl.Sink.ignore[Integer](), system)

var counter = 1
import scala.concurrent.duration._
logProbe.fishForMessage(3.seconds) {
case Logging.Debug(_, _, msg: String) if msg contains "Element: 1" =>
counter += 1
counter == 4

case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished" =>
false
}
}
}

}

final class TestException extends RuntimeException("Boom!") with NoStackTrace

}

0 comments on commit c46861e

Please sign in to comment.