Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

Changed batch-chunk to batch #39

Merged
merged 2 commits into from Feb 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Expand Up @@ -24,15 +24,15 @@ scalafmt: {
}
*/

val akkaVersion = "2.5.19"
val akkaVersion = "2.5.20"
val akkaHttpVersion = "10.1.5"
val akkaHttpCirceVersion = "1.24.3"
val akkaPersistenceInMemVersion = "2.5.1.1"
val akkaPersistenceCassandraVersion = "0.92"
val catsVersion = "1.4.0"
val catsEffectVersion = "1.2.0"
val circeVersion = "0.11.1"
val commonsVersion = "0.10.41"
val commonsVersion = "0.10.42"
val journalVersion = "3.0.19"
val monixVersion = "3.0.0-RC2"
val pureconfigVersion = "0.10.1"
Expand Down
2 changes: 1 addition & 1 deletion modules/indexing/src/main/resources/reference.conf
Expand Up @@ -10,7 +10,7 @@ indexing {
# time to wait before the next batch is consumed
batch-timeout = 40 millis
# number of events to wait before the next batch is consumed
batch-chunk = 10
batch = 10
# The number of times an index function is retried
retry {
# the retry strategy to use; possible values are: "never", "once" and "exponential"
Expand Down
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.Cluster
import akka.cluster.ddata.LWWRegister.Clock
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey, SelfUniqueAddress}
import akka.pattern.ask
import akka.util.Timeout
import cats.effect.{Async, IO, Timer}
Expand Down Expand Up @@ -176,6 +176,7 @@ object KeyValueStore {
extends KeyValueStore[F, K, V] {

private implicit val node: Cluster = Cluster(as)
private val uniqueAddr: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)
private implicit val registerClock: Clock[V] = (currentTimestamp: Long, value: V) => clock(currentTimestamp, value)
private implicit val timeout: Timeout = Timeout(askTimeout)

Expand All @@ -197,7 +198,8 @@ object KeyValueStore {
}

override def put(key: K, value: V): F[Unit] = {
val msg = Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.put(key, value))
val msg =
Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.put(uniqueAddr, key, value, registerClock))
val future = IO(replicator ? msg)
val fa = IO.fromFuture(future).to[F]
fa.flatMap[Unit] {
Expand All @@ -211,7 +213,7 @@ object KeyValueStore {
}

override def remove(key: K) = {
val msg = Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.remove(node, key))
val msg = Update(mapKey, LWWMap.empty[K, V], WriteAll(consistencyTimeout))(_.remove(uniqueAddr, key))
val future = IO(replicator ? msg)
val fa = IO.fromFuture(future).to[F]
fa.flatMap[Unit] {
Expand Down
Expand Up @@ -141,7 +141,7 @@ object IndexerConfig {
final def fromConfig(implicit as: ActorSystem): IndexConfigBuilder[NotUsed, _, _, _, _, Throwable, Persist] = {
val config = as.settings.config.getConfig("indexing")
val timeout = FiniteDuration(config.getDuration("batch-timeout", MILLISECONDS), MILLISECONDS)
val chunk = config.getInt("batch-chunk")
val chunk = config.getInt("batch")
val retryConfig: RetryStrategyConfig = loadConfigOrThrow[RetryStrategyConfig](config, "retry")
builder.retry(retryConfig.retryStrategy).batch(chunk, timeout)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/indexing/src/test/resources/reference.conf
@@ -1,6 +1,6 @@
indexing {
batch-timeout = 40 millis
batch-chunk = 10
batch = 10
retry {
# the retry strategy to use; possible values are: "never", "once" and "exponential"
strategy = "exponential"
Expand Down