Skip to content
This repository has been archived by the owner on May 14, 2022. It is now read-only.

Commit

Permalink
gateway: properly dispose internal subscriptions (#1187)
Browse files Browse the repository at this point in the history
Ensure keepAlive publishers (interval subscriptions) don't leak when
grpc connections (observeJobs) are terminated.
  • Loading branch information
fabiokung committed Nov 30, 2021
1 parent e848aef commit 366d0ec
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.util.ExceptionExt;
import com.netflix.titus.common.util.FunctionExt;
import com.netflix.titus.grpc.protogen.JobChangeNotification;
import com.netflix.titus.grpc.protogen.JobManagementServiceGrpc.JobManagementServiceStub;
import com.netflix.titus.grpc.protogen.KeepAliveRequest;
Expand All @@ -30,6 +33,8 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

Expand All @@ -38,6 +43,7 @@
* Its usage is limited now to communication between TitusGateway and TJC.
*/
public class RemoteJobManagementClientWithKeepAlive extends RemoteJobManagementClient {
private static final Logger logger = LoggerFactory.getLogger(RemoteJobManagementClientWithKeepAlive.class);

private final JobConnectorConfiguration configuration;

Expand All @@ -61,8 +67,12 @@ public RemoteJobManagementClientWithKeepAlive(String clientName,
this.titusRuntime = titusRuntime;
}

@Override
protected Flux<JobChangeNotification> connectObserveJobs(Map<String, String> filteringCriteria) {
/**
* Only used for unit testing to ensure internal subscriptions are not leaked.
* The <t>keepAliveCompleted</t> callback runs when the keep alive (interval) subscription is disposed
*/
@VisibleForTesting
Flux<JobChangeNotification> connectObserveJobs(Map<String, String> filteringCriteria, Runnable keepAliveCompleted) {
return Flux.create(sink -> {
AtomicReference<ClientCallStreamObserver> requestStreamRef = new AtomicReference<>();
StreamObserver<JobChangeNotification> grpcStreamObserver = new ClientResponseObserver<JobChangeNotification, JobChangeNotification>() {
Expand Down Expand Up @@ -94,31 +104,44 @@ public void onCompleted() {
);

// Now emit keep alive requests periodically
Disposable keepAliveSubscription = Flux.interval(Duration.ofMillis(configuration.getKeepAliveIntervalMs())).subscribe(
next -> {
try {
clientStreamObserver.onNext(ObserveJobsWithKeepAliveRequest.newBuilder()
.setKeepAliveRequest(KeepAliveRequest.newBuilder()
.setRequestId(keepAliveIdGen.getAndIncrement())
.setTimestamp(titusRuntime.getClock().wallTime())
Disposable keepAliveSubscription = Flux.interval(Duration.ofMillis(configuration.getKeepAliveIntervalMs()))
// doOnCancel is confusing: it's called when the subscription is disposed
// It should be named doOnDispose. See: https://github.com/reactor/reactor-core/issues/1240
.doOnCancel(
() -> ExceptionExt.doCatch(keepAliveCompleted)
.ifPresent(t -> logger.warn("Error running the keepAliveCompleted callback", t))
)
.subscribe(
next -> {
try {
clientStreamObserver.onNext(ObserveJobsWithKeepAliveRequest.newBuilder()
.setKeepAliveRequest(KeepAliveRequest.newBuilder()
.setRequestId(keepAliveIdGen.getAndIncrement())
.setTimestamp(titusRuntime.getClock().wallTime())
.build()
)
.build()
)
.build()
);
} catch (Exception error) {
clientStreamObserver.onError(error);
}
},
sink::error,
() -> sink.error(new IllegalArgumentException("Keep alive stream terminated. Closing the event stream"))
);

sink.onCancel(() -> {
);
} catch (Exception error) {
clientStreamObserver.onError(error);
}
},
sink::error,
() -> sink.error(new IllegalArgumentException("Keep alive stream terminated. Closing the event stream"))
);

sink.onDispose(() -> {
keepAliveSubscription.dispose();
if (requestStreamRef.get() != null) {
requestStreamRef.get().cancel("ObserveJobs stream cancelled by the client", null);
}
});
});

}

@Override
protected Flux<JobChangeNotification> connectObserveJobs(Map<String, String> filteringCriteria) {
return connectObserveJobs(filteringCriteria, FunctionExt.noop());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.netflix.titus.common.runtime.TitusRuntime;
import com.netflix.titus.common.runtime.TitusRuntimes;
import com.netflix.titus.common.util.Evaluators;
import com.netflix.titus.common.util.FunctionExt;
import com.netflix.titus.common.util.archaius2.Archaius2Ext;
import com.netflix.titus.grpc.protogen.Job;
import com.netflix.titus.grpc.protogen.JobChangeNotification;
Expand All @@ -47,6 +49,7 @@
import reactor.core.Disposable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;

public class RemoteJobManagementClientWithKeepAliveTest {
Expand Down Expand Up @@ -126,30 +129,46 @@ public void testKeepAlive() throws InterruptedException {

@Test
public void testClientCancel() throws InterruptedException {
Disposable disposable = client.connectObserveJobs(Collections.emptyMap()).subscribe();
AtomicBoolean keepAliveCompleted = new AtomicBoolean();
Disposable disposable = client.connectObserveJobs(Collections.emptyMap(), () -> keepAliveCompleted.set(true)).subscribe();
// Read and discard the query message
receivedFromClient.poll(30, TimeUnit.SECONDS);
disposable.dispose();
Object value = receivedFromClient.poll(30, TimeUnit.SECONDS);
assertThat(value).isInstanceOf(StatusRuntimeException.class);
assertThat(((StatusRuntimeException) value).getStatus().getCode()).isEqualTo(Status.Code.CANCELLED);
assertThat(keepAliveCompleted).isTrue();
assertThat(clientCancelled).isTrue();
}

@Test
public void testServerError() throws InterruptedException {
Iterator<JobChangeNotification> it = newClientConnection();
AtomicBoolean keepAliveCompleted = new AtomicBoolean();
Iterator<JobChangeNotification> it = newClientConnection(() -> keepAliveCompleted.set(true));
// Read and discard the query message
receivedFromClient.poll(30, TimeUnit.SECONDS);

responseObserver.onError(new StatusRuntimeException(Status.ABORTED.augmentDescription("simulated error")));
try {
it.next();
fail("expected an exception");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.ABORTED);
assertThat(keepAliveCompleted).isTrue();
}
}

@Test
public void testServerCompleted() throws InterruptedException {
AtomicBoolean keepAliveCompleted = new AtomicBoolean();
Iterator<JobChangeNotification> it = newClientConnection(() -> keepAliveCompleted.set(true));
waitForClientKeepAliveRequest();

responseObserver.onCompleted();
assertThat(it.hasNext()).isFalse();
assertThat(keepAliveCompleted).isTrue();
}

private Server newServerConnection() {
try {
return InProcessServerBuilder.forName("test")
Expand All @@ -162,15 +181,19 @@ private Server newServerConnection() {
}
}

private Iterator<JobChangeNotification> newClientConnection() throws InterruptedException {
Iterator<JobChangeNotification> it = client.connectObserveJobs(Collections.emptyMap()).toIterable().iterator();
private Iterator<JobChangeNotification> newClientConnection(Runnable keepAliveCompleted) throws InterruptedException {
Iterator<JobChangeNotification> it = client.connectObserveJobs(Collections.emptyMap(), keepAliveCompleted).toIterable().iterator();

Object clientRequestEvent = receivedFromClient.poll(30, TimeUnit.SECONDS);
assertThat(clientRequestEvent).isNotNull().isInstanceOf(ObserveJobsWithKeepAliveRequest.class);
assertThat(((ObserveJobsWithKeepAliveRequest) clientRequestEvent).getKindCase()).isEqualTo(ObserveJobsWithKeepAliveRequest.KindCase.QUERY);
return it;
}

private Iterator<JobChangeNotification> newClientConnection() throws InterruptedException {
return newClientConnection(FunctionExt.noop());
}

private JobChangeNotification expectJobChangeNotification(Iterator<JobChangeNotification> it, JobChangeNotification.NotificationCase eventCase) {
JobChangeNotification jobEvent = it.next();
assertThat(jobEvent).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
public final class FunctionExt {
private static final Predicate TRUE_PREDICATE = ignored -> true;
private static final Predicate FALSE_PREDICATE = ignored -> false;
private static final Runnable NOOP = () -> {
};

public static <T> Predicate<T> alwaysTrue() {
return TRUE_PREDICATE;
Expand All @@ -34,6 +36,10 @@ public static <T> Predicate<T> alwaysFalse() {
return FALSE_PREDICATE;
}

public static Runnable noop() {
return NOOP;
}

public static <T> Optional<T> ifNotPresent(Optional<T> opt, Runnable what) {
if (!opt.isPresent()) {
what.run();
Expand Down

0 comments on commit 366d0ec

Please sign in to comment.