diff --git a/WORKSPACE b/WORKSPACE index 8a399002..c379ca98 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -1,5 +1,8 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_file") +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") +load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") +# Some file dependencies http_file( name = "emacs", sha256 = "1439bf7f24e5769f35601dbf332e74dfc07634da6b1e9500af67188a92340a28", @@ -17,3 +20,105 @@ http_file( "https://mirror.pit.teraswitch.com/ubuntu-releases/focal/ubuntu-20.04.4-live-server-amd64.iso", ], ) + +http_archive( + name = "rules_jvm_external", + sha256 = "c21ce8b8c4ccac87c809c317def87644cdc3a9dd650c74f41698d761c95175f3", + strip_prefix = "rules_jvm_external-1498ac6ccd3ea9cdb84afed65aa257c57abf3e0a", + url = "https://github.com/bazelbuild/rules_jvm_external/archive/1498ac6ccd3ea9cdb84afed65aa257c57abf3e0a.zip", +) + +http_archive( + name = "com_google_googleapis", + sha256 = "25bba87daac3f4f7b9f5cd4632ade645de0d41d9600feccfbe6cbdf0cc8f6ae6", + strip_prefix = "googleapis-4f46ddcc9349121b27331e5cb5d18c553696a6c3", + urls = [ + "https://storage.googleapis.com/engflow-tools-public/googleapis-4f46ddcc9349121b27331e5cb5d18c553696a6c3.tar.gz", + "https://github.com/googleapis/googleapis/archive/4f46ddcc9349121b27331e5cb5d18c553696a6c3.tar.gz", + ], +) + +http_archive( + name = "com_engflow_engflowapis", + sha256 = "a04a2d2a978355c85dff8b1018d12a8e0a1e6692add9de716fd4d1b7aa1e2a0d", + strip_prefix = "engflowapis-47aa858b498da13e7863356aaef9c6d05da0a7f2", + urls = [ + "https://storage.googleapis.com/engflow-tools-public/engflowapis-47aa858b498da13e7863356aaef9c6d05da0a7f2.zip", + "https://github.com/EngFlow/engflowapis/archive/47aa858b498da13e7863356aaef9c6d05da0a7f2.zip", + ], +) + +http_archive( + name = "io_grpc_grpc_java", + sha256 = "51bac553d269b97214dbd6aee4e65fc616d8ccd414fc12d708e85979ed4c19b4", + strip_prefix = "grpc-java-1.45.1", + urls = ["https://github.com/grpc/grpc-java/archive/v1.45.1.tar.gz"], +) + +http_archive( + name = "rules_proto", + sha256 = "e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d", + strip_prefix = "rules_proto-4.0.0-3.20.0", + urls = [ + "https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz", + ], +) + +# Loads rules required to compile proto files +http_archive( + name = "rules_proto_grpc", + sha256 = "28724736b7ff49a48cb4b2b8cfa373f89edfcb9e8e492a8d5ab60aa3459314c8", + strip_prefix = "rules_proto_grpc-4.0.1", + urls = ["https://github.com/rules-proto-grpc/rules_proto_grpc/archive/4.0.1.tar.gz"], +) + +load("@rules_proto_grpc//java:repositories.bzl", rules_proto_grpc_java_repos = "java_repos") + +rules_proto_grpc_java_repos() + +load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies", "rules_proto_toolchains") +load("@rules_jvm_external//:defs.bzl", "maven_install") + +rules_proto_dependencies() + +rules_proto_toolchains() + +http_archive( + name = "com_google_protobuf", + sha256 = "990e47a163b4057f98b899eca591981b5b735872b58f59b9ead9cecabbb21a2a", + strip_prefix = "protobuf-21.4", + urls = [ + "https://github.com/protocolbuffers/protobuf/archive/v21.4.tar.gz", + ], +) + +load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps") + +protobuf_deps() + +load("@io_grpc_grpc_java//:repositories.bzl", "IO_GRPC_GRPC_JAVA_ARTIFACTS", "IO_GRPC_GRPC_JAVA_OVERRIDE_TARGETS", "grpc_java_repositories") + +grpc_java_repositories() + +load("@com_google_googleapis//:repository_rules.bzl", "switched_rules_by_language") + +switched_rules_by_language( + name = "com_google_googleapis_imports", + java = True, +) + +maven_install( + artifacts = IO_GRPC_GRPC_JAVA_ARTIFACTS + [ + "commons-cli:commons-cli:1.3.1", + "com.google.oauth-client:google-oauth-client:1.34.1", + ], + generate_compat_repositories = True, + override_targets = IO_GRPC_GRPC_JAVA_OVERRIDE_TARGETS, + repositories = [ + "https://repo.maven.apache.org/maven2/", + ], +) + +load("@maven//:compat.bzl", "compat_repositories") + +compat_repositories() diff --git a/java/com/engflow/notificationqueue/BUILD b/java/com/engflow/notificationqueue/BUILD new file mode 100644 index 00000000..3702962b --- /dev/null +++ b/java/com/engflow/notificationqueue/BUILD @@ -0,0 +1,59 @@ +package(default_visibility = ["//visibility:public"]) + +load("@io_grpc_grpc_java//:java_grpc_library.bzl", "java_grpc_library") + +java_proto_library( + name = "engflowapis_java_proto", + visibility = ["//visibility:public"], + deps = [ + "@com_engflow_engflowapis//engflow/eventstore/v1:build_event_proto", + "@com_engflow_engflowapis//engflow/eventstore/v1:eventstore_proto", + "@com_engflow_engflowapis//engflow/eventstore/v1:notifications_proto", + "@com_engflow_engflowapis//engflow/notification/v1:notification_proto", + "@com_engflow_engflowapis//engflow/notification/v1:notification_queue_proto", + ], +) + +java_grpc_library( + name = "notification_queue_java_grpc", + srcs = [ + "@com_engflow_engflowapis//engflow/notification/v1:notification_queue_proto", + ], + deps = [ + ":engflowapis_java_proto", + ], +) + +java_grpc_library( + name = "eventstore_java_grpc", + srcs = [ + "@com_engflow_engflowapis//engflow/eventstore/v1:eventstore_proto", + ], + deps = [ + ":engflowapis_java_proto", + ], +) + +java_binary( + name = "client", + srcs = [ + "Client.java", + "NotificationOptions.java", + ], + main_class = "com.engflow.notificationqueue.Client", + deps = [ + ":engflowapis_java_proto", + ":eventstore_java_grpc", + ":notification_queue_java_grpc", + "@com_google_protobuf//:any_proto", + "@com_google_protobuf//java/core", + "@io_grpc_grpc_java//api", + "@io_grpc_grpc_java//context", + "@io_grpc_grpc_java//netty", + "@io_grpc_grpc_java//stub", + "@maven//:com_google_code_findbugs_jsr305", + "@maven//:com_google_guava_guava", + "@maven//:commons_cli_commons_cli", + "@maven//:io_netty_netty_handler", + ], +) diff --git a/java/com/engflow/notificationqueue/Client.java b/java/com/engflow/notificationqueue/Client.java new file mode 100644 index 00000000..ee792bf8 --- /dev/null +++ b/java/com/engflow/notificationqueue/Client.java @@ -0,0 +1,225 @@ +package com.engflow.notificationqueue; + +import com.engflow.eventstore.v1.BuildLifecycleEventNotification; +import com.engflow.eventstore.v1.EventStoreGrpc; +import com.engflow.eventstore.v1.GetInvocationRequest; +import com.engflow.eventstore.v1.StreamedBuildEvent; +import com.engflow.notification.v1.Notification; +import com.engflow.notification.v1.NotificationQueueGrpc; +import com.engflow.notification.v1.PullNotificationRequest; +import com.engflow.notification.v1.PullNotificationResponse; +import com.engflow.notification.v1.QueueId; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.netty.handler.ssl.SslContextBuilder; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; + +class Client { + + public static void main(String[] args) throws Exception { + NotificationOptions clientOptions; + try { + clientOptions = new NotificationOptions().parseOptions(args); + } catch (IllegalArgumentException e) { + System.err.println(e); + System.err.println(" --notification_queue_endpoint"); + System.err.println(" --queue_name"); + System.err.println("Please provide also authentication credentials"); + return; + } + + /** + * Channel used by the NotificationQueueGrpc and the EventStoreGrpc stub. It is also possible to + * use two separated channels. + */ + ManagedChannel channel = null; + try { + channel = + createChannel( + clientOptions.getOption("notification_queue_endpoint"), + clientOptions.getOption("tls_certificate"), + clientOptions.getOption("tls_key")); + + } catch (IllegalArgumentException e) { + System.err.println("Unable to open channel to " + args[0].split("=")[1]); + throw new IllegalArgumentException(e); + } catch (IllegalStateException e) { + System.err.println("Unable to open channel to " + args[0].split("=")[1]); + throw new IllegalStateException(e); + } catch (IOException e) { + throw new IOException(e); + } + try { + final Metadata header = new Metadata(); + Metadata.Key userKey = + Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER); + header.put(userKey, "Bearer " + clientOptions.getOption("token")); + pull(channel, clientOptions.getOption("queue_name"), header); + } finally { + if (channel != null) { + channel.shutdownNow(); + try { + channel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + System.out.println("Could not shut down channel within timeout"); + } + } + } + } + + /** + * Gets the notification streams from a {@link NotificationQueueGrpc} stub and then calls + * getInvocation for each received notification + * + * @param channel a grpc channel for cluster connection + * @param queueName name of the queue to listen to + * @param header metadata for token authentication (if needed) + * @throws InterruptedException + */ + private static void pull(ManagedChannel channel, String queueName, Metadata header) + throws InterruptedException { + NotificationQueueGrpc.NotificationQueueStub asyncStub = NotificationQueueGrpc.newStub(channel); + asyncStub = MetadataUtils.attachHeaders(asyncStub, header); + final CountDownLatch finishLatch = new CountDownLatch(1); + StreamObserver requestObserver = + asyncStub.pull( + new StreamObserver() { + @Override + public void onNext(PullNotificationResponse response) { + Notification streamedNotification = response.getNotification().getNotification(); + System.out.println("Notification: " + streamedNotification); + Any notificationContent = streamedNotification.getPayload(); + try { + BuildLifecycleEventNotification lifeCycleEvent = + notificationContent.unpack(BuildLifecycleEventNotification.class); + /** + * Check if this is an invocation started event. Options are INVOCATION_STARTED + * and INVOCATION_FINISHED + */ + if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) { + String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId(); + try { + /** + * Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the + * acquired invocation id + */ + getInvocations(channel, invocation, header); + } catch (InterruptedException e) { + System.err.println("Could not get invocation with uuid " + invocation); + } + } + + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onError(Throwable t) { + System.err.println("Error on request: " + t.getMessage()); + finishLatch.countDown(); + } + + @Override + public void onCompleted() { + System.out.println("Finished pulling notifications"); + finishLatch.countDown(); + } + }); + + try { + requestObserver.onNext( + PullNotificationRequest.newBuilder() + .setQueue(QueueId.newBuilder().setName(queueName).build()) + .build()); + } catch (RuntimeException e) { + // Cancel RPC + requestObserver.onError(e); + throw e; + } + + finishLatch.await(); + } + + /** + * Gets an invocation from the {@link EventStoreGrpc} stub + * + * @param channel a grpc channel for cluster connection + * @param invocationId the id of the required notification + * @param header metadata for token authentication (if needed) + * @throws InterruptedException + */ + public static void getInvocations(ManagedChannel channel, String invocationId, Metadata header) + throws InterruptedException { + EventStoreGrpc.EventStoreStub asyncStub = EventStoreGrpc.newStub(channel); + asyncStub = MetadataUtils.attachHeaders(asyncStub, header); + asyncStub.getInvocation( + GetInvocationRequest.newBuilder().setInvocationId(invocationId).build(), + new StreamObserver() { + @Override + public void onNext(StreamedBuildEvent response) { + System.out.println("Invocation: " + response.toString()); + } + + @Override + public void onError(Throwable t) { + System.err.println("Error on request: " + t.getMessage()); + } + + @Override + public void onCompleted() { + System.out.println("Finished pulling invocation"); + } + }); + } + + private static ManagedChannel createChannel( + String endpoint, @Nullable String clientCertificate, @Nullable String clientKey) + throws IOException { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(endpoint), + "Empty --notification_queue_endpoint, expected \"protocol://host[:port]\""); + + boolean tls = endpoint.startsWith("grpcs://"); + Preconditions.checkArgument( + tls || endpoint.startsWith("grpc://"), + "Bad --notification_queue_endpoint value \"" + + endpoint + + "\", expected \"grpc://\" or \"grpcs://\" protocol"); + endpoint = endpoint.substring(tls ? "grpcs://".length() : "grpc://".length()); + + NettyChannelBuilder builder = + NettyChannelBuilder.forTarget(endpoint) + .negotiationType(tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT); + + if (tls) { + try { + SslContextBuilder contextBuilder = GrpcSslContexts.forClient(); + if (!Strings.isNullOrEmpty(clientCertificate) && !Strings.isNullOrEmpty(clientKey)) { + contextBuilder = + contextBuilder.keyManager(new File(clientCertificate), new File(clientKey)); + } + builder.sslContext(contextBuilder.build()); + } catch (SSLException e) { + throw new IllegalStateException(e); + } catch (IOException e) { + throw new IOException(e); + } + } + return builder.build(); + } +} diff --git a/java/com/engflow/notificationqueue/NotificationOptions.java b/java/com/engflow/notificationqueue/NotificationOptions.java new file mode 100644 index 00000000..0c030498 --- /dev/null +++ b/java/com/engflow/notificationqueue/NotificationOptions.java @@ -0,0 +1,95 @@ +package com.engflow.notificationqueue; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +public class NotificationOptions { + Options options = new Options(); + CommandLineParser parser = new DefaultParser(); + CommandLine cmd; + + public NotificationOptions() {} + + public NotificationOptions parseOptions(String[] args) { + instantiateOptions(); + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + throw new IllegalArgumentException(e); + } + + if (cmd.hasOption("queue_name") && cmd.hasOption("notification_queue_endpoint")) { + return this; + } else { + throw new IllegalArgumentException("Please provide the arguments..."); + } + } + + public String getOption(String value) { + return cmd.getOptionValue(value); + } + + private void instantiateOptions() { + Option queueName = + Option.builder() + .longOpt("queue_name") + .argName("property=value") + .hasArgs() + .valueSeparator() + .numberOfArgs(2) + .desc("The name of the queue that you want to receive notifications from.") + .build(); + options.addOption(queueName); + + Option endpoint = + Option.builder() + .longOpt("notification_queue_endpoint") + .argName("property=value") + .hasArgs() + .valueSeparator() + .numberOfArgs(2) + .desc( + "The service endpoint in protocol://host:port format. The protocol must be 'grpc'" + + " or 'grpcs' signifying plaintext or TLS-encrypted communication" + + " respectively.") + .build(); + options.addOption(endpoint); + + Option certificate = + Option.builder() + .longOpt("tls_certificate") + .argName("property=value") + .hasArgs() + .valueSeparator() + .numberOfArgs(2) + .desc("The path of the mTLS certificate to use as the client certificate.") + .build(); + options.addOption(certificate); + + Option key = + Option.builder() + .longOpt("tls_key") + .argName("property=value") + .hasArgs() + .valueSeparator() + .numberOfArgs(2) + .desc("Path to the `--tls_certificate`'s private key.") + .build(); + options.addOption(key); + + Option remoteHeader = + Option.builder() + .longOpt("token") + .argName("property=value") + .hasArgs() + .valueSeparator() + .numberOfArgs(2) + .desc("Token for open access clusters.") + .build(); + options.addOption(remoteHeader); + } +} diff --git a/java/com/engflow/notificationqueue/README.md b/java/com/engflow/notificationqueue/README.md new file mode 100644 index 00000000..e5637f4d --- /dev/null +++ b/java/com/engflow/notificationqueue/README.md @@ -0,0 +1,109 @@ +# Using the notification queue and event store APIs + +This document explains how to make use of the **notification queue API** to +obtain the notifications for a given build and use the notification data +to obtain invocation events from the **event store API**. To do so, +the [EngFlow API](https://github.com/EngFlow/engflowapis) is used as external +dependency (see [WORKSPACE](../../../../WORKSPACE)). + +# Running the client + +The first argument you must have to execute the client is the cluster `grpc` endpoint +you want to listen to. +The second argument is the queue name. As in this example we are interested in +getting lifecycle events, we pull from the queue called `lifecycle-events`. + +1. `--notification_queue_endpoint=CLUSTER_URL` the URL of the cluster gRPC + server. Must start with `grpc://` or `grpcs://` +2. `--queue_name=eventstore/lifecycle-events` holds the name of the queue to listen + +Next, you must provide authentication information so the client can establish +a connection to the engflow cluster, unless the cluster is totally open. +As for today, two authentication methods are available; certificates or +authentication tokens. These arguments are optional and if they are not given +but are required by the cluster, the connection is rejected. + +### Using certificates to run the client + +In the first case you should have valid credentials in `.crt` and `.key` files. Add +the full path of your certificate files into following arguments + +1. `--tls_certificate=certificate file containing your public key` + holds the path of the crt file to access the cluster. + Only needed for `grpcs://` connections +2. `--tls_key=/path/to/your/file containing your private key` + holds the path of the key file used to access the cluster. + Only needed for `grpcs://` connections + +Run the client using + +``` +bazel run //java/com/engflow/notificationqueue/client -- \ + '--notification_queue_endpoint=grpcs://example.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \ + '--tls_certificate=example_client.crt' '--tls_key=example_client.key' +``` + + +### Using tokens to run the client + +In the second for case authentication you should have a token issued by a +valid authority. In this example we use cluster used by the +[open-source envoy-mobile](https://github.com/envoyproxy/envoy-mobile) project. +It uses [GitHub tokens](https://docs.github.com/en/actions/security-guides/automatic-token-authentication) for authentication method but your server may support +other token provider. To execute the client against the envoy-mobile cluster add +the argument + +1. `--token=token issued by valid authority` + holds the authentication token. + Needed for both `grpc://` and `grpcs://` connections + +Run the client using + +``` +bazel run //java/com/engflow/notificationqueue/client -- \ + '--notification_queue_endpoint=grpcs://envoy.cluster.engflow.com' '--queue_name=eventstore/lifecycle-events' \ + '--token=ghs_vHu2hAHwhg2EjBXrs4koOxk5PfSKVb2lzAUM +``` + +At this point, the client should be listening to the lifecycle events of the cluster. It then remains +to build something and to get its notifications and invocations. + + +# Executing a foo application + +You may now build any targets on the cluster. Open your favorite foo project and build it using +the runtime flags `remote_cache` and `bes_backend` arguments like this + +``` +bazel build //... --remote_cache=CLUSTER_URL --bes_backend=CLUSTER_URL +``` + +You should see a series of notifications like this one + +``` +type_url: "type.googleapis.com/engflow.eventstore.v1.BuildLifecycleEventNotification" +value: "\022$e03d2afe-1a78-4f14-a0f7-85ae65e7e856\"%user_keyword=engflow:StreamSource=BES\"/user_keyword=engflow:StreamType=ClientBEPStream\272\006&\n$1e4f34ee-4669-4ce0-a3fe-5e115ad4772e" +``` + +The value, with some garbage characters, contains the uuid for one invocation. +Using this uuid we may get an invocation like this one + +``` +StreamedBuildEvent: continuation_token: "CiQyMWFjMDlkNC0zZWIzLTQ2MzQtODI0MS0yMzk0Y2JhN2UwMGEQARjSCiAB" +event { + stream_id { + build_id: "c88d85cb-08c5-4227-9a24-8e6ea8f262d8" + component: TOOL + invocation_id: "21ac09d4-3eb3-4634-8241-2394cba7e00a" + } + build_event { + event_time { + seconds: 1658502561 + nanos: 364000000 + } + component_stream_finished { + type: FINISHED + } + } +} +```