diff --git a/collectors/collectors.go b/collectors/collectors.go index 1f13158e..78d9584f 100644 --- a/collectors/collectors.go +++ b/collectors/collectors.go @@ -94,6 +94,11 @@ func NewCollector( res.collectors = append(res.collectors, collector) } + if filter.Enabled(filters.Tasks) { + collector := NewTasksCollector(namespace, environment, deployment) + res.collectors = append(res.collectors, collector) + } + if filter.Enabled(filters.Events) { collector := NewEventsCollector(namespace, environment, deployment) res.collectors = append(res.collectors, collector) diff --git a/collectors/tasks.go b/collectors/tasks.go new file mode 100644 index 00000000..95e55539 --- /dev/null +++ b/collectors/tasks.go @@ -0,0 +1,263 @@ +package collectors + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/bosh-prometheus/cf_exporter/models" +) + +type TasksCollector struct { + namespace string + environment string + deployment string + taskInfoMetric *prometheus.GaugeVec + tasksCountMetric *prometheus.GaugeVec + tasksMemoryMbSumMetric *prometheus.GaugeVec + tasksDiskQuotaMbSumMetric *prometheus.GaugeVec + tasksOldestCreatedAtMetric *prometheus.GaugeVec + tasksScrapesTotalMetric prometheus.Counter + tasksScrapeErrorsTotalMetric prometheus.Counter + lastTasksScrapeErrorMetric prometheus.Gauge + lastTasksScrapeTimestampMetric prometheus.Gauge + lastTasksScrapeDurationSecondsMetric prometheus.Gauge +} + +func NewTasksCollector( + namespace string, + environment string, + deployment string, +) *TasksCollector { + taskInfoMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "task", + Name: "info", + Help: "Labeled Cloud Foundry Task information with a constant '1' value.", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + []string{"application_id", "state"}, + ) + + tasksCountMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "task", + Name: "count", + Help: "Number of Cloud Foundry Tasks.", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + []string{"application_id", "state"}, + ) + + tasksMemoryMbSumMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "task", + Name: "memory_mb_sum", + Help: "Sum of Cloud Foundry Tasks Memory (Mb).", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + []string{"application_id", "state"}, + ) + + tasksDiskQuotaMbSumMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "task", + Name: "disk_quota_mb_sum", + Help: "Sum of Cloud Foundry Tasks Disk Quota (Mb).", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + []string{"application_id", "state"}, + ) + + tasksOldestCreatedAtMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "task", + Name: "oldest_created_at", + Help: "Number of seconds since 1970 of creation time of oldest Cloud Foundry task.", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + []string{"application_id", "state"}, + ) + + tasksScrapesTotalMetric := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "tasks_scrapes", + Name: "total", + Help: "Total number of scrapes for Cloud Foundry Tasks.", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + ) + + tasksScrapeErrorsTotalMetric := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "tasks_scrape_errors", + Name: "total", + Help: "Total number of scrape error of Cloud Foundry Tasks.", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + ) + + lastTasksScrapeErrorMetric := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "", + Name: "last_tasks_scrape_error", + Help: "Whether the last scrape of Tasks metrics from Cloud Foundry resulted in an error (1 for error, 0 for success).", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + ) + + lastTasksScrapeTimestampMetric := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "", + Name: "last_tasks_scrape_timestamp", + Help: "Number of seconds since 1970 since last scrape of Tasks metrics from Cloud Foundry.", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + ) + + lastTasksScrapeDurationSecondsMetric := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "", + Name: "last_tasks_scrape_duration_seconds", + Help: "Duration of the last scrape of Tasks metrics from Cloud Foundry.", + ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment}, + }, + ) + + return &TasksCollector{ + namespace: namespace, + environment: environment, + deployment: deployment, + taskInfoMetric: taskInfoMetric, + tasksCountMetric: tasksCountMetric, + tasksMemoryMbSumMetric: tasksMemoryMbSumMetric, + tasksDiskQuotaMbSumMetric: tasksDiskQuotaMbSumMetric, + tasksOldestCreatedAtMetric: tasksOldestCreatedAtMetric, + tasksScrapesTotalMetric: tasksScrapesTotalMetric, + tasksScrapeErrorsTotalMetric: tasksScrapeErrorsTotalMetric, + lastTasksScrapeErrorMetric: lastTasksScrapeErrorMetric, + lastTasksScrapeTimestampMetric: lastTasksScrapeTimestampMetric, + lastTasksScrapeDurationSecondsMetric: lastTasksScrapeDurationSecondsMetric, + } +} + +func (c TasksCollector) Collect(objs *models.CFObjects, ch chan<- prometheus.Metric) { + errorMetric := float64(0) + if objs.Error != nil { + errorMetric = float64(1) + c.tasksScrapeErrorsTotalMetric.Inc() + } else { + c.reportTasksMetrics(objs, ch) + } + + c.tasksScrapeErrorsTotalMetric.Collect(ch) + c.tasksScrapesTotalMetric.Inc() + c.tasksScrapesTotalMetric.Collect(ch) + + c.lastTasksScrapeErrorMetric.Set(errorMetric) + c.lastTasksScrapeErrorMetric.Collect(ch) + + c.lastTasksScrapeTimestampMetric.Set(float64(time.Now().Unix())) + c.lastTasksScrapeTimestampMetric.Collect(ch) + c.lastTasksScrapeDurationSecondsMetric.Set(objs.Took) + + c.lastTasksScrapeDurationSecondsMetric.Collect(ch) +} + +func (c TasksCollector) Describe(ch chan<- *prometheus.Desc) { + c.taskInfoMetric.Describe(ch) + c.tasksCountMetric.Describe(ch) + c.tasksMemoryMbSumMetric.Describe(ch) + c.tasksDiskQuotaMbSumMetric.Describe(ch) + c.tasksOldestCreatedAtMetric.Describe(ch) + c.tasksScrapesTotalMetric.Describe(ch) + c.tasksScrapeErrorsTotalMetric.Describe(ch) + c.lastTasksScrapeErrorMetric.Describe(ch) + c.lastTasksScrapeTimestampMetric.Describe(ch) + c.lastTasksScrapeDurationSecondsMetric.Describe(ch) +} + +func (c TasksCollector) reportTasksMetrics(objs *models.CFObjects, ch chan<- prometheus.Metric) error { + c.taskInfoMetric.Reset() + c.tasksCountMetric.Reset() + c.tasksMemoryMbSumMetric.Reset() + c.tasksDiskQuotaMbSumMetric.Reset() + c.tasksOldestCreatedAtMetric.Reset() + + type keyType struct { + application_id string + state string + } + groupedTasks := map[keyType][]*models.Task{} + for _, task := range objs.Tasks { + application_id := "unavailable" + if app, ok := task.Relationships["app"]; ok && app.GUID != "" { + application_id = app.GUID + } + key := keyType{application_id, string(task.State)} + + existingValue, ok := groupedTasks[key] + if !ok { + existingValue = []*models.Task{} + } + groupedTasks[key] = append(existingValue, &task) + } + + for key, tasks := range groupedTasks { + c.taskInfoMetric.WithLabelValues( + key.application_id, + key.state, + ).Set(float64(1)) + + c.tasksCountMetric.WithLabelValues( + key.application_id, + key.state, + ).Set(float64(len(tasks))) + + memorySum := int64(0) + for _, task := range tasks { + memorySum += int64(task.MemoryInMb) + } + c.tasksMemoryMbSumMetric.WithLabelValues( + key.application_id, + key.state, + ).Set(float64(memorySum)) + + diskSum := int64(0) + for _, task := range tasks { + diskSum += int64(task.DiskInMb) + } + c.tasksDiskQuotaMbSumMetric.WithLabelValues( + key.application_id, + key.state, + ).Set(float64(diskSum)) + + createdAtOldest := time.Now() + for _, task := range tasks { + if task.CreatedAt.Before(createdAtOldest) { + createdAtOldest = task.CreatedAt + } + } + c.tasksOldestCreatedAtMetric.WithLabelValues( + key.application_id, + key.state, + ).Set(float64(createdAtOldest.Unix())) + } + + c.taskInfoMetric.Collect(ch) + c.tasksCountMetric.Collect(ch) + c.tasksMemoryMbSumMetric.Collect(ch) + c.tasksDiskQuotaMbSumMetric.Collect(ch) + c.tasksOldestCreatedAtMetric.Collect(ch) + + return nil +} diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go index a82aac38..300298fa 100644 --- a/fetcher/fetcher.go +++ b/fetcher/fetcher.go @@ -19,6 +19,10 @@ var ( Key: ccv3.OrderBy, Values: []string{"-created_at"}, } + TaskActiveStates = ccv3.Query{ + Key: ccv3.StatesFilter, + Values: []string{"PENDING", "RUNNING", "CANCELING"}, + } ) type CFConfig struct { @@ -67,6 +71,7 @@ func (c *Fetcher) workInit() { c.worker.PushIf("security_groups", c.fetchSecurityGroups, filters.SecurityGroups) c.worker.PushIf("stacks", c.fetchStacks, filters.Stacks) c.worker.PushIf("buildpacks", c.fetchBuildpacks, filters.Buildpacks) + c.worker.PushIf("tasks", c.fetchTasks, filters.Tasks) c.worker.PushIf("service_brokers", c.fetchServiceBrokers, filters.Services) c.worker.PushIf("service_offerings", c.fetchServiceOfferings, filters.Services) c.worker.PushIf("service_instances", c.fetchServiceInstances, filters.ServiceInstances) diff --git a/fetcher/fetcher_handlers.go b/fetcher/fetcher_handlers.go index b4123a8f..a5175bdc 100644 --- a/fetcher/fetcher_handlers.go +++ b/fetcher/fetcher_handlers.go @@ -147,6 +147,14 @@ func (c *Fetcher) fetchBuildpacks(session *SessionExt, entry *models.CFObjects) return err } +func (c *Fetcher) fetchTasks(session *SessionExt, entry *models.CFObjects) error { + tasks, err := session.GetTasks() + if err == nil { + loadIndex(entry.Tasks, tasks, func(r models.Task) string { return r.GUID }) + } + return err +} + func (c *Fetcher) fetchServiceBrokers(session *SessionExt, entry *models.CFObjects) error { servicebrokers, _, err := session.V3().GetServiceBrokers(LargeQuery) if err == nil { diff --git a/fetcher/fetcher_test.go b/fetcher/fetcher_test.go index 44b0c242..57d8ddb8 100644 --- a/fetcher/fetcher_test.go +++ b/fetcher/fetcher_test.go @@ -75,6 +75,7 @@ var _ = Describe("Fetcher", func() { "security_groups", "stacks", "buildpacks", + "tasks", "service_brokers", "service_offerings", "service_instances", @@ -120,6 +121,16 @@ var _ = Describe("Fetcher", func() { }) }) + When("tasks filter is set", func() { + BeforeEach(func() { + active = []string{ filters.Tasks } + expected = []string{ "info", "tasks" } + }) + It("plans only specific jobs", func() { + Ω(jobs).Should(ConsistOf(expected)) + }) + }) + When("isolationsegments filter is set", func() { BeforeEach(func() { active = []string{filters.IsolationSegments} diff --git a/fetcher/sessionext.go b/fetcher/sessionext.go index ba2ab7ce..16b837c5 100644 --- a/fetcher/sessionext.go +++ b/fetcher/sessionext.go @@ -73,6 +73,20 @@ func (s SessionExt) GetApplications() ([]models.Application, error) { return res, err } +func (s SessionExt) GetTasks() ([]models.Task, error) { + res := []models.Task{} + _, _, err := s.V3().MakeListRequest(ccv3.RequestParams{ + RequestName: "GetTasks", + Query: []ccv3.Query{LargeQuery, TaskActiveStates}, + ResponseBody: models.Task{}, + AppendToList: func(item interface{}) error { + res = append(res, item.(models.Task)) + return nil + }, + }) + return res, err +} + func (s SessionExt) GetOrganizationQuotas() ([]models.Quota, error) { res := []models.Quota{} _, _, err := s.V3().MakeListRequest(ccv3.RequestParams{ diff --git a/fetcher/sessionext_test.go b/fetcher/sessionext_test.go index 730a5678..4dbf43d7 100644 --- a/fetcher/sessionext_test.go +++ b/fetcher/sessionext_test.go @@ -114,6 +114,33 @@ var _ = Describe("Extensions", func() { }) }) + Context("fetching tasks", func() { + It("no error occurs", func() { + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/v3/tasks", "per_page=5000&states=PENDING,RUNNING,CANCELING"), + ghttp.RespondWith(http.StatusOK, serializeList( + models.Task{ + GUID: "guid1", + State: constant.TaskPending, + }, + models.Task{ + GUID: "guid2", + State: constant.TaskCanceling, + }, + )), + ), + ) + objs, err := target.GetTasks() + Ω(err).ShouldNot(HaveOccurred()) + Ω(objs).Should(HaveLen(2)) + Ω(objs[0].GUID).Should(Equal("guid1")) + Ω(objs[0].State).Should(Equal(constant.TaskPending)) + Ω(objs[1].GUID).Should(Equal("guid2")) + Ω(objs[1].State).Should(Equal(constant.TaskCanceling)) + }) + }) + Context("fetching org quotas", func() { It("no error occurs", func() { server.AppendHandlers( diff --git a/filters/filters.go b/filters/filters.go index 8fa99310..e15a0e57 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -19,6 +19,7 @@ const ( Services = "services" Spaces = "spaces" Stacks = "stacks" + Tasks = "tasks" ) var ( @@ -36,6 +37,7 @@ var ( Services, Spaces, Stacks, + Tasks, } ) @@ -58,6 +60,7 @@ func NewFilter(active ...string) (*Filter, error) { Services: true, Spaces: true, Stacks: true, + Tasks: false, Events: false, }, } @@ -86,6 +89,7 @@ func (f *Filter) setActive(active []string) error { Services: false, Spaces: false, Stacks: false, + Tasks: false, Events: false, } diff --git a/filters/filters_test.go b/filters/filters_test.go index 68ee474c..db0ded93 100644 --- a/filters/filters_test.go +++ b/filters/filters_test.go @@ -34,6 +34,7 @@ var _ = Describe("Filters", func() { Expect(f.Enabled(filters.Services)).To(BeTrue()) Expect(f.Enabled(filters.Spaces)).To(BeTrue()) Expect(f.Enabled(filters.Stacks)).To(BeTrue()) + Expect(f.Enabled(filters.Tasks)).To(BeFalse()) Expect(f.Enabled(filters.Events)).To(BeFalse()) }) }) @@ -57,6 +58,7 @@ var _ = Describe("Filters", func() { Expect(f.Enabled(filters.Services)).To(BeFalse()) Expect(f.Enabled(filters.Spaces)).To(BeFalse()) Expect(f.Enabled(filters.Stacks)).To(BeTrue()) + Expect(f.Enabled(filters.Tasks)).To(BeFalse()) Expect(f.Enabled(filters.Events)).To(BeFalse()) }) diff --git a/main.go b/main.go index ff5d0012..a2b1c771 100644 --- a/main.go +++ b/main.go @@ -42,7 +42,7 @@ var ( ).Envar("CF_EXPORTER_CF_DEPLOYMENT_NAME").Required().String() filterCollectors = kingpin.Flag( - "filter.collectors", "Comma separated collectors to filter (Applications,Buildpacks,Events,IsolationSegments,Organizations,Routes,SecurityGroups,ServiceBindings,ServiceInstances,ServicePlans,Services,Spaces,Stacks). If not set, all collectors except Events is enabled ($CF_EXPORTER_FILTER_COLLECTORS)", + "filter.collectors", "Comma separated collectors to filter (Applications,Buildpacks,Events,IsolationSegments,Organizations,Routes,SecurityGroups,ServiceBindings,ServiceInstances,ServicePlans,Services,Spaces,Stacks,Tasks). If not set, all collectors except Events and Tasks are enabled ($CF_EXPORTER_FILTER_COLLECTORS)", ).Envar("CF_EXPORTER_FILTER_COLLECTORS").Default("").String() metricsNamespace = kingpin.Flag( diff --git a/models/model.go b/models/model.go index e0c66063..c59c5bc2 100644 --- a/models/model.go +++ b/models/model.go @@ -16,6 +16,7 @@ type CFObjects struct { SpaceQuotas map[string]Quota `json:"space_quotas"` Apps map[string]Application `json:"apps"` Processes map[string]resources.Process `json:"process"` + Tasks map[string]Task `json:"tasks"` Routes map[string]resources.Route `json:"routes"` RoutesBindings map[string]resources.RouteBinding `json:"route_bindings"` Segments map[string]resources.IsolationSegment `json:"segments"` @@ -91,6 +92,15 @@ type Application struct { UpdatedAt string `json:"updated_at,omitempty"` } +type Task struct { + GUID string `json:"guid,omitempty"` + State constant.TaskState `json:"state,omitempty"` + Relationships resources.Relationships `json:"relationships,omitempty"` + CreatedAt time.Time `json:"created_at,omitempty"` + MemoryInMb int64 `json:"memory_in_mb,omitempty"` + DiskInMb int64 `json:"disk_in_mb,omitempty"` +} + type SpaceSummary struct { GUID string `json:"guid,omitempty"` Name string `json:"name,omitempty"` @@ -146,6 +156,7 @@ func NewCFObjects() *CFObjects { SpaceQuotas: map[string]Quota{}, Apps: map[string]Application{}, Processes: map[string]resources.Process{}, + Tasks: map[string]Task{}, Routes: map[string]resources.Route{}, RoutesBindings: map[string]resources.RouteBinding{}, Segments: map[string]resources.IsolationSegment{},