Skip to content

Commit

Permalink
hubble: Add GetNamespaces to hubble observer API
Browse files Browse the repository at this point in the history
Signed-off-by: Chance Zibolski <chance.zibolski@gmail.com>
  • Loading branch information
chancez committed Jun 9, 2023
1 parent dbc2502 commit 2e8059e
Show file tree
Hide file tree
Showing 16 changed files with 1,022 additions and 140 deletions.
12 changes: 7 additions & 5 deletions Documentation/internals/hubble.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ as well as being exposed as a Kubernetes Service when enabled via TCP.
The Observer service
^^^^^^^^^^^^^^^^^^^^

The Observer service is the principal service. It provides three RPC endpoints:
``GetFlows``, ``GetNodes`` and ``ServerStatus``. While ``ServerStatus`` and
``GetNodes`` endpoints are pretty straightforward (they provides metrics and
other information related to the running instance(s)), ``GetFlows`` is far more
sophisticated and the more important one.
The Observer service is the principal service. It provides four RPC endpoints:
``GetFlows``, ``GetNodes``, ``GetNamespaces`` and ``ServerStatus``.

* ``GetNodes`` returns a list of metrics and other information related to each Hubble instance
* ``ServerStatus`` returns a summary the information in ``GetNodes``
* ``GetNamespaces`` returns a list of namespaces that had network flows within the last one hour
* ``GetFlows`` returns a stream of flow related events

Using ``GetFlows``, callers get a stream of payloads. Request parameters allow
callers to specify filters in the form of allow lists and deny lists to allow
Expand Down
45 changes: 45 additions & 0 deletions api/v1/observer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
- [GetFlowsRequest](#observer-GetFlowsRequest)
- [GetFlowsRequest.Experimental](#observer-GetFlowsRequest-Experimental)
- [GetFlowsResponse](#observer-GetFlowsResponse)
- [GetNamespacesRequest](#observer-GetNamespacesRequest)
- [GetNamespacesResponse](#observer-GetNamespacesResponse)
- [GetNodesRequest](#observer-GetNodesRequest)
- [GetNodesResponse](#observer-GetNodesResponse)
- [Namespace](#observer-Namespace)
- [Node](#observer-Node)
- [ServerStatusRequest](#observer-ServerStatusRequest)
- [ServerStatusResponse](#observer-ServerStatusResponse)
Expand Down Expand Up @@ -183,6 +186,31 @@ GetFlowsResponse contains either a flow or a protocol message.



<a name="observer-GetNamespacesRequest"></a>

### GetNamespacesRequest







<a name="observer-GetNamespacesResponse"></a>

### GetNamespacesResponse
GetNamespacesResponse contains the list of namespaces.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| namespaces | [Namespace](#observer-Namespace) | repeated | Namespaces is a list of namespaces with flows |






<a name="observer-GetNodesRequest"></a>

### GetNodesRequest
Expand All @@ -208,6 +236,22 @@ GetNodesResponse contains the list of nodes.



<a name="observer-Namespace"></a>

### Namespace



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| cluster | [string](#string) | | |
| namespace | [string](#string) | | |






<a name="observer-Node"></a>

### Node
Expand Down Expand Up @@ -297,6 +341,7 @@ to observe.
| GetAgentEvents | [GetAgentEventsRequest](#observer-GetAgentEventsRequest) | [GetAgentEventsResponse](#observer-GetAgentEventsResponse) stream | GetAgentEvents returns Cilium agent events. |
| GetDebugEvents | [GetDebugEventsRequest](#observer-GetDebugEventsRequest) | [GetDebugEventsResponse](#observer-GetDebugEventsResponse) stream | GetDebugEvents returns Cilium datapath debug events. |
| GetNodes | [GetNodesRequest](#observer-GetNodesRequest) | [GetNodesResponse](#observer-GetNodesResponse) | GetNodes returns information about nodes in a cluster. |
| GetNamespaces | [GetNamespacesRequest](#observer-GetNamespacesRequest) | [GetNamespacesResponse](#observer-GetNamespacesResponse) | GetNamespaces returns information about namespaces in a cluster. The namespaces returned are namespaces which have had network flows in the last hour. The namespaces are returned sorted by cluster name and namespace in ascending order. |
| ServerStatus | [ServerStatusRequest](#observer-ServerStatusRequest) | [ServerStatusResponse](#observer-ServerStatusResponse) | ServerStatus returns some details about the running hubble server. |


Expand Down
430 changes: 315 additions & 115 deletions api/v1/observer/observer.pb.go

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions api/v1/observer/observer.pb.json.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions api/v1/observer/observer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ service Observer {
// GetNodes returns information about nodes in a cluster.
rpc GetNodes(GetNodesRequest) returns (GetNodesResponse) {}

// GetNamespaces returns information about namespaces in a cluster.
// The namespaces returned are namespaces which have had network flows in
// the last hour. The namespaces are returned sorted by cluster name and
// namespace in ascending order.
rpc GetNamespaces(GetNamespacesRequest) returns (GetNamespacesResponse) {}

// ServerStatus returns some details about the running hubble server.
rpc ServerStatus(ServerStatusRequest) returns (ServerStatusResponse) {}
}
Expand Down Expand Up @@ -247,6 +253,19 @@ message TLS {
string server_name = 2;
}

message GetNamespacesRequest {}

// GetNamespacesResponse contains the list of namespaces.
message GetNamespacesResponse {
// Namespaces is a list of namespaces with flows
repeated Namespace namespaces = 1;
}

message Namespace {
string cluster = 1;
string namespace = 2;
}

// ExportEvent contains an event to be exported. Not to be used outside of the
// exporter feature.
message ExportEvent {
Expand Down
45 changes: 45 additions & 0 deletions api/v1/observer/observer_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion daemon/cmd/hubble.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,13 @@ func (d *Daemon) launchHubble() {
observerOpts = append(observerOpts, opt)
}
}
namespaceManager := observer.NewNamespaceManager()
go namespaceManager.Run(d.ctx)

d.hubbleObserver, err = observer.NewLocalServer(payloadParser, logger,
d.hubbleObserver, err = observer.NewLocalServer(
payloadParser,
namespaceManager,
logger,
observerOpts...,
)
if err != nil {
Expand Down
43 changes: 36 additions & 7 deletions pkg/hubble/observer/local_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ type LocalObserverServer struct {

// numObservedFlows counts how many flows have been observed
numObservedFlows uint64

namespaceManager NamespaceManager
}

// NewLocalServer returns a new local observer server.
func NewLocalServer(
payloadParser *parser.Parser,
namespaceManager NamespaceManager,
logger logrus.FieldLogger,
options ...observeroption.Option,
) (*LocalObserverServer, error) {
Expand All @@ -83,13 +86,14 @@ func NewLocalServer(
}).Info("Configuring Hubble server")

s := &LocalObserverServer{
log: logger,
ring: container.NewRing(opts.MaxFlows),
events: make(chan *observerTypes.MonitorEvent, opts.MonitorBuffer),
stopped: make(chan struct{}),
payloadParser: payloadParser,
startTime: time.Now(),
opts: opts,
log: logger,
ring: container.NewRing(opts.MaxFlows),
events: make(chan *observerTypes.MonitorEvent, opts.MonitorBuffer),
stopped: make(chan struct{}),
payloadParser: payloadParser,
startTime: time.Now(),
namespaceManager: namespaceManager,
opts: opts,
}

for _, f := range s.opts.OnServerInit {
Expand Down Expand Up @@ -142,6 +146,8 @@ nextEvent:
}

if flow, ok := ev.Event.(*flowpb.Flow); ok {
// track namespaces seen.
s.trackNamespaces(flow)
for _, f := range s.opts.OnDecodedFlow {
stop, err := f.OnDecodedFlow(ctx, flow)
if err != nil {
Expand Down Expand Up @@ -217,6 +223,11 @@ func (s *LocalObserverServer) GetNodes(ctx context.Context, req *observerpb.GetN
return nil, status.Errorf(codes.Unimplemented, "GetNodes not implemented")
}

// GetNamespaces implements observerpb.ObserverClient.GetNamespaces.
func (s *LocalObserverServer) GetNamespaces(ctx context.Context, req *observerpb.GetNamespacesRequest) (*observerpb.GetNamespacesResponse, error) {
return &observerpb.GetNamespacesResponse{Namespaces: s.namespaceManager.GetNamespaces()}, nil
}

// GetFlows implements the proto method for client requests.
func (s *LocalObserverServer) GetFlows(
req *observerpb.GetFlowsRequest,
Expand Down Expand Up @@ -610,6 +621,24 @@ func (r *eventsReader) Next(ctx context.Context) (*v1.Event, error) {
}
}

func (s *LocalObserverServer) trackNamespaces(flow *flowpb.Flow) {
// track namespaces seen.
var namespaces []*observerpb.Namespace
if srcNs := flow.GetSource().GetNamespace(); srcNs != "" {
namespaces = append(namespaces, &observerpb.Namespace{
Namespace: srcNs,
Cluster: nodeTypes.GetClusterName(),
})
}
if dstNs := flow.GetDestination().GetNamespace(); dstNs != "" {
namespaces = append(namespaces, &observerpb.Namespace{
Namespace: dstNs,
Cluster: nodeTypes.GetClusterName(),
})
}
s.namespaceManager.AddNamespace(namespaces...)
}

func validateRequest(req genericRequest) error {
if req.GetFirst() && req.GetFollow() {
return status.Errorf(codes.InvalidArgument, "first cannot be specified with follow")
Expand Down

0 comments on commit 2e8059e

Please sign in to comment.