Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing pub sub messages #239

Open
jimmyff opened this issue May 26, 2021 · 22 comments
Open

Parsing pub sub messages #239

jimmyff opened this issue May 26, 2021 · 22 comments
Labels
kind/enhancement New feature or request

Comments

@jimmyff
Copy link

jimmyff commented May 26, 2021

Hi,
I'm moving over some of my Pub Sub endpoints to functions_framework (from firebase_functions_interop), I've just started with the package so still finding my way around. I've seen support for CloudEvent (which are new to me) and I'm wondering if I can use those to parse pub sub messages ?

I see a mention of parsing the pub sub message data structure as a CloudEvent here:
#1 (comment)

There is a CloudEvent class included in this package, do I need to make a new similar class for PubSubMessage? I think doing that I might be complicating the situation as I might be able to use the existing CloudEvent class (perhaps with like the schema for pub sub messages)

If this is feasible, this might be a really cool thing to include an example for?

@jimmyff
Copy link
Author

jimmyff commented May 28, 2021

Just seen that @grant has already created the necessary classes:
https://github.com/grant/google-cloudevents-dart/blob/main/cloud/pubsub/v1/MessagePublishedData.dart

I'm unclear how to use FF to parse my pubsub payload using this class. I can see how I could do it without using CloudEvent by creating a Request & Response class but I think I might be missing a trick not using CloudEvent

@grant
Copy link
Contributor

grant commented May 28, 2021

Hey @jimmyff,

I'm not super familiar with parsing JSON in Dart, but I think I updated that experimental type library to make it work. Basically, you need to parse the cloudevent.data, which is a Object? with that type.

I re-generated with some methods that I think help with Message.fromJson. Can you try it out – just copy / paste the code.

https://github.com/grant/google-cloudevents-dart/blob/main/cloud/pubsub/v1/MessagePublishedData.dart

If that library is useful, we could perhaps make it more official and publish it.

@jimmyff
Copy link
Author

jimmyff commented Jun 17, 2021

I'm working my way around to testing this stuff, but just found pub sub endpoint example/test that's worth trying too:
https://github.com/GoogleCloudPlatform/functions-framework-dart/blob/main/test/hello/lib/src/json_handlers.dart#L20

@jimmyff
Copy link
Author

jimmyff commented Jun 18, 2021

I've tried a couple of things but I'm getting errors when PubSub pushed to a Functions Framework endpoint.

With the following code, taken from the example here:

@CloudFunction()
void function(CloudEvent event, RequestContext context) {
  context.logger
      .info('[CloudEvent] source: ${event.source}, subject: ${event.subject}');

  final pubSub = PubSub.fromJson(event.data as Map<String, dynamic>);
  context.logger.info('[PubSub] subscription: ${pubSub.subscription}');
}

I get the following error:

Bad request. Could not decode the request as a structured-mode message. 
package:json_annotation/src/checked_helpers.dart 
35 $checkedNew package:functions_framework/src/cloud_event.g.dart 
12 _$CloudEventFromJson package:functions_framework/src/cloud_event.dart 
57 new CloudEvent.fromJson package:functions_framework/src/targets/cloud_event_targets.dart 
108 _decodeValidCloudEvent package:functions_framework/src/targets/cloud_event_targets.dart 
82 _decodeStructured 
===== asynchronous gap =========================== 
package:functions_framework/src/targets/cloud_event_targets.dart 
53 CloudEventWithContextFunctionTarget.handler 
===== asynchronous gap =========================== 
package:functions_framework/src/logging.dart 
171 cloudLoggingMiddleware.hostedLoggingMiddleware

If I try this code, taken from example here:

@CloudFunction()
void function(PubSub pubSub, RequestContext context) {
  context.logger.info('[PubSub] subscription: ${pubSub.subscription}');
  context.responseHeaders['subscription'] = pubSub.subscription;
}

I get the following error:

Bad request. There was an error parsing the provided JSON data. 
package:test_pubsub/pub_sub_types.g.dart 
28 _$PubSubMessageFromJson package:test_pubsub/pub_sub_types.dart 
32 new PubSubMessage.fromJson package:test_pubsub/pub_sub_types.g.dart 
13 _$PubSubFromJson package:test_pubsub/pub_sub_types.dart 
15 new PubSub.fromJson app/os_cloud_functions/bin/server.dart 
31 _nameToFunctionTarget.<fn> package:functions_framework/src/targets/json_targets.dart 
34 _JsonFunctionTargetBase._toRequestType 
===== asynchronous gap =========================== 
package:functions_framework/src/targets/json_targets.dart 
148 _VoidJsonWithContextFunctionTarget.handler
 ===== asynchronous gap =========================== 
package:functions_framework/src/logging.dart 
171 cloudLoggingMiddleware.hostedLoggingMiddleware

Am I doing something obviously wrong here?

Any help would be greatly appreciated. Thanks

@nickmeinhold
Copy link
Contributor

Hey @jimmyff 🙂

Here is a Dart Functions Framework function that triggers when a PubSub message arrives from
a Firebase auth trigger I setup at the other end (here in case you want to see it) and parses the PubSub message. Is that the kind of thing you were looking for?

Hope that helps!

@jimmyff
Copy link
Author

jimmyff commented Jun 18, 2021

Thanks for the assistance @nickmeinhold. I'm trying to wrap my head around what I'm doing differently to you, I've tried trimming mine down to the same as yours but I still get the same error as above (Bad request. Could not decode the request as a structured-mode message. ),

@CloudFunction()
void function(CloudEvent event, RequestContext context) {
  context.logger
      .info('[CloudEvent] source: ${event.source}, subject: ${event.subject}');
}

My pub sub subscription is pretty much just default settings:
image

Stack trace:

package:json_annotation/src/checked_helpers.dart 35                   $checkedNew
package:functions_framework/src/cloud_event.g.dart 12                 _$CloudEventFromJson
package:functions_framework/src/cloud_event.dart 57                   new CloudEvent.fromJson
package:functions_framework/src/targets/cloud_event_targets.dart 108  _decodeValidCloudEvent
package:functions_framework/src/targets/cloud_event_targets.dart 82   _decodeStructured
===== asynchronous gap ===========================
package:functions_framework/src/targets/cloud_event_targets.dart 53   CloudEventWithContextFunctionTarget.handler
===== asynchronous gap ===========================
package:functions_framework/src/logging.dart 171                      cloudLoggingMiddleware.hostedLoggingMiddleware.<fn>.<fn> 

@nickmeinhold
Copy link
Contributor

nickmeinhold commented Jun 19, 2021

Sorry to hear it's not working for you. I had another look at the repo I linked to and I was able to run the cloud function via the server launch config in VS Code then hit it with:

curl --data-binary @test/data/base64_cloud_event_data.json -H 'content-type: application/json' -w '%{http_code}\n' localhost:8080

It did fail for me but when I set a breakpoint and stepped through the failure was a permissions issue (due to being run locally) that only came after the parsing was completed successfully.

I did have to correct my call to curl from what I had in my README - could that be the case for you also? Accidentally sending some incorrect data from where you create/send the PubSub perhaps?

EDIT: I seem to remember that it was digging through the cloudevents spec where I found a missing piece that I was stuck on for a while. Maybe that might help? Also (there's probably a better way to do this) but I seem to remember I got the cloud event data that was causing exceptions from the GCP logs, saved it as a file and then used it locally (eg. via curl) while debugging the cloud function via VS Code to try to track down exactly what was going wrong and where. You may well be all over that stuff, just trying to add anything I can think of that might help... that's about all I've got I think... good luck!

@jimmyff
Copy link
Author

jimmyff commented Jul 2, 2021

Hey @nickmeinhold thanks so much for the assistance, I really appreciate it as I was totally stuck. I took your sample data and verified that does work locally, so the problem must be with where I was publishing to the topic.

Just to be sure it wasn't something to do with the base64 encoded message I took your exact message data and publish that using the code below:

  final ps = PubsubApi(gapi);
  final encodedMessage =  'eyJkYXRhIjp7I..   clipped   ...=';

  final response = await ps.projects.topics.publish(
    PublishRequest()
      ..messages = [
        PubsubMessage()..data = encodedMessage,
      ],
    'projects/myproject/topics/test',
  );
  return Response.ok('Published to topic messageIds: ${response.messageIds}');

However when pubsub attempts to deliver the message I get the following:

Bad request. Could not decode the request as a structured-mode message.
package:json_annotation/src/checked_helpers.dart 35                   $checkedNew
package:functions_framework/src/cloud_event.g.dart 12                 _$CloudEventFromJson
package:functions_framework/src/cloud_event.dart 57                   new CloudEvent.fromJson
package:functions_framework/src/targets/cloud_event_targets.dart 108  _decodeValidCloudEvent
package:functions_framework/src/targets/cloud_event_targets.dart 82   _decodeStructured
===== asynchronous gap ===========================
package:functions_framework/src/targets/cloud_event_targets.dart 53   CloudEventWithContextFunctionTarget.handler
===== asynchronous gap ===========================
package:functions_framework/src/logging.dart 171                      cloudLoggingMiddleware.hostedLoggingMiddleware.<fn>.<fn> 

There must be something different about the structure of the data pubsub is delivering and that sample that you captured. You mentioned you managed to capture the problematic data that was causing you issues, I've searched in GCP Log Explorer and don't seem to be able to find the request body. Any tips how I could capture that?

Thanks again, I really appreciate your help.

@nickmeinhold
Copy link
Contributor

Hey @jimmyff, happy to try and help!

I've searched in GCP Log Explorer and don't seem to be able to find the request body. Any tips how I could capture that?

Yeah good question, I don't remember off the top of my head but I'm happy to have a look and see what I can find. Is your code public? If so I could try and reproduce the problem and see if I can suggest a fix. And I can check if I can find the request body in the logs while I'm doing that. Maybe I changed to a http type function just to log the body... I can't really remember sorry.

@jimmyff
Copy link
Author

jimmyff commented Jul 5, 2021

Ah thanks, that's a really good idea. I switched it to an https and just logged the request body, so I'm receiving the following :

{
  "message": {
    "data": "eyJkYXRhIjp7I..   clipped   ...=",
    "messageId": "2660839537843579",
    "message_id": "2660839537843579",
    "publishTime": "2021-07-05T10:06:08.741Z",
    "publish_time": "2021-07-05T10:06:08.741Z"
  },
  "subscription": "projects/project/subscriptions/test"
}

vs yours

{
  "id": "2386393786561816",
  "source": "//pubsub.googleapis.com/projects/project/topics/eventarc-us-central1-trigger-pubsub-846",
  "specversion": "1.0",
  "type": "google.cloud.pubsub.topic.v1.messagePublished",
  "datacontenttype": "application/json",
  "data": {
    "message": {
      "data": "eyJkYXRhIjp7I..   clipped   ...=",
      "messageId": "2386393786561816",
      "message_id": "2386393786561816",
      "publishTime": "2021-05-11T13:17:12.396Z",
      "publish_time": "2021-05-11T13:17:12.396Z"
    },
    "subscription": "projects/project/subscriptions/eventarc-us-central1-trigger-pubsub-sub-846"
  },
  "time": "2021-05-11T13:17:12.396Z"
}

So i'm missing all the boilerplate wrapping that makes it a CloudEvent, is there a checkbox somewhere I'm missing? I just have it setup as a standard pub-sub push subscription. I've checked through the subscription settings there and there doesn't appear to be anyway to configure it to follow the CloudEvent spec. I see mentions of yours using eventarc, could that be the difference?

@jimmyff
Copy link
Author

jimmyff commented Jul 5, 2021

The other option I have is to give up on trying to get the CloudEvent's working and just add a @JsonSerializable PubSub class and parse what I'm receiving.

@jimmyff
Copy link
Author

jimmyff commented Jul 5, 2021

I'm going to move on without CloudEvent's working as I have an alternative. Thanks so much for the help @nickmeinhold. If anyone knows why I'm not getting CloudEvent compliant data through and others seem to be, then please let me know and I'll switch over!

I'm currently parsing the pub sub with the following:

import 'dart:convert';
import 'dart:typed_data';
import 'package:json_annotation/json_annotation.dart';

part 'pub_sub_types.g.dart';

@JsonSerializable()
class PubSub {
  final PubSubMessage message;
  final String subscription;
  PubSub(this.message, this.subscription);
  factory PubSub.fromJson(Map<String, dynamic> json) => _$PubSubFromJson(json);
  Map<String, dynamic> toJson() => _$PubSubToJson(this);
}

@JsonSerializable()
class PubSubMessage {
  final String? data;
  final Map<String, String>? attributes;
  final String messageId;
  final DateTime publishTime;

  bool hasData() => data != null;
  String dataDecoded() => utf8.decode(dataBytes());
  Uint8List dataBytes() =>
      data != null ? base64Decode(data!) : throw Exception('Data excected');

  PubSubMessage(this.data, this.messageId, this.publishTime, this.attributes);

  factory PubSubMessage.fromJson(Map<String, dynamic> json) =>
      _$PubSubMessageFromJson(json);
  Map<String, dynamic> toJson() => _$PubSubMessageToJson(this);
}

@nickmeinhold
Copy link
Contributor

Hey @jimmyff, great to hear you're able to move forward - I was going to collect the notes I made while I was working on this stuff into something (hopefully) comprehensible to see if that helps, but I haven't found the time just yet. I will get that done at some stage and pass it on - if you're up for providing feedback that'd be awesome, hopefully we can turn our efforts into a useful resource 🙂

Am I right in thinking the problem has been tracked down to the PubSub message you're creating isn't a valid CloudEvent (ie. missing the fields that the parsing in the dart cloud function is expecting)? If so (assuming you want to keep discussing it) could you describe how you are creating the PubSub message?

@kevmoo kevmoo added the kind/enhancement New feature or request label Dec 29, 2021
@atreeon
Copy link

atreeon commented May 12, 2022

This is exactly the same request I get. I also get the same error Bad request. Could not decode the request as a structured-mode message when I try to use pub/sub

Ah thanks, that's a really good idea. I switched it to an https and just logged the request body, so I'm receiving the following :

{
  "message": {
    "data": "eyJkYXRhIjp7I..   clipped   ...=",
    "messageId": "2660839537843579",
    "message_id": "2660839537843579",
    "publishTime": "2021-07-05T10:06:08.741Z",
    "publish_time": "2021-07-05T10:06:08.741Z"
  },
  "subscription": "projects/project/subscriptions/test"
}

@kevmoo
Copy link
Collaborator

kevmoo commented May 12, 2022

@atreeon – are you subscribing to a google provided event? Are you running on Google Cloud? Have you looked at your logs? I'd love to see the stack trace

@atreeon
Copy link

atreeon commented May 12, 2022

so, I used the functions framework cloudevent creation tool. That was giving me the Bad request. Could not decode the request as a structured-mode message. message when Iinked it up to a pubsub subscription on GCP and tried to send a message to it. The HelloWorld service runs ok on Cloud Run when I enter it in an address bar (or use Curl). When I run a Curl command to my cloudevent dart service and pass the sample/data.json to it then it gives me a 200 response.

curl --data-binary @sample/data.json -H 'content-type: application/json' -w '%{http_code}\n' https://functions-framework-cloudevent-nmvgzbgoyq-ew.a.run.app - this returns a 200 response

I couldn't seem to output the request body using the cloudevent service as it was saying 'request cant be run more than once' (or something along of those lines).

To get the request object (json) that is passed by pubsub, I created another service that uses Alfred web server, created a post route and printed the request body.

class Server {
  final app = Alfred();

  start() async {
    app.post('/stuffme', (req, res) async {
      var body = await utf8.decodeStream(req);

      print('body: ' + body);

that gave me the output as above.

caveat: I am super new to GCP, docker and cloud run so I may be doing something obviously wrong!

@kevmoo
Copy link
Collaborator

kevmoo commented May 12, 2022

@atreeon – I'm digging in here. Something DOES seem to be weird...

I think it might be that pubsub does not use Cloud Event by default. You need to wire up EventArc first.

I'll let you know...

@kevmoo
Copy link
Collaborator

kevmoo commented May 12, 2022

That's it!

So if you want to handle pub-sub directly, just use a JSON endpoint and not cloud event.

If you want to use cloudevent, you have to use event arch

@atreeon
Copy link

atreeon commented May 12, 2022

ah, thanks so much @kevmoo! I'll try it out tomorrow and let you know how I get on.

@atreeon
Copy link

atreeon commented May 13, 2022

so, creating an eventarc to listen to a pubsub message works, that's great. However, it seems to scramble the pubsub message. I send:

gcloud pubsub topics publish eventarc-test --message "Yo yo yo from my pubsub message"

but in the json output, in the cloud run log, I get

"data": "WW8geW8geW8gZnJvbSBteSBwdWJzdWIgbWVzc2FnZQ==",

(I was expecting to receive "Yo yo yo from my pubsub message")

How do I unencrypt the message?

@nickmeinhold
Copy link
Contributor

nickmeinhold commented May 13, 2022

@atreeon I found I had to base64 decode then utf8 decode, ie:

var decodedMessage = utf8.decode(base64.decode(encodedMessageData));

@kevmoo
Copy link
Collaborator

kevmoo commented May 13, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants