Skip to content

Commit

Permalink
Further optimise multiple writers
Browse files Browse the repository at this point in the history
Writers can now atomically update the breakdown metrics
if there is already an entry for the transaction or span.
  • Loading branch information
axw committed Jun 19, 2019
1 parent 419c38b commit 5e1d0cc
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 66 deletions.
179 changes: 114 additions & 65 deletions breakdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,10 @@ func (m spanTimingsMap) reset() {
}
}

// breakdownMetricsKey identifies a transaction group, and optionally a
// spanTimingsKey, for recording transaction and span breakdown metrics.
type breakdownMetricsKey struct {
transactionType string
transactionName string
spanTimingsKey
}

func newBreakdownMetrics() *breakdownMetrics {
return &breakdownMetrics{
active: make(map[breakdownMetricsKey]*breakdownTiming),
inactive: make(map[breakdownMetricsKey]*breakdownTiming),
active: make(breakdownMetricsMap),
inactive: make(breakdownMetricsMap),
}
}

Expand All @@ -117,9 +109,40 @@ type breakdownMetrics struct {

// mu serializes access to active/inactive, but not concurrent updates to active.
mu sync.RWMutex
// writeMu serializes updates to active.
writeMu sync.RWMutex
active, inactive map[breakdownMetricsKey]*breakdownTiming
// writeMu serializes updates to activeEntries and the active map.
writeMu sync.RWMutex
activeEntries int
active, inactive breakdownMetricsMap
activeSpace, inactiveSpace [breakdownMetricsLimit]breakdownMetricsMapEntry
}

// maps a key hash to a slice of entries.
type breakdownMetricsMap map[uint64][]*breakdownMetricsMapEntry

type breakdownMetricsMapEntry struct {
breakdownMetricsKey
breakdownTiming
}

// breakdownMetricsKey identifies a transaction group, and optionally a
// spanTimingsKey, for recording transaction and span breakdown metrics.
type breakdownMetricsKey struct {
transactionType string
transactionName string
spanTimingsKey
}

func (k breakdownMetricsKey) hash() uint64 {
h := newFnv1a()
h.add(k.transactionType)
h.add(k.transactionName)
if k.spanType != "" {
h.add(k.spanType)
}
if k.spanSubtype != "" {
h.add(k.spanSubtype)
}
return uint64(h)
}

// breakdownTiming holds breakdown metrics.
Expand All @@ -136,6 +159,14 @@ type breakdownTiming struct {
span spanTiming
}

func (lhs *breakdownTiming) accumulate(rhs breakdownTiming) {
atomic.AddUintptr(&lhs.transaction.count, rhs.transaction.count)
atomic.AddInt64(&lhs.transaction.duration, rhs.transaction.duration)
atomic.AddUintptr(&lhs.span.count, rhs.span.count)
atomic.AddInt64(&lhs.span.duration, rhs.span.duration)
atomic.AddUintptr(&lhs.breakdownCount, rhs.breakdownCount)
}

// recordTransaction records breakdown metrics for td into m.
//
// recordTransaction returns true if breakdown metrics were
Expand Down Expand Up @@ -181,84 +212,102 @@ func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool {
return ok
}

// record records a single breakdown metric, identified by k.
func (m *breakdownMetrics) record(k breakdownMetricsKey, bt breakdownTiming) bool {
hash := k.hash()
m.writeMu.RLock()
timing, ok := m.active[k]
if !ok && len(m.active) >= breakdownMetricsLimit {
m.writeMu.RUnlock()
return false
}
entries, ok := m.active[hash]
m.writeMu.RUnlock()
var offset int
if ok {
for offset = range entries {
if entries[offset].breakdownMetricsKey == k {
// The append may reallocate the entries, but the
// entries are pointers into m.activeSpace. Therefore,
// entries' timings can safely be atomically incremented
// without holding the read lock.
entries[offset].breakdownTiming.accumulate(bt)
return true
}
}
offset++ // where to start searching with the write lock below
}

if !ok {
m.writeMu.Lock()
timing, ok = m.active[k]
if !ok {
if len(m.active) >= breakdownMetricsLimit {
m.writeMu.Lock()
entries, ok = m.active[hash]
if ok {
for i := range entries[offset:] {
if entries[offset+i].breakdownMetricsKey == k {
m.writeMu.Unlock()
return false
entries[offset+i].breakdownTiming.accumulate(bt)
return true
}
m.active[k] = &bt
m.writeMu.Unlock()
return true
}
} else if m.activeEntries >= breakdownMetricsLimit {
m.writeMu.Unlock()
return false
}

atomic.AddUintptr(&timing.transaction.count, bt.transaction.count)
atomic.AddInt64(&timing.transaction.duration, bt.transaction.duration)
atomic.AddUintptr(&timing.span.count, bt.span.count)
atomic.AddInt64(&timing.span.duration, bt.span.duration)
atomic.AddUintptr(&timing.breakdownCount, bt.breakdownCount)
entry := &m.activeSpace[m.activeEntries]
*entry = breakdownMetricsMapEntry{k, bt}
m.active[hash] = append(entries, entry)
m.activeEntries++
m.writeMu.Unlock()
return true
}

func (m *breakdownMetrics) swap() {
// m.inactive is empty, having been cleared by m.gather.
m.mu.Lock()
m.active, m.inactive = m.inactive, m.active
m.activeSpace, m.inactiveSpace = m.inactiveSpace, m.activeSpace
m.activeEntries = 0
m.mu.Unlock()
}

// gather is called by builtinMetricsGatherer to gather breakdown metrics.
func (m *breakdownMetrics) gather(out *Metrics) {
// It is not necessary to hold m.mu, since nothing concurrently
// accesses m.inactive while the gatherer is iterating over it.
for k, d := range m.inactive {
if d.transaction.count > 0 {
out.transactionGroupMetrics = append(out.transactionGroupMetrics, &model.Metrics{
TransactionType: k.transactionType,
TransactionName: k.transactionName,
Samples: map[string]model.Metric{
transactionDurationCountMetricName: {
Value: float64(d.transaction.count),
},
transactionDurationSumMetricName: {
Value: time.Duration(d.transaction.duration).Seconds(),
},
transactionBreakdownCountMetricName: {
Value: float64(d.breakdownCount),
//
// TODO(axw) report durations in milliseconds.
for hash, entries := range m.inactive {
for _, entry := range entries {
if entry.transaction.count > 0 {
out.transactionGroupMetrics = append(out.transactionGroupMetrics, &model.Metrics{
TransactionType: entry.transactionType,
TransactionName: entry.transactionName,
Samples: map[string]model.Metric{
transactionDurationCountMetricName: {
Value: float64(entry.transaction.count),
},
transactionDurationSumMetricName: {
Value: time.Duration(entry.transaction.duration).Seconds(),
},
transactionBreakdownCountMetricName: {
Value: float64(entry.breakdownCount),
},
},
},
})
}
if d.span.count > 0 {
out.transactionGroupMetrics = append(out.transactionGroupMetrics, &model.Metrics{
TransactionType: k.transactionType,
TransactionName: k.transactionName,
SpanType: k.spanType,
SpanSubtype: k.spanSubtype,
Samples: map[string]model.Metric{
spanSelfTimeCountMetricName: {
Value: float64(d.span.count),
},
spanSelfTimeSumMetricName: {
Value: time.Duration(d.span.duration).Seconds(),
})
}
if entry.span.count > 0 {
out.transactionGroupMetrics = append(out.transactionGroupMetrics, &model.Metrics{
TransactionType: entry.transactionType,
TransactionName: entry.transactionName,
SpanType: entry.spanType,
SpanSubtype: entry.spanSubtype,
Samples: map[string]model.Metric{
spanSelfTimeCountMetricName: {
Value: float64(entry.span.count),
},
spanSelfTimeSumMetricName: {
Value: time.Duration(entry.span.duration).Seconds(),
},
},
},
})
})
}
entry.breakdownMetricsKey = breakdownMetricsKey{} // release strings
}
delete(m.inactive, k)
delete(m.inactive, hash)
}
}

Expand Down
42 changes: 42 additions & 0 deletions fnv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Based on Go's pkg/hash/fnv.
//
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package apm

const (
offset64 = 14695981039346656037
prime64 = 1099511628211
)

type fnv1a uint64

func newFnv1a() fnv1a {
return offset64
}

func (f *fnv1a) add(s string) {
for i := 0; i < len(s); i++ {
*f ^= fnv1a(s[i])
*f *= prime64
}
}
2 changes: 1 addition & 1 deletion transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func BenchmarkTransaction(b *testing.B) {
defer tracer.Close()

names := []string{}
for i := 0; i < 100; i++ {
for i := 0; i < 1000; i++ {
names = append(names, fmt.Sprintf("/some/route/%d", i))
}

Expand Down

0 comments on commit 5e1d0cc

Please sign in to comment.