Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve locks #116

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
29 changes: 20 additions & 9 deletions rules/prometheus.go → metrics/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rules
package metrics

import (
"strconv"
Expand Down Expand Up @@ -72,34 +72,45 @@ func init() {
prometheus.MustRegister(rulesEngineWatcherErrors)
}

func incLockMetric(methodName string, pattern string, lockSucceeded bool) {
// IncLockMetric increments the lock count.
func IncLockMetric(methodName string, pattern string, lockSucceeded bool) {
rulesEngineLockCount.WithLabelValues(methodName, pattern, strconv.FormatBool(lockSucceeded)).Inc()
}

func incSatisfiedThenNot(methodName string, pattern string, phaseName string) {
// IncSatisfiedThenNot increments the count of a rule having initially been
// satisfied and then not satisfied, either after the initial evaluation
// or after the lock was obtained.
func IncSatisfiedThenNot(methodName string, pattern string, phaseName string) {
rulesEngineSatisfiedThenNot.WithLabelValues(methodName, pattern, phaseName).Inc()
}

func timesEvaluated(methodName string, ruleID string, count int) {
// TimesEvaluated sets the number of times a rule has been evaluated.
func TimesEvaluated(methodName string, ruleID string, count int) {
rulesEngineEvaluations.WithLabelValues(methodName, ruleID).Set(float64(count))
}

func workerQueueWaitTime(methodName string, startTime time.Time) {
// WorkerQueueWaitTime tracks the amount of time a work item has been sitting in
// a worker queue.
func WorkerQueueWaitTime(methodName string, startTime time.Time) {
rulesEngineWorkerQueueWait.WithLabelValues(methodName).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6))
}

func workBufferWaitTime(methodName, pattern string, startTime time.Time) {
// WorkBufferWaitTime tracks the amount of time a work item was in the work buffer.
func WorkBufferWaitTime(methodName, pattern string, startTime time.Time) {
rulesEngineWorkBufferWaitTime.WithLabelValues(methodName, pattern).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6))
}

func callbackWaitTime(pattern string, startTime time.Time) {
// CallbackWaitTime tracks how much time elapsed between when the rule was evaluated and the callback called.
func CallbackWaitTime(pattern string, startTime time.Time) {
rulesEngineCallbackWaitTime.WithLabelValues(pattern).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6))
}

func keyProcessBufferCap(count int) {
// KeyProcessBufferCap tracks the capacity of the key processor buffer.
func KeyProcessBufferCap(count int) {
rulesEngineKeyProcessBufferCap.Set(float64(count))
}

func incWatcherErrMetric(err, prefix string) {
// IncWatcherErrMetric increments the watcher error count.
func IncWatcherErrMetric(err, prefix string) {
rulesEngineWatcherErrors.WithLabelValues(err, prefix).Inc()
}
20 changes: 10 additions & 10 deletions rules/prometheus_test.go → metrics/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rules
package metrics

import (
"net/http"
Expand Down Expand Up @@ -41,44 +41,44 @@ func checkMetrics(t *testing.T, expectedOutput string) {
}

func TestIncLockMetric(t *testing.T) {
incLockMetric("getKey", "/key/pattern", true)
incLockMetric("getKey", "/second/pattern", false)
IncLockMetric("getKey", "/key/pattern", true)
IncLockMetric("getKey", "/second/pattern", false)

checkMetrics(t, `rules_etcd_lock_count{method="getKey",pattern="/key/pattern",success="true"} 1`)
checkMetrics(t, `rules_etcd_lock_count{method="getKey",pattern="/second/pattern",success="false"} 1`)
}

func TestIncSatisfiedThenNot(t *testing.T) {
incSatisfiedThenNot("getKey", "/key/pattern", "phaseName")
IncSatisfiedThenNot("getKey", "/key/pattern", "phaseName")
checkMetrics(t, `rules_etcd_rule_satisfied_then_not{method="getKey",pattern="/key/pattern",phase="phaseName"} 1`)
}

func TestTimesEvaluated(t *testing.T) {
timesEvaluated("getKey", "rule1234", 5)
TimesEvaluated("getKey", "rule1234", 5)
checkMetrics(t, `rules_etcd_evaluations{method="getKey",rule="rule1234"} 5`)
}

func TestWokerQueueWaitTime(t *testing.T) {
workerQueueWaitTime("getKey", time.Now())
WorkerQueueWaitTime("getKey", time.Now())
checkMetrics(t, `rules_etcd_worker_queue_wait_ms_count{method="getKey"} 1`)
}

func TestWorkBufferWaitTime(t *testing.T) {
workBufferWaitTime("getKey", "/desired/key/pattern", time.Now())
WorkBufferWaitTime("getKey", "/desired/key/pattern", time.Now())
checkMetrics(t, `rules_etcd_work_buffer_wait_ms_count{method="getKey",pattern="/desired/key/pattern"} 1`)
}

func TestCallbackWaitTime(t *testing.T) {
callbackWaitTime("/desired/key/pattern", time.Now())
CallbackWaitTime("/desired/key/pattern", time.Now())
checkMetrics(t, `rules_etcd_callback_wait_ms_count{pattern="/desired/key/pattern"} 1`)
}

func Test_keyProcessBufferCap(t *testing.T) {
keyProcessBufferCap(100)
KeyProcessBufferCap(100)
checkMetrics(t, `rules_etcd_key_process_buffer_cap 100`)
}

func Test_incWatcherErrMetric(t *testing.T) {
incWatcherErrMetric("err", "/desired/key/prefix")
IncWatcherErrMetric("err", "/desired/key/prefix")
checkMetrics(t, `rules_etcd_watcher_errors{error="err",prefix="/desired/key/prefix"} 1`)
}
91 changes: 91 additions & 0 deletions prunelocks/pruner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package prunelocks

import (
"context"
"strings"
"time"

"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

type lockKey struct {
createRevision int64
firstSeen time.Time
}

type Pruner struct {
keys map[string]lockKey
timeout time.Duration
lockPrefixes []string
kv clientv3.KV
lease clientv3.Lease
logger *zap.Logger
}

func (p Pruner) checkLocks() {
ctx := context.Background()
for _, lockPrefix := range p.lockPrefixes {
p.checkLockPrefix(ctx, lockPrefix, p.logger)
}
}

func (p Pruner) checkLockPrefix(ctx context.Context, lockPrefix string, prefixLogger *zap.Logger) {
keysRetrieved := make(map[string]bool)
resp, _ := p.kv.Get(ctx, lockPrefix, clientv3.WithPrefix())
for _, kv := range resp.Kvs {
// There are three possibilities:
// 1. This lock was not seen before
// 2. This lock was seen but has a different create revision
// 3. This lock was seen and has the same create revision
keyString := string(kv.Key)
keysRetrieved[keyString] = true
keyLogger := prefixLogger.With(zap.String("key", keyString), zap.Int64("create_revision", kv.CreateRevision), zap.Int64("lease", kv.Lease))
keyLogger.Info("Found lock")
var key lockKey
var found bool
// Key not seen before or seen before with different create revision
key, found = p.keys[keyString]
keyLogger = keyLogger.With(zap.Bool("found", found))
if found {
keyLogger = keyLogger.With(zap.String("first_seen", key.firstSeen.Format(time.RFC3339)), zap.Int64("existing_create_revision", key.createRevision))
}
if !found || kv.CreateRevision != key.createRevision {
keyLogger.Info("creating new key entry")
key = lockKey{
createRevision: kv.CreateRevision,
firstSeen: time.Now(),
}
p.keys[keyString] = key
}
// Key seen before with same create revision
now := time.Now()

if now.Sub(key.firstSeen) < p.timeout {
keyLogger.Info("Lock not expired")
} else {
keyLogger.Info("Lock expired; deleting key")
resp, err := p.kv.Txn(ctx).If(clientv3.Compare(clientv3.CreateRevision(keyString), "=", key.createRevision)).Then(clientv3.OpDelete(keyString)).Commit()
if err != nil {
keyLogger.Error("error deleting key", zap.Error(err))
} else {
keyLogger.Info("deleted key", zap.Bool("succeeded", resp.Succeeded))
if resp.Succeeded && kv.Lease != 0 {
keyLogger.Error("revoking lease")
_, err := p.lease.Revoke(ctx, clientv3.LeaseID(kv.Lease))
if err != nil {
keyLogger.Error("error revoking lease", zap.Error(err))
} else {
keyLogger.Info("revoked lease")
}
}
}
}
}
for keyString := range p.keys {
if strings.HasPrefix(keyString, lockPrefix) && !keysRetrieved[keyString] {
prefixLogger.Info("removing key from map", zap.String("key", keyString))
delete(p.keys, keyString)
}
}
}
40 changes: 40 additions & 0 deletions prunelocks/pruner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package prunelocks

import (
"testing"
"time"

"go.etcd.io/etcd/clientv3"
"go.uber.org/zap/zaptest"
)

func check(err error) {
if err != nil {
panic(err.Error())
}
}

func Test_Blah(t *testing.T) {
// ctx := context.Background()
cfg := clientv3.Config{Endpoints: []string{"http://127.0.0.1:2379"}}
cl, err := clientv3.New(cfg)
check(err)
kv := clientv3.NewKV(cl)
// resp, err := kv.Get(ctx, "/locks", clientv3.WithPrefix())
// check(err)
// for _, kv := range resp.Kvs {
// fmt.Printf("%v\n", kv)
// }
p := Pruner{
keys: make(map[string]lockKey),
timeout: time.Minute,
kv: kv,
lease: clientv3.NewLease(cl),
logger: zaptest.NewLogger(t),
lockPrefixes: []string{"/locks/hello"},
}
for i := 0; i < 10; i++ {
p.checkLocks()
time.Sleep(10 * time.Second)
}
}
17 changes: 17 additions & 0 deletions rules/concurrency/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package concurrency implements concurrency operations on top of
// etcd such as distributed locks, barriers, and elections.
package concurrency
65 changes: 65 additions & 0 deletions rules/concurrency/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrency

import (
"context"
"fmt"

v3 "go.etcd.io/etcd/clientv3"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
)

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()

var wr v3.WatchResponse
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
Loading