Skip to content

Commit

Permalink
add support for per tenant configuration for mapping otlp data to lok…
Browse files Browse the repository at this point in the history
…i format
  • Loading branch information
sandeepsukhani committed Nov 6, 2023
1 parent 94169a0 commit 3b0b374
Show file tree
Hide file tree
Showing 10 changed files with 782 additions and 73 deletions.
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, push.ParseLokiRequest)
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, pushRequestParser)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/loki/pkg/compactor/retention"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/loghttp/push"
)

// Limits is an interface for distributor limits/related configs
Expand All @@ -30,4 +31,5 @@ type Limits interface {
AllowStructuredMetadata(userID string) bool
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
}
142 changes: 82 additions & 60 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,9 @@ import (
const (
pbContentType = "application/x-protobuf"
gzipContentEncoding = "gzip"
attrServiceName = "service.name"
)

var blessedAttributes = []string{
"service.name",
"service.namespace",
"service.instance.id",
"deployment.environment",
"cloud.region",
"cloud.availability_zone",
"k8s.cluster.name",
"k8s.namespace.name",
"k8s.pod.name",
"k8s.container.name",
"container.name",
"k8s.replicaset.name",
"k8s.deployment.name",
"k8s.statefulset.name",
"k8s.daemonset.name",
"k8s.cronjob.name",
"k8s.job.name",
}

var blessedAttributesNormalized = make([]string, len(blessedAttributes))

func init() {
Expand All @@ -62,14 +43,14 @@ func newPushStats() *Stats {
}
}

func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) {
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) {
stats := newPushStats()
otlpLogs, err := extractLogs(r, stats)
if err != nil {
return nil, nil, err
}

req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, stats)
req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), stats)
return req, stats, nil
}

Expand Down Expand Up @@ -120,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, stats *Stats) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -131,39 +112,38 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for i := 0; i < rls.Len(); i++ {
sls := rls.At(i).ScopeLogs()
res := rls.At(i).Resource()
resAttrs := res.Attributes()

flattenedResourceAttributes := labels.NewBuilder(logproto.FromLabelAdaptersToLabels(attributesToLabels(res.Attributes(), "")))
// service.name is a required Resource Attribute. If it is not present, we will set it to "unknown_service".
if flattenedResourceAttributes.Get("service_name") == "" {
flattenedResourceAttributes = flattenedResourceAttributes.Set("service_name", "unknown_service")
if v, _ := resAttrs.Get(attrServiceName); v.AsString() == "" {
resAttrs.PutStr(attrServiceName, "unknown_service")
}
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized))

if dac := res.DroppedAttributesCount(); dac != 0 {
flattenedResourceAttributes = flattenedResourceAttributes.Set("resource_dropped_attributes_count", fmt.Sprintf("%d", dac))
}
resAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForResourceAttribute(k)
if action == Drop {
return true
}

// copy blessed attributes to stream labels
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized))
for _, ba := range blessedAttributesNormalized {
v := flattenedResourceAttributes.Get(ba)
if v == "" {
continue
attributeAsLabels := attributeToLabels(k, v, "")
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
} else if action == StructuredMetadata {
resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...)
}
streamLabels[model.LabelName(ba)] = model.LabelValue(v)

// remove the blessed attributes copied to stream labels
flattenedResourceAttributes.Del(ba)
}
return true
})

if err := streamLabels.Validate(); err != nil {
stats.errs = append(stats.errs, fmt.Errorf("invalid labels: %w", err))
continue
}
labelsStr := streamLabels.String()

// convert the remaining resource attributes to structured metadata
resourceAttributesAsStructuredMetadata := logproto.FromLabelsToLabelAdapters(flattenedResourceAttributes.Labels())

lbs := modelLabelsSetToLabelsList(streamLabels)
if _, ok := pushRequestsByStream[labelsStr]; !ok {
pushRequestsByStream[labelsStr] = logproto.Stream{
Expand All @@ -178,6 +158,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for j := 0; j < sls.Len(); j++ {
scope := sls.At(j).Scope()
logs := sls.At(j).LogRecords()
scopeAttrs := scope.Attributes()

// it would be rare to have multiple scopes so if the entries slice is empty, pre-allocate it for the number of log entries
if cap(pushRequestsByStream[labelsStr].Entries) == 0 {
Expand All @@ -187,7 +168,20 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
}

// use fields and attributes from scope as structured metadata
scopeAttributesAsStructuredMetadata := attributesToLabels(scope.Attributes(), "")
scopeAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, scopeAttrs.Len()+3)
scopeAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForScopeAttribute(k)
if action == Drop {
return true
}

attributeAsLabels := attributeToLabels(k, v, "")
if action == StructuredMetadata {
scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, attributeAsLabels...)
}

return true
})

if scopeName := scope.Name(); scopeName != "" {
scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, push.LabelAdapter{
Expand All @@ -213,7 +207,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)

entry := otlpLogToPushEntry(log)
entry := otlpLogToPushEntry(log, otlpConfig)

// if entry.StructuredMetadata doesn't have capacity to add resource and scope attributes, make a new slice with enough capacity
attributesAsStructuredMetadataLen := len(resourceAttributesAsStructuredMetadata) + len(scopeAttributesAsStructuredMetadata)
Expand Down Expand Up @@ -251,9 +245,23 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
}

// otlpLogToPushEntry converts an OTLP log record to a Loki push.Entry.
func otlpLogToPushEntry(log plog.LogRecord) push.Entry {
func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig) push.Entry {
// copy log attributes and all the fields from log(except log.Body) to structured metadata
structuredMetadata := attributesToLabels(log.Attributes(), "")
logAttrs := log.Attributes()
structuredMetadata := make(push.LabelsAdapter, 0, logAttrs.Len()+7)
logAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForLogAttribute(k)
if action == Drop {
return true
}

attributeAsLabels := attributeToLabels(k, v, "")
if action == StructuredMetadata {
structuredMetadata = append(structuredMetadata, attributeAsLabels...)
}

return true
})

// if log.Timestamp() is 0, we would have already stored log.ObservedTimestamp as log timestamp so no need to store again in structured metadata
if log.Timestamp() != 0 && log.ObservedTimestamp() != 0 {
Expand Down Expand Up @@ -316,25 +324,39 @@ func attributesToLabels(attrs pcommon.Map, prefix string) push.LabelsAdapter {
}

attrs.Range(func(k string, v pcommon.Value) bool {
keyWithPrefix := k
if prefix != "" {
keyWithPrefix = prefix + "_" + k
}
keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix)

typ := v.Type()
if typ == pcommon.ValueTypeMap {
labelsAdapter = append(labelsAdapter, attributesToLabels(v.Map(), keyWithPrefix)...)
} else {
labelsAdapter = append(labelsAdapter, push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()})
}

labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, prefix)...)
return true
})

return labelsAdapter
}

func attributeToLabels(k string, v pcommon.Value, prefix string) push.LabelsAdapter {
var labelsAdapter push.LabelsAdapter

keyWithPrefix := k
if prefix != "" {
keyWithPrefix = prefix + "_" + k
}
keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix)

typ := v.Type()
if typ == pcommon.ValueTypeMap {
mv := v.Map()
labelsAdapter = make(push.LabelsAdapter, 0, mv.Len())
mv.Range(func(k string, v pcommon.Value) bool {
labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, keyWithPrefix)...)
return true
})
} else {
labelsAdapter = push.LabelsAdapter{
push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()},
}
}

return labelsAdapter
}

func timestampFromLogRecord(lr plog.LogRecord) time.Time {
if lr.Timestamp() != 0 {
return time.Unix(0, int64(lr.Timestamp()))
Expand Down

0 comments on commit 3b0b374

Please sign in to comment.