Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lwcevents: cleanup groups with no data #1669

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ private[events] trait DatapointConverter {

/** Flush the data for a given timestamp. */
def flush(timestamp: Long): Unit

/** Returns true if the converter has no recent data. */
def hasNoData: Boolean
}

private[events] object DatapointConverter {
Expand Down Expand Up @@ -115,6 +118,22 @@ private[events] object DatapointConverter {
}
}

private[events] def addNaN(value: AtomicDouble, amount: Double): Unit = {
if (amount.isNaN)
return

var set = false
while (!set) {
val v = value.get()
if (v.isNaN) {
set = value.compareAndSet(v, amount)
} else {
value.addAndGet(amount)
set = true
}
}
}

case class Params(
id: String,
tags: Map[String, String],
Expand All @@ -127,15 +146,15 @@ private[events] object DatapointConverter {
/** Compute sum for a counter as a rate per second. */
case class Sum(params: Params) extends DatapointConverter {

private val buffer = new StepDouble(0.0, params.clock, params.step)
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite && value >= 0.0) {
buffer.getCurrent.addAndGet(value)
addNaN(buffer.getCurrent, value)
}
}

Expand All @@ -147,19 +166,24 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute count of contributing events. */
case class Count(params: Params) extends DatapointConverter {

private val buffer = new StepDouble(0.0, params.clock, params.step)
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
buffer.getCurrent.addAndGet(1.0)
update(1.0)
}

override def update(value: Double): Unit = {
buffer.getCurrent.addAndGet(1.0)
addNaN(buffer.getCurrent, 1.0)
}

override def flush(timestamp: Long): Unit = {
Expand All @@ -170,6 +194,11 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute max value from contributing events. */
Expand All @@ -195,6 +224,11 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute min value from contributing events. */
Expand Down Expand Up @@ -233,6 +267,11 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute set of data points, one for each distinct group. */
Expand Down Expand Up @@ -312,7 +351,15 @@ private[events] object DatapointConverter {
}

override def flush(timestamp: Long): Unit = {
groups.values().forEach(_.flush(timestamp))
val it = groups.values().iterator()
while (it.hasNext) {
val converter = it.next()
converter.flush(timestamp)
if (converter.hasNoData)
it.remove()
}
}

override def hasNoData: Boolean = groups.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class LwcEventClientSuite extends FunSuite {
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
val vs = output.result()
assertEquals(vs.size, 2)
assertEquals(vs.size, 1)
}

test("analytics, basic aggregate extract value") {
Expand Down Expand Up @@ -139,7 +139,7 @@ class LwcEventClientSuite extends FunSuite {
client.process(sampleLwcEvent)
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
assertEquals(output.result().size, 2)
assertEquals(output.result().size, 1)

// Sync expressions, same set
(2 until 10).foreach { i =>
Expand All @@ -148,7 +148,7 @@ class LwcEventClientSuite extends FunSuite {
client.process(sampleLwcEvent)
clock.setWallTime(step * i)
client.process(LwcEvent.HeartbeatLwcEvent(step * i))
assertEquals(output.result().size, 2)
assertEquals(output.result().size, 1)
}

// Sync expressions, subset
Expand Down
Loading