Skip to content

Commit

Permalink
merge from master to profiler branch
Browse files Browse the repository at this point in the history
  • Loading branch information
cha87de committed Nov 6, 2018
2 parents 1002071 + dd8d5d3 commit 4e74f75
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 102 deletions.
95 changes: 66 additions & 29 deletions cmd/kvmtop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,51 +38,88 @@ func initializeFlags() {
}

// Set collectors from flags
hasCollector := false
if config.Options.EnableCPU {
collector := cpucollector.CreateCollector()
models.Collection.Collectors.Store("cpu", &collector)
enableCPU()
hasCollector = true
}
if config.Options.EnableMEM {
collector := memcollector.CreateCollector()
models.Collection.Collectors.Store("mem", &collector)
enableMEM()
hasCollector = true
}
if config.Options.EnableDISK {
collector := diskcollector.CreateCollector()
models.Collection.Collectors.Store("disk", &collector)
enableDISK()
hasCollector = true
}
if config.Options.EnableNET {
collector := netcollector.CreateCollector()
models.Collection.Collectors.Store("net", &collector)
enableNET()
hasCollector = true
}
if config.Options.EnableIO {
collector := iocollector.CreateCollector()
models.Collection.Collectors.Store("io", &collector)
enableIO()
hasCollector = true
}
if config.Options.EnableHost {
collector := hostcollector.CreateCollector()
models.Collection.Collectors.Store("host", &collector)
enableHOST()
hasCollector = true
}

if !hasCollector {
// no collector selected, going to add default collector
enableCPU()
enableMEM()
}

// select printer, ncurse as default.
if config.Options.PrintBatch { // DEPRECATED remove PrintBatch in future
switch config.Options.Printer {
case "ncurses":
printer := printers.CreateNcurses()
models.Collection.Printer = &printer
case "text":
printer := printers.CreateText()
models.Collection.Printer = &printer
} else {
switch config.Options.Printer {
case "ncurses":
printer := printers.CreateNcurses()
models.Collection.Printer = &printer
case "text":
printer := printers.CreateText()
models.Collection.Printer = &printer
case "json":
printer := printers.CreateJSON()
models.Collection.Printer = &printer
default:
fmt.Println("unknown printer")
os.Exit(1)
}

case "json":
printer := printers.CreateJSON()
models.Collection.Printer = &printer
default:
fmt.Println("unknown printer")
os.Exit(1)
}

}

// EnableCPU adds more cpu collector
func enableCPU() {
collector := cpucollector.CreateCollector()
models.Collection.Collectors.Store("cpu", &collector)
}

// enableMEM adds more mem collector
func enableMEM() {
collector := memcollector.CreateCollector()
models.Collection.Collectors.Store("mem", &collector)
}

// enableDISK adds more disk collector
func enableDISK() {
collector := diskcollector.CreateCollector()
models.Collection.Collectors.Store("disk", &collector)
}

// enableNET adds more net collector
func enableNET() {
collector := netcollector.CreateCollector()
models.Collection.Collectors.Store("net", &collector)
}

// enableIO adds more io collector
func enableIO() {
collector := iocollector.CreateCollector()
models.Collection.Collectors.Store("io", &collector)
}

// enableHOST adds more host collector
func enableHOST() {
collector := hostcollector.CreateCollector()
models.Collection.Collectors.Store("host", &collector)
}
107 changes: 54 additions & 53 deletions collectors/cpucollector/worker.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cpucollector

import (
"bytes"
"encoding/gob"
"path"
"path/filepath"
"regexp"
Expand All @@ -27,6 +25,11 @@ func cpuLookup(domain *models.Domain, libvirtDomain libvirt.Domain) {
newMeasurementCores := models.CreateMeasurement(uint64(cores))
domain.AddMetricMeasurement("cpu_cores", newMeasurementCores)

// cache old thread IDs for cleanup
var oldThreadIds []int
oldThreadIds = append(oldThreadIds, domain.GetMetricIntArray("cpu_threadIDs")...)
oldThreadIds = append(oldThreadIds, domain.GetMetricIntArray("cpu_otherThreadIDs")...)

// get core thread IDs
vCPUThreads, err := libvirtDomain.QemuMonitorCommand("info cpus", libvirt.DOMAIN_QEMU_MONITOR_COMMAND_HMP)
if err != nil {
Expand All @@ -36,7 +39,9 @@ func cpuLookup(domain *models.Domain, libvirtDomain libvirt.Domain) {
threadIDsRaw := regThreadID.FindAllStringSubmatch(vCPUThreads, -1)
coreThreadIDs := make([]int, len(threadIDsRaw))
for i, thread := range threadIDsRaw {
coreThreadIDs[i], _ = strconv.Atoi(thread[1])
threadID, _ := strconv.Atoi(thread[1])
coreThreadIDs[i] = threadID
oldThreadIds = removeFromArray(oldThreadIds, threadID)
}
newMeasurementThreads := models.CreateMeasurement(coreThreadIDs)
domain.AddMetricMeasurement("cpu_threadIDs", newMeasurementThreads)
Expand All @@ -47,7 +52,7 @@ func cpuLookup(domain *models.Domain, libvirtDomain libvirt.Domain) {
if err != nil {
return
}
otherThreadIDs := make([]int, len(files)-cores)
otherThreadIDs := make([]int, 0)
i := 0
for _, f := range files {
taskID, _ := strconv.Atoi(path.Base(f))
Expand All @@ -64,10 +69,19 @@ func cpuLookup(domain *models.Domain, libvirtDomain libvirt.Domain) {
continue
}
// taskID is not for a vCPU core
otherThreadIDs[i] = taskID
otherThreadIDs = append(otherThreadIDs, taskID)
oldThreadIds = removeFromArray(oldThreadIds, taskID)
i++
}
domain.AddMetricMeasurement("cpu_otherThreadIDs", models.CreateMeasurement(otherThreadIDs))

// remove cached but not existent thread IDs
for _, id := range oldThreadIds {
domain.DelMetricMeasurement(fmt.Sprint("cpu_times_", id))
domain.DelMetricMeasurement(fmt.Sprint("cpu_runqueues_", id))
domain.DelMetricMeasurement(fmt.Sprint("cpu_other_times_", id))
domain.DelMetricMeasurement(fmt.Sprint("cpu_other_runqueues_", id))
}
}

func cpuCollect(domain *models.Domain) {
Expand All @@ -82,27 +96,23 @@ func cpuCollect(domain *models.Domain) {

func cpuCollectMeasurements(domain *models.Domain, metricName string, measurementPrefix string) {
threadIDs := domain.GetMetricIntArray(metricName)
var cputimes []uint64
var runqueues []uint64
for _, threadID := range threadIDs {
schedstat := util.GetProcSchedStat(threadID)
cputimes = append(cputimes, schedstat.Cputime)
runqueues = append(runqueues, schedstat.Runqueue)
domain.AddMetricMeasurement(fmt.Sprint(measurementPrefix, "times_", threadID), models.CreateMeasurement(schedstat.Cputime))
domain.AddMetricMeasurement(fmt.Sprint(measurementPrefix, "runqueues_", threadID), models.CreateMeasurement(schedstat.Runqueue))
}
domain.AddMetricMeasurement(fmt.Sprint(measurementPrefix, "times"), models.CreateMeasurement(cputimes))
domain.AddMetricMeasurement(fmt.Sprint(measurementPrefix, "runqueues"), models.CreateMeasurement(runqueues))
}

func cpuPrint(domain *models.Domain) []string {
cores := collectors.GetMetricUint64(domain.Measurable, "cpu_cores", 0)

// cpu util for vcores
cputimeAllCores := CpuPrintThreadMetric(domain, "cpu_times")
queuetimeAllCores := CpuPrintThreadMetric(domain, "cpu_runqueues")
cputimeAllCores := CpuPrintThreadMetric(domain, "cpu_threadIDs", "cpu_times")
queuetimeAllCores := CpuPrintThreadMetric(domain, "cpu_threadIDs", "cpu_runqueues")

// cpu util for for other threads (i/o or emulation)
otherCputimeAllCores := CpuPrintThreadMetric(domain, "cpu_other_times")
otherQueuetimeAllCores := CpuPrintThreadMetric(domain, "cpu_other_runqueues")
otherCputimeAllCores := CpuPrintThreadMetric(domain, "cpu_otherThreadIDs", "cpu_other_times")
otherQueuetimeAllCores := CpuPrintThreadMetric(domain, "cpu_otherThreadIDs", "cpu_other_runqueues")

// put results together
result := append([]string{cores}, cputimeAllCores, queuetimeAllCores)
Expand All @@ -112,44 +122,35 @@ func cpuPrint(domain *models.Domain) []string {
return result
}

func CpuPrintThreadMetric(domain *models.Domain, metric string) string {
var times []string
var timeAllCores string
if metric, ok := domain.GetMetric(metric); ok {
if len(metric.Values) > 1 {
byteValue1 := metric.Values[0].Value
reader1 := bytes.NewReader(byteValue1)
dec1 := gob.NewDecoder(reader1)

byteValue2 := metric.Values[1].Value
reader2 := bytes.NewReader(byteValue2)
dec2 := gob.NewDecoder(reader2)

var timesRaw1 []uint64
var timesRaw2 []uint64
dec1.Decode(&timesRaw1)
dec2.Decode(&timesRaw2)

timeDiff := metric.Values[0].Timestamp.Sub(metric.Values[1].Timestamp).Seconds()
timeConversionFactor := 1000000000 / timeDiff

// for each thread ...
var timeSum float64
for i, time1 := range timesRaw1 {
if len(timesRaw2) <= i {
continue
}
time2 := timesRaw2[i]
if time1 < time2 {
// unexpected case, since dealing with counters
time2 = time1
}
time := float64(time1-time2) / timeConversionFactor
timeSum = timeSum + time
times = append(times, fmt.Sprintf("%.0f", time*100))
}
timeAllCores = fmt.Sprintf("%.0f", timeSum/float64(len(timesRaw1))*100)
func CpuPrintThreadMetric(domain *models.Domain, lookupMetric string, metric string) string {
threadIDs := domain.GetMetricIntArray(lookupMetric)
var measurementSum float64
var measurementCount int
for _, threadID := range threadIDs {
metricName := fmt.Sprint(metric, "_", threadID)
measurementStr := collectors.GetMetricDiffUint64(domain.Measurable, metricName, true)
if measurementStr == "" {
continue
}
measurement, err := strconv.ParseUint(measurementStr, 10, 64)
if err != nil {
continue
}
measurementSeconds := float64(measurement) / 1000000000 // since counters are nanoseconds
measurementSum += measurementSeconds
measurementCount++
}

avg := float64(measurementSum) / float64(measurementCount)
percent := avg * 100
return fmt.Sprintf("%.0f", percent)
}

func removeFromArray(s []int, r int) []int {
for i, v := range s {
if v == r {
return append(s[:i], s[i+1:]...)
}
}
return timeAllCores
return s
}
8 changes: 5 additions & 3 deletions collectors/diskcollector/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,11 @@ func diskLookup(domain *models.Domain, libvirtDomain libvirt.Domain) {
}

// sizes
sums.Capacity += sizeStats.Capacity
sums.Allocation += sizeStats.Allocation
sums.Physical += sizeStats.Physical
if sizeStats != nil {
sums.Capacity += sizeStats.Capacity
sums.Allocation += sizeStats.Allocation
sums.Physical += sizeStats.Physical
}

// find source path
if disk.Source != nil && disk.Source.File != nil {
Expand Down
3 changes: 1 addition & 2 deletions config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ var Options struct {
EnableIO bool `long:"io" description:"enable io metrics (requires root)"`
EnableHost bool `long:"host" description:"enable host metrics"`

PrintBatch bool `short:"b" long:"batch" description:"[DEPRECATED: use --printer=text instead] use simple output e.g. for scripts"`
Printer string `short:"p" long:"printer" description:"the output printer to use (valid printers: ncurses, text, json)" default:"ncurses"`
Printer string `short:"p" long:"printer" description:"the output printer to use (valid printers: ncurses, text, json)" default:"ncurses"`

Output string `short:"o" long:"output" description:"the output channel to send printer output (valid output: stdout, file, tcp)" default:"stdout"`
OutputTarget string `long:"target" description:"for output 'file' the location, for 'tcp' the url to the tcp server"`
Expand Down
5 changes: 5 additions & 0 deletions models/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func (measurable *Measurable) AddMetricMeasurement(metricName string, measuremen
//domain.Metrics.Store(metricName, metrics)
}

// DelMetricMeasurement removes a metric
func (measurable *Measurable) DelMetricMeasurement(metricName string) {
measurable.Metrics.Delete(metricName)
}

// GetMetric reads and returns the metric values by metric name
func (measurable *Measurable) GetMetric(metricName string) (*Metric, bool) {
rawmetric, exists := measurable.Metrics.Load(metricName)
Expand Down
4 changes: 2 additions & 2 deletions profiler/pickup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

func pickupCPU(domain models.Domain) (int, int) {
cputimeAllCores, _ := strconv.Atoi(cpucollector.CpuPrintThreadMetric(&domain, "cpu_times"))
queuetimeAllCores, _ := strconv.Atoi(cpucollector.CpuPrintThreadMetric(&domain, "cpu_runqueues"))
cputimeAllCores, _ := strconv.Atoi(cpucollector.CpuPrintThreadMetric(&domain, "cpu_threadIDs", "cpu_times"))
queuetimeAllCores, _ := strconv.Atoi(cpucollector.CpuPrintThreadMetric(&domain, "cpu_threadIDs", "cpu_runqueues"))
cpuUtil := cputimeAllCores + queuetimeAllCores
cpuMax := 100
return cpuUtil, cpuMax
Expand Down
14 changes: 7 additions & 7 deletions runners/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package runners

import (
"sync"
"time"

"github.com/cha87de/kvmtop/config"
"github.com/cha87de/kvmtop/models"
)

// InitializeCollect starts the periodic collect calls
func InitializeCollect(wg *sync.WaitGroup) {
for n := -1; config.Options.Runs == -1 || n < config.Options.Runs; n++ {
start := time.Now()
for {
// wait with execution for lookup routine
_, ok := <-lookupDone
if !ok {
wg.Done()
return
}
Collect()
nextRun := start.Add(time.Duration(config.Options.Frequency) * time.Second)
time.Sleep(nextRun.Sub(time.Now()))
}
wg.Done()
}

// Collect runs one collect cycle to measure frequently changing metrics
Expand Down
12 changes: 11 additions & 1 deletion runners/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,25 @@ import (
)

var processes []int
var lookupDone chan bool

// InitializeLookup starts the periodic lookup calls
func InitializeLookup(wg *sync.WaitGroup) {
lookupDone = make(chan bool)
for n := -1; config.Options.Runs == -1 || n < config.Options.Runs; n++ {
// execution, then sleep
start := time.Now()
Lookup()
nextRun := start.Add(time.Duration(config.Options.Frequency) * time.Second)
lookupDone <- true
freq := float32(config.Options.Frequency)
if n <= 1 {
// first run, half frequency only
freq = freq / 2
}
nextRun := start.Add(time.Duration(freq) * time.Second)
time.Sleep(nextRun.Sub(time.Now()))
}
close(lookupDone)
wg.Done()
}

Expand Down
Loading

0 comments on commit 4e74f75

Please sign in to comment.