-
Notifications
You must be signed in to change notification settings - Fork 3
/
runc.go
145 lines (124 loc) · 4.31 KB
/
runc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package api
// Implements the task service functions for runc
import (
"context"
"fmt"
"time"
"github.com/cedana/cedana/api/runc"
"github.com/cedana/cedana/api/services/task"
container "github.com/cedana/cedana/container"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *service) RuncDump(ctx context.Context, args *task.RuncDumpArgs) (*task.RuncDumpResp, error) {
// TODO BS: This will be done at controller level, just doing it here for now...
pid, err := runc.GetPidByContainerId(args.ContainerID, args.Root)
if err != nil {
err = status.Error(codes.Internal, err.Error())
return nil, err
}
state, err := s.generateState(pid)
if err != nil {
err = status.Error(codes.Internal, err.Error())
return nil, err
}
state.JobState = task.JobState_JOB_RUNNING
criuOpts := &container.CriuOpts{
ImagesDirectory: args.CriuOpts.ImagesDirectory,
WorkDirectory: args.CriuOpts.WorkDirectory,
LeaveRunning: true,
TcpEstablished: args.CriuOpts.TcpEstablished,
MntnsCompatMode: false,
}
err = s.runcDump(ctx, args.Root, args.ContainerID, criuOpts, state)
if err != nil {
st := status.New(codes.Internal, "Runc dump failed")
st.WithDetails(&errdetails.ErrorInfo{
Reason: err.Error(),
})
return nil, st.Err()
}
var resp task.RuncDumpResp
switch args.Type {
case task.CRType_LOCAL:
resp = task.RuncDumpResp{
Message: fmt.Sprintf("Dumped runc process %d", pid),
}
case task.CRType_REMOTE:
checkpointID, uploadID, err := s.uploadCheckpoint(ctx, state.CheckpointPath)
if err != nil {
st := status.New(codes.Internal, fmt.Sprintf("failed to upload checkpoint with error: %s", err.Error()))
return nil, st.Err()
}
remoteState := &task.RemoteState{CheckpointID: checkpointID, UploadID: uploadID, Timestamp: time.Now().Unix()}
state.RemoteState = append(state.RemoteState, remoteState)
resp = task.RuncDumpResp{
Message: fmt.Sprintf("Dumped runc process %d, multipart checkpoint id: %s", pid, uploadID),
CheckpointID: checkpointID,
UploadID: uploadID,
}
}
err = s.updateState(state.JID, state)
if err != nil {
err = status.Error(codes.Internal, err.Error())
return nil, err
}
return &resp, err
}
func (s *service) RuncRestore(ctx context.Context, args *task.RuncRestoreArgs) (*task.RuncRestoreResp, error) {
opts := &container.RuncOpts{
Root: args.Opts.Root,
Bundle: args.Opts.Bundle,
ConsoleSocket: args.Opts.ConsoleSocket,
Detatch: args.Opts.Detatch,
NetPid: int(args.Opts.NetPid),
}
switch args.Type {
case task.CRType_LOCAL:
err := s.runcRestore(ctx, args.ImagePath, args.ContainerID, args.IsK3S, []string{}, opts)
if err != nil {
err = status.Error(codes.InvalidArgument, "invalid argument")
return nil, err
}
case task.CRType_REMOTE:
if args.CheckpointID == "" {
return nil, status.Error(codes.InvalidArgument, "checkpoint id cannot be empty")
}
zipFile, err := s.store.GetCheckpoint(ctx, args.CheckpointID)
if err != nil {
return nil, err
}
err = s.runcRestore(ctx, *zipFile, args.ContainerID, args.IsK3S, []string{}, opts)
if err != nil {
staterr := status.Error(codes.Internal, fmt.Sprintf("failed to restore process: %v", err))
return nil, staterr
}
}
// TODO: Update state to add or use a job that exists for this container
return &task.RuncRestoreResp{Message: fmt.Sprintf("Restored %v, succesfully", args.ContainerID)}, nil
}
func (s *service) RuncQuery(ctx context.Context, args *task.RuncQueryArgs) (*task.RuncQueryResp, error) {
var containers []*task.RuncContainer
for _, name := range args.ContainerNames {
runcId, bundle, err := runc.GetContainerIdByName(name, args.Root)
if err != nil {
return nil, status.Error(codes.NotFound, fmt.Sprintf("Container \"%s\" not found", name))
}
containers = append(containers, &task.RuncContainer{
ID: runcId,
BundlePath: bundle,
})
}
return &task.RuncQueryResp{Containers: containers}, nil
}
func (s *service) RuncGetPausePid(ctx context.Context, args *task.RuncGetPausePidArgs) (*task.RuncGetPausePidResp, error) {
pid, err := runc.GetPausePid(args.BundlePath)
if err != nil {
return nil, status.Error(codes.NotFound, fmt.Sprintf("pause pid not found: %v", err))
}
resp := &task.RuncGetPausePidResp{
PausePid: int64(pid),
}
return resp, nil
}