Skip to content

Commit

Permalink
bce-iot-9393: collect the cpu and memery stats of the instances of th…
Browse files Browse the repository at this point in the history
…e services (#266)
  • Loading branch information
ludanfeng committed Jun 17, 2019
1 parent 416ec57 commit 2fd3788
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 46 deletions.
39 changes: 39 additions & 0 deletions master/engine/docker/container.go
Expand Up @@ -2,11 +2,14 @@ package docker

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"time"

"github.com/baidu/openedge/master/engine"
"github.com/baidu/openedge/utils"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
Expand Down Expand Up @@ -163,3 +166,39 @@ func (e *dockerEngine) removeContainerByName(name string) error {
}
return err
}

func (e *dockerEngine) statsContainer(cid string) engine.PartialStats {
t := time.Now().UTC()
ctx := context.Background()
sresp, err := e.cli.ContainerStats(ctx, cid, false)
if err != nil {
e.log.WithError(err).Warnf("failed to stats container (%s)", cid[:12])
return engine.PartialStats{"error": err.Error()}
}
defer sresp.Body.Close()
// TODO: to use json.NewEncoder(sresp.Body).Encode(&types.StatsJSON{})
data, err := ioutil.ReadAll(sresp.Body)
if err != nil {
e.log.WithError(err).Warnf("failed to read stats response of container (%s)", cid[:12])
return engine.PartialStats{"error": err.Error()}
}
var tstats types.Stats
err = json.Unmarshal(data, &tstats)
if err != nil {
e.log.WithError(err).Warnf("failed to unmarshal stats response of container (%s)", cid[:12])
return engine.PartialStats{"error": err.Error()}
}

return engine.PartialStats{
"cpu_stats": utils.CPUInfo{
Time: t,
UsedPercent: float64(tstats.CPUStats.CPUUsage.TotalUsage) / float64(tstats.CPUStats.SystemUsage),
},
"mem_stats": utils.MemInfo{
Time: t,
Total: tstats.MemoryStats.Limit,
Used: tstats.MemoryStats.Usage,
UsedPercent: float64(tstats.MemoryStats.Usage) / float64(tstats.MemoryStats.Limit),
},
}
}
4 changes: 4 additions & 0 deletions master/engine/docker/instance.go
Expand Up @@ -76,6 +76,10 @@ func (i *dockerInstance) Info() engine.PartialStats {
return attr.toPartialStats()
}

func (i *dockerInstance) Stats() engine.PartialStats {
return i.service.engine.statsContainer(i.id)
}

func (i *dockerInstance) Wait(s chan<- error) {
defer i.log.Infof("instance stopped")
err := i.service.engine.waitContainer(i.id)
Expand Down
9 changes: 8 additions & 1 deletion master/engine/docker/service.go
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/baidu/openedge/logger"
"github.com/baidu/openedge/master/engine"
openedge "github.com/baidu/openedge/sdk/openedge-go"
"github.com/orcaman/concurrent-map"
cmap "github.com/orcaman/concurrent-map"
)

const (
Expand Down Expand Up @@ -65,6 +65,13 @@ func (s *dockerService) Stop() {
wg.Wait()
}

func (s *dockerService) Stats() {
for _, item := range s.instances.Items() {
instance := item.(*dockerInstance)
s.engine.SetInstanceStats(s.Name(), instance.Name(), instance.Stats(), false)
}
}

func (s *dockerService) StartInstance(instanceName string, dynamicConfig map[string]string) error {
return s.startInstance(instanceName, dynamicConfig)
}
Expand Down
4 changes: 2 additions & 2 deletions master/engine/engine.go
Expand Up @@ -27,8 +27,8 @@ type Engine interface {
io.Closer
Name() string
Prepare([]openedge.ServiceInfo)
AddInstanceStats(serviceName, instanceName string, partialStats PartialStats)
DelInstanceStats(serviceName, instanceName string)
SetInstanceStats(serviceName, instanceName string, partialStats PartialStats, persist bool)
DelInstanceStats(serviceName, instanceName string, persist bool)
Run(openedge.ServiceInfo, map[string]openedge.VolumeInfo) (Service, error)
}

Expand Down
4 changes: 2 additions & 2 deletions master/engine/infostats.go
Expand Up @@ -19,6 +19,6 @@ func NewPartialStatsByStatus(status string) PartialStats {
// InfoStats interfaces of the storage of info and stats
type InfoStats interface {
LoadStats(sss interface{}) bool
AddInstanceStats(serviceName, instanceName string, partialStats PartialStats)
DelInstanceStats(serviceName, instanceName string)
SetInstanceStats(serviceName, instanceName string, partialStats PartialStats, persist bool)
DelInstanceStats(serviceName, instanceName string, persist bool)
}
1 change: 1 addition & 0 deletions master/engine/instance.go
Expand Up @@ -31,6 +31,7 @@ type Instance interface {
Service() Service
Name() string
Info() PartialStats
Stats() PartialStats
Wait(w chan<- error)
Dying() <-chan struct{}
Restart() error
Expand Down
4 changes: 4 additions & 0 deletions master/engine/native/instance.go
Expand Up @@ -90,6 +90,10 @@ func (i *nativeInstance) Info() engine.PartialStats {
return attr.toPartialStats()
}

func (i *nativeInstance) Stats() engine.PartialStats {
return i.service.engine.statsProcess(i.proc)
}

func (i *nativeInstance) Wait(s chan<- error) {
defer i.log.Infof("instance stopped")
err := i.service.engine.waitProcess(i.proc)
Expand Down
34 changes: 34 additions & 0 deletions master/engine/native/process.go
Expand Up @@ -5,6 +5,10 @@ import (
"os"
"syscall"
"time"

"github.com/baidu/openedge/master/engine"
"github.com/baidu/openedge/utils"
"github.com/shirou/gopsutil/process"
)

type processConfigs struct {
Expand Down Expand Up @@ -79,3 +83,33 @@ func (e *nativeEngine) stopProcess(p *os.Process) error {
return nil
}
}

func (e *nativeEngine) statsProcess(p *os.Process) engine.PartialStats {
proc, err := process.NewProcess(int32(p.Pid))
if err != nil {
return engine.PartialStats{"error": err.Error()}
}
cpu := utils.CPUInfo{Time: time.Now().UTC()}
cpu.UsedPercent, err = proc.CPUPercent()
if err != nil {
cpu.Error = err.Error()
}
mem := utils.MemInfo{Time: time.Now().UTC()}
meminfo, err := proc.MemoryInfo()
if err != nil {
mem.Error = err.Error()
} else {
mem.Used = meminfo.RSS
mem.SwapUsed = meminfo.Swap
mup, err := proc.MemoryPercent()
if err != nil {
mem.Error = err.Error()
} else {
mem.UsedPercent = float64(mup)
}
}
return engine.PartialStats{
"cpu_stats": cpu,
"mem_stats": mem,
}
}
9 changes: 8 additions & 1 deletion master/engine/native/service.go
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/baidu/openedge/logger"
"github.com/baidu/openedge/master/engine"
openedge "github.com/baidu/openedge/sdk/openedge-go"
"github.com/orcaman/concurrent-map"
cmap "github.com/orcaman/concurrent-map"
)

const packageConfigPath = "package.yml"
Expand Down Expand Up @@ -69,6 +69,13 @@ func (s *nativeService) Stop() {
wg.Wait()
}

func (s *nativeService) Stats() {
for _, item := range s.instances.Items() {
instance := item.(*nativeInstance)
s.engine.SetInstanceStats(s.Name(), instance.Name(), instance.Stats(), false)
}
}

func (s *nativeService) StartInstance(instanceName string, dynamicConfig map[string]string) error {
return s.startInstance(instanceName, dynamicConfig)
}
Expand Down
1 change: 1 addition & 0 deletions master/engine/service.go
Expand Up @@ -9,6 +9,7 @@ type Service interface {
RestartPolicy() openedge.RestartPolicyInfo
Start() error
Stop()
Stats()
StartInstance(instanceName string, dynamicConfig map[string]string) error
StopInstance(instanceName string) error
}
8 changes: 4 additions & 4 deletions master/engine/supervisor.go
Expand Up @@ -15,7 +15,7 @@ func Supervising(instance Instance) error {
_engine := service.Engine()
serviceName := service.Name()
instanceName := instance.Name()
defer _engine.DelInstanceStats(serviceName, instanceName)
defer _engine.DelInstanceStats(serviceName, instanceName, true)
defer instance.Stop()

c := 0
Expand All @@ -31,7 +31,7 @@ func Supervising(instance Instance) error {
instanceInfo := instance.Info()
instanceInfo[KeyStatus]=Running
instanceInfo[KeyStartTime]=time.Now().UTC()
_engine.AddInstanceStats(serviceName, instanceName, instanceInfo)
_engine.SetInstanceStats(serviceName, instanceName, instanceInfo, true)
go instance.Wait(s)
select {
case <-instance.Dying():
Expand All @@ -43,10 +43,10 @@ func Supervising(instance Instance) error {
if err == nil {
return nil
}
_engine.AddInstanceStats(serviceName, instanceName, NewPartialStatsByStatus(Restarting))
_engine.SetInstanceStats(serviceName, instanceName, NewPartialStatsByStatus(Restarting), true)
goto RESTART
case openedge.RestartAlways:
_engine.AddInstanceStats(serviceName, instanceName, NewPartialStatsByStatus(Restarting))
_engine.SetInstanceStats(serviceName, instanceName, NewPartialStatsByStatus(Restarting), true)
goto RESTART
case openedge.RestartNo:
// TODO: to test
Expand Down

0 comments on commit 2fd3788

Please sign in to comment.