-
Notifications
You must be signed in to change notification settings - Fork 132
/
executor_repository.go
104 lines (94 loc) · 3.47 KB
/
executor_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package database
import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pkg/errors"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
// ExecutorRepository is an interface to be implemented by structs which provide executor information
type ExecutorRepository interface {
// GetExecutors returns all known executors, regardless of their last heartbeat time
GetExecutors(ctx *armadacontext.Context) ([]*schedulerobjects.Executor, error)
// GetLastUpdateTimes returns a map of executor name -> last heartbeat time
GetLastUpdateTimes(ctx *armadacontext.Context) (map[string]time.Time, error)
// StoreExecutor persists the latest executor state
StoreExecutor(ctx *armadacontext.Context, executor *schedulerobjects.Executor) error
}
// PostgresExecutorRepository is an implementation of ExecutorRepository that stores its state in postgres
type PostgresExecutorRepository struct {
// pool of database connections
db *pgxpool.Pool
// proto objects are stored compressed
compressor compress.Compressor
decompressor compress.Decompressor
}
func NewPostgresExecutorRepository(db *pgxpool.Pool) *PostgresExecutorRepository {
return &PostgresExecutorRepository{
db: db,
compressor: compress.NewThreadSafeZlibCompressor(1024),
decompressor: compress.NewThreadSafeZlibDecompressor(),
}
}
// GetExecutors returns all known executors, regardless of their last heartbeat time
func (r *PostgresExecutorRepository) GetExecutors(ctx *armadacontext.Context) ([]*schedulerobjects.Executor, error) {
queries := New(r.db)
requests, err := queries.SelectAllExecutors(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
executors := make([]*schedulerobjects.Executor, len(requests))
for i, request := range requests {
executor := &schedulerobjects.Executor{}
err := decompressAndMarshall(request.LastRequest, r.decompressor, executor)
if err != nil {
return nil, err
}
executors[i] = executor
}
return executors, nil
}
// GetLastUpdateTimes returns a map of executor name -> last heartbeat time
func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx *armadacontext.Context) (map[string]time.Time, error) {
queries := New(r.db)
rows, err := queries.SelectExecutorUpdateTimes(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
lastUpdateTimes := make(map[string]time.Time, len(rows))
for _, row := range rows {
// pgx defaults to local time so we convert to utc here
lastUpdateTimes[row.ExecutorID] = row.LastUpdated.UTC()
}
return lastUpdateTimes, nil
}
// StoreExecutor persists the latest executor state
func (r *PostgresExecutorRepository) StoreExecutor(ctx *armadacontext.Context, executor *schedulerobjects.Executor) error {
queries := New(r.db)
bytes, err := proto.Marshal(executor)
if err != nil {
return errors.WithStack(err)
}
compressed, err := r.compressor.Compress(bytes)
if err != nil {
return errors.WithStack(err)
}
err = queries.UpsertExecutor(ctx, UpsertExecutorParams{
ExecutorID: executor.Id,
LastRequest: compressed,
UpdateTime: executor.LastUpdateTime,
})
if err != nil {
return errors.WithStack(err)
}
return nil
}
func decompressAndMarshall(b []byte, decompressor compress.Decompressor, msg proto.Message) error {
decompressed, err := decompressor.Decompress(b)
if err != nil {
return err
}
return proto.Unmarshal(decompressed, msg)
}