This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
task.go
86 lines (81 loc) · 2.54 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Handles translating gRPC request & response objects to and from repository model objects
package transformers
import (
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
)
// Transforms a TaskCreateRequest to a task model
func CreateTaskModel(
request admin.TaskCreateRequest,
taskClosure admin.TaskClosure,
digest []byte) (models.Task, error) {
closureBytes, err := proto.Marshal(&taskClosure)
if err != nil {
return models.Task{}, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize task closure")
}
var taskType string
if taskClosure.CompiledTask != nil && taskClosure.CompiledTask.Template != nil {
taskType = taskClosure.CompiledTask.Template.Type
}
return models.Task{
TaskKey: models.TaskKey{
Project: request.Id.Project,
Domain: request.Id.Domain,
Name: request.Id.Name,
Version: request.Id.Version,
},
Closure: closureBytes,
Digest: digest,
Type: taskType,
}, nil
}
func FromTaskModel(taskModel models.Task) (admin.Task, error) {
taskClosure := &admin.TaskClosure{}
err := proto.Unmarshal(taskModel.Closure, taskClosure)
if err != nil {
return admin.Task{}, errors.NewFlyteAdminError(codes.Internal, "failed to unmarshal clsoure")
}
createdAt, err := ptypes.TimestampProto(taskModel.CreatedAt)
if err != nil {
return admin.Task{}, errors.NewFlyteAdminErrorf(codes.Internal, "failed to serialize created at")
}
taskClosure.CreatedAt = createdAt
id := core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: taskModel.Project,
Domain: taskModel.Domain,
Name: taskModel.Name,
Version: taskModel.Version,
}
return admin.Task{
Id: &id,
Closure: taskClosure,
}, nil
}
func FromTaskModels(taskModels []models.Task) ([]*admin.Task, error) {
tasks := make([]*admin.Task, len(taskModels))
for idx, taskModel := range taskModels {
task, err := FromTaskModel(taskModel)
if err != nil {
return nil, err
}
tasks[idx] = &task
}
return tasks, nil
}
func FromTaskModelsToIdentifiers(taskModels []models.Task) []*admin.NamedEntityIdentifier {
ids := make([]*admin.NamedEntityIdentifier, len(taskModels))
for i, taskModel := range taskModels {
ids[i] = &admin.NamedEntityIdentifier{
Project: taskModel.Project,
Domain: taskModel.Domain,
Name: taskModel.Name,
}
}
return ids
}