Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import java.time.Clock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -49,24 +48,6 @@ public BoundedQueueExecutor(
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory) {
this(
maximumPoolSize,
keepAliveTime,
unit,
maximumElementsOutstanding,
maximumBytesOutstanding,
threadFactory,
Clock.systemUTC());
}

public BoundedQueueExecutor(
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory,
Clock clock) {
executor =
new ThreadPoolExecutor(
maximumPoolSize,
Expand All @@ -80,7 +61,7 @@ protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
synchronized (this) {
if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
startTimeMaxActiveThreadsUsed = clock.millis();
startTimeMaxActiveThreadsUsed = System.currentTimeMillis();
}
}
}
Expand All @@ -90,7 +71,8 @@ protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
synchronized (this) {
if (activeCount.getAndDecrement() == maximumPoolSize) {
totalTimeMaxActiveThreadsUsed += (clock.millis() - startTimeMaxActiveThreadsUsed);
totalTimeMaxActiveThreadsUsed +=
(System.currentTimeMillis() - startTimeMaxActiveThreadsUsed);
startTimeMaxActiveThreadsUsed = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.InstructionInput;
Expand All @@ -52,7 +56,6 @@
import com.google.api.services.dataflow.model.WriteInstruction;
import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -167,6 +170,7 @@
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
Expand Down Expand Up @@ -2851,24 +2855,10 @@ public void testActiveWorkForShardedKeys() throws Exception {
}

@Test
@Ignore // Test is flaky on Jenkins (#27555)
public void testMaxThreadMetric() throws Exception {
int maxThreads = 2;
int threadExpiration = 60;

Clock mockClock = Mockito.mock(Clock.class);
CountDownLatch latch = new CountDownLatch(2);
doAnswer(
invocation -> {
latch.countDown();
// Return 0 until we are called once (reach max thread count).
if (latch.getCount() == 1) {
return 0L;
}
return 1000L;
})
.when(mockClock)
.millis();

// setting up actual implementation of executor instead of mocking to keep track of
// active thread count.
BoundedQueueExecutor executor =
Expand All @@ -2881,8 +2871,7 @@ public void testMaxThreadMetric() throws Exception {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
.build(),
mockClock);
.build());

StreamingDataflowWorker.ComputationState computationState =
new StreamingDataflowWorker.ComputationState(
Expand All @@ -2894,17 +2883,15 @@ public void testMaxThreadMetric() throws Exception {

ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1);

// overriding definition of MockWork to add sleep, which will help us keep track of how
// long each work item takes to process and therefore let us manipulate how long the time
// at which we're at max threads is.
MockWork m2 =
new MockWork(2) {
@Override
public void run() {
try {
// Make sure we don't finish before both MockWork are executed, thus afterExecute must
// be called after
// beforeExecute.
while (latch.getCount() > 1) {
Thread.sleep(50);
}
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand All @@ -2916,9 +2903,7 @@ public void run() {
@Override
public void run() {
try {
while (latch.getCount() > 1) {
Thread.sleep(50);
}
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand All @@ -2928,11 +2913,13 @@ public void run() {
assertTrue(computationState.activateWork(key1Shard1, m2));
assertTrue(computationState.activateWork(key1Shard1, m3));
executor.execute(m2, m2.getWorkItem().getSerializedSize());

executor.execute(m3, m3.getWorkItem().getSerializedSize());
// Wait until the afterExecute is called.
latch.await();

assertEquals(1000L, executor.allThreadsActiveTime());
// Will get close to 1000ms that both work items are processing (sleeping, really)
// give or take a few ms.
long i = 990L;
assertTrue(executor.allThreadsActiveTime() >= i);
executor.shutdown();
}

Expand Down