Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hubble/relay: improve peer connections handling #12556

Merged
merged 24 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
510b2e7
hubble/relay: decouple peer syncing logic from the server struct
rolinh Jun 25, 2020
49194dd
hubble/relay: move reconnection logic to peer syncer
rolinh Jun 26, 2020
ec924ef
hubble/relay: add a pool manager to handle peers connections
rolinh Jul 9, 2020
215bc08
hubble/peer: define a Peer service Client interface and Builder
rolinh Jul 10, 2020
0bb2c86
hubble/peer: implement ClientBuilder for a local connection
rolinh Jul 10, 2020
68b45a1
hubble/relay: use peer.ClientBuilder to build Peer service clients
rolinh Jul 10, 2020
a1c87a7
hubble/relay: define and use a gRPC client connection builder
rolinh Jul 10, 2020
7bf60d3
hubble/relay: implement backoff for peer connection attempts
rolinh Jul 14, 2020
cfe2615
hubble/relay: make the connection check interval configurable
rolinh Jul 14, 2020
1db1402
hubble/relay: report offline peers to the pool manager
rolinh Jul 14, 2020
0427a13
hubble/relay: ignore backoff on direct connection requests
rolinh Jul 14, 2020
c3107e0
hubble/relay: do not call disconnect before connection attempt
rolinh Jul 14, 2020
6e113c0
hubble: move FakePeerNotifyServer to testutils
rolinh Jul 15, 2020
ed450b3
hubble: move peer interface definitions to hubble/peer/types
rolinh Jul 15, 2020
c9a5df5
hubble/testutils: implement fakers for peer/types/{Client,ClientBuilder}
rolinh Jul 15, 2020
431382f
hubble/testutils: add FakeGRPCClientStream
rolinh Jul 15, 2020
c34a041
hubble/testutils: implement FakePeerNotifyClient
rolinh Jul 15, 2020
a9b8845
hubble/relay: implement unit tests for the peer pool manager
rolinh Jul 16, 2020
d395dd2
hubble/[peer|relay]: remove Target() from the ClientBuilder interface
rolinh Jul 20, 2020
58d9524
hubble/relay: add pool option to set the logger and remove debug option
rolinh Jul 20, 2020
334035b
hubble/relay: wait on goroutine to finish when pool.Stop() is called
rolinh Jul 20, 2020
7c4f4e5
hubble/relay: assert logger messages in pool manager tests
rolinh Jul 20, 2020
864f3f0
hubble/relay: use a RWLock to protect the peers map in the pool Manager
rolinh Jul 21, 2020
f95e76a
hubble/relay: rename struct member from pool to pm to avoid confusion
rolinh Jul 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 2 additions & 14 deletions pkg/hubble/peer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestService_Notify(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var got []*peerpb.ChangeNotification
var wg sync.WaitGroup
fakeServer := &FakePeerNotifyServer{
fakeServer := &testutils.FakePeerNotifyServer{
OnSend: func(resp *peerpb.ChangeNotification) error {
got = append(got, resp)
wg.Done()
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestService_Notify(t *testing.T) {
}

func TestService_NotifyWithBlockedSend(t *testing.T) {
fakeServer := &FakePeerNotifyServer{
fakeServer := &testutils.FakePeerNotifyServer{
OnSend: func(resp *peerpb.ChangeNotification) error {
<-time.After(100 * time.Millisecond)
return nil
Expand Down Expand Up @@ -470,18 +470,6 @@ func TestService_NotifyWithBlockedSend(t *testing.T) {
<-done
}

type FakePeerNotifyServer struct {
OnSend func(response *peerpb.ChangeNotification) error
*testutils.FakeGRPCServerStream
}

func (s *FakePeerNotifyServer) Send(response *peerpb.ChangeNotification) error {
if s.OnSend != nil {
return s.OnSend(response)
}
panic("OnSend not set")
}

type notifier struct {
nodes []types.Node
subscribers map[datapath.NodeHandler]struct{}
Expand Down
67 changes: 67 additions & 0 deletions pkg/hubble/peer/types/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2020 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package types

import (
"context"
"io"
"time"

peerpb "github.com/cilium/cilium/api/v1/peer"

"google.golang.org/grpc"
)

// Client defines an interface that Peer service client should implement.
type Client interface {
peerpb.PeerClient
io.Closer
}

// ClientBuilder creates a new Client.
type ClientBuilder interface {
// Client builds a new Client that connects to the given target.
Client(target string) (Client, error)
}

type client struct {
conn *grpc.ClientConn
peerpb.PeerClient
}

func (c *client) Close() error {
if c.conn == nil {
return nil
}
return c.conn.Close()
}

// LocalClientBuilder is a ClientBuilder that is suitable when the gRPC
// connection to the Peer service is local (typically a Unix Domain Socket).
type LocalClientBuilder struct {
DialTimeout time.Duration
}

// Client implements ClientBuilder.Client.
func (b LocalClientBuilder) Client(target string) (Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.DialTimeout)
defer cancel()
// the connection is local so we assume WithInsecure() is safe in this context
conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, err
}
return &client{conn, peerpb.NewPeerClient(conn)}, nil
}
32 changes: 22 additions & 10 deletions pkg/hubble/relay/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

observerpb "github.com/cilium/cilium/api/v1/observer"
relaypb "github.com/cilium/cilium/api/v1/relay"
"github.com/cilium/cilium/pkg/hubble/relay/pool"
"github.com/cilium/cilium/pkg/hubble/relay/queue"
nodeTypes "github.com/cilium/cilium/pkg/node/types"

Expand All @@ -32,16 +33,27 @@ import (
"google.golang.org/grpc/status"
)

// TODO: remove this shim once "google.golang.org/grpc" is bumped to v1.28+
aanm marked this conversation as resolved.
Show resolved Hide resolved
// which should convert generated code
// `observerpb.NewObserverClient(cc *grpc.ClientConn)` to
// `observerpb.NewObserverClient(cc grpc.ClientConnInterface)`.
func newObserverClient(cc pool.ClientConn) observerpb.ObserverClient {
if conn, ok := cc.(*grpc.ClientConn); ok {
return observerpb.NewObserverClient(conn)
}
return nil
}

// ensure that Server implements the observer.ObserverServer interface.
var _ observerpb.ObserverServer = (*Server)(nil)

func retrieveFlowsFromPeer(
ctx context.Context,
conn *grpc.ClientConn,
conn pool.ClientConn,
req *observerpb.GetFlowsRequest,
flows chan<- *observerpb.GetFlowsResponse,
) error {
client := observerpb.NewObserverClient(conn)
client := newObserverClient(conn)
c, err := client.GetFlows(ctx, req)
if err != nil {
return err
Expand Down Expand Up @@ -236,7 +248,7 @@ func (s *Server) GetFlows(req *observerpb.GetFlowsRequest, stream observerpb.Obs
}
}()

peers := s.peerList()
peers := s.pm.List()
qlen := s.opts.SortBufferMaxLen // we don't want to buffer too many flows
if nqlen := req.GetNumber() * uint64(len(peers)); nqlen > 0 && nqlen < uint64(qlen) {
// don't make the queue bigger than necessary as it would be a problem
Expand All @@ -250,20 +262,20 @@ func (s *Server) GetFlows(req *observerpb.GetFlowsRequest, stream observerpb.Obs

for _, p := range peers {
p := p
rolinh marked this conversation as resolved.
Show resolved Hide resolved
if p.conn == nil || p.connErr != nil {
if p.Conn == nil {
s.log.WithField("address", p.Address.String()).Infof(
"No connection to peer %s, skipping", p.Name,
)
s.pm.ReportOffline(p.Name)
unavailableNodes = append(unavailableNodes, p.Name)
go s.connectPeer(p.Name, p.Address.String())
continue
}
connectedNodes = append(connectedNodes, p.Name)
g.Go(func() error {
// retrieveFlowsFromPeer returns blocks until the peer finishes
// the request by closing the connection, an error occurs,
// or gctx expires.
err := retrieveFlowsFromPeer(gctx, p.conn, req, flows)
err := retrieveFlowsFromPeer(gctx, p.Conn, req, flows)
if err != nil {
s.log.WithFields(logrus.Fields{
"error": err,
Expand Down Expand Up @@ -330,19 +342,19 @@ func (s *Server) ServerStatus(ctx context.Context, req *observerpb.ServerStatusR
}
}()

peers := s.peerList()
peers := s.pm.List()
statuses := make(chan *observerpb.ServerStatusResponse, len(peers))
for _, p := range peers {
p := p
if p.conn == nil || p.connErr != nil {
if p.Conn == nil {
s.log.WithField("address", p.Address.String()).Infof(
"No connection to peer %s, skipping", p.Name,
)
go s.connectPeer(p.Name, p.Address.String())
s.pm.ReportOffline(p.Name)
continue
}
g.Go(func() error {
client := observerpb.NewObserverClient(p.conn)
client := newObserverClient(p.Conn)
status, err := client.ServerStatus(ctx, req)
if err != nil {
s.log.WithFields(logrus.Fields{
Expand Down