diff --git a/cmd/mt-kafka-mdm-sniff-out-of-order/main.go b/cmd/mt-kafka-mdm-sniff-out-of-order/main.go index 8a4fa36071..c5da588fd2 100644 --- a/cmd/mt-kafka-mdm-sniff-out-of-order/main.go +++ b/cmd/mt-kafka-mdm-sniff-out-of-order/main.go @@ -18,6 +18,7 @@ import ( "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" "gopkg.in/raintank/schema.v1" + "gopkg.in/raintank/schema.v1/msg" ) var ( @@ -111,7 +112,7 @@ func (ip *inputOOOFinder) ProcessMetricData(metric *schema.MetricData, partition ip.lock.Unlock() } -func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, partition int32) { +func (ip *inputOOOFinder) ProcessMetricPoint(mp schema.MetricPoint, format msg.Format, partition int32) { now := Msg{ Part: partition, Seen: time.Now(), diff --git a/cmd/mt-kafka-mdm-sniff/main.go b/cmd/mt-kafka-mdm-sniff/main.go index dea4134ca1..a74046259c 100644 --- a/cmd/mt-kafka-mdm-sniff/main.go +++ b/cmd/mt-kafka-mdm-sniff/main.go @@ -17,6 +17,7 @@ import ( "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" "gopkg.in/raintank/schema.v1" + "gopkg.in/raintank/schema.v1/msg" ) var ( @@ -72,7 +73,7 @@ func (ip inputPrinter) ProcessMetricData(metric *schema.MetricData, partition in } } -func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, partition int32) { +func (ip inputPrinter) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) { stdoutLock.Lock() err := ip.tplP.Execute(os.Stdout, DataP{ partition, diff --git a/dashboard.json b/dashboard.json index 719b7928b8..5333469f82 100644 --- a/dashboard.json +++ b/dashboard.json @@ -98,16 +98,25 @@ "yaxis": 2 }, { - "alias": "/received/", + "alias": "/metricdata.received/", "lines": true, "points": false, - "color": "#3f6833" + "color": "#2a4422", + "stack": "A" }, { "alias": "/metricpoint.received/", "lines": true, "points": false, - "color": "#7eb26d" + "color": "#3f6833", + "stack": "A" + }, + { + "alias": "/metricpoint_no_org.received/", + "lines": true, + "points": false, + "color": "#7eb26d", + "stack": "A" }, { "alias": "/invalid/", @@ -180,7 +189,7 @@ "msResolution": false, "shared": true, "sort": 0, - "value_type": "cumulative" + "value_type": "individual" }, "type": "graph", "xaxis": { diff --git a/input/input.go b/input/input.go index e544845858..61c0b371ce 100644 --- a/input/input.go +++ b/input/input.go @@ -6,7 +6,8 @@ import ( "fmt" "time" - schema "gopkg.in/raintank/schema.v1" + "gopkg.in/raintank/schema.v1" + "gopkg.in/raintank/schema.v1/msg" "github.com/grafana/metrictank/idx" "github.com/grafana/metrictank/mdata" @@ -16,7 +17,7 @@ import ( type Handler interface { ProcessMetricData(md *schema.MetricData, partition int32) - ProcessMetricPoint(point schema.MetricPoint, partition int32) + ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) } // TODO: clever way to document all metrics for all different inputs @@ -25,6 +26,7 @@ type Handler interface { type DefaultHandler struct { receivedMD *stats.Counter32 receivedMP *stats.Counter32 + receivedMPNO *stats.Counter32 invalidMD *stats.Counter32 invalidMP *stats.Counter32 unknownMP *stats.Counter32 @@ -39,6 +41,7 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input return DefaultHandler{ receivedMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.received", input)), receivedMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.received", input)), + receivedMPNO: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint_no_org.received", input)), invalidMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.invalid", input)), invalidMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.invalid", input)), unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)), @@ -52,8 +55,12 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input // ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry // concurrency-safe. -func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, partition int32) { - in.receivedMP.Inc() +func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, format msg.Format, partition int32) { + if format == msg.FormatMetricPoint { + in.receivedMP.Inc() + } else { + in.receivedMPNO.Inc() + } if !point.Valid() { in.invalidMP.Inc() log.Debug("in: Invalid metric %v", point) diff --git a/input/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go index 35b821744e..dd69cb3993 100644 --- a/input/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -353,14 +353,15 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset } func (k *KafkaMdm) handleMsg(data []byte, partition int32) { - if msg.IsPointMsg(data) { + format, isPointMsg := msg.IsPointMsg(data) + if isPointMsg { _, point, err := msg.ReadPointMsg(data, uint32(orgId)) if err != nil { metricsDecodeErr.Inc() log.Error(3, "kafka-mdm decode error, skipping message. %s", err) return } - k.Handler.ProcessMetricPoint(point, partition) + k.Handler.ProcessMetricPoint(point, format, partition) return } diff --git a/vendor/gopkg.in/raintank/schema.v1/msg/msg.go b/vendor/gopkg.in/raintank/schema.v1/msg/msg.go index 32efbffb66..e2d57acb5c 100644 --- a/vendor/gopkg.in/raintank/schema.v1/msg/msg.go +++ b/vendor/gopkg.in/raintank/schema.v1/msg/msg.go @@ -111,13 +111,19 @@ func WritePointMsg(point schema.MetricPoint, buf []byte, version Format) (o []by return nil, fmt.Errorf(errFmtUnsupportedFormat, version) } -func IsPointMsg(data []byte) bool { +func IsPointMsg(data []byte) (Format, bool) { l := len(data) if l == 0 { - return false + return 0, false } version := Format(data[0]) - return (l == 29 && version == FormatMetricPointWithoutOrg) || (l == 33 && version == FormatMetricPoint) + if l == 29 && version == FormatMetricPointWithoutOrg { + return FormatMetricPointWithoutOrg, true + } + if l == 33 && version == FormatMetricPoint { + return FormatMetricPoint, true + } + return 0, false } func ReadPointMsg(data []byte, defaultOrg uint32) ([]byte, schema.MetricPoint, error) {