Skip to content

Commit

Permalink
a lot of optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
Novgorodov Igor, PMK-TV-OP authored and Novgorodov Igor, PMK-TV-OP committed Sep 13, 2019
1 parent 18fa193 commit 28694a0
Show file tree
Hide file tree
Showing 15 changed files with 1,459 additions and 363 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.9
1.5.13
34 changes: 21 additions & 13 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"strings"
"time"

pb "github.com/golang/protobuf/proto"
)

type attributeJSON struct {
Expand Down Expand Up @@ -47,14 +45,24 @@ func eventFromJSON(msg []byte) (ev *Event, err error) {
return
}

// ev = &Event{
// Host: pb.String(evJS.Host),
// Service: pb.String(evJS.Service),
// State: pb.String(evJS.State),
// Description: pb.String(evJS.Description),
// MetricD: pb.Float64(evJS.Metric),
// Tags: evJS.Tags,
// Ttl: pb.Float32(evJS.TTL),
// }

ev = &Event{
Host: pb.String(evJS.Host),
Service: pb.String(evJS.Service),
State: pb.String(evJS.State),
Description: pb.String(evJS.Description),
MetricD: pb.Float64(evJS.Metric),
Host: evJS.Host,
Service: evJS.Service,
State: evJS.State,
Description: evJS.Description,
MetricD: evJS.Metric,
Tags: evJS.Tags,
Ttl: pb.Float32(evJS.TTL),
Ttl: evJS.TTL,
}

var tm time.Time
Expand All @@ -64,7 +72,7 @@ func eventFromJSON(msg []byte) (ev *Event, err error) {
tm = time.Now()
}

ev.TimeMicros = pb.Int64(tm.UnixNano() / 1000)
ev.TimeMicros = tm.UnixNano() / 1000

// Unmarshal again to a map
m := map[string]interface{}{}
Expand All @@ -78,16 +86,16 @@ func eventFromJSON(msg []byte) (ev *Event, err error) {

if !eventJSONFields[klc] {
ev.Attributes = append(ev.Attributes, &Attribute{
Key: pb.String(klc),
Value: pb.String(fmt.Sprintf("%v", v)),
Key: klc,
Value: fmt.Sprintf("%v", v),
})
}
}

for _, attr := range evJS.Attributes {
ev.Attributes = append(ev.Attributes, &Attribute{
Key: pb.String(attr.Key),
Value: pb.String(attr.Value),
Key: attr.Key,
Value: attr.Value,
})
}

Expand Down
19 changes: 9 additions & 10 deletions event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"testing"

pb "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

Expand All @@ -29,20 +28,20 @@ const (

func Test_eventFromJSON(t *testing.T) {
ev := &Event{
Host: pb.String("blah"),
Service: pb.String("foo"),
Description: pb.String("baz"),
State: pb.String("ok"),
Host: "blah",
Service: "foo",
Description: "baz",
State: "ok",
Tags: []string{"tag1", "tag2"},
Attributes: []*Attribute{
{
Key: pb.String("key1"),
Value: pb.String("val1"),
Key: "key1",
Value: "val1",
},
},
Ttl: pb.Float32(0),
TimeMicros: pb.Int64(1523367364787000),
MetricD: pb.Float64(123),
Ttl: 0,
TimeMicros: 1523367364787000,
MetricD: 123,
}

ev2, err := eventFromJSON([]byte(jsTest))
Expand Down
155 changes: 95 additions & 60 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"io"
math "math"
"net"
"strconv"
"strings"
"sync"
"unsafe"
)

type outputAlgo uint8
Expand Down Expand Up @@ -115,6 +116,18 @@ var (
outputAlgoMapRev = map[outputAlgo]string{}
riemannFieldMapRev = map[riemannField]string{}
riemannValueMapRev = map[riemannValue]string{}

bufferPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 131072))
},
}

bufferPoolSmall = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 128))
},
}
)

func init() {
Expand All @@ -135,6 +148,10 @@ func init() {
}
}

func unsafeString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

func parseRiemannFields(rfs []string, onlyStrings bool) (rfns []riemannFieldName, err error) {
rhMap := map[string]bool{}
for _, rh := range rfs {
Expand Down Expand Up @@ -172,76 +189,99 @@ func parseRiemannFields(rfs []string, onlyStrings bool) (rfns []riemannFieldName
return
}

func eventGetField(e *Event, f riemannFieldName, v riemannValue) (i interface{}) {
func eventWriteField(w io.Writer, e *Event, f riemannFieldName, v riemannValue) (int, error) {
switch f.f {
case riemannFieldState:
i = e.GetState()
return w.Write([]byte(e.State))
case riemannFieldService:
i = e.GetService()
return w.Write([]byte(e.Service))
case riemannFieldHost:
i = e.GetHost()
return w.Write([]byte(e.Host))
case riemannFieldDescription:
i = e.GetDescription()
return w.Write([]byte(e.Description))
case riemannFieldTag:
if eventHasTag(e, f.name) {
i = f.name
return w.Write([]byte(f.name))
}
case riemannFieldAttr:
if attr := eventGetAttr(e, f.name); attr != nil {
i = attr.GetValue()
return w.Write([]byte(attr.Value))
}
case riemannFieldCustom:
i = f.name
return w.Write([]byte(f.name))
case riemannFieldTimestamp:
i = uint32(eventGetTime(e))
var t uint32
if e.TimeMicros > 0 {
t = uint32(e.TimeMicros / 1000000)
} else {
t = uint32(e.Time)
}

return 4, binary.Write(w, binary.LittleEndian, t)
case riemannFieldValue:
i = math.Float64bits(eventGetValue(e, v))
t := math.Float64bits(eventGetValue(e, v))
return 8, binary.Write(w, binary.LittleEndian, t)
}

return
return 0, nil
}

func eventCompileFields(e *Event, hf []riemannFieldName, sep string) []byte {
var b bytes.Buffer
func eventFieldLen(e *Event, f riemannFieldName) int {
switch f.f {
case riemannFieldState:
return len(e.State)
case riemannFieldService:
return len(e.Service)
case riemannFieldHost:
return len(e.Host)
case riemannFieldDescription:
return len(e.Description)
case riemannFieldTag:
if eventHasTag(e, f.name) {
return len(f.name)
}

for _, f := range hf {
if i := eventGetField(e, f, riemannValueAny); i != nil {
b.WriteString(sep + i.(string))
return 0
case riemannFieldAttr:
if attr := eventGetAttr(e, f.name); attr != nil {
return len(attr.Value)
}
}

if b.Len() == 0 {
return []byte{}
return 0
case riemannFieldCustom:
return len(f.name)
}

// Skip first dot
return b.Bytes()[1:]
return 0
}

func eventWriteClickhouseBinary(e *Event, hf []riemannFieldName, v riemannValue, w io.Writer) (err error) {
for _, f := range hf {
i := eventGetField(e, f, v)
func eventWriteCompileFields(b *bytes.Buffer, e *Event, hf []riemannFieldName, sep byte) {
for i, f := range hf {
if i != 0 {
b.WriteByte(sep)
}

switch j := i.(type) {
case string:
eventWriteField(b, e, f, riemannValueAny)
}
}

func eventWriteClickhouseBinary(w io.Writer, e *Event, hf []riemannFieldName, v riemannValue) (err error) {
for _, f := range hf {
switch f.f {
default:
b := make([]byte, 8)
n := binary.PutUvarint(b, uint64(len(j)))
l := eventFieldLen(e, f)
n := binary.PutUvarint(b, uint64(l))

if _, err = w.Write(b[:n]); err != nil {
return
}

if _, err = w.Write([]byte(j)); err != nil {
return
}

case uint32, uint64:
if err = binary.Write(w, binary.LittleEndian, j); err != nil {
return
}
case riemannFieldValue, riemannFieldTimestamp:
}

default:
return fmt.Errorf("Unknown field value type: %T", i)
if _, err = eventWriteField(w, e, f, v); err != nil {
return
}
}

Expand All @@ -251,18 +291,18 @@ func eventWriteClickhouseBinary(e *Event, hf []riemannFieldName, v riemannValue,
func eventGetValue(e *Event, v riemannValue) (o float64) {
switch v {
case riemannValueInt:
o = float64(e.GetMetricSint64())
o = float64(e.MetricSint64)
case riemannValueFloat:
o = float64(e.GetMetricF())
o = float64(e.MetricF)
case riemannValueDouble:
o = e.GetMetricD()
o = e.MetricD
case riemannValueAny:
if e.GetMetricD() > 0 {
o = e.GetMetricD()
} else if e.GetMetricSint64() > 0 {
o = float64(e.GetMetricSint64())
if e.MetricD > 0 {
o = e.MetricD
} else if e.MetricSint64 > 0 {
o = float64(e.MetricSint64)
} else {
o = float64(e.GetMetricF())
o = float64(e.MetricF)
}
}

Expand All @@ -271,7 +311,7 @@ func eventGetValue(e *Event, v riemannValue) (o float64) {

func eventGetAttr(e *Event, name string) *Attribute {
for _, a := range e.Attributes {
if a.GetKey() == name {
if a.Key == name {
return a
}
}
Expand Down Expand Up @@ -308,25 +348,20 @@ func readPacket(r io.Reader, p []byte) error {
}

func eventGetTime(e *Event) int64 {
if e.GetTimeMicros() > 0 {
return e.GetTimeMicros() / 1000000
if e.TimeMicros > 0 {
return e.TimeMicros / 1000000
}

return e.GetTime()
return e.Time
}

func eventToCarbon(e *Event, cf []riemannFieldName, cv riemannValue) []byte {
var b bytes.Buffer
// func eventToCarbon(w io.Writer, e *Event, cf []riemannFieldName, cv riemannValue) []byte {

b.Write(eventCompileFields(e, cf, "."))
b.WriteByte(' ')
// b.Reset()
// bufferPool.Put(b)

val := strconv.FormatFloat(eventGetValue(e, cv), 'f', -1, 64)
b.WriteString(val)
b.WriteByte(' ')
b.WriteString(strconv.FormatInt(eventGetTime(e), 10))
return b.Bytes()
}
// return b.Bytes()
// }

func guessProto(addr string) (proto string) {
if _, err := net.ResolveTCPAddr("tcp", addr); err == nil {
Expand Down
Loading

0 comments on commit 28694a0

Please sign in to comment.