/
bridge.go
178 lines (149 loc) · 4.98 KB
/
bridge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package adapters
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
uuid "github.com/satori/go.uuid"
"github.com/SeerLink/seerlink/core/store"
"github.com/SeerLink/seerlink/core/store/models"
"github.com/SeerLink/seerlink/core/utils"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
)
// Bridge adapter is responsible for connecting the task pipeline to external
// adapters, allowing for custom computations to be executed and included in runs.
type Bridge struct {
models.BridgeType
Params models.JSON
}
// TaskType returns the bridges defined type.
func (ba *Bridge) TaskType() models.TaskType {
return ba.Name
}
// Perform sends a POST request containing the JSON of the input to the
// external adapter specified in the BridgeType.
//
// It records the RunResult returned to it, and optionally marks the RunResult pending.
//
// If the Perform is resumed with a pending RunResult, the RunResult is marked
// not pending and the RunResult is returned.
func (ba *Bridge) Perform(input models.RunInput, store *store.Store) models.RunOutput {
if input.Status().Completed() {
return models.NewRunOutputComplete(input.Data())
} else if input.Status().PendingBridge() {
return models.NewRunOutputInProgress(input.Data())
}
meta := getMeta(store, input.JobRunID())
return ba.handleNewRun(input, meta, store)
}
func getMeta(store *store.Store, jobRunID uuid.UUID) *models.JSON {
jobRun, err := store.ORM.FindJobRun(jobRunID)
if err != nil {
return nil
} else if jobRun.RunRequest.TxHash == nil || jobRun.RunRequest.BlockHash == nil {
return nil
}
meta := fmt.Sprintf(`
{
"initiator": {
"transactionHash": "%s",
"blockHash": "%s"
}
}`,
jobRun.RunRequest.TxHash.Hex(),
jobRun.RunRequest.BlockHash.Hex(),
)
return &models.JSON{Result: gjson.Parse(meta)}
}
func (ba *Bridge) handleNewRun(input models.RunInput, meta *models.JSON, store *store.Store) models.RunOutput {
data, err := models.Merge(input.Data(), ba.Params)
if err != nil {
return models.NewRunOutputError(baRunResultError("handling data param", err))
}
responseURL := store.Config.BridgeResponseURL()
if *responseURL != *zeroURL {
responseURL.Path += fmt.Sprintf("/v2/runs/%s", input.JobRunID().String())
}
httpConfig := defaultHTTPConfig(store.Config)
// URL is "safe" because it comes from the node's own database
// Some node operators may run external adapters on their own hardware
httpConfig.AllowUnrestrictedNetworkAccess = true
body, err := ba.postToExternalAdapter(input, meta, responseURL, httpConfig)
if err != nil {
return models.NewRunOutputError(baRunResultError("post to external adapter", err))
}
input = input.CloneWithData(data)
return ba.responseToRunResult(body, input)
}
func (ba *Bridge) responseToRunResult(body []byte, input models.RunInput) models.RunOutput {
var brr models.BridgeRunResult
err := json.Unmarshal(body, &brr)
if err != nil {
return models.NewRunOutputError(baRunResultError("unmarshaling JSON", err))
}
if brr.HasError() {
return models.NewRunOutputError(brr.GetError())
}
if brr.ExternalPending {
return models.NewRunOutputPendingBridge()
}
if brr.Data.IsObject() {
data, err := models.Merge(ba.Params, brr.Data)
if err != nil {
return models.NewRunOutputError(baRunResultError("handling data param", err))
}
return models.NewRunOutputComplete(data)
}
return models.NewRunOutputCompleteWithResult(brr.Data.String(), input.ResultCollection())
}
func (ba *Bridge) postToExternalAdapter(
input models.RunInput,
meta *models.JSON,
bridgeResponseURL *url.URL,
config utils.HTTPRequestConfig,
) ([]byte, error) {
data, err := models.Merge(input.Data(), ba.Params)
if err != nil {
return nil, errors.Wrap(err, "error merging bridge params with input params")
}
outgoing := bridgeOutgoing{JobRunID: input.JobRunID().String(), Data: data, Meta: meta}
if bridgeResponseURL != nil {
outgoing.ResponseURL = bridgeResponseURL.String()
}
in, err := json.Marshal(&outgoing)
if err != nil {
return nil, fmt.Errorf("marshaling request body: %v", err)
}
request, err := http.NewRequest("POST", ba.URL.String(), bytes.NewBuffer(in))
if err != nil {
return nil, fmt.Errorf("building outgoing bridge http post: %v", err)
}
request.Header.Set("Authorization", "Bearer "+ba.BridgeType.OutgoingToken)
request.Header.Set("Content-Type", "application/json")
httpRequest := utils.HTTPRequest{
Request: request,
Config: config,
}
bytes, statusCode, err := httpRequest.SendRequest(context.TODO())
if err != nil {
return nil, err
}
if statusCode >= 400 {
err = fmt.Errorf("%v %v", statusCode, string(bytes))
return nil, fmt.Errorf("POST request: %v", err)
}
return bytes, nil
}
func baRunResultError(str string, err error) error {
return fmt.Errorf("ExternalBridge %v: %v", str, err)
}
type bridgeOutgoing struct {
JobRunID string `json:"id"`
Data models.JSON `json:"data"`
Meta *models.JSON `json:"meta,omitempty"`
ResponseURL string `json:"responseURL,omitempty"`
}
var zeroURL = new(url.URL)