Added Google Cloud Pubsub support for data records#209
Conversation
Codecov Report
@@ Coverage Diff @@
## master #209 +/- ##
==========================================
- Coverage 86.73% 85.62% -1.12%
==========================================
Files 23 24 +1
Lines 1342 1384 +42
==========================================
+ Hits 1164 1185 +21
- Misses 127 144 +17
- Partials 51 55 +4
Continue to review full report at Codecov.
|
|
Awesome. We are also looking to use Pub/Sub as our data pipeline. |
| client, err := pubsub.NewClient( | ||
| context.Background(), | ||
| config.Config.RecorderPubsubProjectID, | ||
| option.WithServiceAccountFile(config.Config.RecorderPubsubKeyFile), |
There was a problem hiding this comment.
This method is deprecated, it is recommended to use WithCredentialsFile instead. Just starting to look into this to check their equivalency, but they seem to operate on the same idea.
As this is an option, does setting it to an empty string affect the use of Application Default Credentials? Would want to make sure that is not the case.
There was a problem hiding this comment.
I'm sorry, you are right! this option is deprecated I just used a chunk of code I am already using for the past 2 years in many go services here... I am pushing a fix now (thanks I'm also fixing my own services)
Regarding the empty string, yes it will work with default creds, matter effect this is exactly how we develop on our machines using our own accounts with proper IAM
| } | ||
| } | ||
|
|
||
| type pubsubEvalResult struct { |
There was a problem hiding this comment.
I totally agree, I looked at the code and tried to follow your style just to make sure everything will pass and be ok on your end... But sure I think there's a lot of place to improve some of the things.
I will take a look at those relevant issues and will update ASAP (this is night over here)
There was a problem hiding this comment.
thanks! I think you can use
type pubsubMessageFrame struct {
Payload string `json:"payload"`
Encrypted bool `json:"encrypted"`
}
Consolidating 2 structs is the same amout of work for 3. We can fix #203 later.
Also, make sure the final payload is a JSON struct looks like
{
"payload": "<json marshal of EvalResult>",
"encrypted": false
}
There was a problem hiding this comment.
and test coverage of this file :)
There was a problem hiding this comment.
@zhouzhuojie regarding the pubsubMessageFrame, I added it on my end but it looks weird to marshal into json string because pubsub accepts []byte anyways so the conversion happens for no reason... I'm pushing the code to share this with you, LMK what you think
There was a problem hiding this comment.
I knew the json string payload at the first look is weird. The reason I want to standardize the log format is that
- Extensibility. If people want to end-to-end encryption, compression, or other meta things related to the message frame, they can do it regardless of the choice of the recorder type. They can add more fields into the message frame struct. Payload as a string fits into this decision.
- Portability. The data analytics pipeline requires no changes if one wants to switch between providers.
|
Need to check further to see if this is a larger-scale thing, but it seems that if the pubsub connection fails then an EvaluationResult fails to be returned. @zhouzhuojie would this be expected behavior? It seems to me that we'd want to return a result anyways given that the data recording should be an async process. |
Why do you think "if the pubsub connection fails then an EvaluationResult fails to be returned"? The connection won't affect online evaluation. AsyncRecord function should just log errors if there's any failure, and it shouldn't panic or exit as well. |
|
Here is what I'm seeing:
Server logs: I'd expect the result to be returned immediately, but it's never returned at all. In the OpenAPI client for Go, this ends up setting the error value of the '''PostEvaluationResult''' method. It looks like it's hanging on the |
| option.WithCredentialsFile(config.Config.RecorderPubsubKeyFile), | ||
| ) | ||
| if err != nil { | ||
| logrus.WithField("pubsub_error", err).Error("error getting pubsub client") |
There was a problem hiding this comment.
let's Fatal here instead of Error
There was a problem hiding this comment.
I think the issue is caused by get method from pubsub which is a blocking operation, see pubsub doc here
we should pass context either with a deadline or timeout to get method instead of just background context.
There was a problem hiding this comment.
Can confirm this is the issue. When I disable verbose logging, everything works as expected.
| ctx := context.Background() | ||
| res := p.topic.Publish(ctx, &pubsub.Message{Data: payload}) | ||
| if config.Config.RecorderPubsubVerbose { | ||
| ctx, cancel := context.WithTimeout(ctx, 5*time.Second) |
There was a problem hiding this comment.
What do you think about combining this with running the Get within a new goroutine?
Tried this and it seems to work well:
if config.Config.RecorderPubsubVerbose {
go func() {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
id, err := res.Get(ctx)
if err != nil {
logrus.WithFields(logrus.Fields{"pubsub_error": err, "id": id}).Error("error pushing to pubsub")
}
}()
}There was a problem hiding this comment.
+1 for another goroutine. Also, 5s can be moved into env config.
There was a problem hiding this comment.
Agreed, I added both
|
I need some help regarding the tests, I added test coverage to pubsub but now the I can see that we use in all data recorders with their production functions without mocking, meaning connection should fail on all of them, unless I am missing something that runs as a mock server on CI |
|
Still figuring out the best way to handle this, but I know what is wrong currently: What is failing is the data_recorder_pubsub_test. The data_recorder_test.go file passes still and that is because we are stubbing out the When the following code is called: client, err := pubsub.NewClient(
context.Background(),
config.Config.RecorderPubsubProjectID,
option.WithCredentialsFile(config.Config.RecorderPubsubKeyFile),
)the library uses the following rules to create a new client:
On our machines, step 2 is happening, likely because we have run a NewClient signature is as follows: func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {and the return value in case of an error is this: return nil, fmt.Errorf("pubsub: %v", err)so when the rest of the code executes: if err != nil {
// TODO: use Fatal again after fixing the test expecting to not panic.
// logrus.WithField("pubsub_error", err).Fatal("error getting pubsub client")
logrus.WithField("pubsub_error", err).Error("error getting pubsub client")
}
return &pubsubRecorder{
producer: client,
topic: client.Topic(config.Config.RecorderPubsubTopicName),
enabled: config.Config.RecorderEnabled,
}client is a null pointer, and you get a null pointer dereference. So that's the issue. Having the Fatal call will prevent us from dereferencing a nil pointer which is good. It is also the same way we handle error cases in the kafka client so it would follow the standards there. So to fix the issue in the test we need to either a) is not great because you would need to provide some sort of real credentials. |
| RecorderPubsubTopicName string `env:"FLAGR_RECORDER_PUBSUB_TOPIC_NAME" envDefault:"flagr-records"` | ||
| RecorderPubsubKeyFile string `env:"FLAGR_RECORDER_PUBSUB_KEYFILE" envDefault:""` | ||
| RecorderPubsubVerbose bool `env:"FLAGR_RECORDER_PUBSUB_VERBOSE" envDefault:"false"` | ||
| RecorderPubsubVerboseCancel time.Duration `env:"FLAGR_RECORDER_PUBSUB_VERBOSE_CANCEL" envDefault:"5s"` |
There was a problem hiding this comment.
how about RecorderPubsubVerboseCancelTimeout? Otherwise, it's not clear to me that cancel can be a time.Duration.
|
@vic3lord I've created a PR on your fork which fixes the above issues |
fix issues raised in checkr/flagr PR openflagr#209
|
This looks good to me pending my question about handling potential marshal errors. |
|
Cool. @zhouzhuojie how are you feeling about everything at this point? |
| "github.com/checkr/flagr/pkg/config" | ||
| "github.com/checkr/flagr/swagger_gen/models" | ||
| "google.golang.org/api/option" | ||
|
|
There was a problem hiding this comment.
nit, no need a new line here
| ) | ||
|
|
||
| type pubsubRecorder struct { | ||
| enabled bool |
There was a problem hiding this comment.
is this enabled necessary here? eval.go checks it here https://github.com/checkr/flagr/blob/master/pkg/handler/eval.go#L178
There was a problem hiding this comment.
oh yeah... this could probably be removed from the kafka and kinesis versions as they do the check as well
There was a problem hiding this comment.
didn't realize that it was also used in kafka and kinesis, we can probably clean them up in other PRs
|
Thank you so much for helping, I fixed the little nits. Where would it be best to document the pubsub addition and its auth docs? |
I think you can put it after the Kinesis section in https://github.com/checkr/flagr/blob/master/docs/flagr_env.md |
Description
Adding Google Cloud Pubsub support for data records, currently just a minimal implementation
Motivation and Context
We are using Google Cloud Pubsub and needed something native for data records
How Has This Been Tested?
running
makewith all tasks, passed all tests and started flagr serverTypes of changes
Checklist: