Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ init: tools update-generated $(BIN) vendor
.PHONY: tools
tools: update-vendor
@echo ">> Fetching golangci-lint linter"
@GOBIN=$(GOPATH)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.40.0
@go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.42.1
@echo ">> Fetching goimports"
@GOBIN=$(GOPATH)/bin go get golang.org/x/tools/cmd/goimports@0bb7e5c47b1a31f85d4f173edc878a8e049764a5
@echo ">> Fetching license check"
Expand Down
17 changes: 17 additions & 0 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
package main

import (
"context"
"io/ioutil"
"os"
"os/signal"
"syscall"
"time"

"github.com/arangodb/kube-arangodb/pkg/exporter"
"github.com/arangodb/kube-arangodb/pkg/util"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -100,6 +102,21 @@ func cmdExporterCheckE() error {
return err
}

mon := exporter.NewMonitor(exporterInput.endpoint, func() (string, error) {
if exporterInput.jwtFile == "" {
return "", nil
}

data, err := ioutil.ReadFile(exporterInput.jwtFile)
if err != nil {
return "", err
}

return string(data), nil
}, false, 15*time.Second)

go mon.UpdateMonitorStatus(util.CreateSignalContext(context.Background()))

exporter := exporter.NewExporter(exporterInput.listenAddress, "/metrics", p)
if exporterInput.keyfile != "" {
if e, err := exporter.WithKeyfile(exporterInput.keyfile); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/deployment/v1/license_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s LicenseSpec) Validate() error {
}

// SetDefaultsFrom fills all values not set in s with values from other
func (s LicenseSpec) SetDefaultsFrom(other LicenseSpec) {
func (s *LicenseSpec) SetDefaultsFrom(other LicenseSpec) {
if !s.HasSecretName() {
s.SecretName = util.NewStringOrNil(other.SecretName)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/deployment/v2alpha1/license_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s LicenseSpec) Validate() error {
}

// SetDefaultsFrom fills all values not set in s with values from other
func (s LicenseSpec) SetDefaultsFrom(other LicenseSpec) {
func (s *LicenseSpec) SetDefaultsFrom(other LicenseSpec) {
if !s.HasSecretName() {
s.SecretName = util.NewStringOrNil(other.SecretName)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/deployment/resources/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,11 @@ func AppendKeyfileToKeyfolder(ctx context.Context, cachedStatus inspectorInterfa

var (
exporterTokenClaims = jg.MapClaims{
"iss": "arangodb",
"server_id": "exporter",
"allowed_paths": []interface{}{"/_admin/statistics", "/_admin/statistics-description", k8sutil.ArangoExporterInternalEndpoint, k8sutil.ArangoExporterInternalEndpointV2},
"iss": "arangodb",
"server_id": "exporter",
"allowed_paths": []interface{}{"/_admin/statistics", "/_admin/statistics-description",
k8sutil.ArangoExporterInternalEndpoint, k8sutil.ArangoExporterInternalEndpointV2,
k8sutil.ArangoExporterStatusEndpoint, k8sutil.ArangoExporterClusterHealthEndpoint},
}
)

Expand Down
165 changes: 165 additions & 0 deletions pkg/exporter/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Jakub Wierzbowski
//

package exporter

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
"path"
"strings"
"sync/atomic"
"time"

"github.com/arangodb/go-driver"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"

"github.com/rs/zerolog/log"
)

const (
monitorMetricTemplate = "arangodb_member_health{role=\"%s\",id=\"%s\"} %d \n"
successRefreshInterval = time.Second * 120
failRefreshInterval = time.Second * 15
)

var currentMembersStatus atomic.Value

func NewMonitor(arangodbEndpoint string, auth Authentication, sslVerify bool, timeout time.Duration) *monitor {
uri, err := setPath(arangodbEndpoint, k8sutil.ArangoExporterClusterHealthEndpoint)
if err != nil {
log.Error().Err(err).Msgf("Fatal")
os.Exit(1)
}

return &monitor{
factory: newHttpClientFactory(arangodbEndpoint, auth, sslVerify, timeout),
healthURI: uri,
}
}

type monitor struct {
factory httpClientFactory
healthURI *url.URL
}

// UpdateMonitorStatus load monitor metrics for current cluster into currentMembersStatus
func (m monitor) UpdateMonitorStatus(ctx context.Context) {
for {
sleep := successRefreshInterval

health, err := m.GetClusterHealth()
if err != nil {
log.Error().Err(err).Msg("GetClusterHealth error")
sleep = failRefreshInterval
} else {
var output strings.Builder
for key, value := range health.Health {
entry, err := m.GetMemberStatus(key, value)
if err != nil {
log.Error().Err(err).Msg("GetMemberStatus error")
sleep = failRefreshInterval
}
output.WriteString(entry)
}
currentMembersStatus.Store(output.String())
}

select {
case <-ctx.Done():
return
case <-time.After(sleep):
continue
}
}
}

// GetClusterHealth returns current ArangoDeployment cluster health status
func (m monitor) GetClusterHealth() (*driver.ClusterHealth, error) {
c, req, err := m.factory()
if err != nil {
return nil, err
}
req.URL = m.healthURI
resp, err := c.Do(req)
if err != nil {
return nil, err
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var result driver.ClusterHealth
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}

return &result, err
}

// GetMemberStatus returns Prometheus monitor metric for specific member
func (m monitor) GetMemberStatus(id driver.ServerID, member driver.ServerHealth) (string, error) {
result := fmt.Sprintf(monitorMetricTemplate, member.Role, id, 0)

c, req, err := m.factory()
if err != nil {
return result, err
}

req.URL, err = setPath(member.Endpoint, k8sutil.ArangoExporterStatusEndpoint)
if err != nil {
return result, err
}

resp, err := c.Do(req)
if err != nil {
return result, err
}

if resp.StatusCode != 200 {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return result, err
}
return result, errors.New(string(body))
}
return fmt.Sprintf(monitorMetricTemplate, member.Role, id, 1), nil
}

func setPath(uri, uriPath string) (*url.URL, error) {
u, err := url.Parse(uri)
if err != nil {
return u, err
}
u.Path = path.Join(uriPath)
u.Scheme = "https"
return u, nil
}
6 changes: 6 additions & 0 deletions pkg/exporter/passthru.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ func (p passthru) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
// Fix Header response
responseStr = strings.ReplaceAll(responseStr, "guage", "gauge")

// Attach monitor data
monitorData := currentMembersStatus.Load()
if monitorData != nil {
responseStr = responseStr + monitorData.(string)
}

resp.WriteHeader(data.StatusCode)
_, err = resp.Write([]byte(responseStr))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/provisioner/service/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ func (p *Provisioner) GetInfo(ctx context.Context, localPath string) (provisione
}

// Available is blocks available * fragment size
available := int64(statfs.Bavail) * statfs.Bsize
available := int64(statfs.Bavail) * statfs.Bsize // nolint:typecheck

// Capacity is total block count * fragment size
capacity := int64(statfs.Blocks) * statfs.Bsize
capacity := int64(statfs.Blocks) * statfs.Bsize // nolint:typecheck

log.Debug().
Str("node-name", p.NodeName).
Expand Down
8 changes: 5 additions & 3 deletions pkg/util/k8sutil/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ const (
ArangoSyncWorkerPort = 8729
ArangoExporterPort = 9101

ArangoExporterInternalEndpoint = "/_admin/metrics"
ArangoExporterInternalEndpointV2 = "/_admin/metrics/v2"
ArangoExporterDefaultEndpoint = "/metrics"
ArangoExporterStatusEndpoint = "/_api/version"
ArangoExporterClusterHealthEndpoint = "/_admin/cluster/health"
ArangoExporterInternalEndpoint = "/_admin/metrics"
ArangoExporterInternalEndpointV2 = "/_admin/metrics/v2"
ArangoExporterDefaultEndpoint = "/metrics"

// K8s constants
ClusterIPNone = "None"
Expand Down
56 changes: 56 additions & 0 deletions pkg/util/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Jakub Wierzbowski
//

package util

import (
"context"
"os"
"os/signal"
"syscall"
)

// CreateSignalContext creates and returns the context which is closed when one of the provided signal occurs.
// If the provided list of signals is empty, then SIGINT and SIGTERM is used by default.
func CreateSignalContext(ctx context.Context, signals ...os.Signal) context.Context {
if ctx == nil {
ctx = context.Background()
}
ctxSignal, cancelSignal := context.WithCancel(ctx)
sigChannel := make(chan os.Signal, 2)

if len(signals) > 0 {
signal.Notify(sigChannel, signals...)
} else {
signal.Notify(sigChannel, os.Interrupt, syscall.SIGTERM)
}

go func() {
// Wait until signal occurs.
<-sigChannel
close(sigChannel)
// Close the context which is used by the caller.
cancelSignal()
}()

return ctxSignal
}