forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphite.go
144 lines (118 loc) · 3.73 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package graphite
import (
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/influxdb/influxdb"
)
const (
// DefaultGraphitePort represents the default Graphite (Carbon) plaintext port.
DefaultGraphitePort = 2003
// DefaultGraphiteNameSeparator represents the default Graphite field separator.
DefaultGraphiteNameSeparator = "."
)
var (
// ErrBindAddressRequired is returned when starting the Server
// without a TCP or UDP listening address.
ErrBindAddressRequired = errors.New("bind address required")
// ErrServerClosed return when closing an already closed graphite server.
ErrServerClosed = errors.New("server already closed")
// ErrServerNotSpecified returned when Server is not specified.
ErrServerNotSpecified = errors.New("server not present")
)
// SeriesWriter defines the interface for the destination of the data.
type SeriesWriter interface {
WriteSeries(string, string, []influxdb.Point) (uint64, error)
}
// Server defines the interface all Graphite servers support.
type Server interface {
SetLogOutput(w io.Writer)
ListenAndServe(iface string) error
}
// NewServer return a Graphite server for the given protocol, using the given parser
// series writer, and database.
func NewServer(protocol string, p *Parser, s SeriesWriter, db string) (Server, error) {
if strings.ToLower(protocol) == "tcp" {
return NewTCPServer(p, s, db), nil
} else if strings.ToLower(protocol) == "udp" {
return NewUDPServer(p, s, db), nil
} else {
return nil, fmt.Errorf("unrecognized Graphite Server protocol %s", protocol)
}
}
// Parser encapulates a Graphite Parser.
type Parser struct {
Separator string
LastEnabled bool
}
// NewParser returns a GraphiteParser instance.
func NewParser() *Parser {
return &Parser{Separator: DefaultGraphiteNameSeparator}
}
// Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (influxdb.Point, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 3 {
return influxdb.Point{}, fmt.Errorf("received %q which doesn't have three fields", line)
}
// decode the name and tags
name, tags, err := p.DecodeNameAndTags(fields[0])
if err != nil {
return influxdb.Point{}, err
}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return influxdb.Point{}, err
}
fieldValues := make(map[string]interface{})
fieldValues[name] = v
// Parse timestamp.
unixTime, err := strconv.ParseInt(fields[2], 10, 64)
if err != nil {
return influxdb.Point{}, err
}
timestamp := time.Unix(0, unixTime*int64(time.Millisecond))
point := influxdb.Point{
Name: name,
Tags: tags,
Fields: fieldValues,
Timestamp: timestamp,
}
return point, nil
}
// DecodeNameAndTags parses the name and tags of a single field of a Graphite datum.
func (p *Parser) DecodeNameAndTags(field string) (string, map[string]string, error) {
var (
name string
tags = make(map[string]string)
)
// decode the name and tags
values := strings.Split(field, p.Separator)
if len(values)%2 != 1 {
// There should always be an odd number of fields to map a point name and tags
// ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, point name -> cpu
return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.name or name", field)
}
if p.LastEnabled {
name = values[len(values)-1]
values = values[0 : len(values)-1]
} else {
name = values[0]
values = values[1:]
}
if name == "" {
return name, tags, fmt.Errorf("no name specified for metric. %q", field)
}
// Grab the pairs and throw them in the map
for i := 0; i < len(values); i += 2 {
k := values[i]
v := values[i+1]
tags[k] = v
}
return name, tags, nil
}