Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

Commit

Permalink
Updated signature on deprecated methods
Browse files Browse the repository at this point in the history
  • Loading branch information
umbreak committed Feb 4, 2019
1 parent 8fb9c70 commit 771d2d5
Showing 1 changed file with 5 additions and 3 deletions.
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.Cluster
import akka.cluster.ddata.LWWRegister.Clock
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey, SelfUniqueAddress}
import akka.pattern.ask
import akka.util.Timeout
import cats.effect.{Async, IO, Timer}
Expand Down Expand Up @@ -176,6 +176,7 @@ object KeyValueStore {
extends KeyValueStore[F, K, V] {

private implicit val node: Cluster = Cluster(as)
private val uniqueAddr: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)
private implicit val registerClock: Clock[V] = (currentTimestamp: Long, value: V) => clock(currentTimestamp, value)
private implicit val timeout: Timeout = Timeout(askTimeout)

Expand All @@ -197,7 +198,8 @@ object KeyValueStore {
}

override def put(key: K, value: V): F[Unit] = {
val msg = Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.put(key, value))
val msg =
Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.put(uniqueAddr, key, value, registerClock))
val future = IO(replicator ? msg)
val fa = IO.fromFuture(future).to[F]
fa.flatMap[Unit] {
Expand All @@ -211,7 +213,7 @@ object KeyValueStore {
}

override def remove(key: K) = {
val msg = Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.remove(node, key))
val msg = Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.remove(uniqueAddr, key))
val future = IO(replicator ? msg)
val fa = IO.fromFuture(future).to[F]
fa.flatMap[Unit] {
Expand Down

0 comments on commit 771d2d5

Please sign in to comment.