Skip to content

Commit

Permalink
allow creation of multiple engines on the same protocol and port (pan…
Browse files Browse the repository at this point in the history
…jf2000#419)

Co-authored-by: Jeffrey Damick <jdamick@amazon.com>
  • Loading branch information
jdamick and Jeffrey Damick committed Nov 4, 2022
1 parent cfcafcd commit a7eb974
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 1 deletion.
24 changes: 24 additions & 0 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ func (s Engine) Dup() (dupFD int, err error) {
return
}

// Stop gracefully shuts down this Engine without interrupting any active event-loops,
// it waits indefinitely for connections and event-loops to be closed and then shuts down.
func (s Engine) Stop(ctx context.Context) error {
if s.eng.isInShutdown() {
return errors.ErrEngineInShutdown
}

s.eng.signalShutdown()

ticker := time.NewTicker(shutdownPollInterval)
defer ticker.Stop()
for {
if s.eng.isInShutdown() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}

// Reader is an interface that consists of a number of methods for reading that Conn must implement.
type Reader interface {
// ================================== Non-concurrency-safe API's ==================================
Expand Down Expand Up @@ -401,6 +424,7 @@ var (

// Stop gracefully shuts down the engine without interrupting any active event-loops,
// it waits indefinitely for connections and event-loops to be closed and then shuts down.
// Deprecated: The global Stop only shuts down the last registered Engine with the same protocol and IP:Port as the previous Engine's, which can lead to leaks of Engine if you invoke gnet.Run multiple times using the same protocol and IP:Port under the condition that WithReuseAddr(true) and WithReusePort(true) are enabled. Use Engine.Stop instead.
func Stop(ctx context.Context, protoAddr string) error {
var eng *engine
if s, ok := allEngines.Load(protoAddr); ok {
Expand Down
88 changes: 87 additions & 1 deletion gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,97 @@ func testStop(t *testing.T, network, addr string) {
assert.NoError(t, err)
}

func TestEngineStop(t *testing.T) {
testEngineStop(t, "tcp", ":9998")
}

type testStopEngine struct {
*BuiltinEventEngine
tester *testing.T
network, addr, protoAddr string
eng Engine
stopIter int64
name string
exchngCount int64
}

func (t *testStopEngine) OnBoot(eng Engine) (action Action) {
t.eng = eng
return
}

func (t *testStopEngine) OnClose(c Conn, err error) (action Action) {
logging.Debugf("closing connection...")
return
}

func (t *testStopEngine) OnTraffic(c Conn) (action Action) {
buf, _ := c.Peek(-1)
_, _ = c.Write(buf)
_, _ = c.Discard(-1)
atomic.AddInt64(&t.exchngCount, 1)
return
}

func (t *testStopEngine) OnTick() (delay time.Duration, action Action) {
delay = time.Millisecond * 100
go func() {
conn, err := net.Dial(t.network, t.addr)
require.NoError(t.tester, err)
defer conn.Close()
data := []byte("Hello World! " + t.name)
_, _ = conn.Write(data)
_, err = conn.Read(data)
require.NoError(t.tester, err)

iter := atomic.LoadInt64(&t.stopIter)
if iter <= 0 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
logging.Debugf("stop engine...", t.eng.Stop(ctx))
// waiting the engine shutdown.
_, err = conn.Read(data)
require.Error(t.tester, err)
}
atomic.AddInt64(&t.stopIter, -1)
}()
return
}

func testEngineStop(t *testing.T, network, addr string) {
events1 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "1", stopIter: 2}
events2 := &testStopEngine{tester: t, network: network, addr: addr, protoAddr: network + "://" + addr, name: "2", stopIter: 5}

result1 := make(chan error)
go func() {
err := Run(events1, events1.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result1 <- err
}()
// ensure the first handler processes before starting the next since the delay per tick is 100ms
time.Sleep(150 * time.Millisecond)
result2 := make(chan error)
go func() {
err := Run(events2, events2.protoAddr, WithTicker(true), WithReuseAddr(true), WithReusePort(true))
result2 <- err
}()

err := <-result1
assert.NoError(t, err)
err = <-result2
assert.NoError(t, err)
// make sure that each handler processed at least 1
require.Greater(t, events1.exchngCount, int64(0))
require.Greater(t, events2.exchngCount, int64(0))
require.Equal(t, int64(2+1+5+1), events1.exchngCount+events2.exchngCount)
// stop an already stopped engine
require.Equal(t, gerr.ErrEngineInShutdown, events1.eng.Stop(context.Background()))
}

// Test should not panic when we wake-up server_closed conn.
func TestClosedWakeUp(t *testing.T) {
events := &testClosedWakeUpServer{
tester: t,
BuiltinEventEngine: &BuiltinEventEngine{}, network: "tcp", addr: ":9998", protoAddr: "tcp://:9998",
BuiltinEventEngine: &BuiltinEventEngine{}, network: "tcp", addr: ":9999", protoAddr: "tcp://:9999",
clientClosed: make(chan struct{}),
serverClosed: make(chan struct{}),
wakeup: make(chan struct{}),
Expand Down

0 comments on commit a7eb974

Please sign in to comment.