Skip to content

Commit

Permalink
lp metrics: collect datasources and console options (#2870)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Mar 5, 2024
1 parent e7ecea7 commit d8877a7
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 30 deletions.
31 changes: 18 additions & 13 deletions cmd/crowdsec/crowdsec.go
Expand Up @@ -23,39 +23,42 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)

func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, error) {
// initCrowdsec prepares the log processor service
func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, []acquisition.DataSource, error) {
var err error

if err = alertcontext.LoadConsoleContext(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading context: %w", err)
return nil, nil, fmt.Errorf("while loading context: %w", err)
}

// Start loading configs
csParsers := parser.NewParsers(hub)
if csParsers, err = parser.LoadParsers(cConfig, csParsers); err != nil {
return nil, fmt.Errorf("while loading parsers: %w", err)
return nil, nil, fmt.Errorf("while loading parsers: %w", err)
}

if err := LoadBuckets(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading scenarios: %w", err)
return nil, nil, fmt.Errorf("while loading scenarios: %w", err)
}

if err := appsec.LoadAppsecRules(hub); err != nil {
return nil, fmt.Errorf("while loading appsec rules: %w", err)
return nil, nil, fmt.Errorf("while loading appsec rules: %w", err)
}

if err := LoadAcquisition(cConfig); err != nil {
return nil, fmt.Errorf("while loading acquisition config: %w", err)
datasources, err := LoadAcquisition(cConfig)
if err != nil {
return nil, nil, fmt.Errorf("while loading acquisition config: %w", err)
}

return csParsers, nil
return csParsers, datasources, nil
}

func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub) error {
// runCrowdsec starts the log processor service
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
inputEventChan = make(chan types.Event)
inputLineChan = make(chan types.Event)

//start go-routines for parsing, buckets pour and outputs.
// start go-routines for parsing, buckets pour and outputs.
parserWg := &sync.WaitGroup{}

parsersTomb.Go(func() error {
Expand All @@ -65,7 +68,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
parsersTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/runParse")

if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { //this error will never happen as parser.Parse is not able to return errors
if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil {
// this error will never happen as parser.Parse is not able to return errors
log.Fatalf("starting parse error : %s", err)
return err
}
Expand Down Expand Up @@ -161,7 +165,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
return nil
}

func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) {
// serveCrowdsec wraps the log processor service
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
crowdsecTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/serveCrowdsec")

Expand All @@ -171,7 +176,7 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
agentReady <- true

if err := runCrowdsec(cConfig, parsers, hub); err != nil {
if err := runCrowdsec(cConfig, parsers, hub, datasources); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()
Expand Down
19 changes: 11 additions & 8 deletions cmd/crowdsec/main.go
@@ -1,6 +1,7 @@
package main

import (
"errors"
"flag"
"fmt"
_ "net/http/pprof"
Expand All @@ -10,7 +11,6 @@ import (
"strings"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"

Expand Down Expand Up @@ -95,7 +95,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, hub, files, &bucketsTomb, buckets, flags.OrderEvent)

if err != nil {
return fmt.Errorf("scenario loading failed: %v", err)
return fmt.Errorf("scenario loading failed: %w", err)
}

if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
Expand All @@ -107,7 +107,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
return nil
}

func LoadAcquisition(cConfig *csconfig.Config) error {
func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error) {
var err error

if flags.SingleFileType != "" && flags.OneShotDSN != "" {
Expand All @@ -116,20 +116,20 @@ func LoadAcquisition(cConfig *csconfig.Config) error {

dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
if err != nil {
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
}
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
if err != nil {
return err
return nil, err
}
}

if len(dataSources) == 0 {
return fmt.Errorf("no datasource enabled")
return nil, errors.New("no datasource enabled")
}

return nil
return dataSources, nil
}

var (
Expand Down Expand Up @@ -272,7 +272,7 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
}

if cConfig.DisableAPI && cConfig.DisableAgent {
return nil, errors.New("You must run at least the API Server or crowdsec")
return nil, errors.New("you must run at least the API Server or crowdsec")
}

if flags.OneShotDSN != "" && flags.SingleFileType == "" {
Expand Down Expand Up @@ -360,11 +360,14 @@ func main() {
if err != nil {
log.Fatalf("could not create CPU profile: %s", err)
}

log.Infof("CPU profile will be written to %s", flags.CpuProfile)

if err := pprof.StartCPUProfile(f); err != nil {
f.Close()
log.Fatalf("could not start CPU profile: %s", err)
}

defer f.Close()
defer pprof.StopCPUProfile()
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/crowdsec/serve.go
Expand Up @@ -86,7 +86,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
return nil, fmt.Errorf("while loading hub index: %w", err)
}

csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return nil, fmt.Errorf("unable to init crowdsec: %w", err)
}
Expand All @@ -103,7 +103,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
}

agentReady := make(chan bool, 1)
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
}

log.Printf("Reload is finished")
Expand Down Expand Up @@ -230,7 +230,7 @@ func drainChan(c chan types.Event) {
for {
select {
case _, ok := <-c:
if !ok { //closed
if !ok { // closed
return
}
default:
Expand All @@ -256,8 +256,8 @@ func HandleSignals(cConfig *csconfig.Config) error {

exitChan := make(chan error)

//Always try to stop CPU profiling to avoid passing flags around
//It's a noop if profiling is not enabled
// Always try to stop CPU profiling to avoid passing flags around
// It's a noop if profiling is not enabled
defer pprof.StopCPUProfile()

go func() {
Expand Down Expand Up @@ -369,14 +369,14 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
return fmt.Errorf("while loading hub index: %w", err)
}

csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return fmt.Errorf("crowdsec init: %w", err)
}

// if it's just linting, we're done
if !flags.TestMode {
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
} else {
agentReady <- true
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/csconfig/console.go
Expand Up @@ -37,6 +37,35 @@ type ConsoleConfig struct {
ShareContext *bool `yaml:"share_context"`
}

func (c *ConsoleConfig) EnabledOptions() []string {
ret := []string{}
if c == nil {
return ret
}

if c.ShareCustomScenarios != nil && *c.ShareCustomScenarios {
ret = append(ret, SEND_CUSTOM_SCENARIOS)
}

if c.ShareTaintedScenarios != nil && *c.ShareTaintedScenarios {
ret = append(ret, SEND_TAINTED_SCENARIOS)
}

if c.ShareManualDecisions != nil && *c.ShareManualDecisions {
ret = append(ret, SEND_MANUAL_SCENARIOS)
}

if c.ConsoleManagement != nil && *c.ConsoleManagement {
ret = append(ret, CONSOLE_MANAGEMENT)
}

if c.ShareContext != nil && *c.ShareContext {
ret = append(ret, SEND_CONTEXT)
}

return ret
}

func (c *ConsoleConfig) IsPAPIEnabled() bool {
if c == nil || c.ConsoleManagement == nil {
return false
Expand Down
2 changes: 1 addition & 1 deletion test/bats/01_crowdsec.bats
Expand Up @@ -38,7 +38,7 @@ teardown() {

@test "crowdsec (no api and no agent)" {
rune -0 wait-for \
--err "You must run at least the API Server or crowdsec" \
--err "you must run at least the API Server or crowdsec" \
"${CROWDSEC}" -no-api -no-cs
}

Expand Down
2 changes: 1 addition & 1 deletion test/bats/01_crowdsec_lapi.bats
Expand Up @@ -28,7 +28,7 @@ teardown() {
@test "lapi (.api.server.enable=false)" {
rune -0 config_set '.api.server.enable=false'
rune -1 "${CROWDSEC}" -no-cs
assert_stderr --partial "You must run at least the API Server or crowdsec"
assert_stderr --partial "you must run at least the API Server or crowdsec"
}

@test "lapi (no .api.server.listen_uri)" {
Expand Down

0 comments on commit d8877a7

Please sign in to comment.