diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md index cb3d9263a76..2d9fa253463 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md @@ -18,6 +18,8 @@ Log elements flowing through the stream as well as completion and erroring. By d completion signals are logged on debug level, and errors are logged on Error level. This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow. +See also @ref:[logWithMarker](logWithMarker.md). + ## Example Scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/logWithMarker.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/logWithMarker.md new file mode 100644 index 00000000000..6f7d5a80ce6 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/logWithMarker.md @@ -0,0 +1,42 @@ +# logWithMarker + +Log elements flowing through the stream as well as completion and erroring. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + + +Scala +: @@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #logWithMarker } + +Java +: @@snip [FlowLogWithMarkerTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowLogWithMarkerTest.java) { #signature } + +## Description + +Log elements flowing through the stream as well as completion and erroring. By default element and +completion signals are logged on debug level, and errors are logged on Error level. +This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow. + +See also @ref:[log](log.md). + +## Example + +Scala +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/LogWithMarker.scala) { #logWithMarker } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #logWithMarker } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when upstream emits + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 784d2ad61d1..046452d28c4 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -164,6 +164,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.| |Source/Flow|@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Limit the total weight of incoming elements| |Source/Flow|@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.| +|Source/Flow|@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.| |Source/Flow|@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.| |Source/Flow|@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.| @@ -467,6 +468,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [monitor](Source-or-Flow/monitor.md) * [initialDelay](Source-or-Flow/initialDelay.md) * [log](Source-or-Flow/log.md) +* [logWithMarker](Source-or-Flow/logWithMarker.md) * [asFlowWithContext](Flow/asFlowWithContext.md) * [fromSinkAndSource](Flow/fromSinkAndSource.md) * [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index eb796409b94..df3711f4690 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -35,12 +35,14 @@ // #zip // #log +import akka.event.LogMarker; import akka.stream.Attributes; // #log import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -61,6 +63,20 @@ void logExample() { ; } + void logWithMarkerExample() { + Flow.of(String.class) + // #logWithMarker + .logWithMarker( + "myStream", (e) -> LogMarker.create("myMarker", Collections.singletonMap("element", e))) + .addAttributes( + Attributes.createLogLevels( + Attributes.logLevelOff(), // onElement + Attributes.logLevelInfo(), // onFinish + Attributes.logLevelError())) // onFailure + // #logWithMarker + ; + } + void zipWithIndexExample() { // #zip-with-index Source.from(Arrays.asList("apple", "orange", "banana")) diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/LogWithMarker.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/LogWithMarker.scala new file mode 100644 index 00000000000..c087f819e7f --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/LogWithMarker.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.stream.scaladsl.Flow +//#logWithMarker +import akka.event.LogMarker +import akka.stream.Attributes + +//#logWithMarker + +object LogWithMarker { + def logWithMarkerExample(): Unit = { + Flow[String] + //#logWithMarker + .logWithMarker(name = "myStream", e => LogMarker(name = "myMarker", properties = Map("element" -> e))) + .addAttributes( + Attributes.logLevels( + onElement = Attributes.LogLevels.Off, + onFinish = Attributes.LogLevels.Info, + onFailure = Attributes.LogLevels.Error)) + //#logWithMarker + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowLogWithMarkerTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowLogWithMarkerTest.java new file mode 100644 index 00000000000..fb8e10af5b5 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowLogWithMarkerTest.java @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.event.LogMarker; +import akka.event.MarkerLoggingAdapter; +import akka.japi.function.Function; + +public class FlowLogWithMarkerTest { + + public static // #signature + Flow logWithMarker( + String name, + Function marker, + Function extract, + MarkerLoggingAdapter log) + // #signature + { + return Flow.create().logWithMarker(name, marker, extract, log); + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogWithMarkerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogWithMarkerSpec.scala new file mode 100644 index 00000000000..2218ec7495a --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogWithMarkerSpec.scala @@ -0,0 +1,197 @@ +/* + * Copyright (C) 2014-2020 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.event.{ DummyClassForStringSources, LogMarker, Logging } +import akka.stream.ActorAttributes._ +import akka.stream.Attributes.LogLevels +import akka.stream.Supervision._ +import akka.stream._ +import akka.stream.testkit.{ ScriptedTest, StreamSpec } +import akka.testkit.TestProbe + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class FlowLogWithMarkerSpec extends StreamSpec(""" + akka.loglevel = DEBUG # test verifies logging + """) with ScriptedTest { + + val logProbe = { + val p = TestProbe() + system.eventStream.subscribe(p.ref, classOf[Logging.LogEvent]) + p + } + + "A LogWithMarker" must { + + val supervisorPath = SystemMaterializer(system).materializer.supervisor.path + val LogSrc = s"akka.stream.LogWithMarker($supervisorPath)" + val LogClazz = classOf[Materializer] + val mdc = Logging.emptyMDC + + "on Flow" must { + + "debug each element" in { + val debugging = Flow[Int].logWithMarker("my-debug", _ => LogMarker("my-marker")) + Source(1 to 2).via(debugging).runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 1", mdc, LogMarker("my-marker"))) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 2", mdc, LogMarker("my-marker"))) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished.")) + } + + "allow disabling element logging" in { + val disableElementLogging = + Attributes.logLevels(onElement = LogLevels.Off, onFinish = Logging.DebugLevel, onFailure = Logging.DebugLevel) + + val debugging = Flow[Int].logWithMarker("my-debug", _ => LogMarker("my-marker")) + Source(1 to 2).via(debugging).withAttributes(disableElementLogging).runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished.")) + } + + } + + "on javadsl.Flow" must { + "debug each element" in { + val log = Logging.withMarker(system, "com.example.ImportantLogger") + + val debugging: javadsl.Flow[Integer, Integer, NotUsed] = javadsl.Flow + .of(classOf[Integer]) + .logWithMarker("log-1", _ => LogMarker("marker-1")) + .logWithMarker("log-2", _ => LogMarker("marker-2"), new akka.japi.function.Function[Integer, Integer] { + def apply(i: Integer) = i + }) + .logWithMarker("log-3", _ => LogMarker("marker-3"), new akka.japi.function.Function[Integer, Integer] { + def apply(i: Integer) = i + }, log) + .logWithMarker("log-4", _ => LogMarker("marker-4"), log) + + javadsl.Source.single[Integer](1).via(debugging).runWith(javadsl.Sink.ignore[Integer](), system) + + var counter = 0 + var finishCounter = 0 + import scala.concurrent.duration._ + logProbe.fishForMessage(3.seconds) { + case Logging.Debug(_, _, msg: String) if msg contains "Element: 1" => + counter += 1 + counter == 4 && finishCounter == 4 + + case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished" => + finishCounter += 1 + counter == 4 && finishCounter == 4 + } + } + } + + "on Source" must { + "debug each element" in { + Source(1 to 2).logWithMarker("flow-s2", _ => LogMarker("marker-s2")).runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 1")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 2")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Upstream finished.")) + } + + "allow extracting value to be logged" in { + case class Complex(a: Int, b: String) + Source.single(Complex(1, "42")).logWithMarker("flow-s3", _ => LogMarker("marker-s3"), _.b).runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Element: 42")) + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Upstream finished.")) + } + + "log upstream failure" in { + val cause = new TestException + Source.failed(cause).logWithMarker("flow-4", (_: Any) => LogMarker("marker-4")).runWith(Sink.ignore) + logProbe.expectMsg(Logging.Error(cause, LogSrc, LogClazz, "[flow-4] Upstream failed.")) + } + + "allow passing in custom LoggingAdapter" in { + val log = Logging.withMarker(system, "com.example.ImportantLogger") + val marker = LogMarker("marker-5") + + Source.single(42).logWithMarker("flow-5", _ => marker)(log).runWith(Sink.ignore) + + val src = "com.example.ImportantLogger(akka://FlowLogWithMarkerSpec)" + val clazz = classOf[DummyClassForStringSources] + logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Element: 42", mdc, marker)) + logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Upstream finished.")) + } + + "allow configuring log levels via Attributes" in { + val logAttrs = Attributes.logLevels( + onElement = Logging.WarningLevel, + onFinish = Logging.InfoLevel, + onFailure = Logging.DebugLevel) + + Source + .single(42) + .logWithMarker("flow-6", _ => LogMarker("marker-6")) + .withAttributes(Attributes + .logLevels(onElement = Logging.WarningLevel, onFinish = Logging.InfoLevel, onFailure = Logging.DebugLevel)) + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Warning(LogSrc, LogClazz, "[flow-6] Element: 42", mdc, LogMarker("marker-6"))) + logProbe.expectMsg(Logging.Info(LogSrc, LogClazz, "[flow-6] Upstream finished.")) + + val cause = new TestException + Source + .failed(cause) + .logWithMarker("flow-6e", (_: Any) => LogMarker("marker-6e")) + .withAttributes(logAttrs) + .runWith(Sink.ignore) + logProbe.expectMsg( + Logging + .Debug(LogSrc, LogClazz, "[flow-6e] Upstream failed, cause: FlowLogWithMarkerSpec$TestException: Boom!")) + } + + "follow supervision strategy when exception thrown" in { + val ex = new RuntimeException() with NoStackTrace + val future = Source(1 to 5) + .logWithMarker("hi", _ => LogMarker("marker-hi"), _ => throw ex) + .withAttributes(supervisionStrategy(resumingDecider)) + .runWith(Sink.fold(0)(_ + _)) + Await.result(future, 500.millis) shouldEqual 0 + } + } + + "on javadsl.Source" must { + "debug each element" in { + val log = Logging.withMarker(system, "com.example.ImportantLogger") + + javadsl.Source + .single[Integer](1) + .logWithMarker("log-1", _ => LogMarker("marker-1")) + .logWithMarker("log-2", _ => LogMarker("marker-2"), new akka.japi.function.Function[Integer, Integer] { + def apply(i: Integer) = i + }) + .logWithMarker("log-3", _ => LogMarker("marker-3"), new akka.japi.function.Function[Integer, Integer] { + def apply(i: Integer) = i + }, log) + .logWithMarker("log-4", _ => LogMarker("marker-4"), log) + .runWith(javadsl.Sink.ignore[Integer](), system) + + var counter = 1 + import scala.concurrent.duration._ + logProbe.fishForMessage(3.seconds) { + case Logging.Debug(_, _, msg: String) if msg contains "Element: 1" => + counter += 1 + counter == 4 + + case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished" => + false + } + } + } + + } + + final class TestException extends RuntimeException("Boom!") with NoStackTrace + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 5dff353598b..ebb3d3dc8dc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -10,7 +10,7 @@ import akka.actor.{ ActorRef, Terminated } import akka.annotation.{ DoNotInherit, InternalApi } import akka.dispatch.ExecutionContexts import akka.event.Logging.LogLevel -import akka.event.{ LogSource, Logging, LoggingAdapter } +import akka.event.{ LogMarker, LogSource, Logging, LoggingAdapter, MarkerLoggingAdapter } import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage @@ -1573,6 +1573,126 @@ private[stream] object Collect { LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) } +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class LogWithMarker[T]( + name: String, + marker: T => LogMarker, + extract: T => Any, + logAdapter: Option[MarkerLoggingAdapter]) + extends SimpleLinearGraphStage[T] { + + override def toString = "LogWithMarker" + + // TODO more optimisations can be done here - prepare logOnPush function etc + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler with InHandler { + + import LogWithMarker._ + + private var logLevels: LogLevels = _ + private var log: MarkerLoggingAdapter = _ + + def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + + override def preStart(): Unit = { + logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels) + log = logAdapter match { + case Some(l) => l + case _ => + Logging.withMarker(materializer.system, materializer)(fromMaterializer) + } + } + + override def onPush(): Unit = { + try { + val elem = grab(in) + if (isEnabled(logLevels.onElement)) + log.log(marker(elem), logLevels.onElement, log.format("[{}] Element: {}", name, extract(elem))) + + push(out, elem) + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => failStage(ex) + case _ => pull(in) + } + } + } + + override def onPull(): Unit = pull(in) + + override def onUpstreamFailure(cause: Throwable): Unit = { + if (isEnabled(logLevels.onFailure)) + logLevels.onFailure match { + case Logging.ErrorLevel => log.error(cause, "[{}] Upstream failed.", name) + case level => + log.log( + level, + "[{}] Upstream failed, cause: {}: {}", + name, + Logging.simpleName(cause.getClass), + cause.getMessage) + } + + super.onUpstreamFailure(cause) + } + + override def onUpstreamFinish(): Unit = { + if (isEnabled(logLevels.onFinish)) + log.log(logLevels.onFinish, "[{}] Upstream finished.", name) + + super.onUpstreamFinish() + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + if (isEnabled(logLevels.onFinish)) + log.log( + logLevels.onFinish, + "[{}] Downstream finished, cause: {}: {}", + name, + Logging.simpleName(cause.getClass), + cause.getMessage) + + super.onDownstreamFinish(cause: Throwable) + } + + private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt + + setHandlers(in, out, this) + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object LogWithMarker { + + /** + * Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]] + * More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source. + */ + final val fromMaterializer = new LogSource[Materializer] { + + // do not expose private context classes (of OneBoundedInterpreter) + override def getClazz(t: Materializer): Class[_] = classOf[Materializer] + + override def genString(t: Materializer): String = { + try s"$DefaultLoggerName(${t.supervisor.path})" + catch { + case _: Exception => LogSource.fromString.genString(DefaultLoggerName) + } + } + + } + + private final val DefaultLoggerName = "akka.stream.LogWithMarker" + private final val OffInt = LogLevels.Off.asInt + private final val DefaultLogLevels = + LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 3a651cbd494..ec89ba2925d 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -14,7 +14,7 @@ import java.util.concurrent.CompletableFuture import akka.actor.ActorRef import akka.actor.ClassicActorSystemProvider import akka.dispatch.ExecutionContexts -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.Pair import akka.japi.Util import akka.japi.function @@ -3707,6 +3707,100 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def log(name: String): javadsl.Flow[In, Out, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any], + log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log)) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any]): javadsl.Flow[In, Out, Mat] = + this.logWithMarker(name, marker, extract, null) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow. + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): javadsl.Flow[In, Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** * Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]] * which implements the operations encapsulated by this Flow. Every materialization results in a new Processor diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 7ec4d555d58..22e331f989b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -6,7 +6,7 @@ package akka.stream.javadsl import akka.japi.{ function, Pair, Util } import akka.stream._ -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.util.ConstantFun import scala.annotation.unchecked.uncheckedVariance @@ -238,6 +238,50 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( def log(name: String): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.logWithMarker]]. + * + * @see [[akka.stream.javadsl.Flow.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: function.Function2[Out, CtxOut, LogMarker], + extract: function.Function[Out, Any], + log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.logWithMarker]]. + * + * @see [[akka.stream.javadsl.Flow.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: function.Function2[Out, CtxOut, LogMarker], + extract: function.Function[Out, Any]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + this.logWithMarker(name, marker, extract, null) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.logWithMarker]]. + * + * @see [[akka.stream.javadsl.Flow.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: function.Function2[Out, CtxOut, LogMarker], + log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.logWithMarker]]. + * + * @see [[akka.stream.javadsl.Flow.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: function.Function2[Out, CtxOut, LogMarker]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + def asScala: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = scaladsl.FlowWithContext.fromTuples( scaladsl diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 0f37b0df41b..6adafd87686 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -11,7 +11,7 @@ import java.util.function.{ BiFunction, Supplier } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import akka.dispatch.ExecutionContexts -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.function.Creator import akka.japi.{ function, JavaPartialFunction, Pair, Util } import akka.stream._ @@ -4160,6 +4160,100 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def log(name: String): javadsl.Source[Out, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any], + log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] = + new Source(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log)) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any]): javadsl.Source[Out, Mat] = + this.logWithMarker(name, marker, extract, null) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): javadsl.Source[Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + def asSourceWithContext[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] = new scaladsl.SourceWithContext(this.asScala.map(x => (x, extractContext.apply(x)))).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index fc85a0ddfdd..58a434a7d2e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -7,7 +7,7 @@ package akka.stream.javadsl import java.util.concurrent.CompletionStage import akka.actor.ClassicActorSystemProvider -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.Pair import akka.japi.Util import akka.japi.function @@ -220,6 +220,48 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def log(name: String): SourceWithContext[Out, Ctx, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.logWithMarker]]. + * + * @see [[akka.stream.javadsl.Source.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: function.Function2[Out, Ctx, LogMarker], + extract: function.Function[Out, Any], + log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.logWithMarker]]., + * + * @see [[akka.stream.javadsl.Flow.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: function.Function2[Out, Ctx, LogMarker], + extract: function.Function[Out, Any]): SourceWithContext[Out, Ctx, Mat] = + this.logWithMarker(name, marker, extract, null) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.logWithMarker]]. + * + * @see [[akka.stream.javadsl.Flow.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: function.Function2[Out, Ctx, LogMarker], + log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.logWithMarker]]. + * + * @see [[akka.stream.javadsl.Flow.logWithMarker]] + */ + def logWithMarker(name: String, marker: function.Function2[Out, Ctx, LogMarker]): SourceWithContext[Out, Ctx, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** * Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]], * concatenating the processing steps of both. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 08c4a11ddff..d2848cbc77d 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -5,7 +5,7 @@ package akka.stream.javadsl import akka.NotUsed -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.{ function, Pair, Util } import akka.stream._ import akka.util.ConstantFun @@ -2421,4 +2421,98 @@ class SubFlow[In, Out, Mat]( def log(name: String): SubFlow[In, Out, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any], + log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] = + new SubFlow(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log)) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any]): SubFlow[In, Out, Mat] = + this.logWithMarker(name, marker, extract, null) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow. + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubFlow[In, Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 4bac9e7366a..217c1bea0f6 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -5,7 +5,7 @@ package akka.stream.javadsl import akka.NotUsed -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.{ function, Pair, Util } import akka.stream._ import akka.util.ConstantFun @@ -2395,4 +2395,98 @@ class SubSource[Out, Mat]( def log(name: String): SubSource[Out, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any], + log: MarkerLoggingAdapter): SubSource[Out, Mat] = + new SubSource(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log)) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * The `extract` function will be applied to each element before logging, so it is possible to log only those fields + * of a complex object flowing through this element. + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + extract: function.Function[Out, Any]): SubSource[Out, Mat] = + this.logWithMarker(name, marker, extract, null) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * Uses the given [[MarkerLoggingAdapter]] for logging. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker( + name: String, + marker: function.Function[Out, LogMarker], + log: MarkerLoggingAdapter): SubSource[Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log) + + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow. + * + * Uses an internally created [[MarkerLoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker(name: String, marker: function.Function[Out, LogMarker]): SubSource[Out, Mat] = + this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index db651d5c17c..382d4a72e2c 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -4,7 +4,7 @@ package akka.stream.scaladsl -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.stream._ import akka.Done import akka.stream.impl.{ @@ -2528,6 +2528,29 @@ trait FlowOps[+Out, +Mat] { implicit log: LoggingAdapter = null): Repr[Out] = via(Log(name, extract.asInstanceOf[Any => Any], Option(log))) + /** + * Logs elements flowing through the stream as well as completion and erroring. + * + * By default element and completion signals are logged on debug level, and errors are logged on Error level. + * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: + * + * Uses implicit [[MarkerLoggingAdapter]] if available, otherwise uses an internally created one, + * which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers). + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def logWithMarker(name: String, marker: Out => LogMarker, extract: Out => Any = ConstantFun.scalaIdentityFunction)( + implicit log: MarkerLoggingAdapter = null): Repr[Out] = + via(LogWithMarker(name, marker, extract.asInstanceOf[Any => Any], Option(log))) + /** * Combine the elements of current flow and the given [[Source]] into a stream of tuples. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index 6bafa843aeb..109f42a4d2a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -11,7 +11,7 @@ import akka.NotUsed import akka.dispatch.ExecutionContexts import akka.stream._ import akka.util.ConstantFun -import akka.event.LoggingAdapter +import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } /** * Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context @@ -184,5 +184,19 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { via(flow.log(name, extractWithContext)(log)) } + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.logWithMarker]]. + * + * @see [[akka.stream.scaladsl.FlowOps.logWithMarker]] + */ + def logWithMarker( + name: String, + marker: (Out, Ctx) => LogMarker, + extract: Out => Any = ConstantFun.scalaIdentityFunction)( + implicit log: MarkerLoggingAdapter = null): Repr[Out, Ctx] = { + val extractWithContext: ((Out, Ctx)) => Any = { case (e, _) => extract(e) } + via(flow.logWithMarker(name, marker.tupled, extractWithContext)(log)) + } + private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)] }