generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
manager.go
77 lines (66 loc) · 2.44 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package pubsub
import (
"context"
"time"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/internal/log"
"github.com/jpillora/backoff"
)
const (
// Events can be added simultaneously, which can cause events with out of order create_at values
// By adding a delay, we ensure that by the time we read the events, no new events will be added
// with earlier created_at values.
eventConsumptionDelay = 200 * time.Millisecond
)
type DAL interface {
ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error)
CompleteEventForSubscription(ctx context.Context, module, name string) error
}
type Scheduler interface {
Singleton(retry backoff.Backoff, job scheduledtask.Job)
Parallel(retry backoff.Backoff, job scheduledtask.Job)
}
type AsyncCallListener interface {
AsyncCallWasAdded(ctx context.Context)
}
type Manager struct {
dal DAL
scheduler Scheduler
asyncCallListener AsyncCallListener
}
func New(ctx context.Context, dal *dal.DAL, scheduler Scheduler, asyncCallListener AsyncCallListener) *Manager {
m := &Manager{
dal: dal,
scheduler: scheduler,
asyncCallListener: asyncCallListener,
}
m.scheduler.Parallel(backoff.Backoff{
Min: 1 * time.Second,
Max: 5 * time.Second,
Jitter: true,
Factor: 1.5,
}, m.progressSubscriptions)
return m
}
func (m *Manager) progressSubscriptions(ctx context.Context) (time.Duration, error) {
count, err := m.dal.ProgressSubscriptions(ctx, eventConsumptionDelay)
if err != nil {
return 0, err
}
if count > 0 {
// notify controller that we added an async call
m.asyncCallListener.AsyncCallWasAdded(ctx)
}
return time.Second, err
}
// OnCallCompletion is called within a transaction after an async call has completed to allow the subscription state to be updated.
func (m *Manager) OnCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginPubSub, failed bool) error {
return m.dal.CompleteEventForSubscription(ctx, origin.Subscription.Module, origin.Subscription.Name)
}
// AsyncCallDidCommit is called after an subscription's async call has been completed and committed to the database.
func (m *Manager) AsyncCallDidCommit(ctx context.Context, origin dal.AsyncOriginPubSub) {
if _, err := m.progressSubscriptions(ctx); err != nil {
log.FromContext(ctx).Errorf(err, "failed to progress subscriptions")
}
}