Skip to content

Commit

Permalink
add support for carbon pickle protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
DanCech committed Nov 18, 2016
1 parent 78c140e commit 67a381f
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 54 deletions.
246 changes: 192 additions & 54 deletions carbon-relay-ng.go
Expand Up @@ -23,15 +23,19 @@ import (
m20 "github.com/metrics20/go-metrics20/carbon20"
logging "github.com/op/go-logging"
"github.com/rcrowley/goagain"
ogorek "github.com/kisielk/og-rek"
//"runtime"
"strconv"
"strings"
"syscall"
"time"
"encoding/binary"
"bytes"
)

type Config struct {
Listen_addr string
Pickle_addr string
Admin_addr string
Http_addr string
Spool_dir string
Expand All @@ -54,6 +58,8 @@ type instrumentation struct {
Graphite_interval int
}

type Handler func(net.Conn, Config)

var (
instance string
service = "carbon-relay-ng"
Expand Down Expand Up @@ -82,14 +88,14 @@ func init() {

}

func accept(l *net.TCPListener, config Config) {
func accept(l *net.TCPListener, config Config, handler Handler) {
for {
c, err := l.AcceptTCP()
if nil != err {
log.Error(err.Error())
break
}
go handle(c, config)
go handler(c, config)
}
}

Expand Down Expand Up @@ -136,6 +142,113 @@ func handle(c net.Conn, config Config) {
}
}

func handlePickle(c net.Conn, config Config) {
defer c.Close()
// TODO c.SetTimeout(60e9)
r := bufio.NewReaderSize(c, 4096)
ReadLoop:
for {

// Note that everything in this loop should proceed as fast as it can
// so we're not blocked and can keep processing
// so the validation, the pipeline initiated via table.Dispatch(), etc
// must never block.

var length uint32
err := binary.Read(r, binary.BigEndian, &length)
if nil != err {
log.Error("couldn't read payload length: " + err.Error())
break
}

payload := new(bytes.Buffer)
lengthTotal := int(length)
lengthRead := 0
for {
tmpPayload := make([]byte, lengthTotal - lengthRead)
tmpLengthRead, err := r.Read(tmpPayload)
if nil != err {
log.Error("couldn't read payload: " + err.Error())
break ReadLoop
}
payload.Write(tmpPayload)
lengthRead += tmpLengthRead
if lengthRead == lengthTotal {
break
}
if lengthRead >= lengthTotal {
log.Error(fmt.Sprintf("expected to read %d bytes, but read %d", length, lengthRead))
break ReadLoop
}
}

decoder := ogorek.NewDecoder(payload)
decoded, err := decoder.Decode()
if nil != err {
if io.EOF != err {
log.Error("error reading pickled data " + err.Error())
}
break
}

ItemLoop:
for _, item := range decoded.([]interface{}) {
numIn.Inc(1)

metric := item.([]interface{})[0].(string)
data := item.([]interface{})[1].([]interface{})

var value string
switch v := data[1].(type) {
case string:
value = data[1].(string)
case uint8, uint16, uint32, uint64, int8, int16, int32, int64:
value = fmt.Sprintf("%d", data[1])
case float32, float64:
value = fmt.Sprintf("%f", data[1])
default:
log.Error(fmt.Sprintf("Unrecognized type %s for value", v))
numInvalid.Inc(1)
continue ItemLoop
}

var timestamp string
switch v := data[0].(type) {
case string:
timestamp = data[0].(string)
case uint8, uint16, uint32, uint64, int8, int16, int32, int64:
timestamp = fmt.Sprintf("%d", data[0])
case float32, float64:
timestamp = fmt.Sprintf("%.0f", data[0])
default:
log.Error(fmt.Sprintf("Unrecognized type %s for timestamp", v))
numInvalid.Inc(1)
continue ItemLoop
}

buf := []byte(metric + " " + value + " " + timestamp)

key, _, ts, err := m20.ValidatePacket(buf, config.Validation_level_legacy.Level, config.Validation_level_m20.Level)
if err != nil {
badMetrics.Add(key, buf, err)
numInvalid.Inc(1)
continue
}

if config.Validate_order {
err = validate.Ordered(key, ts)
if err != nil {
badMetrics.Add(key, buf, err)
numOutOfOrder.Inc(1)
continue
}
}

table.Dispatch(buf)
}
}
}

func usage() {
fmt.Fprintln(
os.Stderr,
Expand All @@ -144,6 +257,49 @@ func usage() {
flag.PrintDefaults()
}

func listen(config Config, addr string, handler Handler) (net.Listener, error) {
// Follow the goagain protocol, <https://github.com/rcrowley/goagain>.
l, ppid, err := goagain.GetEnvs()
if nil != err {
laddr, err := net.ResolveTCPAddr("tcp", addr)
if nil != err {
return nil, err
}
l, err = net.ListenTCP("tcp", laddr)
if nil != err {
return nil, err
}
log.Notice("listening on %v/tcp", laddr)
go accept(l.(*net.TCPListener), config, handler)
} else {
log.Notice("resuming listening on %v/tcp", l.Addr())
go accept(l.(*net.TCPListener), config, handler)
if err := goagain.KillParent(ppid); nil != err {
return nil, err
}
for {
err := syscall.Kill(ppid, 0)
if err != nil {
break
}
time.Sleep(10 * time.Millisecond)
}
}

udp_addr, err := net.ResolveUDPAddr("udp", addr)
if nil != err {
return nil, err
}
udp_conn, err := net.ListenUDP("udp", udp_addr)
if nil != err {
return nil, err
}
log.Notice("listening on %v/udp", udp_addr)
go handler(udp_conn, config)

return l, nil
}

func main() {

flag.Usage = usage
Expand Down Expand Up @@ -204,6 +360,20 @@ func main() {

log.Notice("===== carbon-relay-ng instance '%s' starting. =====\n", instance)

if config.Pid_file != "" {
f, err := os.Create(config.Pid_file)
if err != nil {
fmt.Println("error creating pidfile:", err.Error())
os.Exit(1)
}
_, err = f.Write([]byte(strconv.Itoa(os.Getpid())))
if err != nil {
fmt.Println("error writing to pidfile:", err.Error())
os.Exit(1)
}
f.Close()
}

numIn = Counter("unit=Metric.direction=in")
numInvalid = Counter("unit=Err.type=invalid")
numOutOfOrder = Counter("unit=Err.type=out_of_order")
Expand Down Expand Up @@ -242,63 +412,22 @@ func main() {
log.Notice(line)
}

// Follow the goagain protocol, <https://github.com/rcrowley/goagain>.
l, ppid, err := goagain.GetEnvs()
if nil != err {
laddr, err := net.ResolveTCPAddr("tcp", config.Listen_addr)
var l net.Listener
if config.Listen_addr != "" {
l, err = listen(config, config.Listen_addr, handle)
if nil != err {
log.Error(err.Error())
os.Exit(1)
}
l, err = net.ListenTCP("tcp", laddr)
if nil != err {
log.Error(err.Error())

os.Exit(1)
}
log.Notice("listening on %v/tcp", laddr)
go accept(l.(*net.TCPListener), config)
} else {
log.Notice("resuming listening on %v/tcp", l.Addr())
go accept(l.(*net.TCPListener), config)
if err := goagain.KillParent(ppid); nil != err {
log.Error(err.Error())
os.Exit(1)
}
for {
err := syscall.Kill(ppid, 0)
if err != nil {
break
}
time.Sleep(10 * time.Millisecond)
}
}

udp_addr, err := net.ResolveUDPAddr("udp", config.Listen_addr)
if nil != err {
log.Error(err.Error())
os.Exit(1)
}
udp_conn, err := net.ListenUDP("udp", udp_addr)
if nil != err {
log.Error(err.Error())
os.Exit(1)
}
log.Notice("listening on %v/udp", udp_addr)
go handle(udp_conn, config)

if config.Pid_file != "" {
f, err := os.Create(config.Pid_file)
if err != nil {
fmt.Println("error creating pidfile:", err.Error())
os.Exit(1)
}
_, err = f.Write([]byte(strconv.Itoa(os.Getpid())))
if err != nil {
fmt.Println("error writing to pidfile:", err.Error())
var lp net.Listener
if config.Pickle_addr != "" {
lp, err = listen(config, config.Pickle_addr, handlePickle)
if nil != err {
log.Error(err.Error())
os.Exit(1)
}
f.Close()
}

if config.Admin_addr != "" {
Expand All @@ -315,8 +444,17 @@ func main() {
go HttpListener(config.Http_addr, table)
}

if err := goagain.AwaitSignals(l); nil != err {
log.Error(err.Error())
os.Exit(1)
if nil != l {
if err := goagain.AwaitSignals(l); nil != err {
log.Error(err.Error())
os.Exit(1)
}
}

if nil != lp {
if err := goagain.AwaitSignals(lp); nil != err {
log.Error(err.Error())
os.Exit(1)
}
}
}
1 change: 1 addition & 0 deletions examples/carbon-relay-ng.ini
Expand Up @@ -3,6 +3,7 @@ instance = "default"
max_procs = 2

listen_addr = "0.0.0.0:2003"
pickle_addr = "0.0.0.0:2013"
admin_addr = "0.0.0.0:2004"
http_addr = "0.0.0.0:8081"
#spool_dir = "/var/spool/carbon-relay-ng"
Expand Down

0 comments on commit 67a381f

Please sign in to comment.