-
Notifications
You must be signed in to change notification settings - Fork 0
/
job.go
171 lines (122 loc) · 4.1 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package sm
import (
"errors"
"fmt"
"time"
)
type JobParam struct {
DataType ParamType
Value interface{}
// can be linked with an output param of the previous step
LinkedOutputName *string
}
type JobStep struct {
ID int64
TaskID int64
// The `key` of the map is the name of the parameter
Input map[string]JobParam
Output map[string]JobParam
}
type Job struct {
ID int64
IsCompleted bool
IsCanceled bool
CreationDate time.Time
CompletionDate *time.Time
PublicationDate time.Time
ExpirationDate time.Time
Owner ContentOwner
Status string
CurrentStep int
Steps []*JobStep
}
type JobRepository interface {
TaskHasActiveJobs(task_id int64) (bool, error)
GetJobsStepsForModule(
module_id,
limit,
before_job_id int64) ([]map[string]interface{}, error)
GetJobStepForModule(
step_id, module_id int64) (map[string]interface{}, error)
GetJobsForContentOwner(
owner_id, limit, before_job_id int64) ([]*Job, error)
GetJobByStepID(step_id int64) (*Job, error)
GetJobByID(job_id int64) (*Job, error)
GetJobSteps(job_id int64, steps *[]*JobStep) error
GetParamsForStep(step *JobStep) error
SaveStatus(job *Job) error
CreateJob(job *Job) error
SaveStepProgress(job *Job) error
SaveFinishedState(job *Job) error
GetJobsExceedingPublicationDate(
timestamp time.Time,
limit, offset_id int64) ([]*Job, error)
CancelJobsWithExceedingPublicatinDate(timestamp time.Time) error
GetJobsExceedingExpirationDate(
timestamp time.Time,
limit, offset_id int64) ([]*Job, error)
CancelJobsWithExceedingExpirationDate(timestamp time.Time) error
GetNewExpiredJobsAt(
timestamp time.Time,
limit int64,
offset_id int64) ([]*Job, error)
ExpireJobsBefore(timestamp time.Time) error
DeleteParamsForJob(job_id int64) error
}
type JobService interface {
SetJobStatusForStep(step_id int64, status string) error
CreateJob(user_id, publication_date, expiration_date int64,
tasks []map[string]interface{}) (*Job, error)
CancelJobsWithExceedingPublicationDate() error
CancelJobsWithExceedingExpirationDate() error
CancelJobAsModule(module *Module, step_id int64) error
CancelJobAsOwner(owner_id, job_id int64) error
SendCancelRequest(job *Job, task *Task, module *Module) error
// The job is passed by value in order to make the function
// safe to call as a goroutine
PerformNextStepOfJob(job Job)
FinishJobStep(step_id int64, module *Module, output map[string]interface{}) error
}
// errors
var ErrJobIsCanceled = errors.New("Job is canceled")
var ErrJobIsCompleted = errors.New("Job is completed")
var ErrJobStatusNotUpdatable = errors.New(
"Job status can't be updated because it is copmleted")
var ErrInvalidPublicationDate = errors.New("Publication date should be in the future")
var ErrInvalidExpirationDate = errors.New("Expiration date should be after publication date")
var ErrEmptyTasks = errors.New("There should be at least on task for a job")
var ErrMissingTaskID = errors.New("The task_id is missing in the input")
type ErrTaskNotFound struct{ TaskID int64 }
func (e *ErrTaskNotFound) Error() string {
return fmt.Sprintf("A task with id(%d) does not exist", e.TaskID)
}
type ErrTaskIsDisabled struct{ TaskID int64 }
func (e *ErrTaskIsDisabled) Error() string {
return fmt.Sprintf("Task %v is disabled", e.TaskID)
}
type ErrModuleIsDisabled struct{ ModuleID int64 }
func (e *ErrModuleIsDisabled) Error() string {
return fmt.Sprintf("Module %v is disabled", e.ModuleID)
}
type ErrInvalidTaskInput struct{ Message string }
func (e *ErrInvalidTaskInput) Error() string {
return e.Message
}
type ErrInvalidTaskOutput struct{ Message string }
func (e *ErrInvalidTaskOutput) Error() string {
return e.Message
}
type ErrTaskOutputNotFound struct{ Name string }
func (e *ErrTaskOutputNotFound) Error() string {
return fmt.Sprintf(
"Previous task doesn't have an output named %s", e.Name)
}
type ErrLinkedParameterNotTheSameType struct {
InputName string
OutputName string
}
func (e *ErrLinkedParameterNotTheSameType) Error() string {
return fmt.Sprintf(
"\"%s\" and \"%s\" are not of the same type",
e.InputName, e.OutputName)
}