Skip to content

Commit

Permalink
DistributedData extension
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 20, 2017
1 parent 8b2bde1 commit 6d78e30
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually {
}
}

def `have an extension`(): Unit = {
val replicator = DistributedData(system).replicator
val c = start(client(replicator))

val probe = TestProbe[Int]
c ! Increment
c ! GetValue(probe.ref)
probe.expectMsg(1)
}

}

object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ import akka.typed.Terminated
val localAskTimeout = 60.seconds // ReadLocal, WriteLocal shouldn't timeout
val additionalAskTimeout = 1.second

def behavior(settings: dd.ReplicatorSettings): Behavior[SReplicator.Command] = {
val untypedReplicatorProps = dd.Replicator.props(settings)
def behavior(settings: dd.ReplicatorSettings, underlyingReplicator: Option[akka.actor.ActorRef]): Behavior[SReplicator.Command] = {

Actor.deferred { ctx
// FIXME perhaps add supervisor for restarting
val untypedReplicator = ctx.actorOf(untypedReplicatorProps, name = "underlying")
val untypedReplicator = underlyingReplicator match {
case Some(ref) ref
case None
// FIXME perhaps add supervisor for restarting
val untypedReplicatorProps = dd.Replicator.props(settings)
ctx.actorOf(untypedReplicatorProps, name = "underlying")
}

def withState(
subscribeAdapters: Map[ActorRef[JReplicator.Changed[ReplicatedData]], ActorRef[dd.Replicator.Changed[ReplicatedData]]]): Behavior[SReplicator.Command] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.javadsl

import akka.typed.ActorSystem
import akka.typed.Extension
import akka.typed.ExtensionId
import akka.typed.ActorRef
import akka.typed.cluster.ddata.scaladsl

object DistributedData extends ExtensionId[DistributedData] {
def get(system: ActorSystem[_]): DistributedData = apply(system)

override def createExtension(system: ActorSystem[_]): DistributedData =
new DistributedData(system)
}

/**
* Akka extension for convenient configuration and use of the
* [[Replicator]]. Configuration settings are defined in the
* `akka.cluster.ddata` section, see `reference.conf`.
*
* This is using the same underlying `Replicator` instance as
* [[akka.akka.cluster.ddata.DistributedData]] and that means that typed
* and untyped actors can share the same data.
*/
class DistributedData(system: ActorSystem[_]) extends Extension {

/**
* `ActorRef` of the [[Replicator]] .
*/
val replicator: ActorRef[Replicator.Command] =
scaladsl.DistributedData(system).replicator.narrow[Replicator.Command]

}

Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,18 @@ import scala.util.control.NoStackTrace
object Replicator {
import dd.Replicator.DefaultMajorityMinCap

/**
* The `Behavior` for the `Replicator` actor.
*/
def behavior(settings: dd.ReplicatorSettings): Behavior[Command] =
ReplicatorBehavior.behavior(settings).narrow[Command]
ReplicatorBehavior.behavior(settings, underlyingReplicator = None).narrow[Command]

/**
* The `Behavior` for the `Replicator` actor.
* It will use the given underlying [[akka.cluster.ddata.Replicator]]
*/
def behavior(settings: dd.ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] =
ReplicatorBehavior.behavior(settings, Some(underlyingReplicator)).narrow[Command]

@DoNotInherit trait Command extends scaladsl.Replicator.Command

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.scaladsl

import akka.typed.ActorSystem
import akka.typed.Extension
import akka.typed.ExtensionId
import akka.typed.ActorRef
import akka.actor.ExtendedActorSystem

object DistributedData extends ExtensionId[DistributedData] {
def get(system: ActorSystem[_]): DistributedData = apply(system)

override def createExtension(system: ActorSystem[_]): DistributedData =
new DistributedData(system)
}

/**
* Akka extension for convenient configuration and use of the
* [[Replicator]]. Configuration settings are defined in the
* `akka.cluster.ddata` section, see `reference.conf`.
*
* This is using the same underlying `Replicator` instance as
* [[akka.akka.cluster.ddata.DistributedData]] and that means that typed
* and untyped actors can share the same data.
*/
class DistributedData(system: ActorSystem[_]) extends Extension {
import akka.typed.scaladsl.adapter._

private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem]
private val config = system.settings.config.getConfig("akka.cluster.distributed-data")
private val settings = ReplicatorSettings(config)

/**
* `ActorRef` of the [[Replicator]] .
*/
val replicator: ActorRef[Replicator.Command] = {
val configuredName = config.getString("name")
val name = "typed" + configuredName.take(1).toUpperCase + configuredName.drop(1)

val underlyingReplicator = akka.cluster.ddata.DistributedData(untypedSystem).replicator
val replicatorBehavior = Replicator.behavior(settings, underlyingReplicator)

untypedSystem.systemActorOf(PropsAdapter(replicatorBehavior), name)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ object Replicator {
* The `Behavior` for the `Replicator` actor.
*/
def behavior(settings: ReplicatorSettings): Behavior[Command] =
ReplicatorBehavior.behavior(settings)
ReplicatorBehavior.behavior(settings, underlyingReplicator = None)

/**
* The `Behavior` for the `Replicator` actor.
* It will use the given underlying [[akka.cluster.ddata.Replicator]]
*/
def behavior(settings: ReplicatorSettings, underlyingReplicator: akka.actor.ActorRef): Behavior[Command] =
ReplicatorBehavior.behavior(settings, Some(underlyingReplicator))

type ReadConsistency = dd.Replicator.ReadConsistency
val ReadLocal = dd.Replicator.ReadLocal
Expand Down

0 comments on commit 6d78e30

Please sign in to comment.