forked from kubernetes-retired/heapster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
driver.go
286 lines (238 loc) · 8.89 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcl
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/GoogleCloudPlatform/gcloud-golang/compute/metadata"
"github.com/GoogleCloudPlatform/heapster/extpoints"
"github.com/GoogleCloudPlatform/heapster/util/gce"
"github.com/golang/glog"
sink_api "github.com/GoogleCloudPlatform/heapster/sinks/api"
kube_api "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
const (
GCLAuthScope = "https://www.googleapis.com/auth/logging.write"
eventLoggingSeverity = "NOTICE"
eventLoggingServiceName = "custom.googleapis.com"
eventLoggingLogName = "kubernetes.io/events"
logEntriesWriteURLScheme = "https"
logEntriesWriteURLHost = "logging.googleapis.com"
logEntriesWriteURLFormat = "/v1beta3/projects/%s/logs/%s/entries:write"
)
func (sink *gclSink) DebugInfo() string {
return fmt.Sprintf("Sink Type: Google Cloud Logging (GCL). Http Error Count: %v\r\n", sink.httpErrorCount)
}
func (sink *gclSink) Register(metrics []sink_api.MetricDescriptor) error {
// No-op
return nil
}
// Stores metrics into the backend
func (sink *gclSink) StoreTimeseries(input []sink_api.Timeseries) error {
// No-op, Google Cloud Logging (GCL) doesn't store metrics
return nil
}
// Stores events into the backend.
func (sink *gclSink) StoreEvents(events []kube_api.Event) error {
if events == nil || len(events) <= 0 {
return nil
}
glog.V(3).Infof("storing events into GCL sink")
request := sink.createLogsEntriesRequest(events)
err := sink.sendLogsEntriesRequest(request)
glog.V(3).Infof("stored events into GCL sink - %v", err)
return err
}
type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
type gclSink struct {
// Token to use for authentication.
token string
// When the token expires.
tokenExpiration time.Time
// GCE Project ID
projectId string
// HTTP Client
httpClient httpClient
// Number of times an Http Error was encountered (for debugging)
httpErrorCount uint
}
func (sink *gclSink) refreshToken() error {
if time.Now().After(sink.tokenExpiration) {
token, err := gce.GetAuthToken()
if err != nil {
return err
}
// Expire the token a bit early.
const earlyRefreshSeconds = 60
if token.ExpiresIn > earlyRefreshSeconds {
token.ExpiresIn -= earlyRefreshSeconds
}
sink.token = token.AccessToken
sink.tokenExpiration = time.Now().Add(time.Duration(token.ExpiresIn) * time.Second)
}
return nil
}
type LogsEntriesWriteRequest struct {
// Metadata labels that apply to all log entries in this request, so they don't have to be
// repeated in each log entry's metadata.labels field. If any of the log entries contain
// a (key, value) with the same key that is in commonLabels, then the entry's (key, value)
// overrides the one in commonLabels.
CommonLabels map[string]string `json:"commonLabels,omitempty"`
// Entries: Log entries to insert.
Entries []*LogEntry `json:"entries,omitempty"`
}
type LogEntry struct {
// Information about the log entry.
Metadata *LogEntryMetadata `json:"metadata,omitempty"`
// A unique ID for the log entry. If this field is provided and is identical to a previously
// created entry, then the previous instance of this entry is replaced with this one.
InsertId string `json:"insertId,omitempty"`
// The log to which this entry belongs. When a log entry is ingested, the value of this field
// is set by the logging system.
Log string `json:"log,omitempty"`
// The log entry payload, represented by "JSON-like" structured data, in our case, the event.
Payload kube_api.Event `json:"structPayload,omitempty"`
}
type LogEntryMetadata struct {
// The time the event described by the log entry occurred. Timestamps must be later than
// January 1, 1970. Timestamp must be a string following RFC 3339, it must be Z-normalized and
// use 3, 6, or 9 fractional digits depending on required precision.
Timestamp string `json:"timestamp,omitempty"`
// The severity of the log entry. Acceptable values:
// DEFAULT - The log entry has no assigned severity level.
// DEBUG - Debug or trace information.
// INFO - Routine information, such as ongoing status or performance.
// NOTICE - Normal but significant events, such as start up, shut down, or configuration.
// WARNING - Warning events might cause problems.
// ERROR - Error events are likely to cause problems.
// CRITICAL - Critical events cause more severe problems or brief outages.
// ALERT - A person must take an action immediately.
// EMERGENCY - One or more systems are unusable.
Severity string `json:"severity,omitempty"`
// The project ID of the Google Cloud Platform service that created the log entry.
ProjectId string `json:"projectId,omitempty"`
// The API name of the Google Cloud Platform service that created the log entry.
// For example, "compute.googleapis.com" or "custom.googleapis.com".
ServiceName string `json:"serviceName,omitempty"`
// The region name of the Google Cloud Platform service that created the log entry.
// For example, `"us-central1"`.
Region string `json:"region,omitempty"`
// The zone of the Google Cloud Platform service that created the log entry.
// For example, `"us-central1-a"`.
Zone string `json:"zone,omitempty"`
// The fully-qualified email address of the authenticated user that performed or requested
// the action represented by the log entry. If the log entry does not apply to an action
// taken by an authenticated user, then the field should be empty.
UserId string `json:"userId,omitempty"`
// A set of (key, value) data that provides additional information about the log entry.
Labels map[string]string `json:"labels,omitempty"`
}
func (sink *gclSink) createLogsEntriesRequest(events []kube_api.Event) LogsEntriesWriteRequest {
logEntries := make([]*LogEntry, len(events))
for i, event := range events {
logEntries[i] = &LogEntry{
Metadata: &LogEntryMetadata{
Timestamp: event.LastTimestamp.Time.UTC().Format(time.RFC3339),
Severity: eventLoggingSeverity,
ProjectId: sink.projectId,
ServiceName: eventLoggingServiceName,
},
InsertId: string(event.UID),
Payload: event,
}
}
return LogsEntriesWriteRequest{Entries: logEntries}
}
// TODO: Move this to a common lib and share it with GCM implementation.
func (sink *gclSink) sendLogsEntriesRequest(request LogsEntriesWriteRequest) error {
requestBody, err := json.Marshal(request)
if err != nil {
return err
}
url := &url.URL{
Scheme: logEntriesWriteURLScheme,
Host: logEntriesWriteURLHost,
Opaque: fmt.Sprintf(logEntriesWriteURLFormat, sink.projectId, url.QueryEscape(eventLoggingLogName)),
}
req, err := http.NewRequest("POST", url.String(), bytes.NewReader(requestBody))
if err != nil {
return err
}
req.URL = url
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", sink.token))
resp, err := sink.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
out, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
sink.httpErrorCount++
return fmt.Errorf("request to %q failed with status %q and response: %q", url, resp.Status, string(out))
} else {
}
return nil
}
func (self *gclSink) Name() string {
return "Google Cloud Logging Sink"
}
// Returns an implementation of a Google Cloud Logging (GCL) sink.
func new() (sink_api.ExternalSink, error) {
// TODO: Retry OnGCE call for ~15 seconds before declaring failure.
time.Sleep(3 * time.Second)
// Only support GCE for now.
if !metadata.OnGCE() {
return nil, fmt.Errorf("The Google Cloud Logging (GCL) sink failed to start: this process must be running on Google Compute Engine (GCE)")
}
// Detect project ID
projectId, err := metadata.ProjectID()
if err != nil {
return nil, err
}
glog.Infof("Project ID for GCL sink is: %q\r\n", projectId)
// Check for required auth scopes
err = gce.VerifyAuthScope(GCLAuthScope)
if err != nil {
return nil, err
}
impl := &gclSink{
projectId: projectId,
httpClient: &http.Client{},
}
// Get an initial token.
err = impl.refreshToken()
if err != nil {
return nil, err
}
return impl, nil
}
func init() {
extpoints.SinkFactories.Register(CreateGCLSink, "gcl")
}
func CreateGCLSink(_ string, _ map[string][]string) ([]sink_api.ExternalSink, error) {
sink, err := new()
glog.Infof("creating GCL sink")
return []sink_api.ExternalSink{sink}, err
}