Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logsagentpipeline component for OTLP ingest paths #25032

Merged
merged 21 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cmd/otel-agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorinterface"
"github.com/DataDog/datadog-agent/comp/logs"
logsAgent "github.com/DataDog/datadog-agent/comp/logs/agent"
"github.com/DataDog/datadog-agent/comp/metadata/inventoryagent/inventoryagentimpl"
"github.com/DataDog/datadog-agent/comp/otelcol"
"github.com/DataDog/datadog-agent/comp/otelcol/collector"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/logsagentpipeline"
"github.com/DataDog/datadog-agent/comp/serializer/compression"
"github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl/strategy"
"github.com/DataDog/datadog-agent/pkg/serializer"
Expand Down Expand Up @@ -94,7 +93,7 @@ func runOTelAgentCommand(_ context.Context, params *subcommands.GlobalParams) er
// TODO configure the log level from collector config
return corelogimpl.ForOneShot(params.LoggerName, "debug", true)
}),
logs.Bundle(),
logsagentpipeline.Module(),
// We create strategy.ZlibStrategy directly to avoid build tags
fx.Provide(strategy.NewZlibStrategy),
fx.Provide(func(s *strategy.ZlibStrategy) compression.Component {
Expand All @@ -115,7 +114,7 @@ func runOTelAgentCommand(_ context.Context, params *subcommands.GlobalParams) er
return defaultforwarder.Forwarder(c), nil
}),
fx.Provide(newOrchestratorinterfaceimpl),
fx.Invoke(func(_ collector.Component, _ defaultforwarder.Forwarder, _ optional.Option[logsAgent.Component]) {
fx.Invoke(func(_ collector.Component, _ defaultforwarder.Forwarder, _ optional.Option[logsagentpipeline.Component]) {
}),
)
if err != nil {
Expand Down
192 changes: 192 additions & 0 deletions comp/otelcol/otlp/components/logsagentpipeline/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2021-present Datadog, Inc.

package logsagentpipeline

Check failure on line 6 in comp/otelcol/otlp/components/logsagentpipeline/agent.go

View workflow job for this annotation

GitHub Actions / windows-lint

package-comments: should have a package comment (revive)

import (
"context"
"errors"
"fmt"
"time"

configComponent "github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
"github.com/DataDog/datadog-agent/comp/core/log"
logComponent "github.com/DataDog/datadog-agent/comp/core/log"

Check failure on line 17 in comp/otelcol/otlp/components/logsagentpipeline/agent.go

View workflow job for this annotation

GitHub Actions / windows-lint

duplicated-imports: Package "github.com/DataDog/datadog-agent/comp/core/log" already imported (revive)
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/optional"
"github.com/DataDog/datadog-agent/pkg/util/startstop"

"go.uber.org/fx"
"go.uber.org/zap"
)

const (
intakeTrackType = "logs"

// Log messages
multiLineWarning = "multi_line processing rules are not supported as global processing rules."
)

type dependencies struct {
fx.In

Lc fx.Lifecycle
Log logComponent.Component
Config configComponent.Component
Hostname hostnameinterface.Component
}

type provides struct {
fx.Out

Comp optional.Option[Component]
}

// Agent represents the data pipeline that collects, decodes, processes and sends logs to the backend.
type Agent struct {
log log.Component
config pkgconfigmodel.Reader
hostname hostnameinterface.Component

endpoints *config.Endpoints
auditor auditor.Auditor
destinationsCtx *client.DestinationsContext
pipelineProvider pipeline.Provider
health *health.Handle
}

func NewLogsAgent(deps dependencies) provides {

Check failure on line 66 in comp/otelcol/otlp/components/logsagentpipeline/agent.go

View workflow job for this annotation

GitHub Actions / windows-lint

exported: exported function NewLogsAgent should have comment or be unexported (revive)
if deps.Config.GetBool("logs_enabled") || deps.Config.GetBool("log_enabled") {
if deps.Config.GetBool("log_enabled") {
deps.Log.Warn(`"log_enabled" is deprecated, use "logs_enabled" instead`)
}

logsAgent := &Agent{
log: deps.Log,
config: deps.Config,
hostname: deps.Hostname,
}
if deps.Lc != nil {
deps.Lc.Append(fx.Hook{
OnStart: logsAgent.Start,
OnStop: logsAgent.Stop,
})
}

return provides{
Comp: optional.NewOption[Component](logsAgent),
}
}

deps.Log.Debug("logs-agent disabled")
return provides{
liustanley marked this conversation as resolved.
Show resolved Hide resolved
Comp: optional.NewNoneOption[Component](),
}
}

func (a *Agent) Start(context.Context) error {

Check failure on line 95 in comp/otelcol/otlp/components/logsagentpipeline/agent.go

View workflow job for this annotation

GitHub Actions / windows-lint

exported: exported method Agent.Start should have comment or be unexported (revive)
a.log.Debug("Starting logs-agent...")

// setup the server config
endpoints, err := buildEndpoints(a.config)

if err != nil {
message := fmt.Sprintf("Invalid endpoints: %v", err)
return errors.New(message)
}

a.endpoints = endpoints

err = a.setupAgent()

if err != nil {
a.log.Error("Could not start logs-agent: ", zap.Error(err))
return err
}

a.startPipeline()
a.log.Debug("logs-agent started")

return nil
}

func (a *Agent) setupAgent() error {
// setup global processing rules
processingRules, err := config.GlobalProcessingRules(a.config)
if err != nil {
message := fmt.Sprintf("Invalid processing rules: %v", err)
return errors.New(message)
}

if config.HasMultiLineRule(processingRules) {
a.log.Warn(multiLineWarning)
}

a.SetupPipeline(processingRules)
return nil
}

// Start starts all the elements of the data pipeline
// in the right order to prevent data loss
func (a *Agent) startPipeline() {
starter := startstop.NewStarter(
a.destinationsCtx,
a.auditor,
a.pipelineProvider,
)
starter.Start()
}

func (a *Agent) Stop(context.Context) error {

Check failure on line 148 in comp/otelcol/otlp/components/logsagentpipeline/agent.go

View workflow job for this annotation

GitHub Actions / windows-lint

exported: exported method Agent.Stop should have comment or be unexported (revive)
a.log.Debug("Stopping logs-agent")

stopper := startstop.NewSerialStopper(
a.pipelineProvider,
a.auditor,
a.destinationsCtx,
)

// This will try to stop everything in order, including the potentially blocking
// parts like the sender. After StopTimeout it will just stop the last part of the
// pipeline, disconnecting it from the auditor, to make sure that the pipeline is
// flushed before stopping.
// TODO: Add this feature in the stopper.
c := make(chan struct{})
go func() {
stopper.Stop()
close(c)
}()
timeout := time.Duration(a.config.GetInt("logs_config.stop_grace_period")) * time.Second
select {
case <-c:
case <-time.After(timeout):
a.log.Debug("Timed out when stopping logs-agent, forcing it to stop now")
// We force all destinations to read/flush all the messages they get without
// trying to write to the network.
a.destinationsCtx.Stop()
// Wait again for the stopper to complete.
// In some situation, the stopper unfortunately never succeed to complete,
// we've already reached the grace period, give it some more seconds and
// then force quit.
timeout := time.NewTimer(5 * time.Second)
select {
case <-c:
case <-timeout.C:
a.log.Warn("Force close of the Logs Agent.")
}
}
a.log.Debug("logs-agent stopped")
return nil
}

func (a *Agent) GetPipelineProvider() pipeline.Provider {

Check failure on line 190 in comp/otelcol/otlp/components/logsagentpipeline/agent.go

View workflow job for this annotation

GitHub Actions / windows-lint

exported: exported method Agent.GetPipelineProvider should have comment or be unexported (revive)
return a.pipelineProvider
}
52 changes: 52 additions & 0 deletions comp/otelcol/otlp/components/logsagentpipeline/agent_core_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2021-present Datadog, Inc.

package logsagentpipeline
liustanley marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"time"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/client/http"
"github.com/DataDog/datadog-agent/pkg/logs/diagnostic"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/status/health"
)

// SetupPipeline initializes the logs agent pipeline and its dependencies
func (a *Agent) SetupPipeline(
processingRules []*config.ProcessingRule,
) {
health := health.RegisterLiveness("logs-agent")
liustanley marked this conversation as resolved.
Show resolved Hide resolved

// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
// critical part. Arguably it could also be plugged to the destination.
auditorTTL := time.Duration(a.config.GetInt("logs_config.auditor_ttl")) * time.Hour
fmt.Println(a.config.GetString("logs_config.run_path"))
auditor := auditor.New(a.config.GetString("logs_config.run_path"), auditor.DefaultRegistryFilename, auditorTTL, health)
destinationsCtx := client.NewDestinationsContext()

// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)

a.auditor = auditor
a.destinationsCtx = destinationsCtx
a.pipelineProvider = pipelineProvider
a.health = health
}

// buildEndpoints builds endpoints for the logs agent
func buildEndpoints(coreConfig pkgconfigmodel.Reader) (*config.Endpoints, error) {
httpConnectivity := config.HTTPConnectivityFailure
if endpoints, err := config.BuildHTTPEndpoints(coreConfig, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin); err == nil {
httpConnectivity = http.CheckConnectivity(endpoints.Main, coreConfig)
}
return config.BuildEndpoints(coreConfig, httpConnectivity, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package logsagentpipeline

import (
"testing"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"

"github.com/stretchr/testify/assert"
"go.uber.org/fx"
)

func TestBuildEndpoints(t *testing.T) {
config := fxutil.Test[config.Component](t, fx.Options(
config.MockModule(),
))

endpoints, err := buildEndpoints(config)
assert.Nil(t, err)
assert.Equal(t, "agent-intake.logs.datadoghq.com", endpoints.Main.Host)
}
Loading
Loading