Skip to content

Commit

Permalink
Merge branch 'master' into wip-merge-to-master-patriknw
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 29, 2016
2 parents 5558caa + d37fa8e commit 54f5b83
Show file tree
Hide file tree
Showing 91 changed files with 5,337 additions and 2,226 deletions.
Expand Up @@ -118,6 +118,23 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterAll {
}
}

"log with MDC" in {
new TestKit(appLogging) {
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
val myMDC = Map("hello" "mdc")
val a = system.actorOf(Props(new Actor with DiagnosticActorLogging {
override def mdc(currentMessage: Any) = myMDC
def receive = LoggingReceive {
case "hello"
}
}))
a ! "hello"
expectMsgPF(hint = "Logging.Debug2") {
case m: Logging.Debug2 if m.mdc == myMDC ()
}
}
}

}

"An Actor" must {
Expand Down
10 changes: 6 additions & 4 deletions akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala
Expand Up @@ -5,11 +5,12 @@
package akka.io

import java.io.{ File, IOException }
import java.net.{ ServerSocket, URLClassLoader, InetSocketAddress }
import java.net.{ InetSocketAddress, ServerSocket, URLClassLoader }
import java.nio.ByteBuffer
import java.nio.channels._
import java.nio.channels.spi.SelectorProvider
import java.nio.channels.SelectionKey._

import com.typesafe.config.ConfigFactory

import scala.annotation.tailrec
Expand All @@ -21,8 +22,8 @@ import akka.io.Tcp._
import akka.io.SelectionHandler._
import akka.io.Inet.SocketOption
import akka.actor._
import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
import akka.util.{ Helpers, ByteString }
import akka.testkit.{ AkkaSpec, EventFilter, SocketUtil, TestActorRef, TestProbe }
import akka.util.{ ByteString, Helpers }
import akka.testkit.SocketUtil._
import java.util.Random

Expand Down Expand Up @@ -826,7 +827,8 @@ class TcpConnectionSpec extends AkkaSpec("""
"report abort before handler is registered (reproducer from #15033)" in {
// This test needs the OP_CONNECT workaround on Windows, see original report #15033 and parent ticket #15766

val bindAddress = new InetSocketAddress(23402)
val port = SocketUtil.temporaryServerAddress().getPort
val bindAddress = new InetSocketAddress(port)
val serverSocket = new ServerSocket(bindAddress.getPort, 100, bindAddress.getAddress)
val connectionProbe = TestProbe()

Expand Down
16 changes: 16 additions & 0 deletions akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala
Expand Up @@ -308,6 +308,19 @@ class ByteStringSpec extends WordSpec with Matchers with Checkers {
ByteString1.fromString("ab").drop(2) should ===(ByteString(""))
ByteString1.fromString("ab").drop(3) should ===(ByteString(""))
}
"take" in {
ByteString1.empty.take(-1) should ===(ByteString(""))
ByteString1.empty.take(0) should ===(ByteString(""))
ByteString1.empty.take(1) should ===(ByteString(""))
ByteString1.fromString("a").take(1) should ===(ByteString("a"))
ByteString1.fromString("ab").take(-1) should ===(ByteString(""))
ByteString1.fromString("ab").take(0) should ===(ByteString(""))
ByteString1.fromString("ab").take(1) should ===(ByteString("a"))
ByteString1.fromString("ab").take(2) should ===(ByteString("ab"))
ByteString1.fromString("ab").take(3) should ===(ByteString("ab"))
ByteString1.fromString("0123456789").take(3).drop(1) should ===(ByteString("12"))
ByteString1.fromString("0123456789").take(10).take(8).drop(3).take(5) should ===(ByteString("34567"))
}
}
"ByteString1C" must {
"drop(0)" in {
Expand Down Expand Up @@ -415,6 +428,9 @@ class ByteStringSpec extends WordSpec with Matchers with Checkers {
ByteStrings(ByteString1.fromString("a"), ByteString1.fromString("bc")).dropRight(3) should ===(ByteString(""))
}
"take" in {
ByteString.empty.take(-1) should ===(ByteString(""))
ByteString.empty.take(0) should ===(ByteString(""))
ByteString.empty.take(1) should ===(ByteString(""))
ByteStrings(ByteString1.fromString("a"), ByteString1.fromString("bc")).drop(1).take(0) should ===(ByteString(""))
ByteStrings(ByteString1.fromString("a"), ByteString1.fromString("bc")).drop(1).take(-1) should ===(ByteString(""))
ByteStrings(ByteString1.fromString("a"), ByteString1.fromString("bc")).drop(1).take(-2) should ===(ByteString(""))
Expand Down
2 changes: 2 additions & 0 deletions akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala
Expand Up @@ -15,6 +15,8 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet
private var terminatedQueued: Set[ActorRef] = ActorCell.emptyActorRefSet

def isWatching(ref: ActorRef): Boolean = watching contains ref

override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watchingContains(a)) {
Expand Down
16 changes: 11 additions & 5 deletions akka-actor/src/main/scala/akka/event/LoggingReceive.scala
Expand Up @@ -8,6 +8,7 @@ import language.existentials
import akka.actor.Actor.Receive
import akka.actor.ActorContext
import akka.actor.ActorCell
import akka.actor.DiagnosticActorLogging
import akka.event.Logging.Debug

object LoggingReceive {
Expand Down Expand Up @@ -52,13 +53,18 @@ class LoggingReceive(source: Option[AnyRef], r: Receive, label: Option[String])(
def isDefinedAt(o: Any): Boolean = {
val handled = r.isDefinedAt(o)
if (context.system.eventStream.logLevel >= Logging.DebugLevel) {
val (str, clazz) = LogSource.fromAnyRef(source getOrElse context.asInstanceOf[ActorCell].actor)
context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o
+ " from " + context.sender()
+ (label match {
val src = source getOrElse context.asInstanceOf[ActorCell].actor
val (str, clazz) = LogSource.fromAnyRef(src)
val message = "received " + (if (handled) "handled" else "unhandled") + " message " + o + " from " + context.sender() +
(label match {
case Some(l) " in state " + l
case _ ""
})))
})
val event = src match {
case a: DiagnosticActorLogging Debug(str, clazz, message, a.log.mdc)
case _ Debug(str, clazz, message)
}
context.system.eventStream.publish(event)
}
handled
}
Expand Down
34 changes: 21 additions & 13 deletions akka-actor/src/main/scala/akka/util/ByteString.scala
Expand Up @@ -253,8 +253,11 @@ object ByteString {
}

override def take(n: Int): ByteString =
if (n <= 0) ByteString.empty
else ByteString1(bytes, startIndex, Math.min(n, length))
if (n <= 0) ByteString.empty else take1(n)

private[akka] def take1(n: Int): ByteString1 =
if (n >= length) this
else ByteString1(bytes, startIndex, n)

override def slice(from: Int, until: Int): ByteString =
drop(from).take(until - Math.max(0, from))
Expand Down Expand Up @@ -436,18 +439,23 @@ object ByteString {
bytestrings.foreach(_.writeToOutputStream(os))
}

override def take(n: Int): ByteString = {
@tailrec def take0(n: Int, b: ByteStringBuilder, bs: Vector[ByteString1]): ByteString =
if (bs.isEmpty || n <= 0) b.result
else {
val head = bs.head
if (n <= head.length) b.append(head.take(n)).result
else take0(n - head.length, b.append(head), bs.tail)
}

override def take(n: Int): ByteString =
if (n <= 0) ByteString.empty
else if (n >= length) this
else take0(n, ByteString.newBuilder, bytestrings)
else take0(n)

private[akka] def take0(n: Int): ByteString = {
@tailrec def go(last: Int, restToTake: Int): (Int, Int) = {
val bs = bytestrings(last)
if (bs.length > restToTake) (last, restToTake)
else go(last + 1, restToTake - bs.length)
}

val (last, restToTake) = go(0, n)

if (last == 0) bytestrings(last).take(restToTake)
else if (restToTake == 0) new ByteStrings(bytestrings.take(last), n)
else new ByteStrings(bytestrings.take(last) :+ bytestrings(last).take1(restToTake), n)
}

override def dropRight(n: Int): ByteString =
Expand All @@ -473,7 +481,7 @@ object ByteString {

override def drop(n: Int): ByteString =
if (n <= 0) this
else if (n > length) ByteString.empty
else if (n >= length) ByteString.empty
else drop0(n)

private def drop0(n: Int): ByteString = {
Expand Down
Expand Up @@ -5,36 +5,36 @@ import akka.testkit.AkkaSpec

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.language.postfixOps

class ConstantRateEntityRecoveryStrategySpec extends AkkaSpec {

import system.dispatcher

val strategy = EntityRecoveryStrategy.constantStrategy(system, 500 millis, 2)
val strategy = EntityRecoveryStrategy.constantStrategy(system, 1.second, 2)

"ConstantRateEntityRecoveryStrategy" must {
"recover entities" in {
val entities = Set[EntityId]("1", "2", "3", "4", "5")
val startTime = System.currentTimeMillis()
val startTime = System.nanoTime()
val resultWithTimes = strategy.recoverEntities(entities).map(
_.map(entityIds (entityIds, System.currentTimeMillis() - startTime))
)
_.map(entityIds (entityIds (System.nanoTime() - startTime).nanos)))

val result = Await.result(Future.sequence(resultWithTimes), 4 seconds).toList.sortWith(_._2 < _._2)
val result = Await.result(Future.sequence(resultWithTimes), 6.seconds)
.toVector.sortBy { case (_, duration) duration }
result.size should ===(3)

val scheduledEntities = result.map(_._1)
scheduledEntities.head.size should ===(2)
scheduledEntities(0).size should ===(2)
scheduledEntities(1).size should ===(2)
scheduledEntities(2).size should ===(1)
scheduledEntities.foldLeft(Set[EntityId]())(_ ++ _) should ===(entities)
scheduledEntities.flatten.toSet should ===(entities)

val times = result.map(_._2)
val timesMillis = result.map(_._2.toMillis)

times.head should ===(500L +- 30L)
times(1) should ===(1000L +- 30L)
times(2) should ===(1500L +- 30L)
// scheduling will not happen too early
timesMillis(0) should ===(1400L +- 500)
timesMillis(1) should ===(2400L +- 500L)
timesMillis(2) should ===(3400L +- 500L)
}

"not recover when no entities to recover" in {
Expand Down
1 change: 0 additions & 1 deletion akka-docs/rst/additional/index.rst
Expand Up @@ -7,5 +7,4 @@ Additional Information
../common/binary-compatibility-rules
faq
books
language-bindings
osgi
17 changes: 0 additions & 17 deletions akka-docs/rst/additional/language-bindings.rst

This file was deleted.

2 changes: 1 addition & 1 deletion akka-docs/rst/java/futures.rst
Expand Up @@ -187,7 +187,7 @@ Callbacks
---------

Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new Future, but by side-effecting.
For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first.
For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the last two are specializations of the first.

.. includecode:: code/docs/future/FutureDocTest.java
:include: onSuccess
Expand Down
33 changes: 33 additions & 0 deletions akka-docs/rst/java/persistence.rst
Expand Up @@ -454,6 +454,39 @@ mechanism when ``persist()`` is used. Notice the early stop behaviour that occur
.. includecode:: code/docs/persistence/PersistenceDocTest.java#safe-shutdown-example-bad
.. includecode:: code/docs/persistence/PersistenceDocTest.java#safe-shutdown-example-good


.. _replay-filter-java:

Replay Filter
-------------
There could be cases where event streams are corrupted and multiple writers (i.e. multiple persistent actor instances)
journaled different messages with the same sequence number.
In such a case, you can configure how you filter replayed messages from multiple writers, upon recovery.

In your configuration, under the ``akka.persistence.journal.xxx.replay-filter`` section (where ``xxx`` is your journal plugin id),
you can select the replay filter ``mode`` from one of the following values:

* repair-by-discard-old
* fail
* warn
* off

For example, if you configure the replay filter for leveldb plugin, it looks like this::

# The replay filter can detect a corrupt event stream by inspecting
# sequence numbers and writerUuid when replaying events.
akka.persistence.journal.leveldb.replay-filter {
# What the filter should do when detecting invalid events.
# Supported values:
# `repair-by-discard-old` : discard events from old writers,
# warning is logged
# `fail` : fail the replay, error is logged
# `warn` : log warning but emit events untouched
# `off` : disable this feature completely
mode = repair-by-discard-old
}


.. _persistent-views-java:

Persistent Views
Expand Down
10 changes: 10 additions & 0 deletions akka-docs/rst/java/stream/stages-overview.rst
Expand Up @@ -1230,6 +1230,16 @@ returned value downstream.

**completes** when any upstream completes

zipWithIndex
^^^^^^^
Zips elements of current flow with its indices.

**emits** upstream emits an element and is paired with their index

**backpressures** when downstream backpressures

**completes** when upstream completes

concat
^^^^^^
After completion of the original upstream the elements of the given source will be emitted.
Expand Down
2 changes: 1 addition & 1 deletion akka-docs/rst/scala/futures.rst
Expand Up @@ -227,7 +227,7 @@ Callbacks
---------

Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new ``Future``, but by side-effecting.
For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first.
For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the last two are specializations of the first.

.. includecode:: code/docs/future/FutureDocSpec.scala
:include: onSuccess
Expand Down
33 changes: 33 additions & 0 deletions akka-docs/rst/scala/persistence.rst
Expand Up @@ -123,6 +123,8 @@ It contains instructions on how to run the ``PersistentActorExample``.
Note that when using ``become`` from ``receiveRecover`` it will still only use the ``receiveRecover``
behavior when replaying the events. When replay is completed it will use the new behavior.

.. _persistence-id-scala:

Identifiers
-----------

Expand Down Expand Up @@ -440,6 +442,37 @@ mechanism when ``persist()`` is used. Notice the early stop behaviour that occur
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#safe-shutdown-example-bad
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#safe-shutdown-example-good

.. _replay-filter-scala:

Replay Filter
-------------
There could be cases where event streams are corrupted and multiple writers (i.e. multiple persistent actor instances)
journaled different messages with the same sequence number.
In such a case, you can configure how you filter replayed messages from multiple writers, upon recovery.

In your configuration, under the ``akka.persistence.journal.xxx.replay-filter`` section (where ``xxx`` is your journal plugin id),
you can select the replay filter ``mode`` from one of the following values:

* repair-by-discard-old
* fail
* warn
* off

For example, if you configure the replay filter for leveldb plugin, it looks like this::

# The replay filter can detect a corrupt event stream by inspecting
# sequence numbers and writerUuid when replaying events.
akka.persistence.journal.leveldb.replay-filter {
# What the filter should do when detecting invalid events.
# Supported values:
# `repair-by-discard-old` : discard events from old writers,
# warning is logged
# `fail` : fail the replay, error is logged
# `warn` : log warning but emit events untouched
# `off` : disable this feature completely
mode = repair-by-discard-old
}

.. _persistent-views:

Persistent Views
Expand Down
10 changes: 10 additions & 0 deletions akka-docs/rst/scala/stream/stages-overview.rst
Expand Up @@ -1222,6 +1222,16 @@ returned value downstream.

**completes** when any upstream completes

zipWithIndex
^^^^^^^
Zips elements of current flow with its indices.

**emits** upstream emits an element and is paired with their index

**backpressures** when downstream backpressures

**completes** when upstream completes

concat
^^^^^^
After completion of the original upstream the elements of the given source will be emitted.
Expand Down

0 comments on commit 54f5b83

Please sign in to comment.