Skip to content

Commit

Permalink
otel: add support for per tenant configuration for mapping otlp data …
Browse files Browse the repository at this point in the history
…to loki format (grafana#11143)

**What this PR does / why we need it**:
In OTEL, we pick select Resource Attributes to identify the streams and
store all the other attributes as Structured Metadata, as explained
here. The problem however is that the list of Resource Attributes that
are picked as Stream labels are hardcoded, and there is no way to drop
unwanted data. This PR adds support for configuring how data is mapped
from OTEL to Loki format per tenant. It also adds support for dropping
unwanted data. We decided to make the config look similar to
Prometheus's relabling config to make it familiar.

**Special notes for your reviewer**:
Opening a draft PR to get some initial feedback. I will add
documentation in a separate PR.

**Checklist**
- [x] Tests updated
- [x] `CHANGELOG.md` updated
  • Loading branch information
sandeepsukhani authored and Gordejj committed Jan 29, 2024
1 parent b50cea5 commit 1abe640
Show file tree
Hide file tree
Showing 13 changed files with 813 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
* [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results.
* [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results.
* [11143](https://github.com/grafana/loki/pull/11143) **sandeepsukhani** otel: Add support for per tenant configuration for mapping otlp data to loki format
* [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs.

##### Fixes
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, push.ParseLokiRequest)
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
11 changes: 11 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3140,6 +3140,17 @@ shard_streams:
# Maximum number of structured metadata entries per log line.
# CLI flag: -limits.max-structured-metadata-entries-count
[max_structured_metadata_entries_count: <int> | default = 128]

# OTLP log ingestion configurations
otlp_config:
resource_attributes:
[ignore_defaults: <boolean>]

[attributes: <list of AttributesConfigs>]

[scope_attributes: <list of AttributesConfigs>]

[log_attributes: <list of AttributesConfigs>]
```

### frontend_worker
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, pushRequestParser)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/loki/pkg/compactor/retention"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/loghttp/push"
)

// Limits is an interface for distributor limits/related configs
Expand All @@ -29,4 +30,5 @@ type Limits interface {
AllowStructuredMetadata(userID string) bool
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
}
142 changes: 82 additions & 60 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,9 @@ import (
const (
pbContentType = "application/x-protobuf"
gzipContentEncoding = "gzip"
attrServiceName = "service.name"
)

var blessedAttributes = []string{
"service.name",
"service.namespace",
"service.instance.id",
"deployment.environment",
"cloud.region",
"cloud.availability_zone",
"k8s.cluster.name",
"k8s.namespace.name",
"k8s.pod.name",
"k8s.container.name",
"container.name",
"k8s.replicaset.name",
"k8s.deployment.name",
"k8s.statefulset.name",
"k8s.daemonset.name",
"k8s.cronjob.name",
"k8s.job.name",
}

var blessedAttributesNormalized = make([]string, len(blessedAttributes))

func init() {
Expand All @@ -62,14 +43,14 @@ func newPushStats() *Stats {
}
}

func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) {
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) {
stats := newPushStats()
otlpLogs, err := extractLogs(r, stats)
if err != nil {
return nil, nil, err
}

req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, stats)
req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), stats)
return req, stats, nil
}

Expand Down Expand Up @@ -120,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, stats *Stats) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -131,39 +112,38 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for i := 0; i < rls.Len(); i++ {
sls := rls.At(i).ScopeLogs()
res := rls.At(i).Resource()
resAttrs := res.Attributes()

flattenedResourceAttributes := labels.NewBuilder(logproto.FromLabelAdaptersToLabels(attributesToLabels(res.Attributes(), "")))
// service.name is a required Resource Attribute. If it is not present, we will set it to "unknown_service".
if flattenedResourceAttributes.Get("service_name") == "" {
flattenedResourceAttributes = flattenedResourceAttributes.Set("service_name", "unknown_service")
if v, _ := resAttrs.Get(attrServiceName); v.AsString() == "" {
resAttrs.PutStr(attrServiceName, "unknown_service")
}
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized))

if dac := res.DroppedAttributesCount(); dac != 0 {
flattenedResourceAttributes = flattenedResourceAttributes.Set("resource_dropped_attributes_count", fmt.Sprintf("%d", dac))
}
resAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForResourceAttribute(k)
if action == Drop {
return true
}

// copy blessed attributes to stream labels
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized))
for _, ba := range blessedAttributesNormalized {
v := flattenedResourceAttributes.Get(ba)
if v == "" {
continue
attributeAsLabels := attributeToLabels(k, v, "")
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
} else if action == StructuredMetadata {
resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...)
}
streamLabels[model.LabelName(ba)] = model.LabelValue(v)

// remove the blessed attributes copied to stream labels
flattenedResourceAttributes.Del(ba)
}
return true
})

if err := streamLabels.Validate(); err != nil {
stats.errs = append(stats.errs, fmt.Errorf("invalid labels: %w", err))
continue
}
labelsStr := streamLabels.String()

// convert the remaining resource attributes to structured metadata
resourceAttributesAsStructuredMetadata := logproto.FromLabelsToLabelAdapters(flattenedResourceAttributes.Labels())

lbs := modelLabelsSetToLabelsList(streamLabels)
if _, ok := pushRequestsByStream[labelsStr]; !ok {
pushRequestsByStream[labelsStr] = logproto.Stream{
Expand All @@ -178,6 +158,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for j := 0; j < sls.Len(); j++ {
scope := sls.At(j).Scope()
logs := sls.At(j).LogRecords()
scopeAttrs := scope.Attributes()

// it would be rare to have multiple scopes so if the entries slice is empty, pre-allocate it for the number of log entries
if cap(pushRequestsByStream[labelsStr].Entries) == 0 {
Expand All @@ -187,7 +168,20 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
}

// use fields and attributes from scope as structured metadata
scopeAttributesAsStructuredMetadata := attributesToLabels(scope.Attributes(), "")
scopeAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, scopeAttrs.Len()+3)
scopeAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForScopeAttribute(k)
if action == Drop {
return true
}

attributeAsLabels := attributeToLabels(k, v, "")
if action == StructuredMetadata {
scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, attributeAsLabels...)
}

return true
})

if scopeName := scope.Name(); scopeName != "" {
scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, push.LabelAdapter{
Expand All @@ -213,7 +207,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)

entry := otlpLogToPushEntry(log)
entry := otlpLogToPushEntry(log, otlpConfig)

// if entry.StructuredMetadata doesn't have capacity to add resource and scope attributes, make a new slice with enough capacity
attributesAsStructuredMetadataLen := len(resourceAttributesAsStructuredMetadata) + len(scopeAttributesAsStructuredMetadata)
Expand Down Expand Up @@ -251,9 +245,23 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
}

// otlpLogToPushEntry converts an OTLP log record to a Loki push.Entry.
func otlpLogToPushEntry(log plog.LogRecord) push.Entry {
func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig) push.Entry {
// copy log attributes and all the fields from log(except log.Body) to structured metadata
structuredMetadata := attributesToLabels(log.Attributes(), "")
logAttrs := log.Attributes()
structuredMetadata := make(push.LabelsAdapter, 0, logAttrs.Len()+7)
logAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForLogAttribute(k)
if action == Drop {
return true
}

attributeAsLabels := attributeToLabels(k, v, "")
if action == StructuredMetadata {
structuredMetadata = append(structuredMetadata, attributeAsLabels...)
}

return true
})

// if log.Timestamp() is 0, we would have already stored log.ObservedTimestamp as log timestamp so no need to store again in structured metadata
if log.Timestamp() != 0 && log.ObservedTimestamp() != 0 {
Expand Down Expand Up @@ -316,25 +324,39 @@ func attributesToLabels(attrs pcommon.Map, prefix string) push.LabelsAdapter {
}

attrs.Range(func(k string, v pcommon.Value) bool {
keyWithPrefix := k
if prefix != "" {
keyWithPrefix = prefix + "_" + k
}
keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix)

typ := v.Type()
if typ == pcommon.ValueTypeMap {
labelsAdapter = append(labelsAdapter, attributesToLabels(v.Map(), keyWithPrefix)...)
} else {
labelsAdapter = append(labelsAdapter, push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()})
}

labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, prefix)...)
return true
})

return labelsAdapter
}

func attributeToLabels(k string, v pcommon.Value, prefix string) push.LabelsAdapter {
var labelsAdapter push.LabelsAdapter

keyWithPrefix := k
if prefix != "" {
keyWithPrefix = prefix + "_" + k
}
keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix)

typ := v.Type()
if typ == pcommon.ValueTypeMap {
mv := v.Map()
labelsAdapter = make(push.LabelsAdapter, 0, mv.Len())
mv.Range(func(k string, v pcommon.Value) bool {
labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, keyWithPrefix)...)
return true
})
} else {
labelsAdapter = push.LabelsAdapter{
push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()},
}
}

return labelsAdapter
}

func timestampFromLogRecord(lr plog.LogRecord) time.Time {
if lr.Timestamp() != 0 {
return time.Unix(0, int64(lr.Timestamp()))
Expand Down

0 comments on commit 1abe640

Please sign in to comment.