Skip to content

Commit

Permalink
Instance management: Dispose of instance after 5s (#765)
Browse files Browse the repository at this point in the history
Wait disposing an instance until after 5 seconds in case the instance is still in use.

Fixes #753
  • Loading branch information
marefr committed Oct 18, 2023
1 parent b26994a commit 9be45ea
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
8 changes: 7 additions & 1 deletion backend/instancemgmt/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"reflect"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -17,6 +18,7 @@ var (
Name: "active_instances",
Help: "The number of active plugin instances",
})
disposeTTL = 5 * time.Second
)

// Instance is a marker interface for an instance.
Expand Down Expand Up @@ -119,7 +121,11 @@ func (im *instanceManager) Get(ctx context.Context, pluginContext backend.Plugin
}

if disposer, valid := ci.instance.(InstanceDisposer); valid {
disposer.Dispose()
time.AfterFunc(disposeTTL, func() {
disposer.Dispose()
activeInstances.Dec()
})
} else {
activeInstances.Dec()
}
}
Expand Down
40 changes: 30 additions & 10 deletions backend/instancemgmt/instance_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func TestInstanceManager(t *testing.T) {
Updated: time.Now(),
},
}
origDisposeTTL := disposeTTL
disposeTTL = time.Millisecond
t.Cleanup(func() {
disposeTTL = origDisposeTTL
})
newInstance, err := im.Get(ctx, pCtxUpdated)

t.Run("New instance should be created", func(t *testing.T) {
Expand All @@ -57,7 +62,9 @@ func TestInstanceManager(t *testing.T) {
})

t.Run("Old instance should be disposed", func(t *testing.T) {
require.True(t, instance.(*testInstance).disposed)
instance.(*testInstance).wg.Wait()
require.True(t, instance.(*testInstance).disposed.Load())
require.Equal(t, int64(1), instance.(*testInstance).disposedTimes.Load())
})
})
})
Expand Down Expand Up @@ -95,14 +102,20 @@ func TestInstanceManagerConcurrency(t *testing.T) {
t.Run("All created instances should be either disposed or exist in cache for later disposing", func(t *testing.T) {
cachedInstance, _ := im.Get(ctx, pCtx)
for _, instance := range createdInstances {
if cachedInstance.(*testInstance) != instance && instance.disposedTimes < 1 {
if cachedInstance.(*testInstance) != instance && instance.disposedTimes.Load() < 1 {
require.FailNow(t, "Found lost reference to un-disposed instance")
}
}
})
})

t.Run("Check possible race condition issues when re-creating instance on settings update", func(t *testing.T) {
origDisposeTTL := disposeTTL
disposeTTL = time.Millisecond
t.Cleanup(func() {
disposeTTL = origDisposeTTL
})

ctx := context.Background()
initialPCtx := backend.PluginContext{
OrgID: 1,
Expand Down Expand Up @@ -141,12 +154,13 @@ func TestInstanceManagerConcurrency(t *testing.T) {
wg.Wait()

t.Run("Initial instance should be disposed only once", func(t *testing.T) {
require.Equal(t, int64(1), instanceToDispose.(*testInstance).disposedTimes, "Instance should be disposed only once")
instanceToDispose.(*testInstance).wg.Wait()
require.Equal(t, int64(1), instanceToDispose.(*testInstance).disposedTimes.Load(), "Instance should be disposed only once")
})
t.Run("All created instances should be either disposed or exist in cache for later disposing", func(t *testing.T) {
cachedInstance, _ := im.Get(ctx, updatedPCtx)
for _, instance := range createdInstances {
if cachedInstance.(*testInstance) != instance && instance.disposedTimes < 1 {
if cachedInstance.(*testInstance) != instance && instance.disposedTimes.Load() < 1 {
require.FailNow(t, "Found lost reference to un-disposed instance")
}
}
Expand Down Expand Up @@ -204,13 +218,15 @@ func TestInstanceManagerConcurrency(t *testing.T) {
type testInstance struct {
orgID int64
updated time.Time
disposed bool
disposedTimes int64
disposed atomic.Bool
disposedTimes atomic.Int64
wg sync.WaitGroup
}

func (ti *testInstance) Dispose() {
ti.disposed = true
atomic.AddInt64(&ti.disposedTimes, 1)
ti.disposed.Store(true)
ti.disposedTimes.Add(1)
ti.wg.Done()
}

type testInstanceProvider struct {
Expand All @@ -231,8 +247,12 @@ func (tip *testInstanceProvider) NewInstance(_ context.Context, pluginContext ba
if tip.delay > 0 {
time.Sleep(tip.delay)
}
return &testInstance{

ti := &testInstance{
orgID: pluginContext.OrgID,
updated: pluginContext.AppInstanceSettings.Updated,
}, nil
}
ti.wg.Add(1)

return ti, nil
}
15 changes: 10 additions & 5 deletions experimental/e2e/storage/har_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ func TestHARStorage(t *testing.T) {
require.NoError(t, e)
c <- true
}()
req, res := exampleRequest()
defer res.Body.Close()
req.URL.Path = "/two"
err = two.Add(req, res)
require.NoError(t, err)
go func() {
req, res := exampleRequest()
defer res.Body.Close()
req.URL.Path = "/two"
err = two.Add(req, res)
require.NoError(t, err)
c <- true
}()

<-c
<-c
require.Len(t, one.Entries(), 2)
require.Len(t, two.Entries(), 2)
Expand Down

0 comments on commit 9be45ea

Please sign in to comment.