/
middleware.go
312 lines (274 loc) · 10.5 KB
/
middleware.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package api
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
"github.com/gogo/status"
"github.com/golang/protobuf/ptypes/empty"
tclient "go.temporal.io/sdk/client"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apoxy-dev/proximal/core/log"
serverdb "github.com/apoxy-dev/proximal/server/db"
sqlc "github.com/apoxy-dev/proximal/server/db/sql"
"github.com/apoxy-dev/proximal/server/watcher"
middlewarev1 "github.com/apoxy-dev/proximal/api/middleware/v1"
)
// MiddlewareService implements the MiddlewareServiceServer gRPC service.
type MiddlewareService struct {
db *serverdb.DB
tc tclient.Client
watcher *watcher.Watcher
}
// NewMiddlewareService returns a new MiddlewareService.
func NewMiddlewareService(db *serverdb.DB, tc tclient.Client, w *watcher.Watcher) *MiddlewareService {
return &MiddlewareService{
db: db,
tc: tc,
watcher: w,
}
}
func (s *MiddlewareService) List(ctx context.Context, req *middlewarev1.ListRequest) (*middlewarev1.ListResponse, error) {
log.Infof("list middlewares: %s", protojson.Format(req))
ts := time.Now()
if req.PageToken != "" {
dTs, err := decodeNextPageToken(req.PageToken)
if err != nil {
log.Errorf("failed to decode page token: %v", err)
return nil, status.Error(codes.InvalidArgument, "invalid page token")
}
log.Debugf("decoded page token: %s", string(dTs))
ts, err = time.Parse(time.RFC3339Nano, string(dTs))
if err != nil {
log.Errorf("failed to parse page token: %v", err)
return nil, status.Error(codes.InvalidArgument, "invalid page token")
}
}
pageSize := req.PageSize
if pageSize == 0 {
pageSize = 100
}
ms, err := s.db.Queries().ListMiddlewares(ctx, sqlc.ListMiddlewaresParams{
Datetime: ts,
Limit: int64(pageSize),
})
if err != nil {
return nil, fmt.Errorf("failed to list middleware: %w", err)
}
mpbs := make([]*middlewarev1.Middleware, 0, len(ms))
for _, m := range ms {
mw, err := middlewareFromRow(&m)
if err != nil {
return nil, fmt.Errorf("failed to convert middleware: %w", err)
}
mpbs = append(mpbs, mw)
}
var nextPageToken string
if len(mpbs) == int(pageSize) {
nextPageToken = encodeNextPageToken(mpbs[len(mpbs)-1].CreatedAt.AsTime().Format(time.RFC3339Nano))
}
return &middlewarev1.ListResponse{
Middlewares: mpbs,
NextPageToken: nextPageToken,
}, nil
}
func middlewareFromRow(m *sqlc.Middleware) (*middlewarev1.Middleware, error) {
var ingestParams middlewarev1.MiddlewareIngestParams
if err := protojson.Unmarshal(m.IngestParamsJson, &ingestParams); err != nil {
return nil, fmt.Errorf("unable to unmarshal ingest params: %v", err)
}
var runtimeParams middlewarev1.MiddlewareRuntimeParams
if err := protojson.Unmarshal(m.RuntimeParamsJson, &runtimeParams); err != nil {
return nil, fmt.Errorf("unable to unmarshal runtime params: %v", err)
}
status, ok := middlewarev1.Middleware_MiddlewareStatus_value[strings.ToUpper(m.Status)]
if !ok {
return nil, fmt.Errorf("unknown middleware status: %v", m.Status)
}
return &middlewarev1.Middleware{
Slug: m.Slug,
IngestParams: &ingestParams,
RuntimeParams: &runtimeParams,
Status: middlewarev1.Middleware_MiddlewareStatus(status),
LiveBuildSha: m.LiveBuildSha.String,
CreatedAt: timestamppb.New(m.CreatedAt.Time),
UpdatedAt: timestamppb.New(m.UpdatedAt.Time),
}, nil
}
func (s *MiddlewareService) Get(ctx context.Context, req *middlewarev1.GetRequest) (*middlewarev1.Middleware, error) {
m, err := s.db.Queries().GetMiddlewareBySlug(ctx, req.Slug)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, status.Error(codes.NotFound, "middleware not found")
}
log.Errorf("unable to get middleware: %v", err)
return nil, status.Error(codes.Internal, "unable to get middleware")
}
pb, err := middlewareFromRow(&m)
if err != nil {
log.Errorf("unable to create middleware from row: %v", err)
return nil, status.Error(codes.Internal, "unable to create middleware from row")
}
return pb, nil
}
// Create starts a new ingest workflow for the given middleware.
func (s *MiddlewareService) Create(ctx context.Context, req *middlewarev1.CreateRequest) (*middlewarev1.Middleware, error) {
paramsJson, err := protojson.Marshal(req.Middleware.IngestParams)
if err != nil {
log.Errorf("unable to marshal params: %v", err)
return nil, status.Error(codes.Internal, "unable to marshal params")
}
runParamsJson, err := protojson.Marshal(req.Middleware.RuntimeParams)
if err != nil {
log.Errorf("unable to marshal runtime params: %v", err)
return nil, status.Error(codes.Internal, "unable to marshal runtime params")
}
_, err = s.db.Queries().GetMiddlewareBySlug(ctx, req.Middleware.Slug)
if err == nil {
return nil, status.Error(codes.AlreadyExists, "middleware already exists")
} else if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Errorf("unable to get middleware by slug: %v", err)
return nil, status.Error(codes.Internal, "unable to get middleware by slug")
}
tx, err := s.db.Begin()
if err != nil {
log.Errorf("failed to begin transaction: %v", err)
return nil, status.Error(codes.Internal, "failed to start ingest")
}
defer tx.Rollback()
qtx := s.db.Queries().WithTx(tx)
m, err := qtx.CreateMiddleware(ctx, sqlc.CreateMiddlewareParams{
Slug: req.Middleware.Slug,
SourceType: serverdb.SourceTypeGitHub,
IngestParamsJson: paramsJson,
RuntimeParamsJson: runParamsJson,
Status: serverdb.MiddlewareStatusPending,
StatusDetail: sql.NullString{
String: "pending",
Valid: true,
},
})
if err != nil {
log.Errorf("unable to create middleware: %v", err)
return nil, status.Error(codes.Internal, "unable to create middleware")
}
if err := s.startBuildWorkflow(ctx, req.Middleware.Slug, req.Middleware.IngestParams); err != nil {
log.Errorf("unable to start build workflow: %v", err)
return nil, status.Error(codes.Internal, "unable to start build workflow")
}
if err := tx.Commit(); err != nil {
log.Errorf("unable to commit transaction: %v", err)
return nil, status.Error(codes.Internal, "unable to commit transaction")
}
pb, err := middlewareFromRow(&m)
if err != nil {
log.Errorf("unable to create middleware from row: %v", err)
return nil, status.Error(codes.Internal, "unable to create middleware from row")
}
return pb, nil
}
func (s *MiddlewareService) Update(ctx context.Context, req *middlewarev1.UpdateRequest) (*middlewarev1.Middleware, error) {
m, err := s.db.Queries().GetMiddlewareBySlug(ctx, req.Middleware.Slug)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, status.Error(codes.NotFound, "middleware not found")
}
log.Errorf("unable to get middleware: %v", err)
return nil, status.Error(codes.Internal, "unable to get middleware")
}
mpb, err := middlewareFromRow(&m)
if err != nil {
log.Errorf("unable to create middleware from row: %v", err)
return nil, status.Error(codes.Internal, "unable to create middleware from row")
}
if mpb.Type != req.Middleware.Type {
return nil, status.Error(codes.InvalidArgument, "cannot change middleware type")
}
if req.Middleware.IngestParams != nil {
if req.Middleware.IngestParams.Type != mpb.IngestParams.Type {
return nil, status.Error(codes.InvalidArgument, "cannot change ingest type")
}
if req.Middleware.IngestParams.Type == middlewarev1.MiddlewareIngestParams_GITHUB {
if req.Middleware.IngestParams.GetGithubRepo() != mpb.IngestParams.GetGithubRepo() {
return nil, status.Error(codes.InvalidArgument, "cannot change github repo")
}
} else if req.Middleware.IngestParams.Type == middlewarev1.MiddlewareIngestParams_DIRECT {
if req.Middleware.IngestParams.GetWatchDir() != mpb.IngestParams.GetWatchDir() {
return nil, status.Error(codes.InvalidArgument, "cannot change watch dir")
}
}
}
paramsJson, err := protojson.Marshal(req.Middleware.IngestParams)
if err != nil {
log.Errorf("unable to marshal params: %v", err)
return nil, status.Error(codes.Internal, "unable to marshal params")
}
runParamsJson, err := protojson.Marshal(req.Middleware.RuntimeParams)
if err != nil {
log.Errorf("unable to marshal runtime params: %v", err)
return nil, status.Error(codes.Internal, "unable to marshal runtime params")
}
upd, err := s.db.Queries().UpdateMiddleware(ctx, sqlc.UpdateMiddlewareParams{
Slug: req.Middleware.Slug,
IngestParamsJson: paramsJson,
RuntimeParamsJson: runParamsJson,
})
if err != nil {
log.Errorf("unable to update middleware: %v", err)
return nil, status.Error(codes.Internal, "unable to update middleware")
}
upb, err := middlewareFromRow(&upd)
if err != nil {
log.Errorf("unable to create middleware from row: %v", err)
return nil, status.Error(codes.Internal, "unable to create middleware from row")
}
return upb, nil
}
func (s *MiddlewareService) Delete(ctx context.Context, req *middlewarev1.DeleteRequest) (*emptypb.Empty, error) {
log.Infof("deleting middleware %v", req.Slug)
m, err := s.db.Queries().GetMiddlewareBySlug(ctx, req.Slug)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, status.Error(codes.NotFound, "middleware not found")
}
log.Errorf("unable to get middleware: %v", err)
return nil, status.Error(codes.Internal, "unable to get middleware")
}
mpb, err := middlewareFromRow(&m)
if err != nil {
log.Errorf("unable to create middleware from row: %v", err)
return nil, status.Error(codes.Internal, "failed to delete middleware")
}
if mpb.IngestParams.Type == middlewarev1.MiddlewareIngestParams_DIRECT {
if err := s.watcher.Remove(m.Slug); err != nil {
log.Warnf("unable to remove watch dir: %v", err)
}
}
if err := s.db.Queries().DeleteMiddleware(ctx, req.Slug); err != nil {
log.Errorf("unable to delete middleware: %v", err)
return nil, status.Error(codes.Internal, "unable to delete middleware")
}
return &emptypb.Empty{}, nil
}
func (s *MiddlewareService) InternalList(ctx context.Context, _ *empty.Empty) (*middlewarev1.ListResponse, error) {
mws, err := s.db.Queries().ListMiddlewaresAll(ctx)
if err != nil {
log.Errorf("unable to list middleware: %v", err)
return nil, status.Error(codes.Internal, "unable to list middleware")
}
var resp middlewarev1.ListResponse
for _, m := range mws {
pb, err := middlewareFromRow(&m)
if err != nil {
log.Errorf("unable to create middleware from row: %v", err)
return nil, status.Error(codes.Internal, "unable to create middleware from row")
}
resp.Middlewares = append(resp.Middlewares, pb)
}
return &resp, nil
}