-
Notifications
You must be signed in to change notification settings - Fork 6
/
job.go
132 lines (115 loc) · 3.46 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
Copyright 2021 Adevinta
*/
package api
import (
"context"
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"time"
)
const (
// JobStatusPending defines the status of a pending Job.
JobStatusPending JobStatus = "PENDING"
// JobStatusRunning defines the status of a running Job.
JobStatusRunning JobStatus = "RUNNING"
// JobStatusDone defines the status of a done Job.
JobStatusDone JobStatus = "DONE"
)
type JobStatus string
// Job contains the status information of an asynchronous operation.
//
// In case of non-global operations it also contains the team ID associated to
// the operation.
type Job struct {
ID string `gorm:"primary_key:true"`
TeamID string `gorm:"Column:team_id"`
Operation string `validate:"required"`
// Status possible values are:
// - PENDING
// - RUNNING
// - DONE
Status JobStatus `validate:"required"`
Result *JobResult `gorm:"Column:result"`
CreatedAt time.Time
UpdatedAt time.Time
}
// JobResult represents the result of a job. Data and Error fields are
// unstructured JSON fields which content may vary per each operation.
type JobResult struct {
Data json.RawMessage `json:"data"`
Error string `json:"error"`
}
// Scan scans value into Jsonb, implements sql.Scanner interface.
// This method is necessary for GORM to known how to receive/save it into the database.
// Reference: https://gorm.io/docs/data_types.html
func (j *JobResult) Scan(value interface{}) error {
bytes, ok := value.([]byte)
if !ok {
return fmt.Errorf("failed to unmarshal JSONB value: %v", value)
}
return json.Unmarshal(bytes, j)
}
// Value returns json value, implements driver.Valuer interface.
// This method is necessary for GORM to known how to receive/save it into the database.
// Reference: https://gorm.io/docs/data_types.html
func (j *JobResult) Value() (driver.Value, error) {
return json.Marshal(j)
}
func (r *JobResult) toJobResultResponse() JobResultResponse {
return JobResultResponse{
Data: string(r.Data),
Error: r.Error,
}
}
func (j Job) Validate() error {
switch j.Status {
case JobStatusPending:
case JobStatusRunning:
case JobStatusDone:
default:
return errors.New("valid status are PENDING, RUNNING or DONE")
}
if !json.Valid(j.Result.Data) {
return errors.New("invalid result data JSON")
}
return nil
}
func (j Job) ToResponse() *JobResponse {
res := &JobResponse{
ID: j.ID,
TeamID: j.TeamID,
Operation: j.Operation,
Status: j.Status,
}
if j.Result != nil {
res.Result = j.Result.toJobResultResponse()
}
return res
}
// JobResponse represents the data for a Job that is
// returned as a response to Job queries through the API.
type JobResponse struct {
ID string `json:"id"`
TeamID string `json:"team_id,omitempty"`
Operation string `json:"operation"`
Status JobStatus `json:"status"`
Result JobResultResponse `json:"result"`
}
type JobResultResponse struct {
Data string `json:"data"`
Error string `json:"error"`
}
// JobsRunner is a dependency used by the CDC parser to execute async API jobs,
// providing a limited access to the API service layer.
type JobsRunner struct {
Client JobsClient
}
// JobsClient defines the API service layer methods exposd by the JobsRunner.
type JobsClient interface {
MergeDiscoveredAssets(ctx context.Context, teamID string, assets []Asset, groupName string) error
FindJob(ctx context.Context, jobID string) (*Job, error)
UpdateJob(ctx context.Context, job Job) (*Job, error)
}