Skip to content

Commit

Permalink
monitor: Refactor listener registration logic
Browse files Browse the repository at this point in the history
- Modify registerNewListener() to take MonitorListener as a parameter
  to allow arbitrary listener to be registered instead of assuming the
  type of listener is always listenerv1_2.
- Add Close() method to MonitorListener so that the Monitor can close
  listeners without knowing implementation details.
- Explicitly call close() on listenerv1_2.queue so that drainQueue gets
  unblocked during unit test.

Ref #9925

Signed-off-by: Michi Mutsuzaki <michi@isovalent.com>
  • Loading branch information
michi-covalent committed Feb 12, 2020
1 parent f5c42e7 commit 79cc0f3
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 8 deletions.
3 changes: 3 additions & 0 deletions pkg/monitor/agent/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type MonitorListener interface {

// Version returns the API version of this listener
Version() Version

// Close closes the listener.
Close()
}

// IsDisconnected is a convenience function that wraps the absurdly long set of
Expand Down
13 changes: 12 additions & 1 deletion pkg/monitor/agent/listener1_2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package agent
import (
"encoding/gob"
"net"
"sync"

"github.com/cilium/cilium/pkg/monitor/agent/listener"
"github.com/cilium/cilium/pkg/monitor/payload"
Expand All @@ -29,6 +30,8 @@ type listenerv1_2 struct {
conn net.Conn
queue chan *payload.Payload
cleanupFn func(listener.MonitorListener)
// Used to prevent queue from getting closed multiple times.
once sync.Once
}

func newListenerv1_2(c net.Conn, queueSize int, cleanupFn func(listener.MonitorListener)) *listenerv1_2 {
Expand All @@ -55,7 +58,7 @@ func (ml *listenerv1_2) Enqueue(pl *payload.Payload) {
// intended to be a goroutine.
func (ml *listenerv1_2) drainQueue() {
defer func() {
ml.conn.Close()
ml.Close()
ml.cleanupFn(ml)
}()

Expand All @@ -78,3 +81,11 @@ func (ml *listenerv1_2) drainQueue() {
func (ml *listenerv1_2) Version() listener.Version {
return listener.Version1_2
}

// Close closes the underlying socket and payload queue.
func (ml *listenerv1_2) Close() {
ml.once.Do(func() {
ml.conn.Close()
close(ml.queue)
})
}
48 changes: 48 additions & 0 deletions pkg/monitor/agent/listener1_2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2018-2020 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !privileged_tests

package agent

import (
"net"
"testing"

. "gopkg.in/check.v1"

"github.com/cilium/cilium/pkg/monitor/agent/listener"
)

func Test(t *testing.T) { TestingT(t) }

type ListenerSuite struct{}

var _ = Suite(&ListenerSuite{})

func (m *ListenerSuite) TestListenerv1_2(c *C) {
closed := make(chan bool)
server, client := net.Pipe()
l := newListenerv1_2(client, 10, func(listener listener.MonitorListener) {
closed <- true
})
// Verify the listener version.
c.Assert(l.Version(), Equals, listener.Version1_2)
// Calling Close() multiple times shouldn't cause panic.
l.Close()
l.Close()
// Make sure the cleanup function gets called.
<-closed
server.Close()
}
14 changes: 7 additions & 7 deletions pkg/monitor/agent/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewMonitor(ctx context.Context, nPages int, server1_2 net.Listener) (m *Mon
// cancelable context to this goroutine and the cancelFunc is assigned to
// perfReaderCancel. Note that cancelling parentCtx (e.g. on program shutdown)
// will also cancel the derived context.
func (m *Monitor) registerNewListener(parentCtx context.Context, conn net.Conn, version listener.Version) {
func (m *Monitor) registerNewListener(parentCtx context.Context, newListener listener.MonitorListener) {
m.Lock()
defer m.Unlock()

Expand All @@ -123,15 +123,14 @@ func (m *Monitor) registerNewListener(parentCtx context.Context, conn net.Conn,
m.perfReaderCancel = cancel
go m.handleEvents(perfEventReaderCtx)
}

switch version {
version := newListener.Version()
switch newListener.Version() {
case listener.Version1_2:
newListener := newListenerv1_2(conn, option.Config.MonitorQueueSize, m.removeListener)
m.listeners[newListener] = struct{}{}

default:
conn.Close()
log.WithField("version", version).Error("Closing new connection from unsupported monitor client version")
newListener.Close()
log.WithField("version", version).Error("Closing listener from unsupported monitor client version")
}

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -256,7 +255,8 @@ func (m *Monitor) connectionHandler1_2(parentCtx context.Context, server net.Lis
continue
}

m.registerNewListener(parentCtx, conn, listener.Version1_2)
newListener := newListenerv1_2(conn, option.Config.MonitorQueueSize, m.removeListener)
m.registerNewListener(parentCtx, newListener)
}
}

Expand Down

0 comments on commit 79cc0f3

Please sign in to comment.