Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Promtail, scrape logs from Object store #2270

Closed
wants to merge 20 commits into from
Closed
1 change: 0 additions & 1 deletion clients/pkg/promtail/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func (p *positions) save() {
positions[k] = v
}
p.mtx.Unlock()

if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {
level.Error(p.logger).Log("msg", "error writing positions file", "error", err)
}
Expand Down
42 changes: 42 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail/discovery/consulagent"

loki_aws "github.com/grafana/loki/pkg/storage/chunk/aws"
)

// Config describes a job to scrape.
Expand All @@ -43,6 +45,7 @@ type Config struct {
KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"`
GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"`
CloudflareConfig *CloudflareConfig `yaml:"cloudflare,omitempty"`
S3Config *S3TargetConfig `yaml:"aws_s3,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// List of Docker service discovery configurations.
DockerSDConfigs []*moby.DockerSDConfig `yaml:"docker_sd_configs,omitempty"`
Expand Down Expand Up @@ -362,6 +365,45 @@ type PushTargetConfig struct {
KeepTimestamp bool `yaml:"use_incoming_timestamp"`
}

// S3TargetConfig describes s3 targets to scrape
type S3TargetConfig struct {
// Set to true to force the request to use path-style addressing.
S3ForcePathStyle bool `yaml:"s3_forcepath_style"`

// Bucket Name to look for objects.
BucketName string `yaml:"bucketname"`

// AWS S3 Endpoint to connect to.
Endpoint string `yaml:"endpoint"`

// AWS Region to use.
Region string `yaml:"region"`

// AWS Access Key ID.
AccessKeyID string `yaml:"access_key_id"`

// AWS Secret Access Key.
SecretAccessKey string `yaml:"secret_access_key"`

// Disable https on s3 connection.
Insecure bool `yaml:"insecure"`

// HTTP config
HTTPConfig loki_aws.HTTPConfig

// Labels optionally holds labels to associate with each record read from S3 objects.
Labels model.LabelSet `yaml:"labels"`

// SQS queue name to receive s3 events from
SQSQueue string `yaml:"sqs_queue"`

// SQS queue wait timeout value
Timeout int64 `yaml:"sqs_queue_timeout"`

// Whether to read from beginning for already read file
ResetCursor bool `yaml:"reset_cursor"`
}

// DefaultScrapeConfig is the default Config.
var DefaultScrapeConfig = Config{
PipelineStages: stages.PipelineStages{},
Expand Down
31 changes: 24 additions & 7 deletions clients/pkg/promtail/targets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/targets/journal"
"github.com/grafana/loki/clients/pkg/promtail/targets/kafka"
"github.com/grafana/loki/clients/pkg/promtail/targets/lokipush"
"github.com/grafana/loki/clients/pkg/promtail/targets/objectstore"
"github.com/grafana/loki/clients/pkg/promtail/targets/stdin"
"github.com/grafana/loki/clients/pkg/promtail/targets/syslog"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
Expand All @@ -37,6 +38,7 @@ const (
CloudflareConfigs = "cloudflareConfigs"
DockerConfigs = "dockerConfigs"
DockerSDConfigs = "dockerSDConfigs"
S3Configs = "s3Configs"
)

type targetManager interface {
Expand Down Expand Up @@ -96,6 +98,8 @@ func NewTargetManagers(
targetScrapeConfigs[CloudflareConfigs] = append(targetScrapeConfigs[CloudflareConfigs], cfg)
case cfg.DockerSDConfigs != nil:
targetScrapeConfigs[DockerSDConfigs] = append(targetScrapeConfigs[DockerSDConfigs], cfg)
case cfg.S3Config != nil:
targetScrapeConfigs[S3Configs] = append(targetScrapeConfigs[S3Configs], cfg)
default:
return nil, fmt.Errorf("no valid target scrape config defined for %q", cfg.JobName)
}
Expand All @@ -116,12 +120,13 @@ func NewTargetManagers(
}

var (
fileMetrics *file.Metrics
syslogMetrics *syslog.Metrics
gcplogMetrics *gcplog.Metrics
gelfMetrics *gelf.Metrics
cloudflareMetrics *cloudflare.Metrics
dockerMetrics *docker.Metrics
fileMetrics *file.Metrics
syslogMetrics *syslog.Metrics
gcplogMetrics *gcplog.Metrics
gelfMetrics *gelf.Metrics
cloudflareMetrics *cloudflare.Metrics
dockerMetrics *docker.Metrics
objectStoreMetrics *objectstore.Metrics
)
if len(targetScrapeConfigs[FileScrapeConfigs]) > 0 {
fileMetrics = file.NewMetrics(reg)
Expand All @@ -141,7 +146,9 @@ func NewTargetManagers(
if len(targetScrapeConfigs[DockerConfigs]) > 0 || len(targetScrapeConfigs[DockerSDConfigs]) > 0 {
dockerMetrics = docker.NewMetrics(reg)
}

if len(targetScrapeConfigs[S3Configs]) > 0 {
objectStoreMetrics = objectstore.NewMetrics(reg)
}
for target, scrapeConfigs := range targetScrapeConfigs {
switch target {
case FileScrapeConfigs:
Expand Down Expand Up @@ -258,6 +265,16 @@ func NewTargetManagers(
return nil, errors.Wrap(err, "failed to make Docker service discovery target manager")
}
targetManagers = append(targetManagers, cfTargetManager)
case S3Configs:
pos, err := getPositionFile()
if err != nil {
return nil, err
}
objectStoreTargetManager, err := objectstore.NewTargetManager(objectStoreMetrics, logger, pos, client, scrapeConfigs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to support more cloud provider then we should do NewS3TargetManager.

If not then we should rename the package to be s3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have the check for what type of config it is here https://github.com/adityacs/loki/blob/object_storage_scrape/clients/pkg/promtail/targets/objectstore/targetmanager.go#L58

So, it should be okay to have just NewTargetManager

if err != nil {
return nil, errors.Wrap(err, "failed to make s3 object store manager")
}
targetManagers = append(targetManagers, objectStoreTargetManager)
default:
return nil, errors.New("unknown scrape config")
}
Expand Down
219 changes: 219 additions & 0 deletions clients/pkg/promtail/targets/objectstore/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package objectstore

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/chunk"
loki_aws "github.com/grafana/loki/pkg/storage/chunk/aws"
)

type s3Client struct {
queueURL *string
svc sqsiface.SQSAPI
}

type s3TestEvent struct {
Event string
}

type s3Results struct {
Records []s3Record
}

type s3Record struct {
S3 s3
EventTime string
EventName string
}

type s3 struct {
Object object
}

type object struct {
Key string
Size int64
}

var (
defaultMaxRetries = 5
defaultMaxNumberOfMessages = int64(10)
defaultVisibilityTimeout = int64(20)
s3TestEventName = "s3:TestEvent"
)

func newS3Client(cfg loki_aws.S3Config, queue *string) (Client, error) {
sess, err := buildSQSClient(cfg)
if err != nil {
return nil, err
}
svc := sqs.New(sess)
urlResult, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: queue,
})
if err != nil {
return nil, err
}
queueURL := urlResult.QueueUrl
return &s3Client{
queueURL: queueURL,
svc: svc,
}, nil
}

func buildSQSClient(cfg loki_aws.S3Config) (*session.Session, error) {
sqsConfig := &aws.Config{}

sqsConfig = sqsConfig.WithMaxRetries(defaultMaxRetries)

if cfg.Region != "" {
sqsConfig = sqsConfig.WithRegion(cfg.Region)
}

if cfg.AccessKeyID != "" && cfg.SecretAccessKey == "" ||
cfg.AccessKeyID == "" && cfg.SecretAccessKey != "" {
return nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
}

if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
creds := credentials.NewStaticCredentials(cfg.AccessKeyID, cfg.SecretAccessKey, "")
sqsConfig = sqsConfig.WithCredentials(creds)
}

tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.HTTPConfig.InsecureSkipVerify,
}

if cfg.HTTPConfig.CAFile != "" {
tlsConfig.RootCAs = x509.NewCertPool()
data, err := os.ReadFile(cfg.HTTPConfig.CAFile)
if err != nil {
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
}

transport := http.RoundTripper(&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: cfg.HTTPConfig.IdleConnTimeout,
MaxIdleConnsPerHost: 100,
TLSHandshakeTimeout: 3 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: cfg.HTTPConfig.ResponseHeaderTimeout,
TLSClientConfig: tlsConfig,
})

httpClient := &http.Client{
Transport: transport,
}

sqsConfig = sqsConfig.WithHTTPClient(httpClient)

sess, err := session.NewSession(sqsConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create new s3 session")
}
return sess, nil
}

// ReceiveMessage implements Client
func (s3Client *s3Client) ReceiveMessage(timeout int64) ([]messageObject, error) {
msgResult, err := s3Client.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: s3Client.queueURL,
MaxNumberOfMessages: aws.Int64(defaultMaxNumberOfMessages), // hard coded. We might take this as user input in future versions.
WaitTimeSeconds: aws.Int64(timeout),
VisibilityTimeout: aws.Int64(defaultVisibilityTimeout), // hard coded. We might take this as user input in future versions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a large file takes more than 20sec to process and ack does it means another message will be sent. This means you have to run a single Promtail right ? if you run more then the position file is not shared/sync and they will both comsume the same message ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, to scale to process more files, running Promtail with same SQS queue would be tricky. One solution would be to create multiple SQS queues based on some pattern like bucket/a/file.gz will go to queue A and bucket/b/file.gz will go to queue B. Then multiple Promtail can be configured with different queues (A, B etc...)

})
if err != nil {
return nil, err
}
return s3Client.processMessage(msgResult)
}

func (s3Client *s3Client) processMessage(messages interface{}) ([]messageObject, error) {
var messageObjects []messageObject
m := messages.(*sqs.ReceiveMessageOutput)
for _, message := range m.Messages {
content := *message.Body
var sqsResult s3Results
err := json.Unmarshal([]byte(content), &sqsResult)
if err != nil {
return nil, err
}
fmt.Printf("Message content %s", content)
if len(sqsResult.Records) < 1 {
var testEvent s3TestEvent
err := json.Unmarshal([]byte(content), &testEvent)
if err != nil {
return nil, err
}

// S3 sends Test event on configuring SQS queue.
// Unforutnately, it's not automatically deleted
// and we have to handle it.
if isTestMessage(testEvent) {
if err := s3Client.acknowledgeMessage(message)(); err != nil {
return nil, errors.Wrap(err, "error in deleting s3 test event message")
}
}
continue
}
for _, record := range sqsResult.Records {
t, err := time.Parse(time.RFC3339, record.EventTime)
if err != nil {
return nil, err
}

messageObjects = append(messageObjects, messageObject{
Object: chunk.StorageObject{
Key: record.S3.Object.Key,
ModifiedAt: t,
},
Acknowledge: s3Client.acknowledgeMessage(message),
})
}

}

return messageObjects, nil
}

func (s3Client *s3Client) acknowledgeMessage(message *sqs.Message) ackMessage {
return func() error {
_, err := s3Client.svc.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: s3Client.queueURL, ReceiptHandle: message.ReceiptHandle})
if err != nil {
return err
}
return nil
}
}

func isTestMessage(testEvent s3TestEvent) bool {
return testEvent.Event == s3TestEventName
}
16 changes: 16 additions & 0 deletions clients/pkg/promtail/targets/objectstore/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package objectstore

import (
"github.com/grafana/loki/pkg/storage/chunk"
)

type Client interface {
ReceiveMessage(timeout int64) ([]messageObject, error)
}

type ackMessage func() error

type messageObject struct {
Object chunk.StorageObject
Acknowledge ackMessage
}
Loading