diff --git a/daemon/exec.go b/daemon/exec.go index 4fe4427e..22952ed2 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -46,3 +46,15 @@ func (daemon *Daemon) StartExec(stdin io.ReadCloser, stdout io.WriteCloser, cont glog.V(1).Infof("Start Exec for container %s", containerId) return p.StartExec(stdin, stdout, id, execId) } + +func (daemon *Daemon) KillExec(containerId string, execId string, signal int64) error { + p, _, ok := daemon.PodList.GetByContainerIdOrName(containerId) + if !ok { + err := fmt.Errorf("cannot find container %s", containerId) + glog.Error(err) + return err + } + + glog.V(1).Infof("Kill Exec for container %s", containerId) + return p.KillExec(execId, signal) +} diff --git a/daemon/pod/exec.go b/daemon/pod/exec.go index 0d740bb8..bd2b1c3f 100644 --- a/daemon/pod/exec.go +++ b/daemon/pod/exec.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "syscall" "time" "github.com/docker/docker/pkg/stdcopy" @@ -173,6 +174,25 @@ func (p *XPod) GetExecExitCode(containerId, execId string) (uint8, error) { return es.ExitCode, nil } +func (p *XPod) KillExec(execId string, sig int64) error { + p.statusLock.RLock() + es, ok := p.execs[execId] + p.statusLock.RUnlock() + + if !ok { + err := fmt.Errorf("no exec %s exists for pod %s", execId, p.Id) + p.Log(ERROR, err) + return err + } + + return p.protectedSandboxOperation( + func(sb *hypervisor.Vm) error { + return sb.SignalProcess(es.Container, es.Id, syscall.Signal(sig)) + }, + time.Second*5, + fmt.Sprintf("Kill process %s with %d", es.Id, sig)) +} + func (p *XPod) DeleteExec(containerId, execId string) { p.statusLock.Lock() delete(p.execs, execId) diff --git a/integration/client.go b/integration/client.go index 28f4207a..bc9ed8b4 100644 --- a/integration/client.go +++ b/integration/client.go @@ -351,6 +351,21 @@ func (c *HyperClient) RemovePod(podID string) error { return nil } +// ContainerExecSignal sends signal to specified exec of specified container +func (c *HyperClient) ContainerExecSignal(container, execID string, sig int64) error { + req := types.ExecSignalRequest{ + ContainerID: container, + ExecID: execID, + Signal: sig, + } + _, err := c.client.ExecSignal(c.ctx, &req) + if err != nil { + return err + } + + return nil +} + // ContainerExecCreate creates exec in a container func (c *HyperClient) ContainerExecCreate(container string, command []string, tty bool) (string, error) { req := types.ExecCreateRequest{ diff --git a/integration/hyper_test.go b/integration/hyper_test.go index e5010802..57171cbf 100644 --- a/integration/hyper_test.go +++ b/integration/hyper_test.go @@ -1,8 +1,10 @@ package integration import ( + "io" "testing" + "github.com/hyperhq/hyperd/lib/promise" "github.com/hyperhq/hyperd/types" . "gopkg.in/check.v1" ) @@ -529,3 +531,64 @@ func (s *TestSuite) TestSendContainerSignal(c *C) { c.Assert(err, IsNil) c.Assert(containerInfo.Status.Phase, Equals, "failed") } + +func (s *TestSuite) TestSendExecSignal(c *C) { + sigKill := int64(9) + cName := "test-exec-signal" + spec := types.UserPod{ + Id: "busybox", + Containers: []*types.UserContainer{ + { + Name: cName, + Image: "busybox", + }, + }, + } + podID, err := s.client.CreatePod(&spec) + c.Assert(err, IsNil) + + defer func() { + err = s.client.RemovePod(podID) + c.Assert(err, IsNil) + }() + + err = s.client.StartPod(podID) + c.Assert(err, IsNil) + + execId, err := s.client.ContainerExecCreate(cName, []string{"sh", "-c", "top"}, false) + c.Assert(err, IsNil) + + outReader, outWriter := io.Pipe() + errC := promise.Go(func() error { + return s.client.ContainerExecStart(cName, execId, nil, outWriter, nil, false) + }) + + // make sure process has been started. + readC := make(chan struct{}) + go func() { + buf := make([]byte, 32) + for { + n, err := outReader.Read(buf) + if err == nil && n > 0 { + readC <- struct{}{} + break + } else if err != nil && err != io.EOF { + errC <- err + break + } + } + }() + + select { + case err = <-errC: + c.Assert(err, IsNil) + case <-readC: + } + + err = s.client.ContainerExecSignal(cName, execId, sigKill) + c.Assert(err, IsNil) + + exitCode, err := s.client.Wait(cName, execId, false) + c.Assert(err, IsNil) + c.Assert(exitCode, Equals, int32(0)) +} diff --git a/serverrpc/exec.go b/serverrpc/exec.go index ef57edf7..5c5854e2 100644 --- a/serverrpc/exec.go +++ b/serverrpc/exec.go @@ -102,3 +102,16 @@ func (s *ServerRPC) Wait(c context.Context, req *types.WaitRequest) (*types.Wait ExitCode: int32(code), }, nil } + +// ExecSignal sends a singal to specified exec of specified container +func (s *ServerRPC) ExecSignal(ctx context.Context, req *types.ExecSignalRequest) (*types.ExecSignalResponse, error) { + glog.V(3).Infof("ExecSignal with request %v", req.String()) + + err := s.daemon.KillExec(req.ContainerID, req.ExecID, req.Signal) + if err != nil { + glog.Errorf("Kill Process %s of container %s with signal %d failed: %v", req.ExecID, req.ContainerID, req.Signal, err) + return nil, err + } + + return &types.ExecSignalResponse{}, nil +} diff --git a/types/types.pb.go b/types/types.pb.go index a2b36c54..3703cac5 100644 --- a/types/types.pb.go +++ b/types/types.pb.go @@ -86,6 +86,8 @@ It has these top-level messages: ExecCreateResponse ExecStartRequest ExecStartResponse + ExecSignalRequest + ExecSignalResponse PodStartRequest PodStartResponse WaitRequest @@ -1543,6 +1545,23 @@ func (m *ExecStartResponse) Reset() { *m = ExecStartResponse{} } func (m *ExecStartResponse) String() string { return proto.CompactTextString(m) } func (*ExecStartResponse) ProtoMessage() {} +type ExecSignalRequest struct { + ContainerID string `protobuf:"bytes,1,opt,name=containerID,proto3" json:"containerID,omitempty"` + ExecID string `protobuf:"bytes,2,opt,name=execID,proto3" json:"execID,omitempty"` + Signal int64 `protobuf:"varint,3,opt,name=signal,proto3" json:"signal,omitempty"` +} + +func (m *ExecSignalRequest) Reset() { *m = ExecSignalRequest{} } +func (m *ExecSignalRequest) String() string { return proto.CompactTextString(m) } +func (*ExecSignalRequest) ProtoMessage() {} + +type ExecSignalResponse struct { +} + +func (m *ExecSignalResponse) Reset() { *m = ExecSignalResponse{} } +func (m *ExecSignalResponse) String() string { return proto.CompactTextString(m) } +func (*ExecSignalResponse) ProtoMessage() {} + type PodStartRequest struct { PodID string `protobuf:"bytes,1,opt,name=podID,proto3" json:"podID,omitempty"` } @@ -2097,6 +2116,8 @@ func init() { proto.RegisterType((*ExecCreateResponse)(nil), "types.ExecCreateResponse") proto.RegisterType((*ExecStartRequest)(nil), "types.ExecStartRequest") proto.RegisterType((*ExecStartResponse)(nil), "types.ExecStartResponse") + proto.RegisterType((*ExecSignalRequest)(nil), "types.ExecSignalRequest") + proto.RegisterType((*ExecSignalResponse)(nil), "types.ExecSignalResponse") proto.RegisterType((*PodStartRequest)(nil), "types.PodStartRequest") proto.RegisterType((*PodStartResponse)(nil), "types.PodStartResponse") proto.RegisterType((*WaitRequest)(nil), "types.WaitRequest") @@ -2205,6 +2226,8 @@ type PublicAPIClient interface { ExecCreate(ctx context.Context, in *ExecCreateRequest, opts ...grpc.CallOption) (*ExecCreateResponse, error) // ExecStart starts exec ExecStart(ctx context.Context, opts ...grpc.CallOption) (PublicAPI_ExecStartClient, error) + // ExecSignal sends a signal to specified exec in specified container + ExecSignal(ctx context.Context, in *ExecSignalRequest, opts ...grpc.CallOption) (*ExecSignalResponse, error) // Attach attaches to the specified container Attach(ctx context.Context, opts ...grpc.CallOption) (PublicAPI_AttachClient, error) // Wait gets the exit code of the specified container @@ -2500,6 +2523,15 @@ func (x *publicAPIExecStartClient) Recv() (*ExecStartResponse, error) { return m, nil } +func (c *publicAPIClient) ExecSignal(ctx context.Context, in *ExecSignalRequest, opts ...grpc.CallOption) (*ExecSignalResponse, error) { + out := new(ExecSignalResponse) + err := grpc.Invoke(ctx, "/types.PublicAPI/ExecSignal", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *publicAPIClient) Attach(ctx context.Context, opts ...grpc.CallOption) (PublicAPI_AttachClient, error) { stream, err := grpc.NewClientStream(ctx, &_PublicAPI_serviceDesc.Streams[2], c.cc, "/types.PublicAPI/Attach", opts...) if err != nil { @@ -2729,6 +2761,8 @@ type PublicAPIServer interface { ExecCreate(context.Context, *ExecCreateRequest) (*ExecCreateResponse, error) // ExecStart starts exec ExecStart(PublicAPI_ExecStartServer) error + // ExecSignal sends a signal to specified exec in specified container + ExecSignal(context.Context, *ExecSignalRequest) (*ExecSignalResponse, error) // Attach attaches to the specified container Attach(PublicAPI_AttachServer) error // Wait gets the exit code of the specified container @@ -3070,6 +3104,18 @@ func (x *publicAPIExecStartServer) Recv() (*ExecStartRequest, error) { return m, nil } +func _PublicAPI_ExecSignal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(ExecSignalRequest) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(PublicAPIServer).ExecSignal(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + func _PublicAPI_Attach_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(PublicAPIServer).Attach(&publicAPIAttachServer{stream}) } @@ -3338,6 +3384,10 @@ var _PublicAPI_serviceDesc = grpc.ServiceDesc{ MethodName: "ExecCreate", Handler: _PublicAPI_ExecCreate_Handler, }, + { + MethodName: "ExecSignal", + Handler: _PublicAPI_ExecSignal_Handler, + }, { MethodName: "Wait", Handler: _PublicAPI_Wait_Handler, diff --git a/types/types.proto b/types/types.proto index 7393d3e5..6a01af7a 100644 --- a/types/types.proto +++ b/types/types.proto @@ -565,6 +565,14 @@ message ExecStartResponse{ bytes stdout = 1; } +message ExecSignalRequest{ + string containerID = 1; + string execID = 2; + int64 signal = 3; +} + +message ExecSignalResponse{} + message PodStartRequest { string podID = 1; } @@ -819,6 +827,8 @@ service PublicAPI { rpc ExecCreate(ExecCreateRequest) returns (ExecCreateResponse) {} // ExecStart starts exec rpc ExecStart(stream ExecStartRequest) returns (stream ExecStartResponse) {} + // ExecSignal sends a signal to specified exec in specified container + rpc ExecSignal(ExecSignalRequest) returns (ExecSignalResponse) {} // Attach attaches to the specified container rpc Attach(stream AttachMessage) returns (stream AttachMessage) {} // Wait gets the exit code of the specified container