Skip to content

Commit

Permalink
add task api endpoint in task create options
Browse files Browse the repository at this point in the history
Signed-off-by: Abel Feng <fshb1988@gmail.com>
  • Loading branch information
abel-von committed Mar 27, 2024
1 parent 5b07004 commit d2e942b
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 24 deletions.
18 changes: 18 additions & 0 deletions client/task_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ func WithRuntimePath(absRuntimePath string) NewTaskOpts {
}
}

// WithTaskAPIEndpoint allow task service to manage a task through a given endpoint,
// usually it is served inside a sandbox, and we can get it from sandbox status.
func WithTaskAPIEndpoint(address, protocol string, version uint32) NewTaskOpts {
return func(ctx context.Context, client *Client, info *TaskInfo) error {
if info.Options == nil {
info.Options = &options.Options{}
}
opts, ok := info.Options.(*options.Options)
if !ok {
return errors.New("invalid runtime v2 options format")
}
opts.TaskApiAddress = address
opts.TaskApiProtocol = protocol
opts.TaskApiVersion = version
return nil
}
}

// WithTaskCheckpoint allows a task to be created with live runtime and memory data from a
// previous checkpoint. Additional software such as CRIU may be required to
// restore a task from a checkpoint
Expand Down
6 changes: 6 additions & 0 deletions core/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type CreateOpts struct {
Runtime string
// SandboxID is an optional ID of sandbox this container belongs to
SandboxID string
// Address is an optional Address for Task API server
Address string
// Protocol is an optional Protocol for Task API connection
Protocol string
// Version is an optional Version of the Task API
Version uint32
}

// Exit information for a process
Expand Down
36 changes: 28 additions & 8 deletions core/runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,29 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO

// This container belongs to sandbox which supposed to be already started via sandbox API.
if opts.SandboxID != "" {
process, err := m.Get(ctx, opts.SandboxID)
if err != nil {
return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID)
var params shimbinary.BootstrapParams
if opts.Address != "" && opts.Protocol != "" {
params = shimbinary.BootstrapParams{
Version: int(opts.Version),
Address: opts.Address,
Protocol: opts.Protocol,
}
} else {
// For those sandbox we can not get endpoint,
// fallback to legacy implementation
p, restoreErr := m.restoreBootstrapParams(ctx, opts.SandboxID)
if restoreErr != nil {
return nil, fmt.Errorf("failed to get bootstrap "+
"params of sandbox %s, %v, legacy restore error %v", opts.SandboxID, err, restoreErr)
}
params = p
}

// Write sandbox ID this task belongs to.
if err := os.WriteFile(filepath.Join(bundle.Path, "sandbox"), []byte(opts.SandboxID), 0600); err != nil {
return nil, err
}

params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), opts.SandboxID))
if err != nil {
return nil, err
}

if err := writeBootstrapParams(filepath.Join(bundle.Path, "bootstrap.json"), params); err != nil {
return nil, fmt.Errorf("failed to write bootstrap.json for bundle %s: %w", bundle.Path, err)
}
Expand Down Expand Up @@ -287,6 +295,18 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
return shim, nil
}

func (m *ShimManager) restoreBootstrapParams(ctx context.Context, sandboxID string) (shimbinary.BootstrapParams, error) {
process, err := m.Get(ctx, sandboxID)
if err != nil {
return shimbinary.BootstrapParams{}, fmt.Errorf("can't find sandbox %s", sandboxID)
}
params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), sandboxID))
if err != nil {
return shimbinary.BootstrapParams{}, err
}
return params, nil
}

// restoreBootstrapParams reads bootstrap.json to restore shim configuration.
// If its an old shim, this will perform migration - read address file and write default bootstrap
// configuration (version = 2, protocol = ttrpc, and address).
Expand Down
2 changes: 1 addition & 1 deletion internal/cri/server/container_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
endpoint := sandbox.Endpoint
if endpoint.Protocol != "" && endpoint.Address != "" {
taskOpts = append(taskOpts,
containerd.WithTaskApiEndpoint(endpoint.Address, endpoint.Protocol, endpoint.Version))
containerd.WithTaskAPIEndpoint(endpoint.Address, endpoint.Protocol, endpoint.Version))
}

task, err := container.NewTask(ctx, ioCreation, taskOpts...)
Expand Down
40 changes: 25 additions & 15 deletions plugins/services/tasks/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,25 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
if err != nil {
return nil, errdefs.ToGRPC(err)
}
checkpointPath, err := getRestorePath(container.Runtime.Name, r.Options)
if err != nil {
return nil, err

var (
checkpointPath string
taskAPIAddress string
taskAPIProtocol string
taskAPIVersion uint32
)

if r.Options != nil {
taskOptions, err := formatOptions(container.Runtime.Name, r.Options)
if err != nil {
return nil, err
}
checkpointPath = taskOptions.CriuImagePath
taskAPIAddress = taskOptions.TaskApiAddress
taskAPIProtocol = taskOptions.TaskApiProtocol
taskAPIVersion = taskOptions.TaskApiVersion
}

// jump get checkpointPath from checkpoint image
if checkpointPath == "" && r.Checkpoint != nil {
checkpointPath, err = os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint")
Expand Down Expand Up @@ -196,6 +211,9 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
RuntimeOptions: container.Runtime.Options,
TaskOptions: r.Options,
SandboxID: container.SandboxID,
Address: taskAPIAddress,
Protocol: taskAPIProtocol,
Version: taskAPIVersion,
}
if r.RuntimePath != "" {
opts.Runtime = r.RuntimePath
Expand Down Expand Up @@ -723,22 +741,14 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) {
return checkpointPath, nil
}

// getRestorePath only suitable for runc runtime now
func getRestorePath(runtime string, option *ptypes.Any) (string, error) {
if option == nil {
return "", nil
}

var restorePath string
func formatOptions(runtime string, option *ptypes.Any) (*options.Options, error) {
v, err := typeurl.UnmarshalAny(option)
if err != nil {
return "", err
return nil, err
}
opts, ok := v.(*options.Options)
if !ok {
return "", fmt.Errorf("invalid task create option for %s", runtime)
return nil, fmt.Errorf("invalid task create option for %s", runtime)
}
restorePath = opts.CriuImagePath

return restorePath, nil
return opts, nil
}

0 comments on commit d2e942b

Please sign in to comment.