Skip to content

Commit

Permalink
Limit number of open sessions (#165)
Browse files Browse the repository at this point in the history
* Hack:  use a semaphore channel to limit the number of open sessions.
This is a hack because we're relying on session timeout behavior.
Recommend ref-counting API that releases refs when block-height changes.

* Fill up the semaphore channel, else deadlock.

* Use Config struct to define MaxConcurrentSessions.

* Add unit test for max concurrent sessions.

* Test that semaphore is released when sessions expire.
  • Loading branch information
lthibault authored Jan 19, 2024
1 parent 1cd057c commit 669b321
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 18 deletions.
12 changes: 6 additions & 6 deletions suave/builder/api/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

// sessionManager is the backend that manages the session state of the builder API.
type sessionManager interface {
NewSession() (string, error)
// SessionManager is the backend that manages the session state of the builder API.
type SessionManager interface {
NewSession(context.Context) (string, error)
AddTransaction(sessionId string, tx *types.Transaction) (*types.SimulateTransactionResult, error)
}

func NewServer(s sessionManager) *Server {
func NewServer(s SessionManager) *Server {
api := &Server{
sessionMngr: s,
}
return api
}

type Server struct {
sessionMngr sessionManager
sessionMngr SessionManager
}

func (s *Server) NewSession(ctx context.Context) (string, error) {
return s.sessionMngr.NewSession()
return s.sessionMngr.NewSession(ctx)
}

func (s *Server) AddTransaction(ctx context.Context, sessionId string, tx *types.Transaction) (*types.SimulateTransactionResult, error) {
Expand Down
6 changes: 3 additions & 3 deletions suave/builder/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func TestAPI(t *testing.T) {

type nullSessionManager struct{}

func (n *nullSessionManager) NewSession() (string, error) {
return "1", nil
func (nullSessionManager) NewSession(ctx context.Context) (string, error) {
return "1", ctx.Err()
}

func (n *nullSessionManager) AddTransaction(sessionId string, tx *types.Transaction) (*types.SimulateTransactionResult, error) {
func (nullSessionManager) AddTransaction(sessionId string, tx *types.Transaction) (*types.SimulateTransactionResult, error) {
return &types.SimulateTransactionResult{Logs: []*types.SimulatedLog{}}, nil
}
37 changes: 31 additions & 6 deletions suave/builder/session_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package builder

import (
"context"
"fmt"
"math/big"
"sync"
Expand Down Expand Up @@ -31,11 +32,13 @@ type blockchain interface {
}

type Config struct {
GasCeil uint64
SessionIdleTimeout time.Duration
GasCeil uint64
SessionIdleTimeout time.Duration
MaxConcurrentSessions int
}

type SessionManager struct {
sem chan struct{}
sessions map[string]*builder
sessionTimers map[string]*time.Timer
sessionsLock sync.RWMutex
Expand All @@ -50,8 +53,17 @@ func NewSessionManager(blockchain blockchain, config *Config) *SessionManager {
if config.SessionIdleTimeout == 0 {
config.SessionIdleTimeout = 5 * time.Second
}
if config.MaxConcurrentSessions <= 0 {
config.MaxConcurrentSessions = 16 // chosen arbitrarily
}

sem := make(chan struct{}, config.MaxConcurrentSessions)
for len(sem) < cap(sem) {
sem <- struct{}{} // fill 'er up
}

s := &SessionManager{
sem: sem,
sessions: make(map[string]*builder),
sessionTimers: make(map[string]*time.Timer),
blockchain: blockchain,
Expand All @@ -61,12 +73,17 @@ func NewSessionManager(blockchain blockchain, config *Config) *SessionManager {
}

// NewSession creates a new builder session and returns the session id
func (s *SessionManager) NewSession() (string, error) {
s.sessionsLock.Lock()
defer s.sessionsLock.Unlock()
func (s *SessionManager) NewSession(ctx context.Context) (string, error) {
// Wait for session to become available
select {
case <-s.sem:
s.sessionsLock.Lock()
defer s.sessionsLock.Unlock()
case <-ctx.Done():
return "", ctx.Err()
}

parent := s.blockchain.CurrentHeader()

chainConfig := s.blockchain.Config()

header := &types.Header{
Expand Down Expand Up @@ -109,6 +126,14 @@ func (s *SessionManager) NewSession() (string, error) {

delete(s.sessions, id)
delete(s.sessionTimers, id)

// Technically, we are certain that there is an open slot in the semaphore
// channel, but let's be defensive and panic if the invariant is violated.
select {
case s.sem <- struct{}{}:
default:
panic("released more sessions than are open") // unreachable
}
})

return id, nil
Expand Down
42 changes: 39 additions & 3 deletions suave/builder/session_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package builder

import (
"context"
"crypto/ecdsa"
"math/big"
"testing"
Expand All @@ -21,7 +22,7 @@ func TestSessionManager_SessionTimeout(t *testing.T) {
SessionIdleTimeout: 500 * time.Millisecond,
})

id, err := mngr.NewSession()
id, err := mngr.NewSession(context.TODO())
require.NoError(t, err)

time.Sleep(1 * time.Second)
Expand All @@ -30,12 +31,47 @@ func TestSessionManager_SessionTimeout(t *testing.T) {
require.Error(t, err)
}

func TestSessionManager_MaxConcurrentSessions(t *testing.T) {
t.Parallel()

const d = time.Millisecond * 10

mngr, _ := newSessionManager(t, &Config{
MaxConcurrentSessions: 1,
SessionIdleTimeout: d,
})

t.Run("SessionAvailable", func(t *testing.T) {
sess, err := mngr.NewSession(context.TODO())
require.NoError(t, err)
require.NotZero(t, sess)
})

t.Run("ContextExpired", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

sess, err := mngr.NewSession(ctx)
require.Zero(t, sess)
require.ErrorIs(t, err, context.Canceled)
})

t.Run("SessionExpired", func(t *testing.T) {
time.Sleep(d) // Wait for the session to expire.

// We should be able to open a session again.
sess, err := mngr.NewSession(context.TODO())
require.NoError(t, err)
require.NotZero(t, sess)
})
}

func TestSessionManager_SessionRefresh(t *testing.T) {
mngr, _ := newSessionManager(t, &Config{
SessionIdleTimeout: 500 * time.Millisecond,
})

id, err := mngr.NewSession()
id, err := mngr.NewSession(context.TODO())
require.NoError(t, err)

// if we query the session under the idle timeout,
Expand All @@ -60,7 +96,7 @@ func TestSessionManager_StartSession(t *testing.T) {
// test that the session starts and it can simulate transactions
mngr, bMock := newSessionManager(t, &Config{})

id, err := mngr.NewSession()
id, err := mngr.NewSession(context.TODO())
require.NoError(t, err)

txn := bMock.state.newTransfer(t, common.Address{}, big.NewInt(1))
Expand Down

0 comments on commit 669b321

Please sign in to comment.