New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingestion: native otlp ingestion support #10727
ingestion: native otlp ingestion support #10727
Conversation
pkg/loghttp/push/otlp.go
Outdated
|
||
// copy blessed attributes to stream labels | ||
for _, ba := range blessedAttributes { | ||
normalizedBlessedAttribute := prometheustranslator.NormalizeLabel(ba) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do not need to normalize it at runtime, we can generate normalizedBlessedAttributes on startup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! minor comments
integration/client/client.go
Outdated
|
||
buf, err := io.ReadAll(res.Body) | ||
if err != nil { | ||
return fmt.Errorf("reading request failed with status code %v: %w", res.StatusCode, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return fmt.Errorf("reading request failed with status code %v: %w", res.StatusCode, err) | |
return fmt.Errorf("reading response failed for status code %v: %w", res.StatusCode, err) |
integration/client/client.go
Outdated
return fmt.Errorf("reading request failed with status code %v: %w", res.StatusCode, err) | ||
} | ||
|
||
return fmt.Errorf("request failed with status code %v: %w", res.StatusCode, errors.New(string(buf))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the difference between these two formats? I believe we can just skip creating error from the string and use the string directly in the error message
return fmt.Errorf("request failed with status code %v: %w", res.StatusCode, errors.New(string(buf))) | |
return fmt.Errorf("request failed with status code %v: %s", res.StatusCode, string(buf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%w
is used for wrapping the errors, which the consumer can then unwrap. I just copied the code from the other push function. Wrapping the error here doesn't make sense, so I will change it.
|
||
// ingest logs to the current period | ||
require.NoError(t, cliDistributor.PushLogLineWithStructuredMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"})) | ||
require.NoError(t, cliDistributor.PushLogLineWithStructuredMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"})) | ||
require.NoError(t, cliDistributor.PushLogLine("lineC", now, nil, map[string]string{"job": "fake"})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we remove structured metadata from these log lines?
map[string]string{"traceID": "123"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, it was a mistake. Fixed it.
for i, stream := range resp.Data.Stream { | ||
switch i { | ||
case 0: | ||
require.Len(t, stream.Values, 2) | ||
require.Equal(t, "lineD", stream.Values[0][1]) | ||
require.Equal(t, "lineB", stream.Values[1][1]) | ||
require.Equal(t, map[string]string{ | ||
"service_name": "varlog", | ||
}, stream.Stream) | ||
case 1: | ||
require.Len(t, stream.Values, 1) | ||
require.Equal(t, "lineA", stream.Values[0][1]) | ||
require.Equal(t, map[string]string{ | ||
"service_name": "varlog", | ||
"trace_id": "1", | ||
"user_id": "2", | ||
}, stream.Stream) | ||
case 2: | ||
require.Len(t, stream.Values, 1) | ||
require.Equal(t, "lineC", stream.Values[0][1]) | ||
require.Equal(t, map[string]string{ | ||
"service_name": "varlog", | ||
"order_ids": "[5,6]", | ||
}, stream.Stream) | ||
default: | ||
t.Errorf("unexpected case %d", i) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these assertions do not make sure that there are exactly 3 log lines in the response. If the response has only 1 line it will assert the first line and will mark the test as passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work Sandeep! I left some comments.
pkg/loghttp/push/push.go
Outdated
return req, nil | ||
} | ||
|
||
func ParseHTTPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since OTEL can send HTTP requests as well, I'd rename this to ParseLokiRequest
pkg/loghttp/push/otlp.go
Outdated
} | ||
} | ||
|
||
type Stats struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since these structs are also used in push.go, I think moving them there makes more sense. Alternatively, we may create a new stats.go file
|
||
// copy blessed attributes to stream labels | ||
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized)) | ||
for _, ba := range blessedAttributesNormalized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here we should take the tenant's max_label_names_per_series
into account and put the ones not fitting into structured metadata. Otherwise, we may add more labels than we allow and discard them later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The blessed attributes should be an ordered list by preference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not going to enforce max_label_names_per_series
limit here. The limits would be enforced later at the common code path which processed the parsed push requests. It would be the user's responsibility to set appropriate blessed attributes(we are going to make it configurable) to avoid rejecting data when the label count goes beyond the limits.
} | ||
|
||
// use fields and attributes from scope as structured metadata | ||
scopeAttributesAsStructuredMetadata := attributesToLabels(scope.Attributes(), "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will allocate a push.LabelsAdapter of size scope.Attributes().Len() and then we will likely append to it up to 3 more items. I think we can an argument to attributesToLabels with the capacity we want to allocate for the returned labels adapter. Same applies to how we use it in otlpLogToPushEntry
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no way for the caller to know how big the slice has to be since the flattening of attributes could result in more elements when there are nested attributes. Also, passing that extra argument complicates the code. I would love to keep it simple and optimize it later if it happens to cause any with real usage.
log := logs.At(k) | ||
|
||
entry := otlpLogToPushEntry(log) | ||
entry.StructuredMetadata = append(entry.StructuredMetadata, resourceAttributesAsStructuredMetadata...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we'll potentially make two more allocations. I think we should do sth like:
entry := otlpLogToPushEntry(log)
structuredMetadata := make(push.LabelsAdapter, 0, len(entry.StructuredMetadata)+len(scopeAttributesAsStructuredMetadata)+len(resourceAttributesAsStructuredMetadata))
structuredMetadata = append(structuredMetadata, entry.StructuredMetadata...)
structuredMetadata = append(structuredMetadata, resourceAttributesAsStructuredMetadata...)
structuredMetadata = append(structuredMetadata, scopeAttributesAsStructuredMetadata...)
entry.StructuredMetadata = structuredMetadata
}, | ||
{ | ||
Name: "trace_id", | ||
Value: "12345678123456781234567812345678", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit difficult to see how the traceID set in the payload maps to this output. Maybe we can have the traceID as a variable and call traceID.String() here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if that would be an ideal test. I actually copied the test case from otel repo.
if traceID := log.TraceID(); !traceID.IsEmpty() { | ||
structuredMetadata = append(structuredMetadata, push.LabelAdapter{ | ||
Name: "trace_id", | ||
Value: hex.EncodeToString(traceID[:]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call traceID.String()
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not ideal to call String()
as recommended by otel lib.
if spanID := log.SpanID(); !spanID.IsEmpty() { | ||
structuredMetadata = append(structuredMetadata, push.LabelAdapter{ | ||
Name: "span_id", | ||
Value: hex.EncodeToString(spanID[:]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call spanID.String()
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
pkg/loki/modules.go
Outdated
@@ -321,19 +321,25 @@ func (t *Loki) initDistributor() (services.Service, error) { | |||
tenant.WithDefaultResolver(tenant.NewMultiResolver()) | |||
} | |||
|
|||
pushHandler := middleware.Merge( | |||
httpPushHandler := middleware.Merge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment wrt naming. I'd change it for lokiPushHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact provided they will likely always have the same middlewares I'd do:
httpPushHandlerMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
)
lokiPushHandler := httpPushHandlerMiddleware.Wrap(http.HandlerFunc(t.distributor.PushHandler))
otlpPushHandler := httpPushHandlerMiddleware.Wrap(http.HandlerFunc(t.distributor.OTLPPushHandler))
…loki (#11026) **What this PR does / why we need it**: In PR #10727, we added support for ingesting logs to loki in OTLP format. This PR adds the documentation on how to configure otel collector and how the data is mapped from OTLP format to Loki format. **Checklist** - [x] Documentation added --------- Co-authored-by: J Stickler <julie.stickler@grafana.com>
**What this PR does / why we need it**: Add support for natively supporting logs ingestion in OTLP format. `/otlp/v1/logs` is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model: * Index labels: The Resource Attributes map quite well to Index labels in Loki since both usually identify the source of the logs. The problem however is that Resource attributes in OTLP can have an unbounded number of values while Loki has a default limit of having up to 30 labels. Since Index labels in Loki can largely drive the kind of querying experience the users are going to have, we have chosen select attributes which would be picked as Index Labels. The ones that are not picked up as Index labels would be stored as Structured Metadata with each log entry. * Timestamp: LogRecord.TimeUnixNano * LogLine: LogRecord.Body holds the body of the log. However, since Loki only supports Log body in string format, we will stringify non-string values using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). * Structured Metadata: Anything which can’t be stored in Index labels and LogLine. Here is a non-exhaustive list of what will be stored in Structured Metadata to give a sense of what it will hold: * Resource Attributes not stored as Index labels is replicated and stored with each log entry. * Everything under InstrumentationScope is replicated and stored with each log entry. * Everything under LogRecord except LogRecord.Body, LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp. *NOTES*: * Since Loki does not support `.` or any other special characters other than `_` in label names, we replace all non-supported characters with `_`. * Since Loki only supports string in values of Index Labels and Structured Metadata, all the complex types are converted as follows: * Map would be flattened into label keys using `_` as separator, same as how we do it in [json parser in LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json). * Everything else is stringified using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353) **Special notes for your reviewer**: I will open follow-up PRs for: * Documentation * Make blessed attributes list configurable per tenant. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label
…loki (grafana#11026) **What this PR does / why we need it**: In PR grafana#10727, we added support for ingesting logs to loki in OTLP format. This PR adds the documentation on how to configure otel collector and how the data is mapped from OTLP format to Loki format. **Checklist** - [x] Documentation added --------- Co-authored-by: J Stickler <julie.stickler@grafana.com>
What this PR does / why we need it:
Add support for natively supporting logs ingestion in OTLP format.
/otlp/v1/logs
is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model:NOTES:
.
or any other special characters other than_
in label names, we replace all non-supported characters with_
._
as separator, same as how we do it in json parser in LogQL.Special notes for your reviewer:
I will open follow-up PRs for:
Checklist
CHANGELOG.md
updatedadd-to-release-notes
label