Skip to content

Commit

Permalink
feat: rename task to job (#544)
Browse files Browse the repository at this point in the history
* feat: rename task to job

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Aug 12, 2021
1 parent 2dd969c commit 5286be0
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 148 deletions.
6 changes: 3 additions & 3 deletions internal/tasks/constants.go → internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package tasks
package job

// Queue Name
const (
Expand All @@ -23,7 +23,7 @@ const (
CDNsQueue = Queue("cdns")
)

// Task Name
// Job Name
const (
PreheatTask = "preheat"
PreheatJob = "preheat"
)
44 changes: 22 additions & 22 deletions internal/tasks/tasks.go → internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package tasks
package job

import (
"encoding/json"
Expand All @@ -41,13 +41,13 @@ type Config struct {
BackendDB int
}

type Tasks struct {
type Job struct {
Server *machinery.Server
Worker *machinery.Worker
Queue Queue
}

func New(cfg *Config, queue Queue) (*Tasks, error) {
func New(cfg *Config, queue Queue) (*Job, error) {
broker := fmt.Sprintf("redis://%s@%s:%d/%d", cfg.Password, cfg.Host, cfg.Port, cfg.BrokerDB)
backend := fmt.Sprintf("redis://%s@%s:%d/%d", cfg.Password, cfg.Host, cfg.Port, cfg.BackendDB)

Expand All @@ -63,61 +63,61 @@ func New(cfg *Config, queue Queue) (*Tasks, error) {
return nil, err
}

return &Tasks{
return &Job{
Server: server,
Queue: queue,
}, nil
}

func (t *Tasks) RegisterTasks(namedTaskFuncs map[string]interface{}) error {
return t.Server.RegisterTasks(namedTaskFuncs)
func (t *Job) RegisterJob(namedJobFuncs map[string]interface{}) error {
return t.Server.RegisterTasks(namedJobFuncs)
}

func (t *Tasks) LaunchWorker(consumerTag string, concurrency int) error {
func (t *Job) LaunchWorker(consumerTag string, concurrency int) error {
t.Worker = t.Server.NewWorker(consumerTag, concurrency)
return t.Worker.Launch()
}

type GroupTaskState struct {
type GroupJobState struct {
GroupUUID string
State string
CreatedAt time.Time
}

func (t *Tasks) GetGroupTaskState(groupUUID string) (*GroupTaskState, error) {
taskStates, err := t.Server.GetBackend().GroupTaskStates(groupUUID, 0)
func (t *Job) GetGroupJobState(groupUUID string) (*GroupJobState, error) {
jobStates, err := t.Server.GetBackend().GroupTaskStates(groupUUID, 0)
if err != nil {
return nil, err
}

if len(taskStates) == 0 {
return nil, errors.New("empty group task")
if len(jobStates) == 0 {
return nil, errors.New("empty group job")
}

for _, taskState := range taskStates {
if taskState.IsFailure() {
return &GroupTaskState{
for _, jobState := range jobStates {
if jobState.IsFailure() {
return &GroupJobState{
GroupUUID: groupUUID,
State: machineryv1tasks.StateFailure,
CreatedAt: taskState.CreatedAt,
CreatedAt: jobState.CreatedAt,
}, nil
}
}

for _, taskState := range taskStates {
if !taskState.IsSuccess() {
return &GroupTaskState{
for _, jobState := range jobStates {
if !jobState.IsSuccess() {
return &GroupJobState{
GroupUUID: groupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: taskState.CreatedAt,
CreatedAt: jobState.CreatedAt,
}, nil
}
}

return &GroupTaskState{
return &GroupJobState{
GroupUUID: groupUUID,
State: machineryv1tasks.StateSuccess,
CreatedAt: taskStates[0].CreatedAt,
CreatedAt: jobStates[0].CreatedAt,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/tasks/tasks_test.go → internal/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package tasks
package job

import (
"reflect"
Expand All @@ -25,7 +25,7 @@ import (
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
)

func TestTaskMarshal(t *testing.T) {
func TestJobMarshal(t *testing.T) {
tests := []struct {
name string
value interface{}
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestTaskMarshal(t *testing.T) {
}
}

func TestTaskUnmarshal(t *testing.T) {
func TestJobUnmarshal(t *testing.T) {
tests := []struct {
name string
data []reflect.Value
Expand Down
2 changes: 1 addition & 1 deletion internal/tasks/queue.go → internal/job/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package tasks
package job

import (
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions internal/tasks/queue_test.go → internal/job/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

package tasks
package job

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTaskGetSchedulerQueue(t *testing.T) {
func TestJobGetSchedulerQueue(t *testing.T) {
tests := []struct {
name string
hostname string
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestTaskGetSchedulerQueue(t *testing.T) {
}
}

func TestTaskGetCDNQueue(t *testing.T) {
func TestJobGetCDNQueue(t *testing.T) {
tests := []struct {
name string
hostname string
Expand Down
2 changes: 1 addition & 1 deletion internal/tasks/types.go → internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package tasks
package job

type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Expand Down
32 changes: 16 additions & 16 deletions manager/tasks/tasks.go → manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,32 @@
* limitations under the License.
*/

package tasks
package job

import (
internaltasks "d7y.io/dragonfly/v2/internal/tasks"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/types"
)

type Task interface {
type Job interface {
CreatePreheat([]string, types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(string) (*types.Preheat, error)
}

type task struct {
*internaltasks.Tasks
type job struct {
*internaljob.Job
Preheat Preheat
}

func New(cfg *config.Config) (Task, error) {
t, err := internaltasks.New(&internaltasks.Config{
func New(cfg *config.Config) (Job, error) {
t, err := internaljob.New(&internaljob.Config{
Host: cfg.Database.Redis.Host,
Port: cfg.Database.Redis.Port,
Password: cfg.Database.Redis.Password,
BrokerDB: cfg.Database.Redis.BrokerDB,
BackendDB: cfg.Database.Redis.BackendDB,
}, internaltasks.GlobalQueue)
}, internaljob.GlobalQueue)
if err != nil {
return nil, err
}
Expand All @@ -49,25 +49,25 @@ func New(cfg *config.Config) (Task, error) {
return nil, err
}

return &task{
Tasks: t,
return &job{
Job: t,
Preheat: p,
}, nil
}

func (t *task) CreatePreheat(hostnames []string, json types.CreatePreheatRequest) (*types.Preheat, error) {
func (t *job) CreatePreheat(hostnames []string, json types.CreatePreheatRequest) (*types.Preheat, error) {
return t.Preheat.CreatePreheat(hostnames, json)
}

func (t *task) GetPreheat(id string) (*types.Preheat, error) {
groupTaskState, err := t.GetGroupTaskState(id)
func (t *job) GetPreheat(id string) (*types.Preheat, error) {
groupJobState, err := t.GetGroupJobState(id)
if err != nil {
return nil, err
}

return &types.Preheat{
ID: groupTaskState.GroupUUID,
Status: groupTaskState.State,
CreatedAt: groupTaskState.CreatedAt,
ID: groupJobState.GroupUUID,
Status: groupJobState.State,
CreatedAt: groupJobState.CreatedAt,
}, nil
}

0 comments on commit 5286be0

Please sign in to comment.