Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenTelemetry tracing instrumentation #1199

Merged
merged 10 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ linters:
- goimports
# -golint # Deprecated
- gomnd
- gomoddirectives
#- gomoddirectives
- gomodguard
- goprintffuncname
- gosec
Expand Down
59 changes: 54 additions & 5 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,55 @@ correct development environment tools installed:
Finally, if you ran all the above commands and `git status` does not report any changes to the repository,
then you are ready to start.

### Development environment

Certain features in Drand depend on external services.
These features are the support for running with PostgreSQL as a database backend, and observability features such as metrics, and tracing.

To keep your environment clean from any external tools required to interact with such features, you can use the
`docker-compose.yaml` file under `devenv/docker-compose.yaml`.

#### Using the devenv tools

To launch the tools, run
```shell
cd devenv
docker compose up -d
```

If you wish to stop the stack, run:
```shell
docker compose down
```

To cleanup and remove all data, run:
```shell
cd devenv
docker compose down --volumes --remove-orphans
```

#### PostgreSQL backend

To use the database instance provided with the devenv, use `127.0.0.1:5432` as the destination for PostgreSQL.

For more details, see the [testing section below](#testing-with-postgresql-as-database-backend).

#### Observability features

Drand can produce traces compatible with OpenTelemetry specification. To turn on this feature, set the `DRAND_TRACES`
environment varible to the desired destination, e.g.
```shell
export DRAND_TRACES=127.0.0.1:4317
export DRAND_TRACES_PROBABILITY=1 # This will sample all traces to the destination server
```

After that, in the same terminal, use any of the drand features, such as `make test-unit-memdb`, to start producing traces.

To explore the trace details, launch a new browser tab/window at the [Grafana instance](http://127.0.0.1:3000/explore?orgId=1),
which will allow you to explore in detail the inner workings of Drand.

For more details on how to use Grafana, you can [read the manual here](https://grafana.com/docs/grafana/v9.4/explore/trace-integration/).

### Development flow

After editing all files required by your change:
Expand Down Expand Up @@ -54,23 +103,23 @@ To check your code against it, run `make test-unit-postgres`.
You can also run the `make demo-postgres` command to launch the scripted demo using
PostgreSQL as a backend.

If you want to run an isolated version of Postgres, you can use the `test/docker-compose.yaml` file
If you want to run an isolated version of Postgres, you can use the `devenv/docker-compose.yaml` file
from the root of this repository to do so.

To start the database, use:
```shell
cd test/
docker-compose up -d
cd devenv/
docker compose up -d
```

To stop the database, use:
```shell
docker-compose down
docker compose down
```

If you wish to remove the database volume too, use this command instead to stop:
```shell
docker-compose down --volumes --remove-orphans
docker compose down --volumes --remove-orphans
```

## Regression testing
Expand Down
6 changes: 4 additions & 2 deletions chain/beacon/callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ func TestStoreCallback(t *testing.T) {
doneCh <- true
})

cb.Put(ctx, &chain.Beacon{
err = cb.Put(ctx, &chain.Beacon{
Round: 1,
})
require.NoError(t, err)
require.True(t, checkOne(doneCh))

cb.AddCallback(id1, func(*chain.Beacon, bool) {})
cb.Put(ctx, &chain.Beacon{
err = cb.Put(ctx, &chain.Beacon{
Round: 1,
})
require.NoError(t, err)
require.False(t, checkOne(doneCh))

cb.RemoveCallback(id1)
Expand Down
83 changes: 71 additions & 12 deletions chain/beacon/chainstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"

"github.com/drand/drand/chain"
"github.com/drand/drand/crypto/vault"
"github.com/drand/drand/key"
"github.com/drand/drand/log"
"github.com/drand/drand/metrics"
"github.com/drand/drand/net"
"github.com/drand/drand/protobuf/drand"
)
Expand Down Expand Up @@ -40,27 +44,33 @@ type chainStore struct {
beaconStoredAgg chan *chain.Beacon
}

func newChainStore(l log.Logger, cf *Config, cl net.ProtocolClient, v *vault.Vault, store chain.Store, t *ticker) (*chainStore, error) {
//nolint:lll // The names are long but clear
func newChainStore(ctx context.Context, l log.Logger, cf *Config, cl net.ProtocolClient, v *vault.Vault, store chain.Store, t *ticker) (*chainStore, error) {
ctx, span := metrics.NewSpan(ctx, "newChainStore")
defer span.End()

// we write some stats about the timing when new beacon is saved
ds := newDiscrepancyStore(store, l, v.GetGroup(), cf.Clock)

// we add a store to run some checks depending on scheme-related config
ss, err := NewSchemeStore(ds, cf.Group.Scheme)
ss, err := NewSchemeStore(ctx, ds, cf.Group.Scheme)
if err != nil {
span.RecordError(err)
return nil, err
}

// we make sure the chain is increasing monotonically
as, err := newAppendStore(ss)
as, err := newAppendStore(ctx, ss)
if err != nil {
span.RecordError(err)
return nil, err
}

// we can register callbacks on it
cbs := NewCallbackStore(l, as)

// we give the final append store to the sync manager
syncm, err := NewSyncManager(&SyncConfig{
syncm, err := NewSyncManager(ctx, &SyncConfig{
Log: l,
Store: cbs,
BoltdbStore: store,
Expand All @@ -70,6 +80,7 @@ func newChainStore(l log.Logger, cf *Config, cl net.ProtocolClient, v *vault.Vau
NodeAddr: cf.Public.Address(),
})
if err != nil {
span.RecordError(err)
return nil, err
}
go syncm.Run()
Expand Down Expand Up @@ -103,8 +114,11 @@ func newChainStore(l log.Logger, cf *Config, cl net.ProtocolClient, v *vault.Vau
return cs, nil
}

func (c *chainStore) NewValidPartial(addr string, p *drand.PartialBeaconPacket) {
func (c *chainStore) NewValidPartial(ctx context.Context, addr string, p *drand.PartialBeaconPacket) {
spanCtx := oteltrace.SpanContextFromContext(ctx)
c.newPartials <- partialInfo{
spanContext: spanCtx,

addr: addr,
p: p,
}
Expand All @@ -114,7 +128,7 @@ func (c *chainStore) Stop() {
c.ctxCancel()
c.syncm.Stop()
c.RemoveCallback("chainstore")
c.CallbackStore.Close(context.Background())
c.CallbackStore.Close()
}

// we store partials that are up to this amount of rounds more than the last
Expand Down Expand Up @@ -143,18 +157,29 @@ func (c *chainStore) runAggregator() {
case lastBeacon = <-c.beaconStoredAgg:
cache.FlushRounds(lastBeacon.Round)
case partial := <-c.newPartials:
ctx, span := metrics.NewSpanFromSpanContext(c.ctx, partial.spanContext, "c.runAggregator")

span.SetAttributes(
attribute.Int64("round", int64(partial.p.Round)),
attribute.String("addr", partial.addr),
)

var err error
if lastBeacon == nil {
lastBeacon, err = c.Last(c.ctx)
lastBeacon, err = c.Last(ctx)
if err != nil {
span.RecordError(err)
if errors.Is(err, context.Canceled) {
c.l.Errorw("stopping chain_aggregator", "loading", "last_beacon", "err", err)
span.End()
return
}
if err.Error() == "sql: database is closed" {
c.l.Errorw("stopping chain_aggregator", "loading", "last_beacon", "err", err)
span.End()
return
}
span.End()
c.l.Fatalw("stopping chain_aggregator", "loading", "last_beacon", "err", err)
}
}
Expand All @@ -167,7 +192,9 @@ func (c *chainStore) runAggregator() {
shouldStore := isNotInPast && isNotTooFar
// check if we can reconstruct
if !shouldStore {
span.AddEvent("ignoring_partial")
c.l.Debugw("", "ignoring_partial", partial.p.GetRound(), "last_beacon_stored", lastBeacon.Round)
span.End()
break
}
// NOTE: This line means we can only verify partial signatures of
Expand All @@ -179,7 +206,9 @@ func (c *chainStore) runAggregator() {
n := c.crypto.GetGroup().Len()

select {
case <-c.ctx.Done():
case <-ctx.Done():
span.AddEvent("ctx.Done")
CluEleSsUK marked this conversation as resolved.
Show resolved Hide resolved
span.End()
return
default:
}
Expand All @@ -188,12 +217,16 @@ func (c *chainStore) runAggregator() {
roundCache := cache.GetRoundCache(partial.p.GetRound(), partial.p.GetPreviousSignature())
if roundCache == nil {
c.l.Errorw("", "store_partial", partial.addr, "no_round_cache", partial.p.GetRound())
span.RecordError(errors.New("no round cache"))
span.End()
break
}

c.l.Debugw("", "store_partial", partial.addr,
"round", roundCache.round, "len_partials", fmt.Sprintf("%d/%d", roundCache.Len(), thr))
if roundCache.Len() < thr {
span.AddEvent("roundCache.Len < thr")
span.End()
break
}

Expand All @@ -202,13 +235,19 @@ func (c *chainStore) runAggregator() {
finalSig, err := c.crypto.Scheme.ThresholdScheme.Recover(c.crypto.GetPub(), msg, roundCache.Partials(), thr, n)
if err != nil {
c.l.Errorw("invalid_recovery", "error", err, "round", pRound, "got", fmt.Sprintf("%d/%d", roundCache.Len(), n))
span.RecordError(errors.New("invalid recovery"))
break
}
if err := c.crypto.Scheme.ThresholdScheme.VerifyRecovered(c.crypto.GetPub().Commit(), msg, finalSig); err != nil {
c.l.Errorw("invalid_sig", "error", err, "round", pRound)
span.RecordError(errors.New("invalid signature"))
span.End()
break
}

span.AddEvent("cache.FlushRounds")
cache.FlushRounds(partial.p.GetRound())
span.AddEvent("cache.FlushRounds - done")

newBeacon := &chain.Beacon{
Round: roundCache.round,
Expand All @@ -217,13 +256,17 @@ func (c *chainStore) runAggregator() {
}

c.l.Infow("", "aggregated_beacon", newBeacon.Round)
if c.tryAppend(c.ctx, lastBeacon, newBeacon) {
span.AddEvent("calling tryAppend")
if c.tryAppend(ctx, lastBeacon, newBeacon) {
lastBeacon = newBeacon
span.End()
break
}

select {
case <-c.ctx.Done():
span.AddEvent("ctx.Done")
span.End()
return
default:
}
Expand All @@ -232,13 +275,17 @@ func (c *chainStore) runAggregator() {
c.l.Debugw("", "new_aggregated", "not_appendable", "last", lastBeacon.String(), "new", newBeacon.String())
if c.shouldSync(lastBeacon, newBeacon) {
peers := toPeers(c.crypto.GetGroup().Nodes)
c.syncm.SendSyncRequest(newBeacon.Round, peers)
c.syncm.SendSyncRequest(span.SpanContext(), newBeacon.Round, peers)
}
span.End()
}
}
}

func (c *chainStore) tryAppend(ctx context.Context, last, newB *chain.Beacon) bool {
ctx, span := metrics.NewSpan(ctx, "chainStore.tryAppend")
defer span.End()

select {
case <-ctx.Done():
return false
Expand All @@ -251,6 +298,7 @@ func (c *chainStore) tryAppend(ctx context.Context, last, newB *chain.Beacon) bo
}

if err := c.CallbackStore.Put(ctx, newB); err != nil {
span.RecordError(err)
// if round is ok but bytes are different, error will be raised
if errors.Is(err, ErrBeaconAlreadyStored) {
c.l.Debugw("Put: race with SyncManager", "err", err)
Expand Down Expand Up @@ -289,16 +337,22 @@ func (c *chainStore) shouldSync(last *chain.Beacon, newB likeBeacon) bool {
// It will start from the latest stored beacon. If upTo is equal to 0, then it
// will follow the chain indefinitely. If peers is nil, it uses the peers of
// the current group.
func (c *chainStore) RunSync(upTo uint64, peers []net.Peer) {
func (c *chainStore) RunSync(ctx context.Context, upTo uint64, peers []net.Peer) {
_, span := metrics.NewSpan(ctx, "c.RunSync")
defer span.End()

if len(peers) == 0 {
peers = toPeers(c.crypto.GetGroup().Nodes)
}

c.syncm.SendSyncRequest(upTo, peers)
c.syncm.SendSyncRequest(span.SpanContext(), upTo, peers)
}

// RunReSync will sync up with other nodes to repair the invalid beacons in the store.
func (c *chainStore) RunReSync(ctx context.Context, faultyBeacons []uint64, peers []net.Peer, cb func(r, u uint64)) error {
ctx, span := metrics.NewSpan(ctx, "c.RunReSync")
defer span.End()

// we do this check here because the SyncManager doesn't have the notion of group
if len(peers) == 0 {
peers = toPeers(c.crypto.GetGroup().Nodes)
Expand All @@ -311,6 +365,9 @@ func (c *chainStore) RunReSync(ctx context.Context, faultyBeacons []uint64, peer
// and it returns the list of round numbers for which the beacons were corrupted / invalid / not found in the store.
// Note: it does not attempt to correct or fetch these faulty beacons.
func (c *chainStore) ValidateChain(ctx context.Context, upTo uint64, cb func(r, u uint64)) ([]uint64, error) {
ctx, span := metrics.NewSpan(ctx, "c.ValidateChain")
defer span.End()

return c.syncm.CheckPastBeacons(ctx, upTo, cb)
}

Expand All @@ -319,6 +376,8 @@ func (c *chainStore) AppendedBeaconNoSync() chan *chain.Beacon {
}

type partialInfo struct {
spanContext oteltrace.SpanContext
CluEleSsUK marked this conversation as resolved.
Show resolved Hide resolved

addr string
p *drand.PartialBeaconPacket
}
Expand Down
Loading