/
webhook.go
301 lines (252 loc) · 8.92 KB
/
webhook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package webhooks
import (
"fmt"
"github.com/uptrace/bun"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/protoutils"
"github.com/determined-ai/determined/master/pkg/schemas/expconf"
"github.com/determined-ai/determined/proto/pkg/webhookv1"
"github.com/google/uuid"
)
// Webhooks is a slice of Webhook objects.
type Webhooks []Webhook
// Proto converts a slice of webhooks to its protobuf representation.
func (ws Webhooks) Proto() []*webhookv1.Webhook {
out := make([]*webhookv1.Webhook, len(ws))
for i, w := range ws {
out[i] = w.Proto()
}
return out
}
// Webhook corresponds to a row in the "webhooks" DB table.
type Webhook struct {
bun.BaseModel `bun:"table:webhooks"`
ID WebhookID `bun:"id,pk,autoincrement"`
WebhookType WebhookType `bun:"webhook_type,notnull"`
URL string `bun:"url,notnull"`
Triggers Triggers `bun:"rel:has-many,join:id=webhook_id"`
}
// WebhookFromProto returns a model Webhook from a proto definition.
func WebhookFromProto(w *webhookv1.Webhook) Webhook {
return Webhook{
URL: w.Url,
Triggers: TriggersFromProto(w.Triggers),
WebhookType: WebhookTypeFromProto(w.WebhookType),
}
}
// Proto converts a webhook to its protobuf representation.
func (w *Webhook) Proto() *webhookv1.Webhook {
return &webhookv1.Webhook{
Id: int32(w.ID),
Url: w.URL,
Triggers: w.Triggers.Proto(),
WebhookType: w.WebhookType.Proto(),
}
}
// WebhookID is the type for Webhook IDs.
type WebhookID int
// WebhookType is type for the WebhookType enum.
type WebhookType string
// Triggers is a slice of Trigger objects—primarily useful for its methods.
type Triggers []*Trigger
// TriggersFromProto returns a slice of model Triggers from a proto definition.
func TriggersFromProto(ts []*webhookv1.Trigger) Triggers {
out := make(Triggers, len(ts))
for i, t := range ts {
out[i] = TriggerFromProto(t)
}
return out
}
// Proto converts a slice of triggers to its protobuf representation.
func (ts Triggers) Proto() []*webhookv1.Trigger {
out := make([]*webhookv1.Trigger, len(ts))
for i, t := range ts {
out[i] = t.Proto()
}
return out
}
// Trigger corresponds to a row in the "webhook_triggers" DB table.
type Trigger struct {
bun.BaseModel `bun:"table:webhook_triggers"`
ID TriggerID `bun:"id,pk,autoincrement"`
TriggerType TriggerType `bun:"trigger_type,notnull"`
Condition map[string]interface{} `bun:"condition,notnull"`
WebhookID WebhookID `bun:"webhook_id,notnull"`
Webhook *Webhook `bun:"rel:belongs-to,join:webhook_id=id"`
}
// Used for deduping webhook events.
type webhookTaskLogTrigger struct {
bun.BaseModel `bun:"table:webhook_task_log_triggers"`
TaskID model.TaskID `bun:"task_id"`
TriggerID TriggerID `bun:"trigger_id"`
}
// TriggerFromProto returns a Trigger from a proto definition.
func TriggerFromProto(t *webhookv1.Trigger) *Trigger {
return &Trigger{
TriggerType: TriggerTypeFromProto(t.TriggerType),
Condition: t.Condition.AsMap(),
}
}
// Proto converts a Trigger to its protobuf representation.
func (t *Trigger) Proto() *webhookv1.Trigger {
return &webhookv1.Trigger{
Id: int32(t.ID),
TriggerType: t.TriggerType.Proto(),
Condition: protoutils.ToStruct(t.Condition),
WebhookId: int32(t.WebhookID),
}
}
// TriggerID is the type for Trigger IDs.
type TriggerID int
// TriggerType is type for the TriggerType enum.
type TriggerType string
const (
// TriggerTypeStateChange represents a change in experiment state.
TriggerTypeStateChange TriggerType = "EXPERIMENT_STATE_CHANGE"
// TriggerTypeMetricThresholdExceeded represents a threshold for a training metric value.
TriggerTypeMetricThresholdExceeded TriggerType = "METRIC_THRESHOLD_EXCEEDED"
// TriggerTypeTaskLog represents a trigger for a task logs.
TriggerTypeTaskLog TriggerType = "TASK_LOG"
)
const (
// WebhookTypeDefault represents a default webhook.
WebhookTypeDefault WebhookType = "DEFAULT"
// WebhookTypeSlack represents a slack webhook.
WebhookTypeSlack WebhookType = "SLACK"
)
// WebhookTypeFromProto returns a WebhookType from a proto.
func WebhookTypeFromProto(w webhookv1.WebhookType) WebhookType {
switch w {
case webhookv1.WebhookType_WEBHOOK_TYPE_DEFAULT:
return WebhookTypeDefault
case webhookv1.WebhookType_WEBHOOK_TYPE_SLACK:
return WebhookTypeSlack
default:
// TODO(???): prob don't panic
panic(fmt.Errorf("missing mapping for webhook type %s to SQL", w))
}
}
// TriggerTypeFromProto returns a TriggerType from a proto.
func TriggerTypeFromProto(t webhookv1.TriggerType) TriggerType {
switch t {
case webhookv1.TriggerType_TRIGGER_TYPE_METRIC_THRESHOLD_EXCEEDED:
return TriggerTypeMetricThresholdExceeded
case webhookv1.TriggerType_TRIGGER_TYPE_EXPERIMENT_STATE_CHANGE:
return TriggerTypeStateChange
case webhookv1.TriggerType_TRIGGER_TYPE_TASK_LOG:
return TriggerTypeTaskLog
default:
// TODO(???): prob don't panic
panic(fmt.Errorf("missing mapping for trigger %s to SQL", t))
}
}
// Proto returns a proto from a WebhookType.
func (w WebhookType) Proto() webhookv1.WebhookType {
switch w {
case WebhookTypeDefault:
return webhookv1.WebhookType_WEBHOOK_TYPE_DEFAULT
case WebhookTypeSlack:
return webhookv1.WebhookType_WEBHOOK_TYPE_SLACK
default:
return webhookv1.WebhookType_WEBHOOK_TYPE_UNSPECIFIED
}
}
// Proto returns a proto from a TriggerType.
func (t TriggerType) Proto() webhookv1.TriggerType {
switch t {
case TriggerTypeStateChange:
return webhookv1.TriggerType_TRIGGER_TYPE_EXPERIMENT_STATE_CHANGE
case TriggerTypeMetricThresholdExceeded:
return webhookv1.TriggerType_TRIGGER_TYPE_METRIC_THRESHOLD_EXCEEDED
case TriggerTypeTaskLog:
return webhookv1.TriggerType_TRIGGER_TYPE_TASK_LOG
default:
return webhookv1.TriggerType_TRIGGER_TYPE_UNSPECIFIED
}
}
// Proto returns a proto from a TriggerType.
func experimentToWebhookPayload(
e model.Experiment, activeConfig expconf.ExperimentConfig,
) *ExperimentPayload {
var duration int
if e.EndTime != nil {
duration = int(e.EndTime.Sub(e.StartTime).Seconds())
}
return &ExperimentPayload{
ID: e.ID,
State: e.State,
Name: activeConfig.Name(),
Duration: duration,
ResourcePool: activeConfig.Resources().ResourcePool(),
SlotsPerTrial: activeConfig.Resources().SlotsPerTrial(),
WorkspaceName: activeConfig.Workspace(),
ProjectName: activeConfig.Project(),
}
}
// WebhookEventID is the type for Trigger IDs.
type WebhookEventID int
// Event corresponds to a row in the "webhook_events" DB table.
type Event struct {
bun.BaseModel `bun:"table:webhook_events_queue"`
ID WebhookEventID `bun:"id,pk,autoincrement"`
URL string `bun:"url,notnull"`
Payload []byte `bun:"payload,notnull"`
}
// SlackMessageBody corresponds to an entire message as a Slack Block.
type SlackMessageBody struct {
Blocks []SlackBlock `json:"blocks,omitempty"`
Attachments *[]SlackAttachment `json:"attachments,omitempty"`
}
// SlackAttachment corresponds to an Attachment Slack Block element.
type SlackAttachment struct {
Color string `json:"color,omitempty"`
Blocks []SlackBlock `json:"blocks,omitempty"`
}
// SlackBlock corresponds to a Slack Block element.
type SlackBlock struct {
Type string `json:"type,omitempty"`
Text SlackField `json:"text,omitempty"`
Fields *[]SlackField `json:"fields,omitempty"`
}
// SlackField corresponds to a field in a Slack Block element.
type SlackField struct {
Type string `json:"type"`
Text string `json:"text"`
}
// EventPayload respresents a webhook event.
type EventPayload struct {
ID uuid.UUID `json:"event_id"`
Type TriggerType `json:"event_type"`
Timestamp int64 `json:"timestamp"`
Condition Condition `json:"condition"`
Data EventData `json:"event_data"`
}
const regexConditionKey = "regex"
// Condition represents a trigger condition.
type Condition struct {
State model.State `json:"state,omitempty"`
Regex string `json:"regex,omitempty"`
}
// EventData represents the event_data for a webhook event.
type EventData struct {
TestData *string `json:"data,omitempty"`
Experiment *ExperimentPayload `json:"experiment,omitempty"`
TaskLog *TaskLogPayload `json:"task_log,omitempty"`
}
// ExperimentPayload is the webhook request representation of an experiment.
type ExperimentPayload struct {
ID int `json:"id"`
State model.State `json:"state"`
Name expconf.Name `json:"name"`
Duration int `json:"duration"`
ResourcePool string `json:"resource_pool"`
SlotsPerTrial int `json:"slots_per_trial"`
WorkspaceName string `json:"workspace"`
ProjectName string `json:"project"`
}
// TaskLogPayload is the webhook request representation of a trigger of a task log.
type TaskLogPayload struct {
TaskID model.TaskID `json:"task_id"`
NodeName string `json:"node_name"`
TriggeringLog string `json:"triggering_log"`
}