forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis.go
119 lines (110 loc) · 3.46 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"encoding/json"
redis "github.com/garyburd/redigo/redis"
info "github.com/google/cadvisor/info/v1"
storage "github.com/google/cadvisor/storage"
"sync"
"time"
)
type redisStorage struct {
conn redis.Conn
machineName string
redisKey string
bufferDuration time.Duration
lastWrite time.Time
lock sync.Mutex
readyToFlush func() bool
}
type detailSpec struct {
Timestamp int64 `json:"timestamp"`
MachineName string `json:"machine_name,omitempty"`
ContainerName string `json:"container_Name,omitempty"`
ContainerStats *info.ContainerStats `json:"container_stats,omitempty"`
}
func (self *redisStorage) defaultReadyToFlush() bool {
return time.Since(self.lastWrite) >= self.bufferDuration
}
//We must add some defaut params (for example: MachineName,ContainerName...)because containerStats do not include them
func (self *redisStorage) containerStatsAndDefautValues(ref info.ContainerReference, stats *info.ContainerStats) *detailSpec {
timestamp := stats.Timestamp.UnixNano() / 1E3
var containerName string
if len(ref.Aliases) > 0 {
containerName = ref.Aliases[0]
} else {
containerName = ref.Name
}
detail := &detailSpec{
Timestamp: timestamp,
MachineName: self.machineName,
ContainerName: containerName,
ContainerStats: stats,
}
return detail
}
//Push the data into redis
func (self *redisStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
if stats == nil {
return nil
}
var seriesToFlush []byte
func() {
// AddStats will be invoked simultaneously from multiple threads and only one of them will perform a write.
self.lock.Lock()
defer self.lock.Unlock()
// Add some defaut params based on containerStats
detail := self.containerStatsAndDefautValues(ref, stats)
//To json
b, _ := json.Marshal(detail)
if self.readyToFlush() {
seriesToFlush = b
b = nil
self.lastWrite = time.Now()
}
}()
if len(seriesToFlush) > 0 {
//We use redis's "LPUSH" to push the data to the redis
self.conn.Send("LPUSH", self.redisKey, seriesToFlush)
}
return nil
}
func (self *redisStorage) Close() error {
return self.conn.Close()
}
// Create a new redis storage driver.
// machineName: A unique identifier to identify the host that runs the current cAdvisor
// instance is running on.
// redisHost: The host which runs redis.
// redisKey: The key for the Data that stored in the redis
func New(machineName,
redisKey,
redisHost string,
bufferDuration time.Duration,
) (storage.StorageDriver, error) {
conn, err := redis.Dial("tcp", redisHost)
if err != nil {
return nil, err
}
ret := &redisStorage{
conn: conn,
machineName: machineName,
redisKey: redisKey,
bufferDuration: bufferDuration,
lastWrite: time.Now(),
}
ret.readyToFlush = ret.defaultReadyToFlush
return ret, nil
}