Skip to content

Commit

Permalink
[kubelet-to-gcm] Split requests that are too big & use standard flag lib
Browse files Browse the repository at this point in the history
  • Loading branch information
x13n committed Aug 29, 2018
1 parent 5d76d2e commit 921b037
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
2 changes: 1 addition & 1 deletion kubelet-to-gcm/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
OUT_DIR = build
PACKAGE = github.com/GoogleCloudPlatform/k8s-stackdriver/kubelet-to-gcm
PREFIX = staging-k8s.gcr.io
TAG = 1.2.6
TAG = 1.2.7

# Rules for building the real image for deployment to gcr.io

Expand Down
23 changes: 10 additions & 13 deletions kubelet-to-gcm/monitor/main/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

log "github.com/golang/glog"
"github.com/spf13/pflag"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"

Expand All @@ -41,25 +40,23 @@ const (

var (
// Flags to identify the Kubelet.
zone = pflag.String("zone", "use-gce", "The zone where this kubelet lives.")
project = pflag.String("project", "use-gce", "The project where this kubelet's host lives.")
cluster = pflag.String("cluster", "use-gce", "The cluster where this kubelet holds membership.")
kubeletInstance = pflag.String("kubelet-instance", "use-gce", "The instance name the kubelet resides on.")
kubeletHost = pflag.String("kubelet-host", "use-gce", "The kubelet's host name.")
kubeletPort = pflag.Uint("kubelet-port", 10255, "The kubelet's port.")
ctrlPort = pflag.Uint("controller-manager-port", 10252, "The kube-controller's port.")
zone = flag.String("zone", "use-gce", "The zone where this kubelet lives.")
project = flag.String("project", "use-gce", "The project where this kubelet's host lives.")
cluster = flag.String("cluster", "use-gce", "The cluster where this kubelet holds membership.")
kubeletInstance = flag.String("kubelet-instance", "use-gce", "The instance name the kubelet resides on.")
kubeletHost = flag.String("kubelet-host", "use-gce", "The kubelet's host name.")
kubeletPort = flag.Uint("kubelet-port", 10255, "The kubelet's port.")
ctrlPort = flag.Uint("controller-manager-port", 10252, "The kube-controller's port.")
// Flags to control runtime behavior.
res = pflag.Uint("resolution", 10, "The time, in seconds, to poll the Kubelet.")
gcmEndpoint = pflag.String("gcm-endpoint", "", "The GCM endpoint to hit. Defaults to the default endpoint.")
res = flag.Uint("resolution", 10, "The time, in seconds, to poll the Kubelet.")
gcmEndpoint = flag.String("gcm-endpoint", "", "The GCM endpoint to hit. Defaults to the default endpoint.")
)

func main() {
// First log our starting config, and then set up.
flag.CommandLine.Parse([]string{})
flag.Set("logtostderr", "true") // This spoofs glog into teeing logs to stderr.

defer log.Flush()
pflag.Parse()
flag.Parse()
log.Infof("Invoked by %v", os.Args)

resolution := time.Second * time.Duration(*res)
Expand Down
45 changes: 35 additions & 10 deletions kubelet-to-gcm/monitor/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
v3 "google.golang.org/api/monitoring/v3"
)

const maxTimeSeriesPerRequest = 200

// SourceConfig is the set of data required to configure a kubernetes
// data source (e.g., kubelet or kube-controller).
type SourceConfig struct {
Expand All @@ -47,18 +49,41 @@ func Once(src MetricsSource, gcm *v3.Service) {
return
}

// Push that data to GCM's v3 API.
createCall := gcm.Projects.TimeSeries.Create(src.ProjectPath(), req)
if empty, err := createCall.Do(); err != nil {
log.Warningf("Failed to write time series data, empty: %v, err: %v", empty, err)
for _, subReq := range subRequests(req) {
// Push that data to GCM's v3 API.
createCall := gcm.Projects.TimeSeries.Create(src.ProjectPath(), subReq)
if empty, err := createCall.Do(); err != nil {
log.Warningf("Failed to write time series data, empty: %v, err: %v", empty, err)

jsonReq, err := req.MarshalJSON()
if err != nil {
log.Warningf("Failed to marshal time series as JSON")
jsonReq, err := subReq.MarshalJSON()
if err != nil {
log.Warningf("Failed to marshal time series as JSON")
return
}
log.Warningf("JSON GCM: %s", string(jsonReq[:]))
return
}
log.Warningf("JSON GCM: %s", string(jsonReq[:]))
return
log.V(4).Infof("Successfully wrote TimeSeries data for %s to GCM v3 API.", src.Name())
}
}

func subRequests(req *v3.CreateTimeSeriesRequest) []*v3.CreateTimeSeriesRequest {
tsCount := len(req.TimeSeries)
if tsCount <= maxTimeSeriesPerRequest {
return []*v3.CreateTimeSeriesRequest{req}
}
subRequestsCount := (tsCount-1)/maxTimeSeriesPerRequest + 1
log.V(2).Infof("Splitting CreateTimeSeriesRequest into %v requests", subRequestsCount)
subReqs := make([]*v3.CreateTimeSeriesRequest, subRequestsCount)
for i := 0; i < subRequestsCount; i++ {
startIdx := i * maxTimeSeriesPerRequest
endIdx := (i + 1) * maxTimeSeriesPerRequest
if endIdx > tsCount {
endIdx = tsCount
}
subReqs[i] = &v3.CreateTimeSeriesRequest{
TimeSeries: req.TimeSeries[startIdx:endIdx],
}
}
log.V(4).Infof("Successfully wrote TimeSeries data for %s to GCM v3 API.", src.Name())
return subReqs
}

0 comments on commit 921b037

Please sign in to comment.