-
Notifications
You must be signed in to change notification settings - Fork 242
Add Operations Manager, wrap all operations, and add retry functionality #517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## main #517 +/- ##
==========================================
Coverage 100.00% 100.00%
==========================================
Files 295 303 +8
Lines 17000 17386 +386
==========================================
+ Hits 17000 17386 +386
Continue to review full report at Codecov.
|
7b66d2b to
4ff5b01
Compare
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Other managers can register to handle particular operation types. This keeps the logic for each type concentrated in the owning Manager instead of giving too much specialized knowledge to the Operations Manager. Also introduce a serializable PreparedOperation type for wrapping operations before they are sent off to plugins - makes for a neater split between parsing and running operations, and may also be useful for tests/debugging later. Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Retrying an operation that has already been retried will cause it to look up the newest copy of the operation, and retry that one. In this way, retries will always form a single chain, and attempting to re-run any of them will always add a new one to the end of the chain. Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
|
I've continued working this so it now encompasses all operations and actually adds the |
peterbroadhurst
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really think this worked out well. A very clear template for what an operation means in the codebase, that's easy to extend 👍
Some minor comments to consider @awrichar
| @@ -0,0 +1,3 @@ | |||
| BEGIN; | |||
| ALTER TABLE operations DROP COLUMN retry_id; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note that you'll need to get a different slot (think currently 70 is next after PRs in the pipe)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
➡️ ➡️ ➡️ ➡️ ➡️
|
|
||
| func (om *operationsManager) writeOperationSuccess(ctx context.Context, opID *fftypes.UUID) { | ||
| if err := om.database.ResolveOperation(ctx, opID, fftypes.OpStatusSucceeded, "", nil); err != nil { | ||
| log.L(ctx).Errorf("Failed to update operation %s: %s", opID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it would be good to write the full data of the operation here - particularly the outputs - as they would have been lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have any path that writes operation outputs here. I'm not sure there's any more data to capture beyond what is in the log. Need to think further to be sure there's no time we should be capturing outputs here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok - I understand now the ResolveOperation call is only modifying one field on the operation, it's not the call that does the full update with the outputs etc.
|
|
||
| func (om *operationsManager) writeOperationFailure(ctx context.Context, opID *fftypes.UUID, err error) { | ||
| if err := om.database.ResolveOperation(ctx, opID, fftypes.OpStatusFailed, err.Error(), nil); err != nil { | ||
| log.L(ctx).Errorf("Failed to update operation %s: %s", opID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above: Think it would be good to write the full data of the operation here - particularly the outputs - as they would have been lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - the only "outputs" from a synchronous failure are in the form of a Go error which is logged here. I'm not sure there's anything else useful to log... unless it's a sign that we're not returning enough info from some other layer.
| InsertOperation(ctx context.Context, operation *fftypes.Operation) (err error) | ||
|
|
||
| // ResolveOperation - Resolve operation upon completion | ||
| ResolveOperation(ctx context.Context, id *fftypes.UUID, status fftypes.OpStatus, errorMsg string, output fftypes.JSONObject) (err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we have a generic UpdateOperation - I wonder if it would be more consistent to remove the ResolveOperation from the DB layer as that's just a thin wrapper:
firefly/internal/database/sqlcommon/operation_sql.go
Lines 178 to 186 in 60e00ff
| func (s *SQLCommon) ResolveOperation(ctx context.Context, id *fftypes.UUID, status fftypes.OpStatus, errorMsg string, output fftypes.JSONObject) (err error) { | |
| update := database.OperationQueryFactory.NewUpdate(ctx). | |
| Set("status", status). | |
| Set("error", errorMsg) | |
| if output != nil { | |
| update.Set("output", output) | |
| } | |
| return s.updateOperation(ctx, id, update) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, was thinking the same thing. I can do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at this again, there are a lot of places where we invoke ResolveOperation, and this change would take all of those from 1 line to 5 lines. So it would be more consistent, but more verbose as well. I slightly prefer keeping the concise helper.
Could also move the helper somewhere other than the database layer. For instance, it could be on the Operations Manager (but would then need to add a dependency from Event Manager on Operations Manager).
Includes support for token approvals in Operations Manager.
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
peterbroadhurst
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've marked approval @awrichar as I think you were proposing this goes in at this point, and the comments are taken as input for future work in other changes.
| Plugin: op.Plugin, | ||
| Input: op.Input, | ||
| } | ||
| key, err := json.Marshal(opCopy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like extra evidence to the point that we should treat the inputs as references, rather than full data wherever possible. As this is going to be a large string cache key that we are marshaling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... as an aside I still agree serialized JSON is a more efficient cache key than a SHA etc. - so the choice looks right here to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, that was my inkling as well but wanted to have someone else back me up. But agreed it's yet another reason to minimize the size of "Input" as much as is practical.
peterbroadhurst
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the caching. Like that implementation, and think it's great we'll have de-dup on retry within the batch processor.
First step toward hyperledger/firefly-fir#10