Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/datastore: add observable proxy #952

Merged
merged 1 commit into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 0 additions & 17 deletions internal/datastore/crdb/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import (

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)
Expand All @@ -37,9 +34,6 @@ const (
)

func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

query := readCaveat.Where(sq.Eq{colCaveatName: name})
sql, args, err := query.ToSql()
if err != nil {
Expand Down Expand Up @@ -69,9 +63,6 @@ func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.
}

func (cr *crdbReader) ListCaveats(ctx context.Context, caveatNames ...string) ([]*core.CaveatDefinition, error) {
ctx, span := tracer.Start(ctx, "ListCaveats", trace.WithAttributes(attribute.StringSlice("names", caveatNames)))
defer span.End()

caveatsWithNames := listCaveat
if len(caveatNames) > 0 {
caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames})
Expand Down Expand Up @@ -118,9 +109,6 @@ func (cr *crdbReader) ListCaveats(ctx context.Context, caveatNames ...string) ([
}

func (rwt *crdbReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteCaveats")
defer span.End()

write := writeCaveat
writtenCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
Expand All @@ -132,7 +120,6 @@ func (rwt *crdbReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.C
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// store the new caveat
sql, args, err := write.ToSql()
Expand All @@ -152,10 +139,6 @@ func (rwt *crdbReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.C
}

func (rwt *crdbReadWriteTXN) DeleteCaveats(ctx context.Context, names []string) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "DeleteCaveats",
trace.WithAttributes(attribute.StringSlice("names", names)))
defer span.End()

deleteCaveatClause := deleteCaveat.Where(sq.Eq{colCaveatName: names})
sql, args, err := deleteCaveatClause.ToSql()
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,6 @@ func (cds *crdbDatastore) HeadRevision(ctx context.Context) (datastore.Revision,
}

func (cds *crdbDatastore) headRevisionInternal(ctx context.Context) (revision.Decimal, error) {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "HeadRevision")
defer span.End()

var hlcNow revision.Decimal
err := cds.execute(ctx, func(ctx context.Context) error {
return cds.pool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
Expand Down
15 changes: 0 additions & 15 deletions internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/jackc/pgx/v4"
"github.com/jzelinskie/stringz"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/datastore/common"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
Expand Down Expand Up @@ -88,9 +86,6 @@ var (
)

func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteTuples")
defer span.End()

bulkWrite := queryWriteTuple
var bulkWriteCount int64

Expand Down Expand Up @@ -194,37 +189,27 @@ func exactRelationshipClause(r *core.RelationTuple) sq.Eq {
}

func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "DeleteRelationships")
defer span.End()

// Add clauses for the ResourceFilter
query := queryDeleteTuples.Where(sq.Eq{colNamespace: filter.ResourceType})
tracerAttributes := []attribute.KeyValue{common.ObjNamespaceNameKey.String(filter.ResourceType)}
if filter.OptionalResourceId != "" {
query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId})
tracerAttributes = append(tracerAttributes, common.ObjIDKey.String(filter.OptionalResourceId))
}
if filter.OptionalRelation != "" {
query = query.Where(sq.Eq{colRelation: filter.OptionalRelation})
tracerAttributes = append(tracerAttributes, common.ObjRelationNameKey.String(filter.OptionalRelation))
}
rwt.addOverlapKey(filter.ResourceType)

// Add clauses for the SubjectFilter
if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil {
query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType})
tracerAttributes = append(tracerAttributes, common.SubNamespaceNameKey.String(subjectFilter.SubjectType))
if subjectFilter.OptionalSubjectId != "" {
query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId})
tracerAttributes = append(tracerAttributes, common.SubObjectIDKey.String(subjectFilter.OptionalSubjectId))
}
if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil {
query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)})
tracerAttributes = append(tracerAttributes, common.SubRelationNameKey.String(relationFilter.Relation))
}
rwt.addOverlapKey(subjectFilter.SubjectType)
}
span.SetAttributes(tracerAttributes...)
sql, args, err := query.ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
Expand Down
7 changes: 0 additions & 7 deletions internal/datastore/mysql/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

sq "github.com/Masterminds/squirrel"
"github.com/shopspring/decimal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/options"
Expand Down Expand Up @@ -85,11 +83,6 @@ func (mr *mysqlReader) ReverseQueryRelationships(

func (mr *mysqlReader) ReadNamespace(ctx context.Context, nsName string) (*core.NamespaceDefinition, datastore.Revision, error) {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "ReadNamespace", trace.WithAttributes(
attribute.String("name", nsName),
))
defer span.End()

tx, txCleanup, err := mr.txSource(ctx)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
Expand Down
29 changes: 0 additions & 29 deletions internal/datastore/mysql/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/go-sql-driver/mysql"
"github.com/jzelinskie/stringz"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"

"github.com/authzed/spicedb/internal/datastore/common"
Expand Down Expand Up @@ -59,8 +57,6 @@ func (cc *caveatContextWrapper) Value() (driver.Value, error) {
func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
// there are some fundamental changes introduced to prevent a deadlock in MySQL
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteTuples")
defer span.End()

bulkWrite := rwt.WriteTupleQuery
bulkWriteHasValues := false
Expand Down Expand Up @@ -162,37 +158,26 @@ func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations

func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "DeleteRelationships")
defer span.End()

// Add clauses for the ResourceFilter
query := rwt.DeleteTupleQuery.Where(sq.Eq{colNamespace: filter.ResourceType})
tracerAttributes := []attribute.KeyValue{common.ObjNamespaceNameKey.String(filter.ResourceType)}
if filter.OptionalResourceId != "" {
query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId})
tracerAttributes = append(tracerAttributes, common.ObjIDKey.String(filter.OptionalResourceId))
}
if filter.OptionalRelation != "" {
query = query.Where(sq.Eq{colRelation: filter.OptionalRelation})
tracerAttributes = append(tracerAttributes, common.ObjRelationNameKey.String(filter.OptionalRelation))
}

// Add clauses for the SubjectFilter
if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil {
query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType})
tracerAttributes = append(tracerAttributes, common.SubNamespaceNameKey.String(subjectFilter.SubjectType))
if subjectFilter.OptionalSubjectId != "" {
query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId})
tracerAttributes = append(tracerAttributes, common.SubObjectIDKey.String(subjectFilter.OptionalSubjectId))
}
if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil {
query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)})
tracerAttributes = append(tracerAttributes, common.SubRelationNameKey.String(relationFilter.Relation))
}
}

span.SetAttributes(tracerAttributes...)

query = query.Set(colDeletedTxn, rwt.newTxnID)

querySQL, args, err := query.ToSql()
Expand All @@ -209,15 +194,10 @@ func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v

func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces ...*core.NamespaceDefinition) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx = datastore.SeparateContextWithTracing(ctx)

ctx, span := tracer.Start(ctx, "WriteNamespaces")
defer span.End()

deletedNamespaceClause := sq.Or{}
writeQuery := rwt.WriteNamespaceQuery

writtenNamespaceNames := make([]string, 0, len(newNamespaces))
for _, newNamespace := range newNamespaces {
serialized, err := proto.Marshal(newNamespace)
if err != nil {
Expand All @@ -226,11 +206,8 @@ func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces

deletedNamespaceClause = append(deletedNamespaceClause, sq.Eq{colNamespace: newNamespace.Name})
writeQuery = writeQuery.Values(newNamespace.Name, serialized, rwt.newTxnID)
writtenNamespaceNames = append(writtenNamespaceNames, newNamespace.Name)
}

span.SetAttributes(common.ObjNamespaceNameKey.StringSlice(writtenNamespaceNames))

delSQL, delArgs, err := rwt.DeleteNamespaceQuery.
Set(colDeletedTxn, rwt.newTxnID).
Where(sq.And{sq.Eq{colDeletedTxn: liveDeletedTxnID}, deletedNamespaceClause}).
Expand All @@ -253,18 +230,12 @@ func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces
if err != nil {
return fmt.Errorf(errUnableToWriteConfig, err)
}
span.AddEvent("Namespace config written")

return nil
}

func (rwt *mysqlReadWriteTXN) DeleteNamespace(ctx context.Context, nsName string) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "DeleteNamespace", trace.WithAttributes(
attribute.String("name", nsName),
))
defer span.End()
ctx = datastore.SeparateContextWithTracing(ctx)

baseQuery := rwt.ReadNamespaceQuery.Where(sq.Eq{colDeletedTxn: liveDeletedTxnID})
_, createdAt, err := loadNamespace(ctx, nsName, rwt.tx, baseQuery)
Expand Down
5 changes: 0 additions & 5 deletions internal/datastore/mysql/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func (mds *Datastore) optimizedRevisionFunc(ctx context.Context) (datastore.Revi
func (mds *Datastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
// implementation deviates slightly from PSQL implementation in order to support
// database seeding in runtime, instead of through migrate command
ctx, span := tracer.Start(ctx, "HeadRevision")
defer span.End()

revision, err := mds.loadRevision(ctx)
if err != nil {
return datastore.NoRevision, err
Expand All @@ -94,8 +91,6 @@ func (mds *Datastore) CheckRevision(ctx context.Context, revisionRaw datastore.R
revision := revisionRaw.(revision.Decimal)

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "CheckRevision")
defer span.End()

revisionTx := transactionFromRevision(revision)

Expand Down
21 changes: 0 additions & 21 deletions internal/datastore/postgres/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import (
"errors"
"fmt"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/trace"
)

var (
Expand All @@ -33,11 +31,6 @@ const (
)

func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(
common.CaveatNameKey.String(name)))
defer span.End()

statement := readCaveat
// TODO remove once the ID->XID migrations are all complete
if r.migrationPhase == writeBothReadOld {
Expand Down Expand Up @@ -87,11 +80,6 @@ func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Cav
}

func (r *pgReader) ListCaveats(ctx context.Context, caveatNames ...string) ([]*core.CaveatDefinition, error) {
ctx = datastore.SeparateContextWithTracing(ctx)
ctx, span := tracer.Start(ctx, "ListCaveats", trace.WithAttributes(
common.CaveatNameKey.StringSlice(caveatNames)))
defer span.End()

caveatsWithNames := listCaveat
if len(caveatNames) > 0 {
caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames})
Expand Down Expand Up @@ -137,9 +125,6 @@ func (r *pgReader) ListCaveats(ctx context.Context, caveatNames ...string) ([]*c
}

func (rwt *pgReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error {
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "WriteCaveats")
defer span.End()

write := writeCaveat
// TODO remove once the ID->XID migrations are all complete
if rwt.migrationPhase == writeBothReadNew || rwt.migrationPhase == writeBothReadOld {
Expand All @@ -159,7 +144,6 @@ func (rwt *pgReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.Cav
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// mark current caveats as deleted
err := rwt.deleteCaveatsFromNames(ctx, writtenCaveatNames)
Expand All @@ -179,11 +163,6 @@ func (rwt *pgReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.Cav
}

func (rwt *pgReadWriteTXN) DeleteCaveats(ctx context.Context, names []string) error {
ctx = datastore.SeparateContextWithTracing(ctx)
ctx, span := tracer.Start(ctx, "DeleteCaveats", trace.WithAttributes(
common.CaveatNameKey.StringSlice(names)))
defer span.End()

// mark current caveats as deleted
return rwt.deleteCaveatsFromNames(ctx, names)
}
Expand Down
7 changes: 0 additions & 7 deletions internal/datastore/postgres/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/options"
Expand Down Expand Up @@ -91,11 +89,6 @@ func (r *pgReader) ReverseQueryRelationships(
}

func (r *pgReader) ReadNamespace(ctx context.Context, nsName string) (*core.NamespaceDefinition, datastore.Revision, error) {
ctx, span := tracer.Start(ctx, "ReadNamespace", trace.WithAttributes(
attribute.String("name", nsName),
))
defer span.End()

tx, txCleanup, err := r.txSource(ctx)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
Expand Down