Skip to content

Commit

Permalink
AtomicGetOrElseUpdateOnTrieMap::atomicGetOrElseUpdate -> TrieMap::get…
Browse files Browse the repository at this point in the history
…OrElseUpdate (kamon-io#591)

* remove AtomicGetOrElseUpdateOnTrieMap::atomicGetOrElseUpdate method in favor of TrieMap::getOrElseUpdate

* fix rebase bugs

---------

Co-authored-by: Hugh Simpson <hsimpson@rzsoftware.com>
  • Loading branch information
dpsoft and hughsimpson committed Nov 7, 2023
1 parent 147988b commit dad8773
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 22 deletions.
2 changes: 1 addition & 1 deletion core/kamon-core/src/main/scala/kamon/Utilities.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait Utilities { self: Configuration =>
* empty.
*/
def filter(configKey: String): Filter =
_filters.atomicGetOrElseUpdate(configKey, Filter.from(configKey))
_filters.getOrElseUpdate(configKey, Filter.from(configKey))

/**
* Kamon's Clock implementation.
Expand Down
10 changes: 5 additions & 5 deletions core/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class MetricRegistry(config: Config, clock: Clock) {
Metric.Counter = {

val metric = validateInstrumentType[Metric.Counter] {
_metrics.atomicGetOrElseUpdate(name, _factory.counter(name, description, unit, autoUpdateInterval))
_metrics.getOrElseUpdate(name, _factory.counter(name, description, unit, autoUpdateInterval))
} (name, Instrument.Type.Counter)

checkDescription(metric.name, metric.description, description)
Expand All @@ -66,7 +66,7 @@ class MetricRegistry(config: Config, clock: Clock) {
Metric.Gauge = {

val metric = validateInstrumentType[Metric.Gauge] {
_metrics.atomicGetOrElseUpdate(name, _factory.gauge(name, description, unit, autoUpdateInterval))
_metrics.getOrElseUpdate(name, _factory.gauge(name, description, unit, autoUpdateInterval))
} (name, Instrument.Type.Gauge)

checkDescription(metric.name, metric.description, description)
Expand All @@ -82,7 +82,7 @@ class MetricRegistry(config: Config, clock: Clock) {
autoUpdateInterval: Option[Duration]): Metric.Histogram = {

val metric = validateInstrumentType[Metric.Histogram] {
_metrics.atomicGetOrElseUpdate(name, _factory.histogram(name, description, unit, dynamicRange, autoUpdateInterval))
_metrics.getOrElseUpdate(name, _factory.histogram(name, description, unit, dynamicRange, autoUpdateInterval))
} (name, Instrument.Type.Histogram)

checkDescription(metric.name, metric.description, description)
Expand All @@ -98,7 +98,7 @@ class MetricRegistry(config: Config, clock: Clock) {
def timer(name: String, description: Option[String], dynamicRange: Option[DynamicRange], autoUpdateInterval: Option[Duration]): Metric.Timer = {

val metric = validateInstrumentType[Metric.Timer] {
_metrics.atomicGetOrElseUpdate(name, _factory.timer(name, description, Some(MeasurementUnit.time.nanoseconds),
_metrics.getOrElseUpdate(name, _factory.timer(name, description, Some(MeasurementUnit.time.nanoseconds),
dynamicRange, autoUpdateInterval))
} (name, Instrument.Type.Timer)

Expand All @@ -115,7 +115,7 @@ class MetricRegistry(config: Config, clock: Clock) {
autoUpdateInterval: Option[Duration]): Metric.RangeSampler = {

val metric = validateInstrumentType[Metric.RangeSampler] {
_metrics.atomicGetOrElseUpdate(name, _factory.rangeSampler(name, description, unit, dynamicRange, autoUpdateInterval))
_metrics.getOrElseUpdate(name, _factory.rangeSampler(name, description, unit, dynamicRange, autoUpdateInterval))
} (name, Instrument.Type.RangeSampler)

checkDescription(metric.name, metric.description, description)
Expand Down
6 changes: 1 addition & 5 deletions core/kamon-core/src/main/scala/kamon/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,10 @@ package object kamon {


/**
* Workaround to the non thread-safe [scala.collection.concurrent.TrieMap#getOrElseUpdate()] method. More details on
* why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]].
* Atomic variant of [scala.collection.concurrent.TrieMap#getOrElseUpdate()] method with cleanup and init functions.
*/
implicit class AtomicGetOrElseUpdateOnTrieMap[K, V](val trieMap: TrieMap[K, V]) extends AnyVal {

def atomicGetOrElseUpdate(key: K, op: => V): V =
atomicGetOrElseUpdate(key, op, { _: V => () }, { _: V => () })

def atomicGetOrElseUpdate(key: K, op: => V, cleanup: V => Unit, init: V => Unit): V =
trieMap.get(key) match {
case Some(v) => v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ class AdaptiveSampler extends Sampler {

override def decide(operation: Sampler.Operation): SamplingDecision = {
val operationName = operation.operationName()
val operationSampler = _samplers.get(operationName).getOrElse {
val operationSampler = _samplers.getOrElse(operationName, {
// It might happen that the first time we see an operation under high concurrent throughput we will reach this
// block more than once, but worse case effect is that we will rebalance the operation samplers more than once.
val sampler = _samplers.atomicGetOrElseUpdate(operationName, buildOperationSampler(operationName))
val sampler = _samplers.getOrElseUpdate(operationName, buildOperationSampler(operationName))
rebalance()
sampler
}
})

val decision = operationSampler.decide()
if(decision == SamplingDecision.Sample)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ object AkkaClusterShardingMetrics {
private val _shardTelemetry = ShardingInstruments.shardTelemetry(system, typeName, shardHostedEntities, shardProcessedMessages)

def hostedEntitiesPerShardCounter(shardID: String): AtomicLong =
_shardTelemetry.entitiesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong())
_shardTelemetry.entitiesPerShard.getOrElseUpdate(shardID, new AtomicLong())

def processedMessagesPerShardCounter(shardID: String): AtomicLong =
_shardTelemetry.messagesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong())
_shardTelemetry.messagesPerShard.getOrElseUpdate(shardID, new AtomicLong())

// We should only remove when the ShardRegion actor is terminated.
override def remove(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object AkkaMetrics {
)

def forSystem(name: String): ActorSystemInstruments =
_systemInstrumentsCache.atomicGetOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name)))
_systemInstrumentsCache.getOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name)))

class ActorSystemInstruments(tags: TagSet) extends InstrumentGroup(tags) {
val deadLetters = register(SystemDeadLetters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ object AkkaRemoteMetrics {
}

def serializationInstruments(system: String): SerializationInstruments =
_serializationInstrumentsCache.atomicGetOrElseUpdate(system, new SerializationInstruments(system))
_serializationInstrumentsCache.getOrElseUpdate(system, new SerializationInstruments(system))
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ object PekkoClusterShardingMetrics {
private val _shardTelemetry = ShardingInstruments.shardTelemetry(system, typeName, shardHostedEntities, shardProcessedMessages)

def hostedEntitiesPerShardCounter(shardID: String): AtomicLong =
_shardTelemetry.entitiesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong())
_shardTelemetry.entitiesPerShard.getOrElseUpdate(shardID, new AtomicLong())

def processedMessagesPerShardCounter(shardID: String): AtomicLong =
_shardTelemetry.messagesPerShard.atomicGetOrElseUpdate(shardID, new AtomicLong())
_shardTelemetry.messagesPerShard.getOrElseUpdate(shardID, new AtomicLong())

// We should only remove when the ShardRegion actor is terminated.
override def remove(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object PekkoMetrics {
)

def forSystem(name: String): ActorSystemInstruments =
_systemInstrumentsCache.atomicGetOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name)))
_systemInstrumentsCache.getOrElseUpdate(name, new ActorSystemInstruments(TagSet.of("system", name)))

class ActorSystemInstruments(tags: TagSet) extends InstrumentGroup(tags) {
val deadLetters = register(SystemDeadLetters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ object PekkoRemoteMetrics {
}

def serializationInstruments(system: String): SerializationInstruments =
_serializationInstrumentsCache.atomicGetOrElseUpdate(system, new SerializationInstruments(system))
_serializationInstrumentsCache.getOrElseUpdate(system, new SerializationInstruments(system))
}

0 comments on commit dad8773

Please sign in to comment.