This repository has been archived by the owner on Sep 12, 2022. It is now read-only.
/
future.go
73 lines (62 loc) · 1.64 KB
/
future.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package scheduler
import (
"context"
"time"
"github.com/palantir/stacktrace"
"github.com/rs/xid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"github.com/Raphy42/weekend/core/errors"
"github.com/Raphy42/weekend/core/scheduler/async"
)
type Future struct {
*Context
ID xid.ID
ManifestID xid.ID
result <-chan any
error <-chan error
}
func NewFuture(ctx context.Context, parent xid.ID, manifest async.Manifest) (*Future, chan<- any, chan<- error) {
switch v := ctx.(type) {
case Context:
parent = v.Parent
}
result := make(chan any)
err := make(chan error)
return &Future{
Context: NewContext(ctx, parent),
ID: xid.New(),
ManifestID: manifest.ID,
result: result,
error: err,
}, result, err
}
func (h Future) Poll(ctx context.Context) (any, error) {
ctx, span := otel.Tracer("wk.core.scheduler").Start(ctx, "poll")
span.SetAttributes(
attribute.Stringer("wk.handle.id", h.ID),
attribute.Stringer("wk.parent.id", h.Parent),
attribute.Stringer("wk.manifest.id", h.ManifestID),
)
defer span.End()
select {
case <-ctx.Done():
return nil, stacktrace.PropagateWithCode(ctx.Err(), errors.EInvalidContext, "invalid context")
case err := <-h.error:
return nil, err
case result := <-h.result:
return result, nil
}
}
func (h Future) TryPoll(ctx context.Context, duration time.Duration) (any, bool, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, duration)
defer cancel()
result, err := h.Poll(timeoutCtx)
if errors.HasCode(err, errors.EInvalidContext) {
return nil, false, nil
}
return result, true, err
}
func (h Future) Error() <-chan error {
return h.error
}