From 142184a2ded597ff491fdca041f5cddb737fbb79 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Mon, 30 Sep 2019 00:08:10 +0200 Subject: [PATCH] use fasthttp, begin flatbuffer --- flatm.fbs | 23 +++++++ flatm/Attr.go | 56 +++++++++++++++++ flatm/Batch.go | 60 ++++++++++++++++++ flatm/Metric.go | 157 +++++++++++++++++++++++++++++++++++++++++++++++ outputTgt.go | 10 ++- outputTgtConn.go | 75 ++++++++++++++++------ 6 files changed, 360 insertions(+), 21 deletions(-) create mode 100644 flatm.fbs create mode 100644 flatm/Attr.go create mode 100644 flatm/Batch.go create mode 100644 flatm/Metric.go diff --git a/flatm.fbs b/flatm.fbs new file mode 100644 index 0000000..b16f882 --- /dev/null +++ b/flatm.fbs @@ -0,0 +1,23 @@ +namespace flatm; + +table Attr { + name: string; + value: string; +} + +table Metric { + host: string; + service: string; + state: string; + description: string; + time: int64; + value: float64; + tags: [string]; + attrs: [Attr]; +} + +table Batch { + metrics: [Metric]; +} + +root_type Batch; diff --git a/flatm/Attr.go b/flatm/Attr.go new file mode 100644 index 0000000..f68a048 --- /dev/null +++ b/flatm/Attr.go @@ -0,0 +1,56 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package flatm + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Attr struct { + _tab flatbuffers.Table +} + +func GetRootAsAttr(buf []byte, offset flatbuffers.UOffsetT) *Attr { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Attr{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Attr) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Attr) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Attr) Name() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Attr) Value() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func AttrStart(builder *flatbuffers.Builder) { + builder.StartObject(2) +} +func AttrAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0) +} +func AttrAddValue(builder *flatbuffers.Builder, value flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(value), 0) +} +func AttrEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/flatm/Batch.go b/flatm/Batch.go new file mode 100644 index 0000000..df2a072 --- /dev/null +++ b/flatm/Batch.go @@ -0,0 +1,60 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package flatm + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Batch struct { + _tab flatbuffers.Table +} + +func GetRootAsBatch(buf []byte, offset flatbuffers.UOffsetT) *Batch { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Batch{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Batch) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Batch) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Batch) Metrics(obj *Metric, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Batch) MetricsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func BatchStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func BatchAddMetrics(builder *flatbuffers.Builder, metrics flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(metrics), 0) +} +func BatchStartMetricsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func BatchEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/flatm/Metric.go b/flatm/Metric.go new file mode 100644 index 0000000..6fdf695 --- /dev/null +++ b/flatm/Metric.go @@ -0,0 +1,157 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package flatm + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Metric struct { + _tab flatbuffers.Table +} + +func GetRootAsMetric(buf []byte, offset flatbuffers.UOffsetT) *Metric { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Metric{} + x.Init(buf, n+offset) + return x +} + +func (rcv *Metric) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Metric) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Metric) Host() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Metric) Service() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Metric) State() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Metric) Description() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Metric) Time() int64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.GetInt64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *Metric) MutateTime(n int64) bool { + return rcv._tab.MutateInt64Slot(12, n) +} + +func (rcv *Metric) Value() float64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) + if o != 0 { + return rcv._tab.GetFloat64(o + rcv._tab.Pos) + } + return 0.0 +} + +func (rcv *Metric) MutateValue(n float64) bool { + return rcv._tab.MutateFloat64Slot(14, n) +} + +func (rcv *Metric) Tags(j int) []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) + } + return nil +} + +func (rcv *Metric) TagsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(16)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Metric) Attrs(obj *Attr, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Metric) AttrsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(18)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func MetricStart(builder *flatbuffers.Builder) { + builder.StartObject(8) +} +func MetricAddHost(builder *flatbuffers.Builder, host flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(host), 0) +} +func MetricAddService(builder *flatbuffers.Builder, service flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(service), 0) +} +func MetricAddState(builder *flatbuffers.Builder, state flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(state), 0) +} +func MetricAddDescription(builder *flatbuffers.Builder, description flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(description), 0) +} +func MetricAddTime(builder *flatbuffers.Builder, time int64) { + builder.PrependInt64Slot(4, time, 0) +} +func MetricAddValue(builder *flatbuffers.Builder, value float64) { + builder.PrependFloat64Slot(5, value, 0.0) +} +func MetricAddTags(builder *flatbuffers.Builder, tags flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(tags), 0) +} +func MetricStartTagsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MetricAddAttrs(builder *flatbuffers.Builder, attrs flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(7, flatbuffers.UOffsetT(attrs), 0) +} +func MetricStartAttrsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MetricEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/outputTgt.go b/outputTgt.go index 9909645..950e217 100644 --- a/outputTgt.go +++ b/outputTgt.go @@ -4,7 +4,7 @@ import ( "context" fmt "fmt" "net" - "net/http" + fh "github.com/valyala/fasthttp" "net/url" "sync" "sync/atomic" @@ -89,8 +89,12 @@ func newOutputTgt(h string, cf *outputCfg, o *output) (*target, error) { c.alive = true c.writeBatch = c.writeBatchClickhouse - c.httpCli = &http.Client{ - Timeout: c.timeoutWrite, + // c.httpCli = &http.Client{ + // Timeout: c.timeoutWrite, + // } + + c.httpCli = &fh.Client{ + WriteTimeout: c.timeoutWrite, } u, err := url.Parse(h) diff --git a/outputTgtConn.go b/outputTgtConn.go index abb60f8..7554e9b 100644 --- a/outputTgtConn.go +++ b/outputTgtConn.go @@ -6,15 +6,19 @@ import ( "encoding/binary" "fmt" "io" - "io/ioutil" "net" - "net/http" "strconv" "sync" "sync/atomic" "time" pb "github.com/golang/protobuf/proto" + fh "github.com/valyala/fasthttp" +) + +const ( + maxBatchSize = 1000 + maxAttrs = 20 ) type tConn struct { @@ -23,7 +27,8 @@ type tConn struct { id int conn net.Conn - httpCli *http.Client + //httpCli *http.Client + httpCli *fh.Client alive bool @@ -399,10 +404,41 @@ func (c *tConn) writeBatchRiemann(batch []*Event) (err error) { return } -func (c *tConn) writeBatchClickhouse(batch []*Event) (err error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +// func (c *tConn) writeBatchClickhouse(batch []*Event) (err error) { +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() + +// rr, wr := io.Pipe() +// go func() { +// defer wr.Close() +// for _, e := range batch { +// if err := eventWriteClickhouseBinary(wr, e, c.t.o.riemannFields, c.t.o.riemannValue); err != nil { +// return +// } +// } +// }() + +// req, err := http.NewRequestWithContext(ctx, "POST", c.url, rr) +// if err != nil { +// return +// } + +// resp, err := c.httpCli.Do(req) +// if err != nil { +// return fmt.Errorf("HTTP request failed: %s", err) +// } +// defer resp.Body.Close() +// body, _ := ioutil.ReadAll(resp.Body) + +// if resp.StatusCode != http.StatusOK { +// return fmt.Errorf("HTTP code is not 200: %d (%s)", resp.StatusCode, string(body)) +// } + +// return +// } + +func (c *tConn) writeBatchClickhouse(batch []*Event) (err error) { rr, wr := io.Pipe() go func() { defer wr.Close() @@ -413,21 +449,24 @@ func (c *tConn) writeBatchClickhouse(batch []*Event) (err error) { } }() - req, err := http.NewRequestWithContext(ctx, "POST", c.url, rr) - if err != nil { - return - } + req := fh.AcquireRequest() + resp := fh.AcquireResponse() + + defer func() { + fh.ReleaseRequest(req) + fh.ReleaseResponse(resp) + }() - resp, err := c.httpCli.Do(req) - if err != nil { - return fmt.Errorf("HTTP request failed: %s", err) - } - defer resp.Body.Close() + req.SetBodyStream(rr, -1) + req.Header.SetMethod("POST") + req.SetRequestURI(c.url) - body, _ := ioutil.ReadAll(resp.Body) + if err = c.httpCli.Do(req, resp); err != nil { + return fmt.Errorf("HTTP request failed: %s", err) + } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("HTTP code is not 200: %d (%s)", resp.StatusCode, string(body)) + if resp.Header.StatusCode() != 200 { + return fmt.Errorf("HTTP code is not 200: %d (%s)", resp.Header.StatusCode(), string(resp.Body())) } return