Skip to content

Commit

Permalink
Merge pull request #798 from jmaitrehenry/master
Browse files Browse the repository at this point in the history
Update statsd storage - issue #724
  • Loading branch information
rjnagal committed Jul 28, 2015
2 parents ef41402 + 1e35331 commit 843fc13
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 25 deletions.
41 changes: 16 additions & 25 deletions storage/statsd/client.go → storage/statsd/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package statsd
package client

import (
"fmt"
Expand All @@ -22,8 +22,9 @@ import (
)

type Client struct {
HostPort string
conn net.Conn
HostPort string
Namespace string
conn net.Conn
}

func (self *Client) Open() error {
Expand All @@ -36,38 +37,28 @@ func (self *Client) Open() error {
return nil
}

func (self *Client) Close() {
func (self *Client) Close() error {
self.conn.Close()
}

func (self *Client) UpdateGauge(name, value string) error {
stats := make(map[string]string)
val := fmt.Sprintf("%s|g", value)
stats[name] = val
if err := self.send(stats); err != nil {
return err
}
self.conn = nil
return nil
}

// Simple send to statsd daemon without sampling.
func (self *Client) send(data map[string]string) error {
for k, v := range data {
formatted := fmt.Sprintf("%s:%s", k, v)
_, err := fmt.Fprintf(self.conn, formatted)
if err != nil {
glog.V(3).Infof("failed to send data %q: %v", formatted, err)
// return on first error.
return err
}
func (self *Client) Send(namespace, containerName, key string, value uint64) error {
// only send counter value
formatted := fmt.Sprintf("%s.%s.%s:%d|g", namespace, containerName, key, value)
_, err := fmt.Fprintf(self.conn, formatted)
if err != nil {
glog.V(3).Infof("failed to send data %q: %v", formatted, err)
return err
}
return nil
}

func New(hostPort string) (*Client, error) {
client := Client{HostPort: hostPort}
if err := client.Open(); err != nil {
Client := Client{HostPort: hostPort}
if err := Client.Open(); err != nil {
return nil, err
}
return &client, nil
return &Client, nil
}
127 changes: 127 additions & 0 deletions storage/statsd/statsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statsd

import (
info "github.com/google/cadvisor/info/v1"
client "github.com/google/cadvisor/storage/statsd/client"
)

type statsdStorage struct {
client *client.Client
Namespace string
}

const (
colCpuCumulativeUsage string = "cpu_cumulative_usage"
// Memory Usage
colMemoryUsage string = "memory_usage"
// Working set size
colMemoryWorkingSet string = "memory_working_set"
// Cumulative count of bytes received.
colRxBytes string = "rx_bytes"
// Cumulative count of receive errors encountered.
colRxErrors string = "rx_errors"
// Cumulative count of bytes transmitted.
colTxBytes string = "tx_bytes"
// Cumulative count of transmit errors encountered.
colTxErrors string = "tx_errors"
// Filesystem summary
colFsSummary = "fs_summary"
// Filesystem limit.
colFsLimit = "fs_limit"
// Filesystem usage.
colFsUsage = "fs_usage"
)

func (self *statsdStorage) containerStatsToValues(
stats *info.ContainerStats,
) (series map[string]uint64) {
series = make(map[string]uint64)

// Cumulative Cpu Usage
series[colCpuCumulativeUsage] = stats.Cpu.Usage.Total

// Memory Usage
series[colMemoryUsage] = stats.Memory.Usage

// Working set size
series[colMemoryWorkingSet] = stats.Memory.WorkingSet

// Network stats.
series[colRxBytes] = stats.Network.RxBytes
series[colRxErrors] = stats.Network.RxErrors
series[colTxBytes] = stats.Network.TxBytes
series[colTxErrors] = stats.Network.TxErrors

return series
}

func (self *statsdStorage) containerFsStatsToValues(
series *map[string]uint64,
stats *info.ContainerStats,
) {
for _, fsStat := range stats.Filesystem {
// Summary stats.
(*series)[colFsSummary+"."+colFsLimit] += fsStat.Limit
(*series)[colFsSummary+"."+colFsUsage] += fsStat.Usage

// Per device stats.
(*series)[fsStat.Device+"."+colFsLimit] = fsStat.Limit
(*series)[fsStat.Device+"."+colFsUsage] = fsStat.Usage
}
}

//Push the data into redis
func (self *statsdStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
if stats == nil {
return nil
}

var containerName string
if len(ref.Aliases) > 0 {
containerName = ref.Aliases[0]
} else {
containerName = ref.Name
}

series := self.containerStatsToValues(stats)
self.containerFsStatsToValues(&series, stats)
for key, value := range series {
err := self.client.Send(self.Namespace, containerName, key, value)
if err != nil {
return err
}
}
return nil
}

func (self *statsdStorage) Close() error {
self.client.Close()
self.client = nil
return nil
}

func New(namespace, hostPort string) (*statsdStorage, error) {
statsdClient, err := client.New(hostPort)
if err != nil {
return nil, err
}
statsdStorage := &statsdStorage{
client: statsdClient,
Namespace: namespace,
}
return statsdStorage, nil
}
6 changes: 6 additions & 0 deletions storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/google/cadvisor/storage/bigquery"
"github.com/google/cadvisor/storage/influxdb"
"github.com/google/cadvisor/storage/redis"
"github.com/google/cadvisor/storage/statsd"
)

var argDbUsername = flag.String("storage_driver_user", "root", "database username")
Expand Down Expand Up @@ -88,6 +89,11 @@ func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error)
*argDbHost,
*argDbBufferDuration,
)
case "statsd":
backendStorage, err = statsd.New(
*argDbName,
*argDbHost,
)
default:
err = fmt.Errorf("unknown backend storage driver: %v", *argDbDriver)
}
Expand Down

0 comments on commit 843fc13

Please sign in to comment.