Skip to content

Commit

Permalink
OpenTelemetry: Improve support for long-running observable measurements
Browse files Browse the repository at this point in the history
* improve stability of
  AsyncObservableLongMeasurementTest.testLimitParallelExecution
* support OpenTelemetry attributes
* separate generic code from OpenTelemetry specific one (do not rely on
  Java extension mechanism)
* support cluster node specific execution of measurements
  (e.g. by leader election)

see 59d779e, e09d340 and d539000
  • Loading branch information
brunokoeferli committed Jun 9, 2024
1 parent 2a55103 commit 28a15a2
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.scout.rt.platform.BEANS;
Expand All @@ -31,16 +32,14 @@
import org.mockito.Mockito;
import org.slf4j.helpers.MessageFormatter;

import io.opentelemetry.api.metrics.ObservableLongMeasurement;

public class AsyncObservableLongMeasurementTest {
public class PeriodicAsyncMeasurementTest {

private IExecutionSemaphore m_asyncJobExecutionSemaphore;
private int m_origExecutionSemaphorePermits;

@Before
public void before() {
m_asyncJobExecutionSemaphore = AbstractAsyncObservableMeasurement.ASYNC_JOB_EXECUTION_SEMAPHORE.get();
m_asyncJobExecutionSemaphore = PeriodicAsyncMeasurement.ASYNC_JOB_EXECUTION_SEMAPHORE.get();
m_origExecutionSemaphorePermits = m_asyncJobExecutionSemaphore.getPermits();
m_asyncJobExecutionSemaphore.withPermits(1);
}
Expand All @@ -56,44 +55,37 @@ public void testSimple() throws Exception {

final long asyncObservationInterval = 100;
Callable<Long> callable = mockCallable();
Mockito.when(callable.call()).thenReturn(10L, 15L);
AtomicLong value = new AtomicLong(10);
Mockito.when(callable.call()).then(invocation -> {
Thread.sleep(3); // simulate "db access"
return value.getAndAdd(5);
});
AsyncObservableLongMeasurement asyncMeasurement = AsyncObservableLongMeasurement.create("testSimple", callable, () -> RunContexts.empty(), asyncObservationInterval);
ObservableLongMeasurement measurement = Mockito.mock(ObservableLongMeasurement.class);
try (PeriodicAsyncMeasurement<Long> asyncMeasurement = new PeriodicAsyncMeasurement<>("testSimple", callable, () -> RunContexts.empty(), () -> Boolean.TRUE, asyncObservationInterval)) {

// first observation --> no measurement, but trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 1);
assertEquals(15, value.get());
// first observation --> no measurement, but trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);
assertEquals(15, value.get());

// second observation --> first measurement, but no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verify(measurement).record(10);
assertAsyncJobTrigger(callable, 1);
// second observation --> first measurement, but no trigger async job
assertEquals(Long.valueOf(10L), asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

// third observation --> second measurement, still no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verify(measurement, Mockito.times(2)).record(10);
assertAsyncJobTrigger(callable, 1);
// third observation --> second measurement, still no trigger async job
assertEquals(Long.valueOf(10L), asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

awaitNextAsyncObservationInterval(asyncObservationInterval);
awaitNextAsyncObservationInterval(asyncObservationInterval);

// forth observation --> third measurement, trigger async job
asyncMeasurement.accept(measurement);
Mockito.verify(measurement, Mockito.times(3)).record(10);
assertAsyncJobTrigger(callable, 2);
assertEquals(20, value.get());

// fifth observation --> forth measurement, no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verify(measurement, Mockito.times(3)).record(10);
Mockito.verify(measurement).record(15);
assertAsyncJobTrigger(callable, 2);
// forth observation --> third measurement, trigger async job
assertEquals(Long.valueOf(10L), asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 2);
assertEquals(20, value.get());

// fifth observation --> forth measurement, no trigger async job
assertEquals(Long.valueOf(15L), asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 2);
}
}

@Test
Expand All @@ -107,45 +99,40 @@ public void testLongRunningAsyncJob() throws Exception {
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
return 2L;
});
AsyncObservableLongMeasurement asyncMeasurement = AsyncObservableLongMeasurement.create(asyncJobName, callable, () -> RunContexts.empty(), asyncObservationInterval);
ObservableLongMeasurement measurement = Mockito.mock(ObservableLongMeasurement.class);
try (PeriodicAsyncMeasurement<Long> asyncMeasurement = new PeriodicAsyncMeasurement<>(asyncJobName, callable, () -> RunContexts.empty(), () -> Boolean.TRUE, asyncObservationInterval)) {

// first observation --> no measurement, but trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 1);
// first observation --> no measurement, but trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

// second observation --> no measurement, but no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 1);
// second observation --> no measurement, but no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

// third observation --> no measurement, still no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 1);
// third observation --> no measurement, still no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

awaitNextAsyncObservationInterval(asyncObservationInterval);
awaitNextAsyncObservationInterval(asyncObservationInterval);

Set<IFuture<?>> runningAsyncJobs = getRunningAsyncJobs();
assertEquals(1, runningAsyncJobs.size());
IFuture<?> firstAsyncJob = runningAsyncJobs.iterator().next();

// forth observation --> no measurement, trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 2);
runningAsyncJobs = getRunningAsyncJobs();
assertEquals(1, runningAsyncJobs.size());
assertNotSame(firstAsyncJob, runningAsyncJobs.iterator().next());

// fifth observation --> no measurement, no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 2);

Set<IFuture<?>> runningAsyncJobs = getRunningAsyncJobs();
assertEquals(1, runningAsyncJobs.size());
IFuture<?> firstAsyncJob = runningAsyncJobs.iterator().next();

// forth observation --> no measurement, trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 2);
runningAsyncJobs = getRunningAsyncJobs();
assertEquals(1, runningAsyncJobs.size());
assertNotSame(firstAsyncJob, runningAsyncJobs.iterator().next());

// fifth observation --> no measurement, no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 2);

runningAsyncJobs.iterator().next().cancel(true);
awaitAsyncJobFinished(asyncJobName);
runningAsyncJobs.iterator().next().cancel(true);
awaitAsyncJobFinished(asyncJobName);
}
}

@Test
Expand All @@ -155,30 +142,26 @@ public void testFailingAsyncJob() throws Exception {
final long asyncObservationInterval = 100;
Callable<Long> callable = mockCallable();
Mockito.when(callable.call()).thenThrow(RuntimeException.class);
AsyncObservableLongMeasurement asyncMeasurement = AsyncObservableLongMeasurement.create("testFailingAsyncJob", callable, () -> RunContexts.empty(), asyncObservationInterval);
ObservableLongMeasurement measurement = Mockito.mock(ObservableLongMeasurement.class);
try (PeriodicAsyncMeasurement<Long> asyncMeasurement = new PeriodicAsyncMeasurement<>("testFailingAsyncJob", callable, () -> RunContexts.empty(), () -> Boolean.TRUE, asyncObservationInterval)) {

// first observation --> no measurement, but trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 1);
// first observation --> no measurement, but trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

// second observation --> no measurement, but no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 1);
// second observation --> no measurement, but no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

// third observation --> no measurement, still no trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 1);
// third observation --> no measurement, still no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

awaitNextAsyncObservationInterval(asyncObservationInterval);
awaitNextAsyncObservationInterval(asyncObservationInterval);

// forth observation --> no measurement, trigger async job
asyncMeasurement.accept(measurement);
Mockito.verifyNoInteractions(measurement);
assertAsyncJobTrigger(callable, 2);
// forth observation --> no measurement, trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 2);
}
}

@Test
Expand All @@ -191,10 +174,8 @@ public void testLimitParallelExecution() throws Exception {
BlockingAsyncObservableLongMeasurementMock blockingMock2 = new BlockingAsyncObservableLongMeasurementMock("testLimitParallelExecution2", 2, asyncObservationInterval);

// first observation --> no measurement, but trigger async jobs
blockingMock1.runObservation();
blockingMock2.runObservation();
Mockito.verifyNoInteractions(blockingMock1.m_measurement);
Mockito.verifyNoInteractions(blockingMock2.m_measurement);
assertNull(blockingMock1.runObservation());
assertNull(blockingMock2.runObservation());
assertEquals(2, getRunningAsyncJobs().size());
assertEquals(2, m_asyncJobExecutionSemaphore.getCompetitorCount());
assertAsyncJobTrigger(blockingMock1.m_callable, 1);
Expand All @@ -207,10 +188,8 @@ public void testLimitParallelExecution() throws Exception {
assertAsyncJobTrigger(blockingMock2.m_callable, 1);

// second observation --> first measurement1, but no trigger async jobs
blockingMock1.runObservation();
blockingMock2.runObservation();
Mockito.verify(blockingMock1.m_measurement).record(1);
Mockito.verifyNoInteractions(blockingMock2.m_measurement);
assertEquals(Long.valueOf(1L), blockingMock1.runObservation());
assertNull(blockingMock2.runObservation());
assertEquals(1, getRunningAsyncJobs().size());
assertEquals(1, m_asyncJobExecutionSemaphore.getCompetitorCount());
assertAsyncJobTrigger(blockingMock1.m_callable, 1);
Expand All @@ -225,10 +204,8 @@ public void testLimitParallelExecution() throws Exception {
awaitNextAsyncObservationInterval(asyncObservationInterval);

// third observation --> second measurement1, first measurement2, trigger async jobs
blockingMock1.runObservation();
blockingMock2.runObservation();
Mockito.verify(blockingMock1.m_measurement, Mockito.times(2)).record(1);
Mockito.verify(blockingMock2.m_measurement).record(2);
assertEquals(Long.valueOf(1L), blockingMock1.runObservation());
assertEquals(Long.valueOf(2L), blockingMock2.runObservation());
awaitAsyncJobFinished(blockingMock1.m_name);
awaitAsyncJobFinished(blockingMock2.m_name);
assertEquals(0, getRunningAsyncJobs().size());
Expand All @@ -237,19 +214,59 @@ public void testLimitParallelExecution() throws Exception {
assertAsyncJobTrigger(blockingMock2.m_callable, 2);
}

@Test
public void testActiveOnThisNode() throws Exception {
assertEquals(0, m_asyncJobExecutionSemaphore.getCompetitorCount());

final long asyncObservationInterval = 100;
Callable<Long> callable = mockCallable();
Mockito.when(callable.call()).then(invocation -> {
Thread.sleep(3); // simulate "db access"
return 111L;
});
AtomicBoolean activeOnThisNode = new AtomicBoolean(false);
try (PeriodicAsyncMeasurement<Long> asyncMeasurement = new PeriodicAsyncMeasurement<>("testActiveOnThisNode", callable, () -> RunContexts.empty(), () -> activeOnThisNode.get(), asyncObservationInterval)) {

// first observation --> no measurement and no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 0);

// second observation --> no measurement, but no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 0);

awaitNextAsyncObservationInterval(asyncObservationInterval);

// third observation --> no measurement and no trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 0);

activeOnThisNode.set(Boolean.TRUE);

// forth observation --> no measurement, but trigger async job
assertNull(asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);

Thread.sleep(20);
// fifth observation --> first measurement, but no trigger async job
assertEquals(Long.valueOf(111L), asyncMeasurement.getAndNext());
assertAsyncJobTrigger(callable, 1);
}
}

private Set<IFuture<?>> getRunningAsyncJobs() {
return BEANS.get(IJobManager.class).getFutures(newAsyncJobFilter().toFilter());
}

private void awaitAsyncJobFinished(String name) {
BEANS.get(IJobManager.class).awaitFinished(newAsyncJobFilter()
.andMatchName(MessageFormatter.arrayFormat(AbstractAsyncObservableMeasurement.ASYNC_JOB_NAME_PATTERN, new String[]{name}).getMessage())
.andMatchName(MessageFormatter.arrayFormat(PeriodicAsyncMeasurement.ASYNC_JOB_NAME_PATTERN, new String[]{name}).getMessage())
.toFilter(), 1, TimeUnit.SECONDS);
}

private FutureFilterBuilder newAsyncJobFilter() {
return Jobs.newFutureFilterBuilder()
.andMatchExecutionHint(AbstractAsyncObservableMeasurement.ASYNC_JOB_EXECUTION_HINT);
.andMatchExecutionHint(PeriodicAsyncMeasurement.ASYNC_JOB_EXECUTION_HINT);
}

private void awaitNextAsyncObservationInterval(long asyncObservationIntervalMillis) throws InterruptedException {
Expand All @@ -272,8 +289,7 @@ private class BlockingAsyncObservableLongMeasurementMock {
private final String m_name;
private final CountDownLatch m_callbackEntry;
private final Callable<Long> m_callable;
private final AsyncObservableLongMeasurement m_asyncMeasurement;
private final ObservableLongMeasurement m_measurement;
private final PeriodicAsyncMeasurement<Long> m_asyncMeasurement;

public BlockingAsyncObservableLongMeasurementMock(String name, long value, long asyncObservationInterval) throws Exception {
m_name = name;
Expand All @@ -283,17 +299,17 @@ public BlockingAsyncObservableLongMeasurementMock(String name, long value, long
m_callbackEntry.await();
return value;
});
m_asyncMeasurement = AsyncObservableLongMeasurement.create(name, m_callable, () -> RunContexts.empty(), asyncObservationInterval);
m_measurement = Mockito.mock(ObservableLongMeasurement.class);
m_asyncMeasurement = new PeriodicAsyncMeasurement<>(name, m_callable, () -> RunContexts.empty(), () -> Boolean.TRUE, asyncObservationInterval);
}

public void runObservation() {
m_asyncMeasurement.accept(m_measurement);
public Long runObservation() {
return m_asyncMeasurement.getAndNext();
}

public void unblockAndAwaitFinished() {
public void unblockAndAwaitFinished() throws InterruptedException {
m_callbackEntry.countDown();
awaitAsyncJobFinished(m_name);
Thread.sleep(20);
}
}
}
Loading

0 comments on commit 28a15a2

Please sign in to comment.