Skip to content

Commit

Permalink
The first step to allow non-lifecycle build stream to specify notific…
Browse files Browse the repository at this point in the history
…ation keywords.

Change-Id: I8421e9d2d8a5e6720d7b8d6de4417bee71c0fa68
PiperOrigin-RevId: 162966141
  • Loading branch information
Kai Xu authored and buchgr committed Jul 24, 2017
1 parent 849213b commit 9e62187
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 29 deletions.
Expand Up @@ -28,6 +28,7 @@
import com.google.devtools.build.v1.BuildStatus;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.OrderedBuildEvent;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.devtools.build.v1.StreamId;
import com.google.devtools.build.v1.StreamId.BuildComponent;
Expand Down Expand Up @@ -91,35 +92,41 @@ public PublishLifecycleEventRequest invocationFinished(Result result) {
.build();
}

/** Utility method used to create a OrderedBuildEvent that delimits the end of the stream. */
public OrderedBuildEvent streamFinished() {
/**
* Utility method used to create a PublishBuildToolEventStreamRequest that delimits the end of the
* stream.
*/
public PublishBuildToolEventStreamRequest streamFinished() {
return streamFinished(streamSequenceNumber.getAndIncrement());
}

/** Utility method used to create a OrderedBuildEvent from an packed bazel event */
public OrderedBuildEvent bazelEvent(Any packedEvent) {
/**
* Utility method used to create a PublishBuildToolEventStreamRequest from an packed bazel event
*/
public PublishBuildToolEventStreamRequest bazelEvent(Any packedEvent) {
return bazelEvent(streamSequenceNumber.getAndIncrement(), packedEvent);
}

@VisibleForTesting
public OrderedBuildEvent bazelEvent(int sequenceNumber, Any packedEvent) {
return orderedBuildEvent(
public PublishBuildToolEventStreamRequest bazelEvent(int sequenceNumber, Any packedEvent) {
return publishBuildToolEventStreamRequest(
sequenceNumber,
com.google.devtools.build.v1.BuildEvent.newBuilder().setBazelEvent(packedEvent));
}

@VisibleForTesting
public OrderedBuildEvent streamFinished(int sequenceNumber) {
return orderedBuildEvent(
public PublishBuildToolEventStreamRequest streamFinished(int sequenceNumber) {
return publishBuildToolEventStreamRequest(
sequenceNumber,
BuildEvent.newBuilder()
.setComponentStreamFinished(
BuildComponentStreamFinished.newBuilder().setType(FINISHED)));
}

@VisibleForTesting
public OrderedBuildEvent orderedBuildEvent(int sequenceNumber, BuildEvent.Builder besEvent) {
return OrderedBuildEvent.newBuilder()
public PublishBuildToolEventStreamRequest publishBuildToolEventStreamRequest(
int sequenceNumber, BuildEvent.Builder besEvent) {
return PublishBuildToolEventStreamRequest.newBuilder()
.setSequenceNumber(sequenceNumber)
.setEvent(besEvent.setEventTime(Timestamps.fromMillis(clock.currentTimeMillis())))
.setStreamId(streamId(besEvent.getEventCase()))
Expand Down
Expand Up @@ -49,7 +49,7 @@
import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.OrderedBuildEvent;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.protobuf.Any;
Expand Down Expand Up @@ -93,9 +93,9 @@ public class BuildEventServiceTransport implements BuildEventTransport {

private final PathConverter pathConverter;
/** Contains all pendingAck events that might be retried in case of failures. */
private ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck;
private ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck;
/** Contains all events should be sent ordered by sequence number. */
private final BlockingDeque<OrderedBuildEvent> pendingSend;
private final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend;
/** Holds the result status of the BuildEventStreamProtos BuildFinished event. */
private Result invocationResult;
/** Used to block until all events have been uploaded. */
Expand Down Expand Up @@ -304,7 +304,8 @@ private void maybeReportUploadError() {
}
}

private synchronized void sendOrderedBuildEvent(OrderedBuildEvent serialisedEvent) {
private synchronized void sendOrderedBuildEvent(
PublishBuildToolEventStreamRequest serialisedEvent) {
if (uploadComplete != null && uploadComplete.isDone()) {
maybeReportUploadError();
return;
Expand Down Expand Up @@ -399,7 +400,7 @@ private Status publishLifecycleEvent(PublishLifecycleEventRequest request) throw
*/
private Status publishEventStream() throws Exception {
// Reschedule unacked messages if required, keeping its original order.
OrderedBuildEvent unacked;
PublishBuildToolEventStreamRequest unacked;
while ((unacked = pendingAck.pollLast()) != null) {
pendingSend.addFirst(unacked);
}
Expand All @@ -411,11 +412,11 @@ private Status publishEventStream() throws Exception {

/** Method responsible for a single Streaming RPC. */
private static ListenableFuture<Status> publishEventStream(
final ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck,
final BlockingDeque<OrderedBuildEvent> pendingSend,
final ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck,
final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend,
final BuildEventServiceClient besClient)
throws Exception {
OrderedBuildEvent event;
PublishBuildToolEventStreamRequest event;
ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
try {
do {
Expand All @@ -432,12 +433,13 @@ private static ListenableFuture<Status> publishEventStream(
return streamDone;
}

private static boolean isLastEvent(OrderedBuildEvent event) {
private static boolean isLastEvent(PublishBuildToolEventStreamRequest event) {
return event != null && event.getEvent().getEventCase() == COMPONENT_STREAM_FINISHED;
}

private static Function<PublishBuildToolEventStreamResponse, Void> ackCallback(
final Deque<OrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) {
final Deque<PublishBuildToolEventStreamRequest> pendingAck,
final BuildEventServiceClient besClient) {
return ack -> {
long pendingSeq = pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber();
long ackSeq = ack.getSequenceNumber();
Expand Down
Expand Up @@ -16,7 +16,7 @@

import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.v1.OrderedBuildEvent;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.Status;
Expand All @@ -38,7 +38,7 @@ public interface BuildEventServiceClient {
* guarantee that all callback calls have been received.
*
* @param ackCallback Consumer called every time a ack message is received.
* @return Listenable future that blocks until the the onDone callback is called.
* @return Listenable future that blocks until the onDone callback is called.
* @throws Exception
*/
ListenableFuture<Status> openStream(
Expand All @@ -50,7 +50,7 @@ ListenableFuture<Status> openStream(
* @param buildEvent Event that should be sent.
* @throws Exception
*/
void sendOverStream(OrderedBuildEvent buildEvent) throws Exception;
void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception;

/**
* Closes the currently opened opened stream. This method does not block. Callers should block on
Expand Down
Expand Up @@ -27,10 +27,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.v1.OrderedBuildEvent;
import com.google.devtools.build.v1.PublishBuildEventGrpc;
import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventBlockingStub;
import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventStub;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.CallCredentials;
Expand Down Expand Up @@ -71,7 +71,7 @@ public class BuildEventServiceGrpcClient implements BuildEventServiceClient {
private final PublishBuildEventStub besAsync;
private final PublishBuildEventBlockingStub besBlocking;
private final ManagedChannel channel;
private final AtomicReference<StreamObserver<OrderedBuildEvent>> streamReference;
private final AtomicReference<StreamObserver<PublishBuildToolEventStreamRequest>> streamReference;

public BuildEventServiceGrpcClient(String serverSpec, boolean tlsEnabled,
@Nullable String tlsCertificateFile, @Nullable String tlsAuthorityOverride,
Expand Down Expand Up @@ -116,7 +116,7 @@ public ListenableFuture<Status> openStream(
return streamFinished;
}

private StreamObserver<OrderedBuildEvent> createStream(
private StreamObserver<PublishBuildToolEventStreamRequest> createStream(
final Function<PublishBuildToolEventStreamResponse, Void> ack,
final SettableFuture<Status> streamFinished) {
return besAsync.publishBuildToolEventStream(
Expand All @@ -141,22 +141,22 @@ public void onCompleted() {
}

@Override
public void sendOverStream(OrderedBuildEvent buildEvent) throws Exception {
public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception {
checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream")
.onNext(buildEvent);
}

@Override
public void closeStream() {
StreamObserver<OrderedBuildEvent> stream;
StreamObserver<PublishBuildToolEventStreamRequest> stream;
if ((stream = streamReference.getAndSet(null)) != null) {
stream.onCompleted();
}
}

@Override
public void abortStream(Status status) {
StreamObserver<OrderedBuildEvent> stream;
StreamObserver<PublishBuildToolEventStreamRequest> stream;
if ((stream = streamReference.getAndSet(null)) != null) {
stream.onError(status.asException());
}
Expand Down

0 comments on commit 9e62187

Please sign in to comment.