Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hubble/recorder: Extend the API to allow stopping a recording automatically #16473

Merged
merged 3 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
385 changes: 251 additions & 134 deletions api/v1/recorder/recorder.pb.go

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions api/v1/recorder/recorder.pb.json.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions api/v1/recorder/recorder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

package recorder;

Expand Down Expand Up @@ -49,6 +50,26 @@ message StartRecording {
// max_capture_length specifies the maximum packet length.
// Full packet length will be captured if absent/zero.
uint32 max_capture_length = 3;

// stop_condition defines conditions which will cause the recording to
// stop early after any of the stop conditions has been hit
StopCondition stop_condition = 4;
}

// StopCondition defines one or more conditions which cause the recording to
// stop after they have been hit. Stop conditions are ignored if they are
// absent or zero-valued. If multiple conditions are defined, the recording
// stops after the first one is hit.
message StopCondition {
// bytes_captured_count stops the recording after at least this many bytes
// have been captured. Note: The resulting file might be slightly larger due
// to added pcap headers.
uint64 bytes_captured_count = 1;
// packets_captured_count stops the recording after at least this many packets have
// been captured.
uint64 packets_captured_count = 2;
// time_elapsed stops the recording after this duration has elapsed.
google.protobuf.Duration time_elapsed = 3;
}

// FileSinkConfiguration configures the file output. Possible future additions
Expand Down
21 changes: 16 additions & 5 deletions pkg/hubble/recorder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,20 +342,26 @@ func (s *Service) startRecording(
})
scopedLog.Debug("starting new recording")

stop := req.GetStopCondition()
config := sink.PcapSink{
RuleID: ruleID,
Header: pcap.Header{
SnapshotLength: capLen,
Datalink: pcap.Ethernet,
},
Writer: pcap.NewWriter(f),
StopCondition: sink.StopConditions{
PacketsCaptured: stop.GetPacketsCapturedCount(),
BytesCaptured: stop.GetBytesCapturedCount(),
DurationElapsed: stop.GetTimeElapsed().AsDuration(),
},
}

handle, err = s.dispatch.StartSink(ctx, config)
if err != nil {
return nil, "", err
}

// Upserting a new recorder can take up to a few seconds due to datapath
// regeneration. To avoid having the stop condition timer on the sink
// already running while the recorder is still being upserted, we install
// the recorder before the sink. This is safe, as sink.Dispatch silently
// ignores recordings for unknown sinks.
recInfo := &recorder.RecInfo{
ID: recorder.ID(ruleID),
CapLen: uint16(capLen),
Expand All @@ -366,6 +372,11 @@ func (s *Service) startRecording(
return nil, "", err
}

handle, err = s.dispatch.StartSink(ctx, config)
if err != nil {
return nil, "", err
}

// Ensure to delete the above recorder when the sink has stopped
go func() {
<-handle.Done
Expand Down
16 changes: 13 additions & 3 deletions pkg/hubble/recorder/sink/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,20 @@ type Statistics struct {
BytesLost uint64
}

// StopConditions defines a set of values which cause the sink to stop
// recording if any of them are hit. Zero-valued conditions are ignored.
glibsm marked this conversation as resolved.
Show resolved Hide resolved
type StopConditions struct {
PacketsCaptured uint64
BytesCaptured uint64
DurationElapsed time.Duration
}

// PcapSink defines the parameters of a sink which writes to a pcap.RecordWriter
type PcapSink struct {
RuleID uint16
Header pcap.Header
Writer pcap.RecordWriter
RuleID uint16
Header pcap.Header
Writer pcap.RecordWriter
StopCondition StopConditions
}

// Dispatch implements consumer.MonitorConsumer and dispatches incoming
Expand Down Expand Up @@ -121,6 +130,7 @@ func NewDispatch(sinkQueueSize int) (*Dispatch, error) {
// The sink is unregistered automatically when it stops. A sink is stopped for
// one of the following four reasons. In all cases, Handle.Done will be closed.
// - Explicitly via Handle.Stop (Handle.Err() == nil)
// - When one of the p.StopCondition is hit (Handle.Err() == nil)
// - When the context ctx is cancelled (Handle.Err() != nil)
// - When an error occurred (Handle.Err() != nil)
func (d *Dispatch) StartSink(ctx context.Context, p PcapSink) (*Handle, error) {
Expand Down
28 changes: 26 additions & 2 deletions pkg/hubble/recorder/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sink

import (
"context"
"time"

"github.com/cilium/cilium/pkg/hubble/recorder/pcap"
"github.com/cilium/cilium/pkg/lock"
Expand All @@ -39,6 +40,7 @@ type sink struct {
// startSink creates a queue and go routine for the sink. The spawned go
// routine will run until one of the following happens:
// - sink.stop is called
// - a p.StopCondition is reached
// - ctx is cancelled
// - an error occurred
func startSink(ctx context.Context, p PcapSink, queueSize int) *sink {
Expand Down Expand Up @@ -69,6 +71,16 @@ func startSink(ctx context.Context, p PcapSink, queueSize int) *sink {
close(s.done)
}()

stop := p.StopCondition
var stopAfter <-chan time.Time
if stop.DurationElapsed != 0 {
stopTimer := time.NewTimer(stop.DurationElapsed)
defer func() {
stopTimer.Stop()
}()
stopAfter = stopTimer.C
}

if err = p.Writer.WriteHeader(p.Header); err != nil {
return
}
Expand All @@ -87,12 +99,19 @@ func startSink(ctx context.Context, p PcapSink, queueSize int) *sink {
return
}

s.addToStatistics(Statistics{
stats := s.addToStatistics(Statistics{
PacketsWritten: 1,
BytesWritten: uint64(rec.inclLen),
})
if (stop.PacketsCaptured > 0 && stats.PacketsWritten >= stop.PacketsCaptured) ||
(stop.BytesCaptured > 0 && stats.BytesWritten >= stop.BytesCaptured) {
return
}
case <-s.shutdown:
return
case <-stopAfter:
// duration of stop condition has been reached
return
case <-ctx.Done():
err = ctx.Err()
return
Expand All @@ -108,19 +127,24 @@ func (s *sink) stop() {
close(s.shutdown)
}

func (s *sink) addToStatistics(add Statistics) {
// addToStatistics adds add to the current statistics and returns the resulting
// value.
func (s *sink) addToStatistics(add Statistics) (result Statistics) {
s.mutex.Lock()
s.stats.BytesWritten += add.BytesWritten
s.stats.PacketsWritten += add.PacketsWritten
s.stats.BytesLost += add.BytesLost
s.stats.PacketsLost += add.PacketsLost
result = s.stats
s.mutex.Unlock()

// non-blocking send
select {
case s.trigger <- struct{}{}:
default:
}

return result
}

// enqueue submits a new record to this sink. If the sink is not keeping up,
Expand Down