Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions java/com/engflow/notificationqueue/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ java_binary(
":engflowapis_java_proto",
":eventstore_java_grpc",
":notification_queue_java_grpc",
"//java/com/engflow/notificationqueue/demoserver:server_java_grpc_proto",
"//java/com/engflow/notificationqueue/demoserver:server_java_proto",
"//java/com/engflow/notificationqueue/demoserver:server_proto",
"@com_google_protobuf//:any_proto",
"@com_google_protobuf//java/core",
"@io_grpc_grpc_java//api",
Expand Down
76 changes: 70 additions & 6 deletions java/com/engflow/notificationqueue/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
import com.engflow.notification.v1.PullNotificationRequest;
import com.engflow.notification.v1.PullNotificationResponse;
import com.engflow.notification.v1.QueueId;
import com.engflow.notificationqueue.demoserver.EngFlowRequest;
import com.engflow.notificationqueue.demoserver.EngFlowResponse;
import com.engflow.notificationqueue.demoserver.ForwardingGrpc;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -38,7 +42,9 @@ public static void main(String[] args) throws Exception {
System.err.println(e);
System.err.println(" --notification_queue_endpoint");
System.err.println(" --queue_name");
System.err.println("Please provide also authentication credentials");
System.err.println(
"Please provide also authentication credentials "
+ "and if you want to forward then add another external server endpoint");
return;
}

Expand All @@ -62,17 +68,31 @@ public static void main(String[] args) throws Exception {
throw new IllegalStateException(e);
}

ManagedChannel forwardChannel = null;
if (!Strings.isNullOrEmpty(clientOptions.getOption("forward"))) {
try {
forwardChannel = createChannel(clientOptions.getOption("forward"), null, null);
} catch (IllegalArgumentException e) {
System.err.println("Could not open forwarding channel");
} catch (IllegalStateException e) {
System.err.println("Could not open forwarding channel");
} catch (IOException e) {
System.err.println("Could not open forwarding channel");
}
}
try {
final Metadata header = new Metadata();
Metadata.Key<String> userKey =
Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);
header.put(userKey, "Bearer " + clientOptions.getOption("token"));
pull(channel, clientOptions.getOption("queue_name"), header);
pull(channel, clientOptions.getOption("queue_name"), header, forwardChannel);
} finally {
if (channel != null) {
channel.shutdownNow();
forwardChannel.shutdownNow();
try {
channel.awaitTermination(10, TimeUnit.SECONDS);
forwardChannel.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println("Could not shut down channel within timeout");
}
Expand All @@ -89,18 +109,30 @@ public static void main(String[] args) throws Exception {
* @param header metadata for token authentication (if needed)
* @throws InterruptedException
*/
private static void pull(ManagedChannel channel, String queueName, Metadata header)
private static void pull(
ManagedChannel channel, String queueName, Metadata header, ManagedChannel forwardChannel)
throws InterruptedException {

NotificationQueueGrpc.NotificationQueueStub asyncStub = NotificationQueueGrpc.newStub(channel);
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
final CountDownLatch finishLatch = new CountDownLatch(1);
System.out.println("Listening for build events...");
StreamObserver<PullNotificationRequest> requestObserver =
asyncStub.pull(
new StreamObserver<PullNotificationResponse>() {
@Override
public void onNext(PullNotificationResponse response) {
Notification streamedNotification = response.getNotification().getNotification();
System.out.println("Notification: " + streamedNotification);
System.out.println("Notification: " + streamedNotification.toString());
try {
/** Forward notification data to external server */
forwardToBESStub(
forwardChannel,
streamedNotification.getId().toString(),
streamedNotification.getPayload().toString());
} catch (Exception e) {
System.err.println("Could not forward notification to external sever...");
}
Any notificationContent = streamedNotification.getPayload();
try {
BuildLifecycleEventNotification lifeCycleEvent =
Expand All @@ -116,7 +148,7 @@ public void onNext(PullNotificationResponse response) {
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
* acquired invocation id
*/
getInvocations(channel, invocation, header);
getInvocations(channel, invocation, header, forwardChannel);
} catch (InterruptedException e) {
System.err.println("Could not get invocation with uuid " + invocation);
}
Expand Down Expand Up @@ -162,7 +194,8 @@ public void onCompleted() {
* @param header metadata for token authentication (if needed)
* @throws InterruptedException
*/
public static void getInvocations(ManagedChannel channel, String invocationId, Metadata header)
private static void getInvocations(
ManagedChannel channel, String invocationId, Metadata header, ManagedChannel forwardChannel)
throws InterruptedException {
EventStoreGrpc.EventStoreStub asyncStub = EventStoreGrpc.newStub(channel);
asyncStub = MetadataUtils.attachHeaders(asyncStub, header);
Expand All @@ -172,6 +205,13 @@ public static void getInvocations(ManagedChannel channel, String invocationId, M
@Override
public void onNext(StreamedBuildEvent response) {
System.out.println("Invocation: " + response.toString());
String buildEvent = response.getEvent().toString();
try {
/** Forward invocation data to external server */
forwardToBESStub(forwardChannel, invocationId, buildEvent);
} catch (Exception e) {
System.err.println("Could not forward invocation to external sever...");
}
}

@Override
Expand All @@ -186,6 +226,30 @@ public void onCompleted() {
});
}

/**
* Forwards data to an external grpc stub.
*
* @param channel a grpc channel for connection
* @param id the id of the data to send
* @param payload the payload
*/
private static void forwardToBESStub(ManagedChannel channel, String id, String payload) {
if (channel == null) {
return;
}
final ForwardingGrpc.ForwardingBlockingStub blockingStub =
ForwardingGrpc.newBlockingStub(channel);
EngFlowRequest request = EngFlowRequest.newBuilder().setId(id).setPayload(payload).build();
EngFlowResponse response;
try {
response = blockingStub.forwardStream(request);
System.out.println("Forwarding: " + response.getMessage());
} catch (StatusRuntimeException e) {
System.out.println("Could not forward data to external server.");
return;
}
}

private static ManagedChannel createChannel(
String endpoint, @Nullable String clientCertificate, @Nullable String clientKey)
throws IOException {
Expand Down
11 changes: 11 additions & 0 deletions java/com/engflow/notificationqueue/NotificationOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,16 @@ private void instantiateOptions() {
.desc("Token for open access clusters.")
.build();
options.addOption(remoteHeader);

Option forwardSever =
Option.builder()
.longOpt("forward")
.argName("property=value")
.hasArgs()
.valueSeparator()
.numberOfArgs(2)
.desc("The external service endpoint in grpc://host:port format.")
.build();
options.addOption(forwardSever);
}
}
54 changes: 41 additions & 13 deletions java/com/engflow/notificationqueue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependency (see [WORKSPACE](../../../../WORKSPACE)).
The first argument you must have to execute the client is the cluster `grpc` endpoint
you want to listen to.
The second argument is the queue name. As in this example we are interested in
getting lifecycle events, we pull from the queue called `lifecycle-events`.
getting lifecycle events from the cluster, we pull from the queue called `lifecycle-events`.

1. `--notification_queue_endpoint=CLUSTER_URL` the URL of the cluster gRPC
server. Must start with `grpc://` or `grpcs://`
Expand All @@ -26,7 +26,7 @@ but are required by the cluster, the connection is rejected.
### Using certificates to run the client

In the first case you should have valid credentials in `.crt` and `.key` files. Add
the full path of your certificate files into following arguments
the full path of your certificate files into the following options

1. `--tls_certificate=certificate file containing your public key`
holds the path of the crt file to access the cluster.
Expand All @@ -37,8 +37,8 @@ the full path of your certificate files into following arguments

Run the client using

```
bazel run //java/com/engflow/notificationqueue/client -- \
```bash
bazel run //java/com/engflow/notificationqueue:client -- \
'--notification_queue_endpoint=grpcs://example.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \
'--tls_certificate=example_client.crt' '--tls_key=example_client.key'
```
Expand All @@ -59,36 +59,61 @@ the argument

Run the client using

```
bazel run //java/com/engflow/notificationqueue/client -- \
```bash
bazel run //java/com/engflow/notificationqueue:client -- \
'--notification_queue_endpoint=grpcs://envoy.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \
'--token=ghs_vHu2hAHwhg2EjBXrs4koOxk5PfSKVb2lzAUM
'--token=ghs_vHu2hAHwhg2EjBXrs4koOxk5PfSKVb2lzAUM'
```

At this point, the client should be listening to the lifecycle events of the cluster. It then remains
to build something and to get its notifications and invocations.
Note: The token provided in the example is not valid. You should count with a
valid envoy-mobile token to use envoy-cluster. Change your target cluster and
acquire a valid token for executing the client.

### Forwarding data to external server

One useful use case for the **notification** and **event store APIs** is forwarding the
received data to external servers, for instance another _build event store_ or _result store_ sever.
The current client example implements communication
with a demo grpc stub. To do so, the client uses `Request` and `Response`
abstractions provided by a [server proto definition].

To execute a client that forwards information to an external server you add the flag
`--forward`. Let us first execute a demonstration server that will receive the data from
the client. To execute the server run

```bash
bazel run //java/com/engflow/notificationqueue/demoserver:server
```

This will start the demo server listening at `localhost:50051`. Now you can start the client with a given
authentication method and the forwarding endpoint

```bash
bazel run //java/com/engflow/notificationqueue:client -- \
'--notification_queue_endpoint=grpcs://example.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \
'--tls_certificate=example_client.crt' '--tls_key=example_client.key' '--forward=grpc://localhost:50051'
```

# Executing a foo application

You may now build any targets on the cluster. Open your favorite foo project and build it using
the runtime flags `remote_cache` and `bes_backend` arguments like this

```
bazel build //... --remote_cache=CLUSTER_URL --bes_backend=CLUSTER_URL
```bash
bazel build //... '--remote_cache=grpcs://example.cluster.engflow.com' '--bes_backend=grpcs://example.cluster.engflow.com'
```

You should see a series of notifications like this one

```
```json
type_url: "type.googleapis.com/engflow.eventstore.v1.BuildLifecycleEventNotification"
value: "\022$e03d2afe-1a78-4f14-a0f7-85ae65e7e856\"%user_keyword=engflow:StreamSource=BES\"/user_keyword=engflow:StreamType=ClientBEPStream\272\006&\n$1e4f34ee-4669-4ce0-a3fe-5e115ad4772e"
```

The value, with some garbage characters, contains the uuid for one invocation.
Using this uuid we may get an invocation like this one

```
```json
StreamedBuildEvent: continuation_token: "CiQyMWFjMDlkNC0zZWIzLTQ2MzQtODI0MS0yMzk0Y2JhN2UwMGEQARjSCiAB"
event {
stream_id {
Expand All @@ -107,3 +132,6 @@ event {
}
}
```


[server proto definition]: demoserver/server.proto
39 changes: 39 additions & 0 deletions java/com/engflow/notificationqueue/demoserver/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package(default_visibility = ["//visibility:public"])

load("@io_grpc_grpc_java//:java_grpc_library.bzl", "java_grpc_library")

proto_library(
name = "server_proto",
srcs = ["server.proto"],
)

java_proto_library(
name = "server_java_proto",
deps = [":server_proto"],
)

java_grpc_library(
name = "server_java_grpc_proto",
srcs = [
":server_proto",
],
deps = [
":server_java_proto",
],
)

java_binary(
name = "server",
srcs = [
"DemoServer.java",
],
main_class = "com.engflow.notificationqueue.demoserver.DemoServer",
deps = [
":server_java_grpc_proto",
":server_java_proto",
":server_proto",
"@io_grpc_grpc_java//api",
"@io_grpc_grpc_java//netty",
"@io_grpc_grpc_java//stub",
],
)
Loading