forked from raystack/optimus
/
scheduler.go
111 lines (92 loc) · 2.67 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package event
import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/goto/optimus/core/scheduler"
pbInt "github.com/goto/optimus/protos/gotocompany/optimus/integration/v1beta1"
)
type JobRunWaitUpstream struct {
Event
JobRun *scheduler.JobRun
}
func (j *JobRunWaitUpstream) Bytes() ([]byte, error) {
return proto.Marshal(toOptimusChangeEvent(j.JobRun, j.Event, pbInt.OptimusChangeEvent_EVENT_TYPE_JOB_WAIT_UPSTREAM))
}
type JobRunInProgress struct {
Event
JobRun *scheduler.JobRun
}
func (j *JobRunInProgress) Bytes() ([]byte, error) {
return proto.Marshal(toOptimusChangeEvent(j.JobRun, j.Event, pbInt.OptimusChangeEvent_EVENT_TYPE_JOB_IN_PROGRESS))
}
type JobRunSuccess struct {
Event
JobRun *scheduler.JobRun
}
func (j *JobRunSuccess) Bytes() ([]byte, error) {
return proto.Marshal(toOptimusChangeEvent(j.JobRun, j.Event, pbInt.OptimusChangeEvent_EVENT_TYPE_JOB_SUCCESS))
}
type JobRunFailed struct {
Event
JobRun *scheduler.JobRun
}
func (j *JobRunFailed) Bytes() ([]byte, error) {
return proto.Marshal(toOptimusChangeEvent(j.JobRun, j.Event, pbInt.OptimusChangeEvent_EVENT_TYPE_JOB_FAILURE))
}
func NewJobRunWaitUpstreamEvent(jobRun *scheduler.JobRun) (*JobRunWaitUpstream, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
return &JobRunWaitUpstream{
Event: baseEvent,
JobRun: jobRun,
}, nil
}
func NewJobRunInProgressEvent(jobRun *scheduler.JobRun) (*JobRunInProgress, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
return &JobRunInProgress{
Event: baseEvent,
JobRun: jobRun,
}, nil
}
func NewJobRunSuccessEvent(jobRun *scheduler.JobRun) (*JobRunSuccess, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
return &JobRunSuccess{
Event: baseEvent,
JobRun: jobRun,
}, nil
}
func NewJobRunFailedEvent(jobRun *scheduler.JobRun) (*JobRunFailed, error) {
baseEvent, err := NewBaseEvent()
if err != nil {
return nil, err
}
return &JobRunFailed{
Event: baseEvent,
JobRun: jobRun,
}, nil
}
func toOptimusChangeEvent(j *scheduler.JobRun, e Event, eventType pbInt.OptimusChangeEvent_EventType) *pbInt.OptimusChangeEvent {
return &pbInt.OptimusChangeEvent{
EventId: e.ID.String(),
OccurredAt: timestamppb.New(e.OccurredAt),
ProjectName: j.Tenant.ProjectName().String(),
NamespaceName: j.Tenant.NamespaceName().String(),
EventType: eventType,
Payload: &pbInt.OptimusChangeEvent_JobRun{
JobRun: &pbInt.JobRunPayload{
JobName: j.JobName.String(),
ScheduledAt: timestamppb.New(j.ScheduledAt),
JobRunId: j.ID.String(),
StartTime: timestamppb.New(j.StartTime),
},
},
}
}