Skip to content
This repository was archived by the owner on Nov 24, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dependency_license
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ CHANGELOG$, Docs
\.cfg$, Apache-2.0
\.json$, Apache-2.0
\.webmanifest, Apache-2.0
\.csv$, Apache-2.0
\.conf$, Apache-2.0
\.config(\.example)?$, Apache-2.0
/\.bowerrc$, Apache-2.0
Expand Down
6 changes: 6 additions & 0 deletions docs/source/admin/traffic_monitor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ Note that this means the stat buffer interval acts as "bufferbloat," increasing

It is not recommended to set either flush interval to 0, regardless of the stat buffer interval. This will cause new results to be immediately processed, with little to no processing of multiple results concurrently. Result processing does not scale linearly. For example, processing 100 results at once does not cost significantly more CPU usage or time than processing 10 results at once. Thus, a flush interval which is too low will cause increased CPU usage, and potentially increased overall poll times, with little or no benefit. The default value of 200 milliseconds is recommended as a starting point for configuration tuning.

HTTP Accept Header Configuration
--------------------------------
The Accept header sent to caches for stat retrieval can be modified with the ``http_polling_format`` option. This is a string that will be inserted in to the Accept header of any requests. The default value is ``text/json`` which is the default value used by the astats plugin currently.

However newer versions of astats also support CSV output, which can have some CPU savings. To enable that format using ``http_polling_format: "text/csv"`` in :file:`traffic_monitor.cfg` will set the Accept header properly.

Troubleshooting and Log Files
=============================
Traffic Monitor log files are in :file:`/opt/traffic_monitor/var/log/`.
Expand Down
527 changes: 527 additions & 0 deletions traffic_monitor/cache/astats.csv

Large diffs are not rendered by default.

81 changes: 46 additions & 35 deletions traffic_monitor/cache/astats.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/apache/trafficcontrol/lib/go-log"
"github.com/apache/trafficcontrol/traffic_monitor/dsdata"
"github.com/apache/trafficcontrol/traffic_monitor/poller"
"github.com/apache/trafficcontrol/traffic_monitor/todata"
jsoniter "github.com/json-iterator/go"
)
Expand Down Expand Up @@ -68,50 +69,60 @@ type Astats struct {
System AstatsSystem `json:"system"`
}

func astatsParse(cacheName string, rdr io.Reader) (Statistics, map[string]interface{}, error) {
func astatsParse(cacheName string, rdr io.Reader, pollCTX interface{}) (Statistics, map[string]interface{}, error) {
var stats Statistics
if rdr == nil {
log.Warnf("%s handle reader nil", cacheName)
return stats, nil, errors.New("handler got nil reader")
}

var astats Astats
json := jsoniter.ConfigFastest
if err := json.NewDecoder(rdr).Decode(&astats); err != nil {
return stats, nil, err
}
ctx := pollCTX.(*poller.HTTPPollCtx)

if err := stats.AddInterfaceFromRawLine(astats.System.ProcNetDev); err != nil {
return stats, nil, fmt.Errorf("Failed to parse interface line for cache '%s': %v", cacheName, err)
}
if inf, ok := stats.Interfaces[astats.System.InfName]; !ok {
return stats, nil, errors.New("/proc/net/dev line didn't match reported interface line")
} else {
inf.Speed = int64(astats.System.InfSpeed)
stats.Interfaces[astats.System.InfName] = inf
}
ctype := ctx.HTTPHeader.Get("Content-Type")

if load, err := LoadavgFromRawLine(astats.System.ProcLoadavg); err != nil {
return stats, nil, fmt.Errorf("Failed to parse loadavg line for cache '%s': %v", cacheName, err)
} else {
stats.Loadavg = load
}
if ctype == "text/json" || ctype == "text/javascript" || ctype == "" {
var astats Astats
json := jsoniter.ConfigFastest
if err := json.NewDecoder(rdr).Decode(&astats); err != nil {
return stats, nil, err
}

stats.NotAvailable = astats.System.NotAvailable
if err := stats.AddInterfaceFromRawLine(astats.System.ProcNetDev); err != nil {
return stats, nil, fmt.Errorf("failed to parse interface line for cache '%s': %v", cacheName, err)
}
if inf, ok := stats.Interfaces[astats.System.InfName]; !ok {
return stats, nil, errors.New("/proc/net/dev line didn't match reported interface line")
} else {
inf.Speed = int64(astats.System.InfSpeed)
stats.Interfaces[astats.System.InfName] = inf
}

if load, err := LoadavgFromRawLine(astats.System.ProcLoadavg); err != nil {
return stats, nil, fmt.Errorf("failed to parse loadavg line for cache '%s': %v", cacheName, err)
} else {
stats.Loadavg = load
}

// TODO: what's using these?? Can we get rid of them?
astats.Ats["system.astatsLoad"] = float64(astats.System.AstatsLoad)
astats.Ats["system.configReloadRequests"] = float64(astats.System.ConfigLoadRequest)
astats.Ats["system.configReloads"] = float64(astats.System.ConfigReloads)
astats.Ats["system.inf.name"] = astats.System.InfName
astats.Ats["system.inf.speed"] = float64(astats.System.InfSpeed)
astats.Ats["system.lastReload"] = float64(astats.System.LastReload)
astats.Ats["system.lastReloadRequest"] = float64(astats.System.LastReloadRequest)
astats.Ats["system.notAvailable"] = stats.NotAvailable
astats.Ats["system.proc.loadavg"] = astats.System.ProcLoadavg
astats.Ats["system.proc.net.dev"] = astats.System.ProcNetDev
stats.NotAvailable = astats.System.NotAvailable

return stats, astats.Ats, nil
// TODO: what's using these?? Can we get rid of them?
astats.Ats["system.astatsLoad"] = float64(astats.System.AstatsLoad)
astats.Ats["system.configReloadRequests"] = float64(astats.System.ConfigLoadRequest)
astats.Ats["system.configReloads"] = float64(astats.System.ConfigReloads)
astats.Ats["system.inf.name"] = astats.System.InfName
astats.Ats["system.inf.speed"] = float64(astats.System.InfSpeed)
astats.Ats["system.lastReload"] = float64(astats.System.LastReload)
astats.Ats["system.lastReloadRequest"] = float64(astats.System.LastReloadRequest)
astats.Ats["system.notAvailable"] = stats.NotAvailable
astats.Ats["system.proc.loadavg"] = astats.System.ProcLoadavg
astats.Ats["system.proc.net.dev"] = astats.System.ProcNetDev

return stats, astats.Ats, nil
} else if ctype == "text/csv" {
return astatsCsvParseCsv(cacheName, rdr)
} else {
return stats, nil, fmt.Errorf("stats Content-Type (%s) can not be parsed by astats", ctype)
}
}

func astatsPrecompute(cacheName string, data todata.TOData, stats Statistics, miscStats map[string]interface{}) PrecomputedData {
Expand Down Expand Up @@ -190,10 +201,10 @@ func astatsProcessStatPluginRemapStats(server string, stats map[string]*DSStat,

ds, ok := toData.DeliveryServiceRegexes.DeliveryService(domain, subdomain, subsubdomain)
if !ok {
return stats, fmt.Errorf("No Delivery Service match for '%s.%s.%s' stat '%v'", subsubdomain, subdomain, domain, strings.Join(statParts, "."))
return stats, fmt.Errorf("no Delivery Service match for '%s.%s.%s' stat '%v'", subsubdomain, subdomain, domain, strings.Join(statParts, "."))
}
if ds == "" {
return stats, fmt.Errorf("Empty Delivery Service fqdn '%s.%s.%s' stat %v", subsubdomain, subdomain, domain, strings.Join(statParts, "."))
return stats, fmt.Errorf("empty Delivery Service fqdn '%s.%s.%s' stat %v", subsubdomain, subdomain, domain, strings.Join(statParts, "."))
}

dsName := string(ds)
Expand Down
123 changes: 123 additions & 0 deletions traffic_monitor/cache/astats_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package cache

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import (
"bufio"
"errors"
"fmt"
"io"
"strconv"
"strings"

"github.com/apache/trafficcontrol/lib/go-log"
)

type astatsDataCsv struct {
Ats map[string]interface{}
}

func astatsCsvParseCsv(cacheName string, data io.Reader) (Statistics, map[string]interface{}, error) {
var stats Statistics
var err error
if data == nil {
log.Warnf("Cannot read stats data for cache '%s' - nil data reader", cacheName)
return stats, nil, errors.New("handler got nil reader")
}

var atsData astatsDataCsv
var allData []string
scanner := bufio.NewScanner(data)
for scanner.Scan() {
allData = append(allData, scanner.Text())
}

atsData.Ats = make(map[string]interface{}, len(allData))

for _, line := range allData {
delim := strings.IndexByte(line, ',')

// No delimiter found, skip this line as invalid
if delim < 0 {
continue
}
// Special cases where we just want the string value
if strings.Contains(line[0:delim], "proc.") || strings.Contains(line[0:delim], "inf.name") {
atsData.Ats[line[0:delim]] = line[delim+1:]
} else {
value, err := strconv.ParseFloat(line[delim+1:], 64)

// Skip values that dont parse
if err != nil {
continue
}
atsData.Ats[line[0:delim]] = value
}
}

if len(atsData.Ats) < 1 {
return stats, nil, errors.New("no 'global' data object found in stats_over_http payload")
}

statMap := atsData.Ats

// Handle system specific values and remove them from the map for precomputing to not have issues
if stats.Loadavg, err = LoadavgFromRawLine(statMap["proc.loadavg"].(string)); err != nil {
return stats, nil, fmt.Errorf("parsing loadavg for cache '%s': %v", cacheName, err)
} else {
delete(statMap, "proc.loadavg")
}

if err := stats.AddInterfaceFromRawLine(statMap["proc.net.dev"].(string)); err != nil {
return stats, nil, fmt.Errorf("failed to parse interface line for cache '%s': %v", cacheName, err)
} else {
delete(statMap, "proc.net.dev")
}

if inf, ok := stats.Interfaces[statMap["inf.name"].(string)]; !ok {
return stats, nil, errors.New("/proc/net/dev line didn't match reported interface line")
} else {
inf.Speed = int64(statMap["inf.speed"].(float64)) //strconv.ParseInt(statMap["inf.speed"].(string), 10, 64)
stats.Interfaces[statMap["inf.name"].(string)] = inf
delete(statMap, "inf.speed")
delete(statMap, "inf.name")

}

// Clean up other non-stats entries
nonStats := []string{
"astatsLoad",
"lastReloadRequest",
"version",
"something",
"lastReload",
"configReloadRequests",
"configReloads",
}
for _, nonStat := range nonStats {
delete(statMap, nonStat)
}

if len(stats.Interfaces) < 1 {
return stats, nil, fmt.Errorf("cache '%s' had no interfaces", cacheName)
}

return stats, statMap, nil
}
81 changes: 73 additions & 8 deletions traffic_monitor/cache/astats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,93 @@ package cache
*/

import (
"bytes"
"io/ioutil"
"math/rand"
"net/http"
"os"
"testing"

"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/traffic_monitor/poller"
"github.com/apache/trafficcontrol/traffic_monitor/todata"

"github.com/json-iterator/go"
)

func TestAstats(t *testing.T) {
text, err := ioutil.ReadFile("astats.json")
func TestAstatsJson(t *testing.T) {
file, err := os.Open("astats.json")
if err != nil {
t.Fatal(err)
}
aStats := Astats{}
json := jsoniter.ConfigFastest
err = json.Unmarshal(text, &aStats)

pl := &poller.HTTPPollCtx{HTTPHeader: http.Header{}}
ctx := interface{}(pl)
ctx.(*poller.HTTPPollCtx).HTTPHeader.Set("Content-Type", "text/json")
_, _, err = astatsParse("testCache", file, ctx)

if err != nil {
t.Error(err)
}
t.Logf("Found %v key/val pairs in ats\n", len(aStats.Ats))
}

func TestAstatsCSV(t *testing.T) {
file, err := os.Open("astats.csv")
if err != nil {
t.Fatal(err)
}

pl := &poller.HTTPPollCtx{HTTPHeader: http.Header{}}
ctx := interface{}(pl)
ctx.(*poller.HTTPPollCtx).HTTPHeader.Set("Content-Type", "text/csv")
_, _, err = astatsParse("testCache", file, ctx)

if err != nil {
t.Error(err)
}
}

func BenchmarkAstatsJson(b *testing.B) {
file, err := ioutil.ReadFile("astats.json")
if err != nil {
b.Fatal(err)
}

pl := &poller.HTTPPollCtx{HTTPHeader: http.Header{}}
ctx := interface{}(pl)
ctx.(*poller.HTTPPollCtx).HTTPHeader.Set("Content-Type", "text/json")
// Reset benchmark timer to not include reading the file
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := astatsParse("testCache", bytes.NewReader(file), ctx)

if err != nil {
b.Error(err)
}
}
}

func BenchmarkAstatsCSV(b *testing.B) {
file, err := ioutil.ReadFile("astats.csv")
if err != nil {
b.Fatal(err)
}

// Reset benchmark timer to not include reading the file
b.ResetTimer()
pl := &poller.HTTPPollCtx{HTTPHeader: http.Header{}}
ctx := interface{}(pl)
ctx.(*poller.HTTPPollCtx).HTTPHeader.Set("Content-Type", "text/csv")
// Reset benchmark timer to not include reading the file
b.ReportAllocs()

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := astatsParse("testCache", bytes.NewReader(file), ctx)

if err != nil {
b.Error(err)
}
}
}

func getMockTODataDSNameDirectMatches() map[tc.DeliveryServiceName]string {
Expand Down
4 changes: 2 additions & 2 deletions traffic_monitor/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func ComputedStats() map[string]StatComputeFunc {
}

// Handle handles results fetched from a cache, parsing the raw Reader data and passing it along to a chan for further processing.
func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime time.Duration, reqEnd time.Time, reqErr error, pollID uint64, usingIPv4 bool, pollFinished chan<- uint64) {
func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime time.Duration, reqEnd time.Time, reqErr error, pollID uint64, usingIPv4 bool, pollCtx interface{}, pollFinished chan<- uint64) {
log.Debugf("poll %v %v (format '%v') handle start\n", pollID, time.Now(), format)
result := Result{
ID: id,
Expand All @@ -304,7 +304,7 @@ func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime t
return
}

stats, miscStats, err := decoder.Parse(result.ID, rdr)
stats, miscStats, err := decoder.Parse(result.ID, rdr, pollCtx)
if err != nil {
log.Warnf("%s decode error '%v'", id, err)
result.Error = err
Expand Down
2 changes: 1 addition & 1 deletion traffic_monitor/cache/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func init() {
registerDecoder("noop", noOpParse, noopPrecompute)
}

func noOpParse(string, io.Reader) (Statistics, map[string]interface{}, error) {
func noOpParse(string, io.Reader, interface{}) (Statistics, map[string]interface{}, error) {
stats := Statistics{
Loadavg: Loadavg{
One: 0.1,
Expand Down
Loading