Skip to content

Commit

Permalink
Make sure to cleanup goroutine generated by the namespace manager and…
Browse files Browse the repository at this point in the history
… the parser

Depends upon dgraph-io/ristretto#286

Signed-off-by: Joseph Schorr <josephschorr@users.noreply.github.com>
  • Loading branch information
josephschorr committed Oct 14, 2021
1 parent 424037a commit 968a8b7
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 31 deletions.
4 changes: 4 additions & 0 deletions cmd/spicedb/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ func serveRun(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("failed while shutting down metrics server")
}

if err := nsm.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down namespace manager")
}

if err := ds.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down datastore")
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
go.opentelemetry.io/otel v1.0.0
go.opentelemetry.io/otel/exporters/jaeger v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.0.0
go.uber.org/goleak v1.1.12
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210921065528-437939a70204 // indirect
golang.org/x/tools v0.1.6 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -634,6 +636,7 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
Expand Down
6 changes: 5 additions & 1 deletion internal/namespace/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func (nsc cachingManager) ReadNamespace(ctx context.Context, nsName string) (*v0

// Check the cache.
now := time.Now()

value, found := nsc.c.Get(nsName)
if found {
foundEntry := value.(cacheEntry)
Expand Down Expand Up @@ -126,3 +125,8 @@ func (nsc cachingManager) CheckNamespaceAndRelation(ctx context.Context, namespa

return NewRelationNotFoundErr(namespace, relation)
}

func (nsc cachingManager) Close() error {
nsc.c.Close()
return nil
}
5 changes: 5 additions & 0 deletions internal/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ type Manager interface {

// ReadNamespaceAndTypes reads a namespace definition, version, and type system and returns it if found.
ReadNamespaceAndTypes(ctx context.Context, nsName string) (*v0.NamespaceDefinition, *NamespaceTypeSystem, decimal.Decimal, error)

// Closes the namespace manager, disposing of any resources.
//
// NOTE: Should *not* call Close on the datastore.
Close() error
}
2 changes: 2 additions & 0 deletions internal/services/v0/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,8 @@ func newACLServicer(
require.NoError(err)

return v0.NewACLServiceClient(conn), func() {
ns.Close()
ds.Close()
s.Stop()
lis.Close()
}, revision, ds
Expand Down
52 changes: 32 additions & 20 deletions internal/services/v0/devcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (

// DevContext holds the various helper types for running the developer calls.
type DevContext struct {
Datastore datastore.Datastore
Revision decimal.Decimal
Namespaces []*v0.NamespaceDefinition
Dispatcher dispatch.Dispatcher
RequestErrors []*v0.DeveloperError
Datastore datastore.Datastore
Revision decimal.Decimal
Namespaces []*v0.NamespaceDefinition
Dispatcher dispatch.Dispatcher
RequestErrors []*v0.DeveloperError
NamespaceManager namespace.Manager
}

// NewDevContext creates a new DevContext from the specified request context, parsing and populating
Expand All @@ -39,7 +40,12 @@ func NewDevContext(ctx context.Context, requestContext *v0.RequestContext) (*Dev

dctx, ok, err := newDevContext(ctx, requestContext, ds)
if !ok || err != nil {
err := ds.Close()
err := dctx.NamespaceManager.Close()
if err != nil {
return nil, false, err
}

err = ds.Close()
if err != nil {
return nil, false, err
}
Expand All @@ -57,58 +63,64 @@ func newDevContext(ctx context.Context, requestContext *v0.RequestContext, ds da

namespaces, devError, err := compile(requestContext.Schema)
if err != nil {
return nil, false, err
return &DevContext{NamespaceManager: nsm}, false, err
}

if devError != nil {
return &DevContext{RequestErrors: []*v0.DeveloperError{devError}}, false, nil
return &DevContext{NamespaceManager: nsm, RequestErrors: []*v0.DeveloperError{devError}}, false, nil
}

requestErrors, err := loadNamespaces(ctx, namespaces, nsm, ds)
if err != nil {
return &DevContext{}, false, err
return &DevContext{NamespaceManager: nsm}, false, err
}

if len(requestErrors) > 0 {
return &DevContext{RequestErrors: requestErrors}, false, nil
return &DevContext{NamespaceManager: nsm, RequestErrors: requestErrors}, false, nil
}

if len(requestContext.LegacyNsConfigs) > 0 {
requestErrors, err := loadNamespaces(ctx, requestContext.LegacyNsConfigs, nsm, ds)
if err != nil {
return &DevContext{}, false, err
return &DevContext{NamespaceManager: nsm}, false, err
}

if len(requestErrors) > 0 {
return &DevContext{RequestErrors: requestErrors}, false, nil
return &DevContext{NamespaceManager: nsm, RequestErrors: requestErrors}, false, nil
}
}

revision, requestErrors, err := loadTuples(ctx, requestContext.Relationships, nsm, ds)
if err != nil {
return &DevContext{Namespaces: namespaces}, false, err
return &DevContext{NamespaceManager: nsm, Namespaces: namespaces}, false, err
}

if len(requestErrors) == 0 {
err = requestContext.Validate()
if err != nil {
return nil, false, err
return &DevContext{NamespaceManager: nsm, Namespaces: namespaces}, false, err
}
}

return &DevContext{
Datastore: ds,
Namespaces: namespaces,
Revision: revision,
Dispatcher: dispatcher,
RequestErrors: requestErrors,
Datastore: ds,
Namespaces: namespaces,
Revision: revision,
Dispatcher: dispatcher,
RequestErrors: requestErrors,
NamespaceManager: nsm,
}, len(requestErrors) == 0, nil
}

func (dc *DevContext) dispose() {
datastore := dc.Datastore
if datastore != nil {
err := datastore.Close()
err := dc.NamespaceManager.Close()
if err != nil {
log.Err(err).Msg("error when disposing of namespace manager in devcontext")
}

err = datastore.Close()
if err != nil {
log.Err(err).Msg("error when disposing of datastore in devcontext")
}
Expand Down
23 changes: 18 additions & 5 deletions internal/services/v0/developer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (

v0 "github.com/authzed/authzed-go/proto/authzed/api/v0"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/authzed/spicedb/pkg/tuple"
)

func TestSharing(t *testing.T) {
func TestDeveloperSharing(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreCurrent())

require := require.New(t)

store := NewInMemoryShareStore("flavored")
Expand Down Expand Up @@ -41,7 +44,9 @@ func TestSharing(t *testing.T) {
require.Equal("s", lresp.Schema)
}

func TestSharingConverted(t *testing.T) {
func TestDeveloperSharingConverted(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreCurrent())

require := require.New(t)

store := NewInMemoryShareStore("flavored")
Expand Down Expand Up @@ -72,6 +77,8 @@ func TestSharingConverted(t *testing.T) {
}

func TestEditCheck(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreCurrent())

type testCase struct {
name string
schema string
Expand Down Expand Up @@ -201,7 +208,9 @@ func TestEditCheck(t *testing.T) {
}
}

func TestValidate(t *testing.T) {
func TestDeveloperValidate(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreCurrent())

type testCase struct {
name string
schema string
Expand Down Expand Up @@ -607,7 +616,9 @@ assertFalse:
}
}

func TestFormatSchema(t *testing.T) {
func TestDeveloperFormatSchema(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreCurrent())

require := require.New(t)

store := NewInMemoryShareStore("flavored")
Expand All @@ -621,7 +632,9 @@ func TestFormatSchema(t *testing.T) {
require.Equal("definition foos {}\n\ndefinition bars {}", lresp.FormattedSchema)
}

func TestValidateONR(t *testing.T) {
func TestDeveloperValidateONR(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreCurrent())

require := require.New(t)

store := NewInMemoryShareStore("flavored")
Expand Down
3 changes: 3 additions & 0 deletions internal/services/v0/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestNamespace(t *testing.T) {

ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC, 0)
require.NoError(err)
defer ds.Close()

srv := NewNamespaceServer(ds)

Expand Down Expand Up @@ -179,6 +180,7 @@ func TestNamespaceChanged(t *testing.T) {

ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC, 0)
require.NoError(err)
defer ds.Close()

srv := NewNamespaceServer(ds)

Expand Down Expand Up @@ -279,6 +281,7 @@ func TestDeleteNamespace(t *testing.T) {

ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC, 0)
require.NoError(err)
defer ds.Close()

srv := NewNamespaceServer(ds)

Expand Down
23 changes: 18 additions & 5 deletions pkg/schemadsl/lexer/lex.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func createLexer(source input.InputSource, input string) *Lexer {
source: source,
input: input,
tokens: make(chan Lexeme),
closed: make(chan struct{}),
}
go l.run()
return l
Expand All @@ -32,6 +33,12 @@ func (l *Lexer) run() {
close(l.tokens)
}

// Close stops the lexer from running.
func (l *Lexer) Close() {
l.state = nil
close(l.closed)
}

// Lexeme represents a token returned from scanning the contents of a file.
type Lexeme struct {
Kind TokenType // The type of this lexeme.
Expand All @@ -55,6 +62,7 @@ type Lexer struct {
currentToken Lexeme // The current token if any
lastNonWhitespaceToken Lexeme // The last token returned that is non-whitespace
lastNonIgnoredToken Lexeme // The last token returned that is non-whitespace and non-comment
closed chan struct{} // Holds the closed channel
}

// nextToken returns the next token from the input.
Expand Down Expand Up @@ -105,9 +113,14 @@ func (l *Lexer) emit(t TokenType) {
l.lastNonIgnoredToken = currentToken
}

l.tokens <- currentToken
l.currentToken = currentToken
l.start = l.pos
select {
case l.tokens <- currentToken:
l.currentToken = currentToken
l.start = l.pos

case <-l.closed:
return
}
}

// errorf returns an error token and terminates the scan by passing
Expand Down Expand Up @@ -173,11 +186,11 @@ func buildLexUntil(findType TokenType, checker checkFn) stateFn {
return func(l *Lexer) stateFn {
for {
r := l.next()
is_valid, err := checker(r)
isValid, err := checker(r)
if err != nil {
return l.errorf("%v", err)
}
if !is_valid {
if !isValid {
l.backup()
break
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/schemadsl/lexer/peekable_lex.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ func NewPeekableLexer(lex *Lexer) *PeekableLexer {
}
}

// Close stops the lexer from running.
func (l *PeekableLexer) Close() {
l.lex.Close()
}

// NextToken returns the next token found in the lexer.
func (l *PeekableLexer) NextToken() Lexeme {
frontElement := l.readTokens.Front()
Expand Down
1 change: 1 addition & 0 deletions pkg/schemadsl/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
func Parse(builder NodeBuilder, source input.InputSource, input string) AstNode {
lx := lexer.Lex(source, input)
parser := buildParser(lx, builder, source, input)
defer parser.close()
return parser.consumeTopLevel()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/schemadsl/parser/parser_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func buildParser(lx *lexer.Lexer, builder NodeBuilder, source input.InputSource,
}
}

func (p *sourceParser) close() {
p.lex.Close()
}

// createNode creates a new AstNode and returns it.
func (p *sourceParser) createNode(kind dslshape.NodeType) AstNode {
return p.builder(p.source, kind)
Expand Down

0 comments on commit 968a8b7

Please sign in to comment.