/
metascheduler.go
152 lines (136 loc) · 4.45 KB
/
metascheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright (C) 2023 DeepSquare Association
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package metascheduler
import (
"context"
metaschedulerabi "github.com/deepsquare-io/grid/supervisor/generated/abi/metascheduler"
"github.com/deepsquare-io/grid/supervisor/logger"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"go.uber.org/zap"
)
type Job struct {
JobID [32]byte
Status JobStatus
CustomerAddr common.Address
ProviderAddr common.Address
Definition metaschedulerabi.JobDefinition
Cost metaschedulerabi.JobCost
Time metaschedulerabi.JobTime
JobName [32]byte
HasCancelRequest bool
LastError string
}
func FromStructToJob(s metaschedulerabi.Job) *Job {
return &Job{
JobID: s.JobId,
Status: JobStatus(s.Status),
CustomerAddr: s.CustomerAddr,
ProviderAddr: s.ProviderAddr,
Definition: s.Definition,
Cost: s.Cost,
Time: s.Time,
JobName: s.JobName,
HasCancelRequest: s.HasCancelRequest,
LastError: s.LastError,
}
}
type ProviderJobIterator struct {
Job *Job
*metaschedulerabi.MetaSchedulerClaimJobEventIterator
client MetaScheduler
providerAddress common.Address
}
func (it *ProviderJobIterator) Next(ctx context.Context) bool {
for it.MetaSchedulerClaimJobEventIterator.Next() {
if it.Event.ProviderAddr != it.providerAddress {
continue
}
job, err := it.client.GetJob(ctx, it.Event.JobId)
if err != nil {
logger.I.Error("GetJob failed", zap.Error(err))
return false
}
if job.ProviderAddr != it.providerAddress {
continue
}
it.Job = job
return true
}
return false
}
type setJobStatusOptions struct {
err error
exitCode int64
}
func applySetJobStatusOptions(opts []SetJobStatusOption) *setJobStatusOptions {
o := &setJobStatusOptions{}
for _, opt := range opts {
opt(o)
}
return o
}
type SetJobStatusOption func(*setJobStatusOptions)
func SetJobStatusWithError(err error) SetJobStatusOption {
return func(sjso *setJobStatusOptions) {
sjso.err = err
}
}
func SetJobStatusWithExitCode(exitCode int64) SetJobStatusOption {
return func(sjso *setJobStatusOptions) {
sjso.exitCode = exitCode
}
}
type MetaScheduler interface {
IsRequestNewJobEnabled(ctx context.Context) (bool, error)
// Claim a job for scheduling.
Claim(ctx context.Context) error
// Claim cancelling calls.
ClaimCancelling(ctx context.Context) error
// Claim top up calls.
ClaimTopUp(ctx context.Context) error
// Refuse a job for metascheduling.
RefuseJob(ctx context.Context, jobID [32]byte) error
// WatchEvents observes the incoming ClaimNextTopUpJobEvent, ClaimNextCancellingJobEvent and ClaimJobEvent.
WatchEvents(
ctx context.Context,
claimNextTopUpJobEvents chan<- *metaschedulerabi.MetaSchedulerClaimNextTopUpJobEvent,
claimNextCancellingJobEvents chan<- *metaschedulerabi.MetaSchedulerClaimNextCancellingJobEvent,
claimJobEvents chan<- *metaschedulerabi.MetaSchedulerClaimJobEvent,
) (event.Subscription, error)
// GetProviderAddress fetches the provider public address.
GetProviderAddress() common.Address
// GetOldInfo fetches the last registration provider information.
GetOldInfo(ctx context.Context) (*metaschedulerabi.Provider, error)
// GetJobStatus fetches the job status.
GetJobStatus(ctx context.Context, jobID [32]byte) (JobStatus, error)
SetJobStatus(
ctx context.Context,
jobID [32]byte,
status JobStatus,
jobDurationMinute uint64,
opts ...SetJobStatusOption,
) error
Register(
ctx context.Context,
hardware metaschedulerabi.ProviderHardware,
prices metaschedulerabi.ProviderPrices,
labels []metaschedulerabi.Label,
) error
// GetJob fetches a job.
GetJob(ctx context.Context, jobID [32]byte) (*Job, error)
// GetJobs fetches the jobs of the provider.
GetJobs(ctx context.Context) (*ProviderJobIterator, error)
}