Skip to content

Commit

Permalink
ipc stability enhancements and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kcq committed Feb 4, 2018
1 parent 4167f6a commit 2f09e6a
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 51 deletions.
15 changes: 10 additions & 5 deletions internal/app/master/inspectors/container/container_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/docker-slim/docker-slim/internal/app/master/inspectors/image"
"github.com/docker-slim/docker-slim/internal/app/master/security/apparmor"
"github.com/docker-slim/docker-slim/internal/app/master/security/seccomp"
"github.com/docker-slim/docker-slim/pkg/messages"
"github.com/docker-slim/docker-slim/pkg/ipc/command"
"github.com/docker-slim/docker-slim/pkg/report"
"github.com/docker-slim/docker-slim/pkg/utils/errutils"
"github.com/docker-slim/docker-slim/pkg/utils/fsutils"
Expand Down Expand Up @@ -177,7 +177,7 @@ func (i *Inspector) RunContainer() error {
return err
}

cmd := &messages.StartMonitor{
cmd := &command.StartMonitor{
AppName: i.FatContainerCmd[0],
}

Expand Down Expand Up @@ -259,16 +259,17 @@ func (i *Inspector) ShutdownContainer() error {

// FinishMonitoring ends the target container monitoring activities
func (i *Inspector) FinishMonitoring() {
cmdResponse, err := ipc.SendContainerCmd(&messages.StopMonitor{})
cmdResponse, err := ipc.SendContainerCmd(&command.StopMonitor{})
errutils.WarnOn(err)
_ = cmdResponse
//_ = cmdResponse
log.Debugf("'stop' monitor response => '%v'", cmdResponse)

log.Debugf("'stop' response => '%v'", cmdResponse)
log.Info("docker-slim: waiting for the container to finish its work...")

//for now there's only one event ("done")
//getEvt() should timeout in two minutes (todo: pick a good timeout)
evt, err := ipc.GetContainerEvt()
log.Debugf("sensor event => '%v'", evt)

//don't want to expose mangos here... mangos.ErrRecvTimeout = errors.New("receive time out")
if err != nil && err.Error() == IpcErrRecvTimeoutStr {
Expand All @@ -279,6 +280,10 @@ func (i *Inspector) FinishMonitoring() {
errutils.WarnOn(err)
_ = evt
log.Debugf("docker-slim: sensor event => '%v'", evt)

cmdResponse, err = ipc.SendContainerCmd(&command.ShutdownSensor{})
errutils.WarnOn(err)
log.Debugf("'shutdown' sensor response => '%v'", cmdResponse)
}

func (i *Inspector) initContainerChannels() error {
Expand Down
22 changes: 13 additions & 9 deletions internal/app/master/inspectors/container/ipc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
//"github.com/go-mangos/mangos/transport/ipc"
"github.com/go-mangos/mangos/transport/tcp"

"github.com/docker-slim/docker-slim/pkg/messages"
"github.com/docker-slim/docker-slim/pkg/ipc/channel"
"github.com/docker-slim/docker-slim/pkg/ipc/command"
"github.com/docker-slim/docker-slim/pkg/ipc/event"
)

// InitContainerChannels initializes the communication channels with the target container
Expand All @@ -37,12 +39,12 @@ func InitContainerChannels(dockerHostIP, cmdChannelPort, evtChannelPort string)
}

// SendContainerCmd sends the given command to the target container
func SendContainerCmd(cmd messages.Message) (string, error) {
func SendContainerCmd(cmd command.Message) (string, error) {
return sendCmd(cmdChannel, cmd)
}

// GetContainerEvt returns the current event generated by the target container
func GetContainerEvt() (string, error) {
func GetContainerEvt() (event.Name, error) {
return getEvt(evtChannel)
}

Expand All @@ -53,7 +55,8 @@ func ShutdownContainerChannels() {
}

//var cmdChannelAddr = "ipc:///tmp/docker-slim-sensor.cmds.ipc"
var cmdChannelAddr = "tcp://127.0.0.1:65501"
var cmdChannelAddr = fmt.Sprintf("tcp://127.0.0.1:%d", channel.CmdPort)

var cmdChannel mangos.Socket

func newCmdClient(addr string) (mangos.Socket, error) {
Expand Down Expand Up @@ -89,13 +92,13 @@ func shutdownCmdChannel() {
}
}

func sendCmd(channel mangos.Socket, cmd messages.Message) (string, error) {
func sendCmd(channel mangos.Socket, cmd command.Message) (string, error) {
sendTimeouts := 0
recvTimeouts := 0

log.Debugf("sendCmd(%s)", cmd)
for {
sendData, err := messages.Encode(cmd)
sendData, err := command.Encode(cmd)
if err != nil {
log.Info("sendCmd(): malformed cmd - ", err)
return "", err
Expand Down Expand Up @@ -132,7 +135,7 @@ func sendCmd(channel mangos.Socket, cmd messages.Message) (string, error) {
}
}

var evtChannelAddr = "tcp://127.0.0.1:65502"
var evtChannelAddr = fmt.Sprintf("tcp://127.0.0.1:%d", channel.EvtPort)

//var evtChannelAddr = "ipc:///tmp/docker-slim-sensor.events.ipc"
var evtChannel mangos.Socket
Expand Down Expand Up @@ -170,12 +173,13 @@ func shutdownEvtChannel() {
}
}

func getEvt(channel mangos.Socket) (string, error) {
func getEvt(channel mangos.Socket) (event.Name, error) {
log.Debug("getEvt()")
evt, err := channel.Recv()
log.Debug("getEvt(): channel.Recv() - done")
if err != nil {
return "", err
}

return string(evt), nil
return event.Name(evt), nil
}
31 changes: 19 additions & 12 deletions internal/app/sensor/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"github.com/docker-slim/docker-slim/internal/app/sensor/monitors/fanotify"
"github.com/docker-slim/docker-slim/internal/app/sensor/monitors/pevent"
"github.com/docker-slim/docker-slim/internal/app/sensor/monitors/ptrace"
"github.com/docker-slim/docker-slim/pkg/messages"
"github.com/docker-slim/docker-slim/pkg/ipc/command"
"github.com/docker-slim/docker-slim/pkg/ipc/event"
"github.com/docker-slim/docker-slim/pkg/report"
"github.com/docker-slim/docker-slim/pkg/utils/errutils"

Expand All @@ -25,7 +26,7 @@ func monitor(stopWork chan bool,
stopWorkAck chan bool,
pids chan []int,
ptmonStartChan chan int,
cmd *messages.StartMonitor,
cmd *command.StartMonitor,
dirName string) {
log.Info("sensor: monitor starting...")
mountPoint := "/"
Expand Down Expand Up @@ -119,22 +120,32 @@ doneRunning:
case cmd := <-cmdChan:
log.Debug("\nsensor: command => ", cmd)
switch data := cmd.(type) {
case *messages.StartMonitor:
case *command.StartMonitor:
if data == nil {
log.Info("sensor: 'start' command - no data...")
log.Info("sensor: 'start' monitor command - no data...")
break
}

log.Debugf("sensor: 'start' command (%#v) - starting monitor...", data)
log.Debugf("sensor: 'start' monitor command (%#v)", data)
monitor(monDoneChan, monDoneAckChan, pidsChan, ptmonStartChan, data, dirName)

//target app started by ptmon... (long story :-))
//TODO: need to get the target app pid to pemon, so it can filter process events
log.Debugf("sensor: target app started => %v %#v", data.AppName, data.AppArgs)
time.Sleep(3 * time.Second)

case *messages.StopMonitor:
log.Debug("sensor: 'stop' command - stopping monitor...")
case *command.StopMonitor:
log.Debug("sensor: 'stop' monitor command")

monDoneChan <- true
log.Info("sensor: waiting for monitor to finish...")
<-monDoneAckChan
log.Info("sensor: monitor stopped...")

ipc.TryPublishEvt(3, event.StopMonitorDoneName)

case *command.ShutdownSensor:
log.Debug("sensor: 'shutdown' sensor command")
break doneRunning
default:
log.Debug("sensor: ignoring unknown command => ", cmd)
Expand All @@ -145,11 +156,7 @@ doneRunning:
}
}

monDoneChan <- true
log.Info("sensor: waiting for monitor to finish...")
<-monDoneAckChan

ipc.TryPublishEvt(3, "monitor.finish.completed")
ipc.TryPublishEvt(3, event.ShutdownSensorDoneName)

log.Info("sensor: done!")
}
8 changes: 4 additions & 4 deletions internal/app/sensor/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"strings"
"syscall"

"github.com/docker-slim/docker-slim/pkg/messages"
"github.com/docker-slim/docker-slim/pkg/ipc/command"
"github.com/docker-slim/docker-slim/pkg/report"
"github.com/docker-slim/docker-slim/pkg/utils/errutils"
"github.com/docker-slim/docker-slim/pkg/utils/fsutils"
Expand Down Expand Up @@ -54,7 +54,7 @@ func saveResults(fanMonReport *report.FanMonitorReport,
fileNames map[string]*report.ArtifactProps,
ptMonReport *report.PtMonitorReport,
peReport *report.PeMonitorReport,
cmd *messages.StartMonitor) {
cmd *command.StartMonitor) {
artifactDirName := defaultArtifactDirName

artifactStore := newArtifactStore(artifactDirName, fanMonReport, fileNames, ptMonReport, peReport, cmd)
Expand All @@ -73,15 +73,15 @@ type artifactStore struct {
resolve map[string]struct{}
linkMap map[string]*report.ArtifactProps
fileMap map[string]*report.ArtifactProps
cmd *messages.StartMonitor
cmd *command.StartMonitor
}

func newArtifactStore(storeLocation string,
fanMonReport *report.FanMonitorReport,
rawNames map[string]*report.ArtifactProps,
ptMonReport *report.PtMonitorReport,
peMonReport *report.PeMonitorReport,
cmd *messages.StartMonitor) *artifactStore {
cmd *command.StartMonitor) *artifactStore {
store := &artifactStore{
storeLocation: storeLocation,
fanMonReport: fanMonReport,
Expand Down
4 changes: 2 additions & 2 deletions internal/app/sensor/data_porcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"syscall"

"github.com/docker-slim/docker-slim/internal/app/sensor/monitors/fanotify"
"github.com/docker-slim/docker-slim/pkg/messages"
"github.com/docker-slim/docker-slim/pkg/ipc/command"
"github.com/docker-slim/docker-slim/pkg/report"

log "github.com/Sirupsen/logrus"
Expand All @@ -20,7 +20,7 @@ func processReports(mountPoint string,
fanReport *report.FanMonitorReport,
ptReport *report.PtMonitorReport,
peReport *report.PeMonitorReport,
cmd *messages.StartMonitor) {
cmd *command.StartMonitor) {

fileCount := 0
for _, processFileMap := range fanReport.ProcessFiles {
Expand Down
36 changes: 21 additions & 15 deletions internal/app/sensor/ipc/ipc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ipc

import (
"fmt"
"time"

log "github.com/Sirupsen/logrus"
Expand All @@ -10,7 +11,9 @@ import (
//"github.com/go-mangos/mangos/transport/ipc"
"github.com/go-mangos/mangos/transport/tcp"

"github.com/docker-slim/docker-slim/pkg/messages"
"github.com/docker-slim/docker-slim/pkg/ipc/channel"
"github.com/docker-slim/docker-slim/pkg/ipc/command"
"github.com/docker-slim/docker-slim/pkg/ipc/event"
)

// InitChannels initializes the communication channels with the master
Expand All @@ -36,11 +39,11 @@ func ShutdownChannels() {
}

// RunCmdServer starts the command server
func RunCmdServer(done <-chan struct{}) (<-chan messages.Message, error) {
func RunCmdServer(done <-chan struct{}) (<-chan command.Message, error) {
return runCmdServer(cmdChannel, done)
}

var cmdChannelAddr = "tcp://0.0.0.0:65501"
var cmdChannelAddr = fmt.Sprintf("tcp://0.0.0.0:%d", channel.CmdPort)

//var cmdChannelAddr = "ipc:///tmp/docker-slim-sensor.cmds.ipc"
//var cmdChannelAddr = "ipc:///opt/dockerslim/ipc/docker-slim-sensor.cmds.ipc"
Expand Down Expand Up @@ -68,8 +71,8 @@ func newCmdServer(addr string) (mangos.Socket, error) {
return socket, nil
}

func runCmdServer(channel mangos.Socket, done <-chan struct{}) (<-chan messages.Message, error) {
cmdChan := make(chan messages.Message)
func runCmdServer(channel mangos.Socket, done <-chan struct{}) (<-chan command.Message, error) {
cmdChan := make(chan command.Message)
go func() {
for {
// Could also use sock.RecvMsg to get header
Expand All @@ -89,7 +92,7 @@ func runCmdServer(channel mangos.Socket, done <-chan struct{}) (<-chan messages.
} else {
log.Debug("sensor: cmd server - got a command => ", string(rawCmd))

if cmd, err := messages.Decode(rawCmd); err != nil {
if cmd, err := command.Decode(rawCmd); err != nil {
log.Println(err)
} else {
cmdChan <- cmd
Expand All @@ -99,10 +102,10 @@ func runCmdServer(channel mangos.Socket, done <-chan struct{}) (<-chan messages.
//NOTE:
//must reply before receiving the next message
//otherwise nanomsg/mangos will be confused :-)
monitorFinishReply := "ok"
err = channel.Send([]byte(monitorFinishReply))
cmdStatusReply := "ok"
err = channel.Send([]byte(cmdStatusReply))
if err != nil {
log.Warnln("sensor: cmd server - fail to send monitor.finish reply =>", err)
log.Warnln("sensor: cmd server - fail to send command status reply =>", err)
}
}
}
Expand All @@ -119,7 +122,7 @@ func shutdownCmdChannel() {
}
}

var evtChannelAddr = "tcp://0.0.0.0:65502"
var evtChannelAddr = fmt.Sprintf("tcp://0.0.0.0:%d", channel.EvtPort)

//var evtChannelAddr = "ipc:///tmp/docker-slim-sensor.events.ipc"
//var evtChannelAddr = "ipc:///opt/dockerslim/ipc/docker-slim-sensor.events.ipc"
Expand Down Expand Up @@ -147,20 +150,23 @@ func newEvtPublisher(addr string) (mangos.Socket, error) {
return socket, nil
}

func publishEvt(channel mangos.Socket, evt string) error {
if err := channel.Send([]byte(evt)); err != nil {
log.Debugf("fail to publish '%v' event:%v", evt, err)
func publishEvt(channel mangos.Socket, event event.Name) error {
log.Debugf("publishEvt(%v)", event)
if err := channel.Send([]byte(event)); err != nil {
log.Debugf("fail to publish '%v' event:%v", event, err)
return err
}

return nil
}

// TryPublishEvt attempts to publish an event to the master
func TryPublishEvt(ptry uint, event string) {
func TryPublishEvt(ptry uint, event event.Name) {
log.Debugf("TryPublishEvt(%v,%v)", ptry, event)

for ptry := 0; ptry < 3; ptry++ {
log.Debugf("sensor: trying to publish '%v' event (attempt %v)", event, ptry+1)
err := publishEvt(evtChannel, "monitor.finish.completed")
err := publishEvt(evtChannel, event)
if err == nil {
log.Infof("sensor: published '%v'", event)
break
Expand Down
7 changes: 7 additions & 0 deletions pkg/ipc/channel/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package channel

// Supported events
const (
CmdPort = 65501
EvtPort = 65502
)
Loading

0 comments on commit 2f09e6a

Please sign in to comment.