-
Notifications
You must be signed in to change notification settings - Fork 90
/
prometheusmetric.go
302 lines (261 loc) · 9.56 KB
/
prometheusmetric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package prestostore
import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/operator-framework/operator-metering/pkg/db"
"github.com/operator-framework/operator-metering/pkg/hive"
"github.com/operator-framework/operator-metering/pkg/operator/reportingutil"
"github.com/operator-framework/operator-metering/pkg/presto"
)
const (
// defaultPrestoQueryCap is the default maximum payload size a single SQL
// statement can contain before Presto will error due to the payload being
// too large.
defaultPrestoQueryCap = 1000000
amountColumnName = "amount"
timestampColumnName = "timestamp"
timePrecisionColumnName = "timePrecision"
labelsColumnName = "labels"
dtColumnName = "dt"
)
var (
defaultQueryBufferPool = NewBufferPool(defaultPrestoQueryCap)
PrometheusMetricHiveTableColumns = []hive.Column{
{Name: amountColumnName, Type: "double"},
{Name: timestampColumnName, Type: "timestamp"},
{Name: timePrecisionColumnName, Type: "double"},
{Name: labelsColumnName, Type: "map<string, string>"},
}
PrometheusMetricHivePartitionColumns = []hive.Column{
{Name: dtColumnName, Type: "string"},
}
// Initialized by init()
PrometheusMetricPrestoTableColumn, PrometheusMetricPrestoPartitionColumns, PrometheusMetricPrestoAllColumns []presto.Column
)
func init() {
var err error
PrometheusMetricPrestoTableColumn, err = reportingutil.HiveColumnsToPrestoColumns(PrometheusMetricHiveTableColumns)
if err != nil {
panic(err)
}
PrometheusMetricPrestoPartitionColumns, err = reportingutil.HiveColumnsToPrestoColumns(PrometheusMetricHivePartitionColumns)
if err != nil {
panic(err)
}
PrometheusMetricPrestoAllColumns = append(PrometheusMetricPrestoTableColumn, PrometheusMetricPrestoPartitionColumns...)
}
func NewBufferPool(capacity int) sync.Pool {
return sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, capacity))
},
}
}
type PrometheusMetricsStorer interface {
StorePrometheusMetrics(ctx context.Context, tableName string, metrics []*PrometheusMetric) error
}
type PrometheusMetricsGetter interface {
GetPrometheusMetrics(tableName string, start, end time.Time) ([]*PrometheusMetric, error)
}
type PrometheusMetricTimestampTracker interface {
GetLastTimestampForTable(tableName string) (*time.Time, error)
}
type PrometheusMetricsRepo interface {
PrometheusMetricsGetter
PrometheusMetricsStorer
PrometheusMetricTimestampTracker
}
type prometheusMetricRepo struct {
queryer db.Queryer
queryBufferPool *sync.Pool
}
func NewPrometheusMetricsRepo(queryer db.Queryer, queryBufferPool *sync.Pool) *prometheusMetricRepo {
if queryBufferPool == nil {
queryBufferPool = &defaultQueryBufferPool
}
return &prometheusMetricRepo{
queryer: queryer,
queryBufferPool: queryBufferPool,
}
}
func (r *prometheusMetricRepo) StorePrometheusMetrics(ctx context.Context, tableName string, metrics []*PrometheusMetric) error {
queryBuf := r.queryBufferPool.Get().(*bytes.Buffer)
queryBuf.Reset()
defer r.queryBufferPool.Put(queryBuf)
return StorePrometheusMetricsWithBuffer(queryBuf, ctx, r.queryer, tableName, metrics)
}
func (r *prometheusMetricRepo) GetPrometheusMetrics(tableName string, start, end time.Time) ([]*PrometheusMetric, error) {
return GetPrometheusMetrics(r.queryer, tableName, start, end)
}
func (r *prometheusMetricRepo) GetLastTimestampForTable(tableName string) (*time.Time, error) {
// Get the most recent timestamp in the table for this query
getLastTimestampQuery := fmt.Sprintf(`
SELECT "timestamp"
FROM %s
ORDER BY "timestamp" DESC
LIMIT 1`, tableName)
results, err := presto.ExecuteSelect(r.queryer, getLastTimestampQuery)
if err != nil {
return nil, fmt.Errorf("error getting last timestamp for table %s, maybe table doesn't exist yet? %v", tableName, err)
}
if len(results) != 0 {
ts := results[0]["timestamp"].(time.Time)
return &ts, nil
}
return nil, nil
}
// PrometheusMetric is a receipt of a usage determined by a query within a specific time range.
type PrometheusMetric struct {
Labels map[string]string `json:"labels"`
Amount float64 `json:"amount"`
StepSize time.Duration `json:"stepSize"`
Timestamp time.Time `json:"timestamp"`
Dt string `json:"dt"`
}
// storePrometheusMetricsWithBuffer handles storing Prometheus metrics into the
// specified Presto table.
func StorePrometheusMetricsWithBuffer(queryBuf *bytes.Buffer, ctx context.Context, queryer db.Queryer, tableName string, metrics []*PrometheusMetric) error {
bufferCapacity := queryBuf.Cap()
insertStatementLength := len(presto.FormatInsertQuery(tableName, ""))
// calculate the queryCap with the "INSERT INTO $table_name" portion
// accounted for
queryCap := bufferCapacity - insertStatementLength
// account for "," and "VALUES " string length when writing to buffer
commaStr := ","
valuesStmtStr := "VALUES "
metricsInBuffer := false
numMetrics := len(metrics)
for i, metric := range metrics {
select {
case <-ctx.Done():
return ctx.Err()
default:
// continue processing if context isn't cancelled.
}
metricSQLStr := generatePrometheusMetricSQLValues(metric)
// lastMetric means we need to insert after writing the metric to the
// buffer
lastMetric := i == (numMetrics - 1)
if metricsInBuffer {
// if writing the current metricSQLStr to the buffer would exceed the
// bufferCapacity, perform the insert query, and reset the buffer
// to flush it
bytesToWrite := len(commaStr + metricSQLStr)
if (bytesToWrite + queryBuf.Len()) > queryCap {
err := presto.InsertInto(queryer, tableName, queryBuf.String())
if err != nil {
return fmt.Errorf("failed to store metrics into presto: %v", err)
}
queryBuf.Reset()
// we just inserted the contents of the buffer, so reset
// metricsInBuffer and prepend VALUES
metricsInBuffer = false
}
}
var toWrite string
if !metricsInBuffer {
// no metrics in buffer means we need to prepend "VALUES " before
// we write metricSQL
toWrite = valuesStmtStr + metricSQLStr
} else {
// existing metrics in buffer means we need to prepend "," before
// we write metricSQL since that separates each record
toWrite = commaStr + metricSQLStr
}
bytesToWrite := len(toWrite)
if (bytesToWrite + queryBuf.Len()) > queryCap {
return fmt.Errorf("writing %q would exceed buffer size, please adjust buffer size: bufferCapacityBytes: %d, queryCapacityBytes: %d, currentBufferSize: %d bytesToWrite: %d", toWrite, bufferCapacity, queryCap, queryBuf.Len(), bytesToWrite)
}
_, err := queryBuf.WriteString(toWrite)
if err != nil {
return fmt.Errorf(`error writing %q string to buffer: %v`, toWrite, err)
}
metricsInBuffer = true
// this is the last metric in the loop, insert the contents of the
// buffer
if lastMetric {
err := presto.InsertInto(queryer, tableName, queryBuf.String())
if err != nil {
return fmt.Errorf("failed to store metrics into presto: %v", err)
}
queryBuf.Reset()
}
}
return nil
}
// generatePrometheusMetricSQLValues turns a PrometheusMetric into a SQL literal
// suited for INSERT statements. To insert maps, we crete an array of keys and
// values as recommended by Presto documentation.
//
// The schema is as follows:
// column "amount" type: "double"
// column "timestamp" type: "timestamp"
// column "timePrecision" type: "double"
// column "labels" type: "map<string, string>"
// the following columns are partition columns:
// column "dt" type: "string"
func generatePrometheusMetricSQLValues(metric *PrometheusMetric) string {
var keys []string
var vals []string
for k, v := range metric.Labels {
keys = append(keys, "'"+k+"'")
vals = append(vals, "'"+v+"'")
}
keyString := "ARRAY[" + strings.Join(keys, ",") + "]"
valString := "ARRAY[" + strings.Join(vals, ",") + "]"
dt := PrometheusMetricTimestampPartition(metric.Timestamp)
return fmt.Sprintf("(%f,timestamp '%s',%f,map(%s,%s),'%s')",
metric.Amount, metric.Timestamp.Format(presto.TimestampFormat), metric.StepSize.Seconds(), keyString, valString, dt,
)
}
const PrometheusMetricTimestampPartitionFormat = "2006-01-02"
func PrometheusMetricTimestampPartition(t time.Time) string {
return t.UTC().Format(PrometheusMetricTimestampPartitionFormat)
}
func GetPrometheusMetrics(queryer db.Queryer, tableName string, start, end time.Time) ([]*PrometheusMetric, error) {
whereClause := ""
if !start.IsZero() {
whereClause += fmt.Sprintf(`WHERE "timestamp" >= timestamp '%s' `, start.Format(presto.TimestampFormat))
}
if !end.IsZero() {
if !start.IsZero() {
whereClause += " AND "
} else {
whereClause += " WHERE "
}
whereClause += fmt.Sprintf(`"timestamp" <= timestamp '%s'`, end.Format(presto.TimestampFormat))
}
rows, err := presto.GetRowsWhere(queryer, tableName, PrometheusMetricPrestoAllColumns, whereClause)
if err != nil {
return nil, err
}
results := make([]*PrometheusMetric, len(rows))
for i, row := range rows {
rowLabels := row[labelsColumnName].(map[string]interface{})
rowAmount := row[amountColumnName].(float64)
rowTimePrecision := row[timePrecisionColumnName].(float64)
rowTimestamp := row[timestampColumnName].(time.Time)
dt := row[dtColumnName].(string)
labels := make(map[string]string)
for key, value := range rowLabels {
var ok bool
labels[key], ok = value.(string)
if !ok {
return nil, fmt.Errorf("invalid label %s, valueType: %T, value: %+v", key, value, value)
}
}
metric := &PrometheusMetric{
Labels: labels,
Amount: rowAmount,
StepSize: time.Duration(rowTimePrecision) * time.Second,
Timestamp: rowTimestamp,
Dt: dt,
}
results[i] = metric
}
return results, nil
}