/
flows.go
136 lines (121 loc) · 4.23 KB
/
flows.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*************************************************************************
* Copyright 2021 Gravwell, Inc. All rights reserved.
* Contact: <legal@gravwell.io>
*
* This software may be modified and distributed under the terms of the
* BSD 2-clause license. See the LICENSE file for details.
**************************************************************************/
package client
import (
"errors"
"net/http"
"github.com/gravwell/gravwell/v3/client/types"
)
// GetFlowhList returns flows the user has access to.
func (c *Client) GetFlowList() ([]types.ScheduledSearch, error) {
var searches []types.ScheduledSearch
if err := c.getStaticURL(flowUrl(), &searches); err != nil {
return nil, err
}
return searches, nil
}
// CreateFlow makes a new flow and returns the ID. The parameters are:
//
// - name: the flow name.
//
// - description: the flow description.
//
// - schedule: a cron-format schedule on which to execute the flow.
//
// - flow: a valid JSON flow definition.
//
// - groups: an optional array of groups which should be able to access this object.
func (c *Client) CreateFlow(name, description, schedule, flow string, groups []int32) (int32, error) {
ss := types.ScheduledSearch{
Groups: groups,
Name: name,
Description: description,
Schedule: schedule,
ScheduledType: types.ScheduledTypeFlow,
Flow: flow,
}
var resp int32
if err := c.postStaticURL(flowUrl(), ss, &resp); err != nil {
return 0, err
}
return resp, nil
}
// UpdateFlowResults is used to update the flow after it has been
// run. It only updates the LastRun, LastRunDuration, LastSearchIDs,
// and LastError fields.
func (c *Client) UpdateFlowResults(ss types.ScheduledSearch) error {
return c.putStaticURL(flowResultsIdUrl(ss.ID), ss)
}
// UpdateFlow is used to modify an existing flow.
func (c *Client) UpdateFlow(ss types.ScheduledSearch) error {
return c.putStaticURL(flowIdUrl(ss.ID), ss)
}
// DeleteFlow removes the specified flow.
func (c *Client) DeleteFlow(id int32) error {
return c.deleteStaticURL(flowIdUrl(id), nil)
}
// GetFlow returns the flow with the given ID. The ID is an interface{}
// to allow the user to specify either the flow's int32 "ID" or its
// UUID "GUID" field.
func (c *Client) GetFlow(id interface{}) (types.ScheduledSearch, error) {
var search types.ScheduledSearch
err := c.getStaticURL(flowIdUrl(id), &search)
return search, err
}
// ClearFlowError clears the error field on the specified scheduled search.
func (c *Client) ClearFlowError(id int32) error {
return c.deleteStaticURL(flowErrorIdUrl(id), nil)
}
// ClearFlowState clears state variables on the specified scheduled search.
func (c *Client) ClearFlowState(id int32) error {
return c.deleteStaticURL(flowStateIdUrl(id), nil)
}
// ParseFlow asks the API to check a flow.
// If there is no error, outputPayloads will be a map containing the outputs
// of each node, keyed by the node ID.
func (c *Client) ParseFlow(flow string) (outputPayloads map[int]map[string]interface{}, err error) {
var resp types.FlowParseResponse
req := types.FlowParseRequest{
Flow: flow,
}
if err = c.methodStaticPushURL(http.MethodPut, flowParseUrl(), req, &resp); err != nil {
return
}
//if the parse failed but we don't have an error, set something
if !resp.OK {
if len(resp.Error) == 0 {
resp.Error = `Unknown parse error`
}
err = errors.New(resp.Error)
}
outputPayloads = resp.OutputPayloads
return
}
// ParseReactiveFlow asks the API to check a flow as if triggered by an alert.
// The event parameter will be injected into the initial payload under the name `event`.
// If there is no error, outputPayloads will be a map containing the outputs
// of each node, keyed by the node ID.
func (c *Client) ParseReactiveFlow(flow string, event types.Event) (outputPayloads map[int]map[string]interface{}, err error) {
var resp types.FlowParseResponse
req := types.FlowParseRequest{
DebugEvent: &event,
Flow: flow,
}
if err = c.methodStaticPushURL(http.MethodPut, flowParseUrl(), req, &resp); err != nil {
return
}
//if the parse failed but we don't have an error, set something
if !resp.OK {
if len(resp.Error) == 0 {
resp.Error = `Unknown parse error`
}
err = errors.New(resp.Error)
}
outputPayloads = resp.OutputPayloads
return
}