diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/Event.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/Event.java index 6f52f20b..ed12efe2 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/Event.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/Event.java @@ -21,6 +21,8 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParseException; import java.lang.reflect.Type; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; /** * Represents an event that should be handled by a background function. This is an internal format @@ -53,6 +55,34 @@ public Event deserialize( context = jsonDeserializationContext.deserialize( adjustContextResource(contextCopy), CloudFunctionsContext.class); + } else if (isPubSubEmulatorPayload(root)) { + JsonObject message = root.getAsJsonObject("message"); + + String timestampString = + message.has("publishTime") + ? message.get("publishTime").getAsString() + : DateTimeFormatter.ISO_INSTANT.format(OffsetDateTime.now()); + + context = + CloudFunctionsContext.builder() + .setEventType("google.pubsub.topic.publish") + .setTimestamp(timestampString) + .setEventId(message.get("messageId").getAsString()) + .setResource( + "{" + + "\"name\":null," + + "\"service\":\"pubsub.googleapis.com\"," + + "\"type\":\"type.googleapis.com/google.pubsub.v1.PubsubMessage\"" + + "}") + .build(); + + JsonObject marshalledData = new JsonObject(); + marshalledData.addProperty("@type", "type.googleapis.com/google.pubsub.v1.PubsubMessage"); + marshalledData.add("data", message.get("data")); + if (message.has("attributes")) { + marshalledData.add("attributes", message.get("attributes")); + } + data = marshalledData; } else { JsonObject rootCopy = root.deepCopy(); rootCopy.remove("data"); @@ -63,6 +93,14 @@ public Event deserialize( return Event.of(data, context); } + private boolean isPubSubEmulatorPayload(JsonObject root) { + if (root.has("subscription") && root.has("message") && root.get("message").isJsonObject()) { + JsonObject message = root.getAsJsonObject("message"); + return message.has("data") && message.has("messageId"); + } + return false; + } + /** * Replaces 'resource' member from context JSON with its string equivalent. The original * 'resource' member can be a JSON object itself while {@link CloudFunctionsContext} requires it diff --git a/invoker/core/src/test/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutorTest.java b/invoker/core/src/test/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutorTest.java index 02c5d630..d00b0b4f 100644 --- a/invoker/core/src/test/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutorTest.java +++ b/invoker/core/src/test/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutorTest.java @@ -1,10 +1,15 @@ package com.google.cloud.functions.invoker; import static com.google.cloud.functions.invoker.BackgroundFunctionExecutor.backgroundFunctionTypeArgument; +import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth8.assertThat; import com.google.cloud.functions.BackgroundFunction; import com.google.cloud.functions.Context; +import com.google.gson.JsonObject; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; @@ -20,7 +25,8 @@ private static class PubSubMessage { } private static class PubSubFunction implements BackgroundFunction { - @Override public void accept(PubSubMessage payload, Context context) {} + @Override + public void accept(PubSubMessage payload, Context context) {} } @Test @@ -31,7 +37,8 @@ public void backgroundFunctionTypeArgument_simple() { private abstract static class Parent implements BackgroundFunction {} private static class Child extends Parent { - @Override public void accept(PubSubMessage payload, Context context) {} + @Override + public void accept(PubSubMessage payload, Context context) {} } @Test @@ -42,7 +49,8 @@ public void backgroundFunctionTypeArgument_superclass() { private interface GenericParent extends BackgroundFunction {} private static class GenericChild implements GenericParent { - @Override public void accept(PubSubMessage payload, Context context) {} + @Override + public void accept(PubSubMessage payload, Context context) {} } @Test @@ -52,7 +60,8 @@ public void backgroundFunctionTypeArgument_genericInterface() { @SuppressWarnings("rawtypes") private static class ForgotTypeParameter implements BackgroundFunction { - @Override public void accept(Object payload, Context context) {} + @Override + public void accept(Object payload, Context context) {} } @Test @@ -62,4 +71,41 @@ public void backgroundFunctionTypeArgument_raw() { (Class>) (Class) ForgotTypeParameter.class; assertThat(backgroundFunctionTypeArgument(c)).isEmpty(); } + + @Test + public void parseLegacyEventPubSub() throws IOException { + try (Reader reader = + new InputStreamReader(getClass().getResourceAsStream("/pubsub_background.json"))) { + Event event = BackgroundFunctionExecutor.parseLegacyEvent(reader); + + Context context = event.getContext(); + assertThat(context.eventType()).isEqualTo("google.pubsub.topic.publish"); + assertThat(context.eventId()).isEqualTo("1"); + assertThat(context.timestamp()).isEqualTo("2021-06-28T05:46:32.390Z"); + + JsonObject data = event.getData().getAsJsonObject(); + assertThat(data.get("data").getAsString()).isEqualTo("eyJmb28iOiJiYXIifQ=="); + String attr = data.get("attributes").getAsJsonObject().get("test").getAsString(); + assertThat(attr).isEqualTo("123"); + } + } + + @Test + public void parseLegacyEventPubSubEmulator() throws IOException { + try (Reader reader = + new InputStreamReader(getClass().getResourceAsStream("/pubsub_emulator.json"))) { + Event event = BackgroundFunctionExecutor.parseLegacyEvent(reader); + + Context context = event.getContext(); + assertThat(context.eventType()).isEqualTo("google.pubsub.topic.publish"); + assertThat(context.eventId()).isEqualTo("1"); + assertThat(context.timestamp()).isNotNull(); + ; + + JsonObject data = event.getData().getAsJsonObject(); + assertThat(data.get("data").getAsString()).isEqualTo("eyJmb28iOiJiYXIifQ=="); + String attr = data.get("attributes").getAsJsonObject().get("test").getAsString(); + assertThat(attr).isEqualTo("123"); + } + } } diff --git a/invoker/core/src/test/resources/pubsub_background.json b/invoker/core/src/test/resources/pubsub_background.json new file mode 100644 index 00000000..5f9927cb --- /dev/null +++ b/invoker/core/src/test/resources/pubsub_background.json @@ -0,0 +1,19 @@ +{ + "data": { + "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "data": "eyJmb28iOiJiYXIifQ==", + "attributes": { + "test": "123" + } + }, + "context": { + "eventId": "1", + "eventType": "google.pubsub.topic.publish", + "resource": { + "name": "projects/FOO/topics/BAR_TOPIC", + "service": "pubsub.googleapis.com", + "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage" + }, + "timestamp": "2021-06-28T05:46:32.390Z" + } +} \ No newline at end of file diff --git a/invoker/core/src/test/resources/pubsub_emulator.json b/invoker/core/src/test/resources/pubsub_emulator.json new file mode 100644 index 00000000..cdfe340a --- /dev/null +++ b/invoker/core/src/test/resources/pubsub_emulator.json @@ -0,0 +1,10 @@ +{ + "subscription": "projects/FOO/subscriptions/BAR_SUB", + "message": { + "data": "eyJmb28iOiJiYXIifQ==", + "messageId": "1", + "attributes": { + "test": "123" + } + } +} \ No newline at end of file