Skip to content

Commit

Permalink
fall back to protobuf2 because of Rieamnn issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Novgorodov Igor, PMK-TV-OP authored and Novgorodov Igor, PMK-TV-OP committed Jun 12, 2019
1 parent 237ab5b commit 95f3f4b
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 160 deletions.
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
1.4.3
1.4.4
22 changes: 12 additions & 10 deletions event.go
Expand Up @@ -3,6 +3,8 @@ package main
import (
"encoding/json"
"time"

pb "github.com/golang/protobuf/proto"
)

type attributeJSON struct {
Expand All @@ -29,25 +31,25 @@ func eventFromJSON(msg []byte) (ev *Event, err error) {
}

ev = &Event{
Host: evJS.Host,
Service: evJS.Service,
State: evJS.State,
Description: evJS.Description,
MetricD: evJS.Metric,
Host: pb.String(evJS.Host),
Service: pb.String(evJS.Service),
State: pb.String(evJS.State),
Description: pb.String(evJS.Description),
MetricD: pb.Float64(evJS.Metric),
Tags: evJS.Tags,
Ttl: evJS.TTL,
Ttl: pb.Float32(evJS.TTL),
}

if !evJS.Time.IsZero() {
ev.TimeMicros = evJS.Time.UnixNano() / 1000
ev.TimeMicros = pb.Int64(evJS.Time.UnixNano() / 1000)
} else {
ev.TimeMicros = time.Now().UnixNano() / 1000
ev.TimeMicros = pb.Int64(time.Now().UnixNano() / 1000)
}

for _, attr := range evJS.Attributes {
ev.Attributes = append(ev.Attributes, &Attribute{
Key: attr.Key,
Value: attr.Value,
Key: pb.String(attr.Key),
Value: pb.String(attr.Value),
})
}

Expand Down
18 changes: 10 additions & 8 deletions event_test.go
Expand Up @@ -3,6 +3,7 @@ package main
import (
"testing"

pb "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

Expand All @@ -28,19 +29,20 @@ const (

func Test_eventFromJSON(t *testing.T) {
ev := &Event{
Host: "blah",
Service: "foo",
Description: "baz",
State: "ok",
Host: pb.String("blah"),
Service: pb.String("foo"),
Description: pb.String("baz"),
State: pb.String("ok"),
Tags: []string{"tag1", "tag2"},
Attributes: []*Attribute{
{
Key: "key1",
Value: "val1",
Key: pb.String("key1"),
Value: pb.String("val1"),
},
},
TimeMicros: 1523367364787000,
MetricD: 123,
Ttl: pb.Float32(0),
TimeMicros: pb.Int64(1523367364787000),
MetricD: pb.Float64(123),
}

ev2, err := eventFromJSON([]byte(jsTest))
Expand Down
34 changes: 17 additions & 17 deletions helper.go
Expand Up @@ -169,20 +169,20 @@ func eventCompileFields(e *Event, hf []riemannFieldName, sep string) []byte {
for _, f := range hf {
switch f.f {
case riemannFieldState:
b.WriteString(sep + e.State)
b.WriteString(sep + e.GetState())
case riemannFieldService:
b.WriteString(sep + e.Service)
b.WriteString(sep + e.GetService())
case riemannFieldHost:
b.WriteString(sep + e.Host)
b.WriteString(sep + e.GetHost())
case riemannFieldDescription:
b.WriteString(sep + e.Description)
b.WriteString(sep + e.GetDescription())
case riemannFieldTag:
if eventHasTag(e, f.name) {
b.WriteString(sep + f.name)
}
case riemannFieldAttr:
if attr = eventGetAttr(e, f.name); attr != nil {
b.WriteString(sep + attr.Value)
b.WriteString(sep + attr.GetValue())
}
case riemannFieldCustom:
b.WriteString(sep + f.name)
Expand All @@ -200,18 +200,18 @@ func eventCompileFields(e *Event, hf []riemannFieldName, sep string) []byte {
func eventGetValue(e *Event, v riemannValue) (o float64) {
switch v {
case riemannValueInt:
o = float64(e.MetricSint64)
o = float64(e.GetMetricSint64())
case riemannValueFloat:
o = float64(e.MetricF)
o = float64(e.GetMetricF())
case riemannValueDouble:
o = e.MetricD
o = e.GetMetricD()
case riemannValueAny:
if e.MetricD > 0 {
o = e.MetricD
} else if e.MetricSint64 > 0 {
o = float64(e.MetricSint64)
if e.GetMetricD() > 0 {
o = e.GetMetricD()
} else if e.GetMetricSint64() > 0 {
o = float64(e.GetMetricSint64())
} else {
o = float64(e.MetricF)
o = float64(e.GetMetricF())
}
}

Expand All @@ -220,7 +220,7 @@ func eventGetValue(e *Event, v riemannValue) (o float64) {

func eventGetAttr(e *Event, name string) *Attribute {
for _, a := range e.Attributes {
if a.Key == name {
if a.GetKey() == name {
return a
}
}
Expand Down Expand Up @@ -267,10 +267,10 @@ func eventToCarbon(e *Event, cf []riemannFieldName, cv riemannValue) []byte {
b.WriteByte(' ')

var t int64
if e.TimeMicros > 0 {
t = e.TimeMicros / 1000000
if e.GetTimeMicros() > 0 {
t = e.GetTimeMicros() / 1000000
} else {
t = e.Time
t = e.GetTime()
}

b.WriteString(strconv.FormatInt(t, 10))
Expand Down
21 changes: 11 additions & 10 deletions helper_test.go
Expand Up @@ -4,23 +4,24 @@ import (
"bytes"
"testing"

pb "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

var (
testEvent = &Event{
State: "foo",
Service: "bar",
Host: "baz",
Description: "fooz",
State: pb.String("foo"),
Service: pb.String("bar"),
Host: pb.String("baz"),
Description: pb.String("fooz"),
Tags: []string{"a", "b", "c"},
Attributes: []*Attribute{
{Key: "key1", Value: "val1"},
{Key: "key2", Value: "val2"},
{Key: pb.String("key1"), Value: pb.String("val1")},
{Key: pb.String("key2"), Value: pb.String("val2")},
},
Time: 1234567,
TimeMicros: 1234567000000,
MetricSint64: 9876,
Time: pb.Int64(1234567),
TimeMicros: pb.Int64(1234567000000),
MetricSint64: pb.Int64(9876),
}

testRfn = []riemannFieldName{
Expand Down Expand Up @@ -107,7 +108,7 @@ func Benchmark_eventCompileFields(b *testing.B) {
}

func Test_eventGetAttr(t *testing.T) {
assert.Equal(t, "val1", eventGetAttr(testEvent, "key1").Value)
assert.Equal(t, "val1", eventGetAttr(testEvent, "key1").GetValue())
assert.Nil(t, eventGetAttr(testEvent, "foo"))
}

Expand Down
8 changes: 4 additions & 4 deletions input.go
Expand Up @@ -222,8 +222,8 @@ func (i *input) handleTCPConnection(c net.Conn) {

func (i *input) sendReply(ok bool, reason string, c net.Conn) error {
msg := &Msg{
Ok: ok,
Error: reason,
Ok: pb.Bool(ok),
Error: pb.String(reason),
}

buf, err := pb.Marshal(msg)
Expand Down Expand Up @@ -362,8 +362,8 @@ func (i *input) readTCPMessage(c net.Conn) (err error) {

func (i *input) sendEvents(events []*Event) {
for _, ev := range events {
if ev.TimeMicros == 0 && ev.Time == 0 {
ev.TimeMicros = time.Now().UnixNano() / 1000
if ev.GetTimeMicros() == 0 && ev.GetTime() == 0 {
ev.TimeMicros = pb.Int64(time.Now().UnixNano() / 1000)
}
}

Expand Down
7 changes: 4 additions & 3 deletions input_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

pb "github.com/golang/protobuf/proto"
riemanngo "github.com/riemann/riemann-go-client"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -64,9 +65,9 @@ func Test_Input(t *testing.T) {
i.Close()

evT := &Event{
Service: "foo",
Host: "bar",
Description: "baz",
Service: pb.String("foo"),
Host: pb.String("bar"),
Description: pb.String("baz"),
}

batch, ok := <-ch
Expand Down
2 changes: 1 addition & 1 deletion outputTgt.go
Expand Up @@ -349,7 +349,7 @@ func (t *target) writeBatchRiemann(batch []*Event) (err error) {
return fmt.Errorf("Unable to unmarshal Protobuf reply: %s", err)
}

if !msg.Ok {
if !msg.GetOk() {
return fmt.Errorf("Non-OK reply from Riemann")
}

Expand Down
17 changes: 9 additions & 8 deletions outputTgt_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

pb "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -41,9 +42,9 @@ func Test_outputRiemann(t *testing.T) {
assert.Nil(t, err)

evT := &Event{
Service: "foo",
Host: "bar",
Description: "baz",
Service: pb.String("foo"),
Host: pb.String("bar"),
Description: pb.String("baz"),
}

time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -105,11 +106,11 @@ func Test_outputCarbon(t *testing.T) {
assert.Nil(t, err)

evT := &Event{
Service: "foo",
Host: "bar",
Description: "baz",
MetricSint64: 123,
Time: 12345,
Service: pb.String("foo"),
Host: pb.String("bar"),
Description: pb.String("baz"),
MetricSint64: pb.Int64(123),
Time: pb.Int64(12345),
}

time.Sleep(100 * time.Millisecond)
Expand Down
32 changes: 16 additions & 16 deletions riemann-relay.dev.conf
Expand Up @@ -3,26 +3,26 @@ stats_interval = "5s"

[input.input1]
listen = "127.0.0.1:1234"
listenWS = "127.0.0.1:5556"
listenWS = "127.0.0.1:15556"
timeout = "5s"
outputs = [ "carbon1" ]
outputs = [ "riemann1" ]

[output.carbon1]
type = "carbon"
algo = "hash"
algo_failover = true
hash_fields = [ "attr:prefix", "host", "service", "description", "tag:foobar" ]
carbon_fields = [ "attr:prefix", "host", "service" ]
carbon_value = "any"
buffer_size = 300000
#[output.carbon1]
#type = "carbon"
#algo = "hash"
#algo_failover = true
#hash_fields = [ "attr:prefix", "host", "service", "description", "tag:foobar" ]
#carbon_fields = [ "attr:prefix", "host", "service" ]
#carbon_value = "any"
#buffer_size = 300000
#targets = [ "127.0.0.1:1235", "127.0.0.1:1236", "127.0.0.1:1237", "127.0.0.1:1238" ]
targets = [ "127.0.0.1:1235" ]
#targets = [ "127.0.0.1:1235" ]

# batch_timeout = "1s"
batch_size = 200

# [output.riemann1]
# type = "riemann"
# algo = "hash"
# hash_fields = [ "attr:prefix", "host", "service" ]
# targets = [ "127.0.0.1:5555" ]
[output.riemann1]
type = "riemann"
algo = "hash"
hash_fields = [ "attr:prefix", "host", "service" ]
targets = [ "127.0.0.1:5555" ]

0 comments on commit 95f3f4b

Please sign in to comment.