Skip to content

Commit

Permalink
internal/datastore: add observable proxy
Browse files Browse the repository at this point in the history
This pulls out all tracing into one proxy to deduplicate some
boilerplate around tracing the datastore.
  • Loading branch information
jzelinskie committed Oct 27, 2022
1 parent a01491a commit 30bcdb9
Show file tree
Hide file tree
Showing 16 changed files with 296 additions and 178 deletions.
17 changes: 0 additions & 17 deletions internal/datastore/crdb/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import (

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)
Expand All @@ -37,9 +34,6 @@ const (
)

func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

query := readCaveat.Where(sq.Eq{colCaveatName: name})
sql, args, err := query.ToSql()
if err != nil {
Expand Down Expand Up @@ -69,9 +63,6 @@ func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.
}

func (cr *crdbReader) ListCaveats(ctx context.Context, caveatNames ...string) ([]*core.CaveatDefinition, error) {
ctx, span := tracer.Start(ctx, "ListCaveats", trace.WithAttributes(attribute.StringSlice("names", caveatNames)))
defer span.End()

caveatsWithNames := listCaveat
if len(caveatNames) > 0 {
caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames})
Expand Down Expand Up @@ -118,9 +109,6 @@ func (cr *crdbReader) ListCaveats(ctx context.Context, caveatNames ...string) ([
}

func (rwt *crdbReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteCaveats")
defer span.End()

write := writeCaveat
writtenCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
Expand All @@ -132,7 +120,6 @@ func (rwt *crdbReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.C
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// store the new caveat
sql, args, err := write.ToSql()
Expand All @@ -152,10 +139,6 @@ func (rwt *crdbReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.C
}

func (rwt *crdbReadWriteTXN) DeleteCaveats(ctx context.Context, names []string) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "DeleteCaveats",
trace.WithAttributes(attribute.StringSlice("names", names)))
defer span.End()

deleteCaveatClause := deleteCaveat.Where(sq.Eq{colCaveatName: names})
sql, args, err := deleteCaveatClause.ToSql()
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,6 @@ func (cds *crdbDatastore) HeadRevision(ctx context.Context) (datastore.Revision,
}

func (cds *crdbDatastore) headRevisionInternal(ctx context.Context) (revision.Decimal, error) {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "HeadRevision")
defer span.End()

var hlcNow revision.Decimal
err := cds.execute(ctx, func(ctx context.Context) error {
return cds.pool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
Expand Down
15 changes: 0 additions & 15 deletions internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/jackc/pgx/v4"
"github.com/jzelinskie/stringz"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/datastore/common"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
Expand Down Expand Up @@ -88,9 +86,6 @@ var (
)

func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteTuples")
defer span.End()

bulkWrite := queryWriteTuple
var bulkWriteCount int64

Expand Down Expand Up @@ -194,37 +189,27 @@ func exactRelationshipClause(r *core.RelationTuple) sq.Eq {
}

func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "DeleteRelationships")
defer span.End()

// Add clauses for the ResourceFilter
query := queryDeleteTuples.Where(sq.Eq{colNamespace: filter.ResourceType})
tracerAttributes := []attribute.KeyValue{common.ObjNamespaceNameKey.String(filter.ResourceType)}
if filter.OptionalResourceId != "" {
query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId})
tracerAttributes = append(tracerAttributes, common.ObjIDKey.String(filter.OptionalResourceId))
}
if filter.OptionalRelation != "" {
query = query.Where(sq.Eq{colRelation: filter.OptionalRelation})
tracerAttributes = append(tracerAttributes, common.ObjRelationNameKey.String(filter.OptionalRelation))
}
rwt.addOverlapKey(filter.ResourceType)

// Add clauses for the SubjectFilter
if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil {
query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType})
tracerAttributes = append(tracerAttributes, common.SubNamespaceNameKey.String(subjectFilter.SubjectType))
if subjectFilter.OptionalSubjectId != "" {
query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId})
tracerAttributes = append(tracerAttributes, common.SubObjectIDKey.String(subjectFilter.OptionalSubjectId))
}
if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil {
query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)})
tracerAttributes = append(tracerAttributes, common.SubRelationNameKey.String(relationFilter.Relation))
}
rwt.addOverlapKey(subjectFilter.SubjectType)
}
span.SetAttributes(tracerAttributes...)
sql, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
Expand Down
7 changes: 0 additions & 7 deletions internal/datastore/mysql/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

sq "github.com/Masterminds/squirrel"
"github.com/shopspring/decimal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/options"
Expand Down Expand Up @@ -85,11 +83,6 @@ func (mr *mysqlReader) ReverseQueryRelationships(

func (mr *mysqlReader) ReadNamespace(ctx context.Context, nsName string) (*core.NamespaceDefinition, datastore.Revision, error) {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "ReadNamespace", trace.WithAttributes(
attribute.String("name", nsName),
))
defer span.End()

tx, txCleanup, err := mr.txSource(ctx)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
Expand Down
29 changes: 0 additions & 29 deletions internal/datastore/mysql/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/go-sql-driver/mysql"
"github.com/jzelinskie/stringz"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/datastore/common"
Expand Down Expand Up @@ -59,8 +57,6 @@ func (cc *caveatContextWrapper) Value() (driver.Value, error) {
func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
// there are some fundamental changes introduced to prevent a deadlock in MySQL
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteTuples")
defer span.End()

bulkWrite := rwt.WriteTupleQuery
bulkWriteHasValues := false
Expand Down Expand Up @@ -162,37 +158,26 @@ func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations

func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "DeleteRelationships")
defer span.End()

// Add clauses for the ResourceFilter
query := rwt.DeleteTupleQuery.Where(sq.Eq{colNamespace: filter.ResourceType})
tracerAttributes := []attribute.KeyValue{common.ObjNamespaceNameKey.String(filter.ResourceType)}
if filter.OptionalResourceId != "" {
query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId})
tracerAttributes = append(tracerAttributes, common.ObjIDKey.String(filter.OptionalResourceId))
}
if filter.OptionalRelation != "" {
query = query.Where(sq.Eq{colRelation: filter.OptionalRelation})
tracerAttributes = append(tracerAttributes, common.ObjRelationNameKey.String(filter.OptionalRelation))
}

// Add clauses for the SubjectFilter
if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil {
query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType})
tracerAttributes = append(tracerAttributes, common.SubNamespaceNameKey.String(subjectFilter.SubjectType))
if subjectFilter.OptionalSubjectId != "" {
query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId})
tracerAttributes = append(tracerAttributes, common.SubObjectIDKey.String(subjectFilter.OptionalSubjectId))
}
if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil {
query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)})
tracerAttributes = append(tracerAttributes, common.SubRelationNameKey.String(relationFilter.Relation))
}
}

span.SetAttributes(tracerAttributes...)

query = query.Set(colDeletedTxn, rwt.newTxnID)

querySQL, args, err := query.ToSql()
Expand All @@ -209,15 +194,10 @@ func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v

func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces ...*core.NamespaceDefinition) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx = datastore.SeparateContextWithTracing(ctx)

ctx, span := tracer.Start(ctx, "WriteNamespaces")
defer span.End()

deletedNamespaceClause := sq.Or{}
writeQuery := rwt.WriteNamespaceQuery

writtenNamespaceNames := make([]string, 0, len(newNamespaces))
for _, newNamespace := range newNamespaces {
serialized, err := proto.Marshal(newNamespace)
if err != nil {
Expand All @@ -226,11 +206,8 @@ func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces

deletedNamespaceClause = append(deletedNamespaceClause, sq.Eq{colNamespace: newNamespace.Name})
writeQuery = writeQuery.Values(newNamespace.Name, serialized, rwt.newTxnID)
writtenNamespaceNames = append(writtenNamespaceNames, newNamespace.Name)
}

span.SetAttributes(common.ObjNamespaceNameKey.StringSlice(writtenNamespaceNames))

delSQL, delArgs, err := rwt.DeleteNamespaceQuery.
Set(colDeletedTxn, rwt.newTxnID).
Where(sq.And{sq.Eq{colDeletedTxn: liveDeletedTxnID}, deletedNamespaceClause}).
Expand All @@ -253,18 +230,12 @@ func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces
if err != nil {
return fmt.Errorf(errUnableToWriteConfig, err)
}
span.AddEvent("Namespace config written")

return nil
}

func (rwt *mysqlReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "DeleteNamespace", trace.WithAttributes(
attribute.String("name", nsName),
))
defer span.End()
ctx = datastore.SeparateContextWithTracing(ctx)

baseQuery := rwt.ReadNamespaceQuery.Where(sq.Eq{colDeletedTxn: liveDeletedTxnID})
_, createdAt, err := loadNamespace(ctx, nsName, rwt.tx, baseQuery)
Expand Down
5 changes: 0 additions & 5 deletions internal/datastore/mysql/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func (mds *Datastore) optimizedRevisionFunc(ctx context.Context) (datastore.Revi
func (mds *Datastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
// implementation deviates slightly from PSQL implementation in order to support
// database seeding in runtime, instead of through migrate command
ctx, span := tracer.Start(ctx, "HeadRevision")
defer span.End()

revision, err := mds.loadRevision(ctx)
if err != nil {
return datastore.NoRevision, err
Expand All @@ -94,8 +91,6 @@ func (mds *Datastore) CheckRevision(ctx context.Context, revisionRaw datastore.R
revision := revisionRaw.(revision.Decimal)

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "CheckRevision")
defer span.End()

revisionTx := transactionFromRevision(revision)

Expand Down
21 changes: 0 additions & 21 deletions internal/datastore/postgres/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import (
"errors"
"fmt"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/trace"
)

var (
Expand All @@ -33,11 +31,6 @@ const (
)

func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(
common.CaveatNameKey.String(name)))
defer span.End()

statement := readCaveat
// TODO remove once the ID->XID migrations are all complete
if r.migrationPhase == writeBothReadOld {
Expand Down Expand Up @@ -87,11 +80,6 @@ func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Cav
}

func (r *pgReader) ListCaveats(ctx context.Context, caveatNames ...string) ([]*core.CaveatDefinition, error) {
ctx = datastore.SeparateContextWithTracing(ctx)
ctx, span := tracer.Start(ctx, "ListCaveats", trace.WithAttributes(
common.CaveatNameKey.StringSlice(caveatNames)))
defer span.End()

caveatsWithNames := listCaveat
if len(caveatNames) > 0 {
caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames})
Expand Down Expand Up @@ -137,9 +125,6 @@ func (r *pgReader) ListCaveats(ctx context.Context, caveatNames ...string) ([]*c
}

func (rwt *pgReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteCaveats")
defer span.End()

write := writeCaveat
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
Expand All @@ -159,7 +144,6 @@ func (rwt *pgReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.Cav
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// mark current caveats as deleted
err := rwt.deleteCaveatsFromNames(ctx, writtenCaveatNames)
Expand All @@ -179,11 +163,6 @@ func (rwt *pgReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.Cav
}

func (rwt *pgReadWriteTXN) DeleteCaveats(ctx context.Context, names []string) error {
ctx = datastore.SeparateContextWithTracing(ctx)
ctx, span := tracer.Start(ctx, "DeleteCaveats", trace.WithAttributes(
common.CaveatNameKey.StringSlice(names)))
defer span.End()

// mark current caveats as deleted
return rwt.deleteCaveatsFromNames(ctx, names)
}
Expand Down
7 changes: 0 additions & 7 deletions internal/datastore/postgres/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/options"
Expand Down Expand Up @@ -91,11 +89,6 @@ func (r *pgReader) ReverseQueryRelationships(
}

func (r *pgReader) ReadNamespace(ctx context.Context, nsName string) (*core.NamespaceDefinition, datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "ReadNamespace", trace.WithAttributes(
attribute.String("name", nsName),
))
defer span.End()

tx, txCleanup, err := r.txSource(ctx)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
Expand Down

0 comments on commit 30bcdb9

Please sign in to comment.