-
-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
job.go
117 lines (106 loc) · 2.6 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package task
import (
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/util"
)
// 任务类型
const (
// CompressTaskType 压缩任务
CompressTaskType = iota
// DecompressTaskType 解压缩任务
DecompressTaskType
// TransferTaskType 中转任务
TransferTaskType
// ImportTaskType 导入任务
ImportTaskType
)
// 任务状态
const (
// Queued 排队中
Queued = iota
// Processing 处理中
Processing
// Error 失败
Error
// Canceled 取消
Canceled
// Complete 完成
Complete
)
// 任务进度
const (
// PendingProgress 等待中
PendingProgress = iota
// Compressing 压缩中
CompressingProgress
// Decompressing 解压缩中
DecompressingProgress
// Downloading 下载中
DownloadingProgress
// Transferring 转存中
TransferringProgress
// ListingProgress 索引中
ListingProgress
// InsertingProgress 插入中
InsertingProgress
)
// Job 任务接口
type Job interface {
Type() int // 返回任务类型
Creator() uint // 返回创建者ID
Props() string // 返回序列化后的任务属性
Model() *model.Task // 返回对应的数据库模型
SetStatus(int) // 设定任务状态
Do() // 开始执行任务
SetError(*JobError) // 设定任务失败信息
GetError() *JobError // 获取任务执行结果,返回nil表示成功完成执行
}
// JobError 任务失败信息
type JobError struct {
Msg string `json:"msg,omitempty"`
Error string `json:"error,omitempty"`
}
// Record 将任务记录到数据库中
func Record(job Job) (*model.Task, error) {
record := model.Task{
Status: Queued,
Type: job.Type(),
UserID: job.Creator(),
Progress: 0,
Error: "",
Props: job.Props(),
}
_, err := record.Create()
return &record, err
}
// Resume 从数据库中恢复未完成任务
func Resume() {
tasks := model.GetTasksByStatus(Queued, Processing)
if len(tasks) == 0 {
return
}
util.Log().Info("从数据库中恢复 %d 个未完成任务", len(tasks))
for i := 0; i < len(tasks); i++ {
job, err := GetJobFromModel(&tasks[i])
if err != nil {
util.Log().Warning("无法恢复任务,%s", err)
continue
}
TaskPoll.Submit(job)
}
}
// GetJobFromModel 从数据库给定模型获取任务
func GetJobFromModel(task *model.Task) (Job, error) {
switch task.Type {
case CompressTaskType:
return NewCompressTaskFromModel(task)
case DecompressTaskType:
return NewDecompressTaskFromModel(task)
case TransferTaskType:
return NewTransferTaskFromModel(task)
case ImportTaskType:
return NewImportTaskFromModel(task)
default:
return nil, ErrUnknownTaskType
}
}