-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
k8s-tester/clusterloader: initial commit
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
- Loading branch information
Showing
21 changed files
with
2,496 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,280 @@ | ||
package clusterloader | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"runtime" | ||
"strings" | ||
"text/template" | ||
"time" | ||
|
||
"github.com/aws/aws-k8s-tester/utils/file" | ||
utils_http "github.com/aws/aws-k8s-tester/utils/http" | ||
"go.uber.org/zap" | ||
"k8s.io/utils/exec" | ||
) | ||
|
||
var ( | ||
defaultClusterloaderVersion = "v1.6.0" | ||
defaultClusterloaderPath = fmt.Sprintf("/tmp/clusterloader2-%s", defaultClusterloaderVersion) | ||
// ref. https://github.com/aws/aws-k8s-tester/releases | ||
// e.g. https://github.com/aws/aws-k8s-tester/releases/download/v1.6.0/clusterloader2-darwin-arm64 | ||
defaultClusterloaderDownloadURL = fmt.Sprintf( | ||
"https://github.com/aws/aws-k8s-tester/releases/download/%s/clusterloader2-%s-%s", | ||
defaultClusterloaderVersion, | ||
runtime.GOOS, | ||
runtime.GOARCH, | ||
) | ||
) | ||
|
||
const DefaultProvider = "eks" | ||
|
||
func DefaultClusterloaderPath() string { | ||
return defaultClusterloaderPath | ||
} | ||
|
||
func DefaultClusterloaderDownloadURL() string { | ||
return defaultClusterloaderDownloadURL | ||
} | ||
|
||
func installClusterloader(lg *zap.Logger, clusterloaderPath string, clusterloaderDownloadURL string) (err error) { | ||
lg.Info("mkdir", zap.String("clusterloader-path-dir", filepath.Dir(clusterloaderPath))) | ||
if err = os.MkdirAll(filepath.Dir(clusterloaderPath), 0700); err != nil { | ||
lg.Warn("could not create", zap.String("dir", filepath.Dir(clusterloaderPath)), zap.Error(err)) | ||
return err | ||
} | ||
if !file.Exist(clusterloaderPath) { | ||
if clusterloaderDownloadURL == "" { | ||
lg.Warn("clusterloader path does not exist, clusterloader download URL empty", zap.String("clusterloader-path", clusterloaderPath)) | ||
return fmt.Errorf("clusterloader path %q does not exist and empty clusterloader download URL", clusterloaderPath) | ||
} | ||
clusterloaderPath, _ = filepath.Abs(clusterloaderPath) | ||
lg.Info("downloading clusterloader", zap.String("clusterloader-path", clusterloaderPath)) | ||
if err := utils_http.Download(lg, os.Stderr, clusterloaderDownloadURL, clusterloaderPath); err != nil { | ||
lg.Warn("failed to download clusterloader", zap.Error(err)) | ||
return err | ||
} | ||
} else { | ||
lg.Info("skipping clusterloader download; already exist", zap.String("clusterloader-path", clusterloaderPath)) | ||
} | ||
if err = file.EnsureExecutable(clusterloaderPath); err != nil { | ||
// file may be already executable while the process does not own the file/directory | ||
// ref. https://github.com/aws/aws-k8s-tester/issues/66 | ||
lg.Warn("failed to ensure executable", zap.Error(err)) | ||
err = nil | ||
} | ||
|
||
var output []byte | ||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) | ||
output, err = exec.New().CommandContext(ctx, clusterloaderPath, "--help").CombinedOutput() | ||
cancel() | ||
out := strings.TrimSpace(string(output)) | ||
if err != nil { | ||
lg.Warn("clusterloader2 --help failed", zap.Error(err)) | ||
} | ||
if !strings.Contains(out, "--alsologtostderr") { | ||
if err == nil { | ||
err = fmt.Errorf("%s --help failed", clusterloaderPath) | ||
} else { | ||
err = fmt.Errorf("%v; %s --help failed", err, clusterloaderPath) | ||
} | ||
} else { | ||
err = nil | ||
} | ||
|
||
fmt.Fprintf(os.Stderr, "\n'%s --help' output:\n\n%s\n\n", clusterloaderPath, out) | ||
return err | ||
} | ||
|
||
type TestOverride struct { | ||
// Path is the test override YAML file path. | ||
Path string `json:"path" read-only:"true"` | ||
|
||
NodesPerNamespace int `json:"nodes_per_namespace"` | ||
PodsPerNode int `json:"pods_per_node"` | ||
|
||
BigGroupSize int `json:"big_group_size"` | ||
MediumGroupSize int `json:"medium_group_size"` | ||
SmallGroupSize int `json:"small_group_size"` | ||
|
||
SmallStatefulSetsPerNamespace int `json:"small_stateful_sets_per_namespace"` | ||
MediumStatefulSetsPerNamespace int `json:"medium_stateful_sets_per_namespace"` | ||
|
||
CL2UseHostNetworkPods bool `json:"cl2_use_host_network_pods"` | ||
CL2LoadTestThroughput int `json:"cl2_load_test_throughput"` | ||
CL2EnablePVS bool `json:"cl2_enable_pvs"` | ||
CL2SchedulerThroughputThreshold int `json:"cl2_scheduler_throughput_threshold"` | ||
PrometheusScrapeKubeProxy bool `json:"prometheus_scrape_kube_proxy"` | ||
EnableSystemPodMetrics bool `json:"enable_system_pod_metrics"` | ||
} | ||
|
||
const ( | ||
DefaultNodesPerNamespace = 10 | ||
DefaultPodsPerNode = 10 | ||
|
||
DefaultBigGroupSize = 25 | ||
DefaultMediumGroupSize = 10 | ||
DefaultSmallGroupSize = 5 | ||
|
||
DefaultSmallStatefulSetsPerNamespace = 0 | ||
DefaultMediumStatefulSetsPerNamespace = 0 | ||
|
||
DefaultCL2UseHostNetworkPods = false | ||
DefaultCL2LoadTestThroughput = 20 | ||
DefaultCL2EnablePVS = false | ||
DefaultCL2SchedulerThroughputThreshold = 100 | ||
DefaultPrometheusScrapeKubeProxy = false | ||
DefaultEnableSystemPodMetrics = false | ||
) | ||
|
||
func newDefaultTestOverride() *TestOverride { | ||
return &TestOverride{ | ||
Path: DefaultTestOverridePath(), | ||
|
||
NodesPerNamespace: DefaultNodesPerNamespace, | ||
PodsPerNode: DefaultPodsPerNode, | ||
|
||
BigGroupSize: DefaultBigGroupSize, | ||
MediumGroupSize: DefaultMediumGroupSize, | ||
SmallGroupSize: DefaultSmallGroupSize, | ||
|
||
SmallStatefulSetsPerNamespace: DefaultSmallStatefulSetsPerNamespace, | ||
MediumStatefulSetsPerNamespace: DefaultMediumStatefulSetsPerNamespace, | ||
|
||
CL2UseHostNetworkPods: DefaultCL2UseHostNetworkPods, | ||
CL2LoadTestThroughput: DefaultCL2LoadTestThroughput, | ||
CL2EnablePVS: DefaultCL2EnablePVS, | ||
CL2SchedulerThroughputThreshold: DefaultCL2SchedulerThroughputThreshold, | ||
PrometheusScrapeKubeProxy: DefaultPrometheusScrapeKubeProxy, | ||
EnableSystemPodMetrics: DefaultEnableSystemPodMetrics, | ||
} | ||
} | ||
|
||
func (to *TestOverride) Sync() error { | ||
if to.Path == "" { | ||
to.Path = DefaultTestOverridePath() | ||
} | ||
|
||
buf := bytes.NewBuffer(nil) | ||
tpl := template.Must(template.New("templateTestOverrides").Parse(templateTestOverrides)) | ||
if err := tpl.Execute(buf, to); err != nil { | ||
return err | ||
} | ||
|
||
os.RemoveAll(to.Path) | ||
f, err := os.Create(to.Path) | ||
if err != nil { | ||
return err | ||
} | ||
_, err = f.Write(buf.Bytes()) | ||
f.Close() | ||
return err | ||
} | ||
|
||
// ref. https://github.com/kubernetes/perf-tests/tree/master/clusterloader2/testing/load | ||
// ref. https://github.com/kubernetes/perf-tests/tree/master/clusterloader2/testing/overrides | ||
// ref. https://github.com/kubernetes/perf-tests/pull/1345 | ||
const templateTestOverrides = `NODES_PER_NAMESPACE: {{ .NodesPerNamespace }} | ||
PODS_PER_NODE: {{ .PodsPerNode }} | ||
BIG_GROUP_SIZE: {{ .BigGroupSize }} | ||
MEDIUM_GROUP_SIZE: {{ .MediumGroupSize }} | ||
SMALL_GROUP_SIZE: {{ .SmallGroupSize }} | ||
SMALL_STATEFUL_SETS_PER_NAMESPACE: {{ .SmallStatefulSetsPerNamespace }} | ||
MEDIUM_STATEFUL_SETS_PER_NAMESPACE: {{ .MediumStatefulSetsPerNamespace }} | ||
CL2_USE_HOST_NETWORK_PODS: {{ .CL2UseHostNetworkPods }} | ||
CL2_LOAD_TEST_THROUGHPUT: {{ .CL2LoadTestThroughput }} | ||
CL2_ENABLE_PVS: {{ .CL2EnablePVS }} | ||
CL2_SCHEDULER_THROUGHPUT_THRESHOLD: {{ .CL2SchedulerThroughputThreshold }} | ||
PROMETHEUS_SCRAPE_KUBE_PROXY: {{ .PrometheusScrapeKubeProxy }} | ||
ENABLE_SYSTEM_POD_METRICS: {{ .EnableSystemPodMetrics }} | ||
` | ||
|
||
func parsePodStartupLatency(fpath string) (perfData PerfData, err error) { | ||
rf, err := os.OpenFile(fpath, os.O_RDONLY, 0444) | ||
if err != nil { | ||
return PerfData{}, fmt.Errorf("failed to open %q (%v)", fpath, err) | ||
} | ||
defer rf.Close() | ||
err = json.NewDecoder(rf).Decode(&perfData) | ||
return perfData, err | ||
} | ||
|
||
func mergePodStartupLatency(datas ...PerfData) (perfData PerfData) { | ||
if len(datas) == 0 { | ||
return perfData | ||
} | ||
if len(datas) == 1 { | ||
return datas[0] | ||
} | ||
|
||
perfData.Labels = make(map[string]string) | ||
labelToUnit := make(map[string]string) | ||
labelToData := make(map[string]map[string]float64) | ||
|
||
for _, d := range datas { | ||
perfData.Version = d.Version | ||
for k, v := range d.Labels { | ||
perfData.Labels[k] = v | ||
} | ||
for _, cur := range d.DataItems { | ||
b, err := json.Marshal(cur.Labels) | ||
if err != nil { | ||
panic(err) | ||
} | ||
key := string(b) | ||
|
||
labelToUnit[key] = cur.Unit | ||
prev, ok := labelToData[key] | ||
if ok { | ||
for k, v := range prev { | ||
// average | ||
cur.Data[k] += v | ||
cur.Data[k] /= 2.0 | ||
} | ||
} | ||
labelToData[key] = cur.Data | ||
} | ||
} | ||
|
||
for key, data := range labelToData { | ||
unit := labelToUnit[key] | ||
var labels map[string]string | ||
if err := json.Unmarshal([]byte(key), &labels); err != nil { | ||
panic(err) | ||
} | ||
perfData.DataItems = append(perfData.DataItems, DataItem{ | ||
Data: data, | ||
Labels: labels, | ||
Unit: unit, | ||
}) | ||
} | ||
return perfData | ||
} | ||
|
||
// Copy from: | ||
// https://pkg.go.dev/k8s.io/perf-tests/clusterloader2/pkg/measurement/util#PerfData | ||
type PerfData struct { | ||
// Version is the version of the metrics. The metrics consumer could use the version | ||
// to detect metrics version change and decide what version to support. | ||
Version string `json:"version"` | ||
DataItems []DataItem `json:"dataItems"` | ||
// Labels is the labels of the dataset. | ||
Labels map[string]string `json:"labels,omitempty"` | ||
} | ||
|
||
// Copy from: | ||
// https://pkg.go.dev/k8s.io/perf-tests/clusterloader2/pkg/measurement/util#DataItem | ||
type DataItem struct { | ||
// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice | ||
// that all data items with the same label combination should have the same buckets. | ||
Data map[string]float64 `json:"data"` | ||
// Unit is the data unit. Notice that all data items with the same label combination | ||
// should have the same unit. | ||
Unit string `json:"unit"` | ||
// Labels is the labels of the data item. | ||
Labels map[string]string `json:"labels,omitempty"` | ||
} |
Oops, something went wrong.