Skip to content

Commit b493027

Browse files
authored
OTLP: rest and transport action (#133907)
Indexes documents created from grouped data points and handles the http response codes according to the spec.
1 parent 98fa3b7 commit b493027

File tree

4 files changed

+548
-17
lines changed

4 files changed

+548
-17
lines changed

x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java

Lines changed: 213 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,20 @@
88
package org.elasticsearch.xpack.oteldata.otlp;
99

1010
import io.opentelemetry.api.common.Attributes;
11-
import io.opentelemetry.exporter.internal.FailedExportException;
11+
import io.opentelemetry.api.metrics.Meter;
1212
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
1313
import io.opentelemetry.sdk.common.Clock;
1414
import io.opentelemetry.sdk.common.CompletableResultCode;
1515
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1616
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
17+
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
1718
import io.opentelemetry.sdk.metrics.data.MetricData;
1819
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
1920
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
2021
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
22+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
2123
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
24+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
2225
import io.opentelemetry.sdk.resources.Resource;
2326

2427
import org.elasticsearch.client.Request;
@@ -29,17 +32,29 @@
2932
import org.elasticsearch.test.cluster.ElasticsearchCluster;
3033
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
3134
import org.elasticsearch.test.rest.ESRestTestCase;
35+
import org.elasticsearch.test.rest.ObjectPath;
3236
import org.junit.Before;
3337
import org.junit.ClassRule;
3438

39+
import java.io.IOException;
3540
import java.time.Duration;
41+
import java.time.Instant;
3642
import java.util.List;
43+
import java.util.Map;
3744
import java.util.concurrent.Executors;
3845
import java.util.concurrent.TimeUnit;
3946

4047
import static io.opentelemetry.api.common.AttributeKey.stringKey;
48+
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
49+
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
50+
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC;
51+
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC;
52+
import static org.hamcrest.Matchers.aMapWithSize;
53+
import static org.hamcrest.Matchers.anEmptyMap;
4154
import static org.hamcrest.Matchers.equalTo;
4255
import static org.hamcrest.Matchers.is;
56+
import static org.hamcrest.Matchers.isA;
57+
import static org.hamcrest.Matchers.not;
4358

4459
public class OTLPMetricsIndexingRestIT extends ESRestTestCase {
4560

@@ -56,6 +71,8 @@ public class OTLPMetricsIndexingRestIT extends ESRestTestCase {
5671
.user(USER, PASS, "superuser", false)
5772
.setting("xpack.security.autoconfiguration.enabled", "false")
5873
.setting("xpack.license.self_generated.type", "trial")
74+
.setting("xpack.ml.enabled", "false")
75+
.setting("xpack.watcher.enabled", "false")
5976
.build();
6077

6178
@Override
@@ -98,21 +115,138 @@ public void tearDown() throws Exception {
98115
super.tearDown();
99116
}
100117

118+
public void testIngestMetricViaMeterProvider() throws Exception {
119+
Meter sampleMeter = meterProvider.get("io.opentelemetry.example.metrics");
120+
long totalMemory = 42;
121+
122+
sampleMeter.gaugeBuilder("jvm.memory.total")
123+
.setDescription("Reports JVM memory usage.")
124+
.setUnit("By")
125+
.buildWithCallback(result -> result.record(totalMemory, Attributes.empty()));
126+
127+
var result = meterProvider.shutdown();
128+
assertThat(result.isSuccess(), is(true));
129+
130+
refreshMetricsIndices();
131+
132+
ObjectPath search = search("metrics-generic.otel-default");
133+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
134+
var source = search.evaluate("hits.hits.0._source");
135+
assertThat(ObjectPath.evaluate(source, "@timestamp"), isA(String.class));
136+
assertThat(ObjectPath.evaluate(source, "start_timestamp"), isA(String.class));
137+
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
138+
assertThat(ObjectPath.<Number>evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory));
139+
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
140+
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
141+
}
142+
101143
public void testIngestMetricDataViaMetricExporter() throws Exception {
102-
MetricData jvmMemoryMetricData = createDoubleGauge(
103-
TEST_RESOURCE,
104-
Attributes.empty(),
105-
"jvm.memory.total",
106-
Runtime.getRuntime().totalMemory(),
107-
"By",
108-
Clock.getDefault().now()
144+
long now = Clock.getDefault().now();
145+
long totalMemory = 42;
146+
MetricData jvmMemoryMetricData = createLongGauge(TEST_RESOURCE, Attributes.empty(), "jvm.memory.total", totalMemory, "By", now);
147+
148+
export(List.of(jvmMemoryMetricData));
149+
ObjectPath search = search("metrics-generic.otel-default");
150+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
151+
var source = search.evaluate("hits.hits.0._source");
152+
assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
153+
assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
154+
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
155+
assertThat(ObjectPath.<Number>evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory));
156+
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
157+
assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
158+
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
159+
}
160+
161+
public void testGroupingSameGroup() throws Exception {
162+
long now = Clock.getDefault().now();
163+
MetricData metric1 = createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now);
164+
// uses an equal but not the same resource to test grouping across resourceMetrics
165+
MetricData metric2 = createDoubleGauge(TEST_RESOURCE.toBuilder().build(), Attributes.empty(), "metric2", 42, "By", now);
166+
167+
export(List.of(metric1, metric2));
168+
169+
ObjectPath path = ObjectPath.createFromResponse(
170+
client().performRequest(new Request("GET", "metrics-generic.otel-default/_search"))
171+
);
172+
assertThat(path.toString(), path.evaluate("hits.total.value"), equalTo(1));
173+
assertThat(path.evaluate("hits.hits.0._source.metrics"), equalTo(Map.of("metric1", 42.0, "metric2", 42.0)));
174+
assertThat(path.evaluate("hits.hits.0._source.resource"), equalTo(Map.of("attributes", Map.of("service.name", "elasticsearch"))));
175+
}
176+
177+
public void testGroupingDifferentGroup() throws Exception {
178+
long now = Clock.getDefault().now();
179+
export(
180+
List.of(
181+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now),
182+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now + TimeUnit.MILLISECONDS.toNanos(1)),
183+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "", now),
184+
createDoubleGauge(TEST_RESOURCE, Attributes.of(stringKey("foo"), "bar"), "metric1", 42, "By", now)
185+
)
186+
);
187+
ObjectPath path = search("metrics-generic.otel-default");
188+
assertThat(path.toString(), path.evaluate("hits.total.value"), equalTo(4));
189+
}
190+
191+
public void testGauge() throws Exception {
192+
long now = Clock.getDefault().now();
193+
export(
194+
List.of(
195+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "double_gauge", 42.0, "By", now),
196+
createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now)
197+
)
109198
);
199+
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
200+
assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double"));
201+
assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge"));
202+
assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long"));
203+
assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge"));
204+
}
205+
206+
public void testCounterTemporality() throws Exception {
207+
long now = Clock.getDefault().now();
208+
export(
209+
List.of(
210+
createCounter(TEST_RESOURCE, Attributes.empty(), "cumulative_counter", 42, "By", now, CUMULATIVE, MONOTONIC),
211+
createCounter(TEST_RESOURCE, Attributes.empty(), "delta_counter", 42, "By", now, DELTA, MONOTONIC)
212+
)
213+
);
214+
215+
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
216+
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long"));
217+
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter"));
218+
assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long"));
219+
assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge"));
220+
}
221+
222+
public void testCounterMonotonicity() throws Exception {
223+
long now = Clock.getDefault().now();
224+
export(
225+
List.of(
226+
createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter", 42, "By", now, CUMULATIVE, NON_MONOTONIC),
227+
createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter_delta", 42, "By", now, DELTA, NON_MONOTONIC)
110228

111-
FailedExportException.HttpExportException exception = assertThrows(
112-
FailedExportException.HttpExportException.class,
113-
() -> export(List.of(jvmMemoryMetricData))
229+
)
114230
);
115-
assertThat(exception.getResponse().statusCode(), equalTo(RestStatus.NOT_IMPLEMENTED.getStatus()));
231+
232+
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
233+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long"));
234+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge"));
235+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long"));
236+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
237+
}
238+
239+
private static Map<String, Object> getMapping(String target) throws IOException {
240+
Map<String, Object> mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping")))
241+
.evaluate("");
242+
assertThat(mappings, aMapWithSize(1));
243+
Map<String, Object> mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings");
244+
assertThat(mapping, not(anEmptyMap()));
245+
return mapping;
246+
}
247+
248+
private static String timestampAsString(long now) {
249+
return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(now)).toString();
116250
}
117251

118252
private void export(List<MetricData> metrics) throws Exception {
@@ -124,7 +258,15 @@ private void export(List<MetricData> metrics) throws Exception {
124258
throw new RuntimeException("Failed to export metrics", failure);
125259
}
126260
assertThat(result.isSuccess(), is(true));
127-
assertOK(client().performRequest(new Request("GET", "_refresh/metrics-*")));
261+
refreshMetricsIndices();
262+
}
263+
264+
private ObjectPath search(String target) throws IOException {
265+
return ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_search")));
266+
}
267+
268+
private static void refreshMetricsIndices() throws IOException {
269+
assertOK(client().performRequest(new Request("GET", "metrics-*/_refresh")));
128270
}
129271

130272
private static MetricData createDoubleGauge(
@@ -144,4 +286,62 @@ private static MetricData createDoubleGauge(
144286
ImmutableGaugeData.create(List.of(ImmutableDoublePointData.create(timeEpochNanos, timeEpochNanos, attributes, value)))
145287
);
146288
}
289+
290+
private static MetricData createLongGauge(
291+
Resource resource,
292+
Attributes attributes,
293+
String name,
294+
long value,
295+
String unit,
296+
long timeEpochNanos
297+
) {
298+
return ImmutableMetricData.createLongGauge(
299+
resource,
300+
TEST_SCOPE,
301+
name,
302+
"Your description could be here.",
303+
unit,
304+
ImmutableGaugeData.create(List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value)))
305+
);
306+
}
307+
308+
private static MetricData createCounter(
309+
Resource resource,
310+
Attributes attributes,
311+
String name,
312+
long value,
313+
String unit,
314+
long timeEpochNanos,
315+
AggregationTemporality temporality,
316+
Monotonicity monotonicity
317+
) {
318+
return ImmutableMetricData.createLongSum(
319+
resource,
320+
TEST_SCOPE,
321+
name,
322+
"Your description could be here.",
323+
unit,
324+
ImmutableSumData.create(
325+
monotonicity.isMonotonic(),
326+
temporality,
327+
List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value))
328+
)
329+
);
330+
}
331+
332+
// this is just to enhance readability of the createCounter calls (avoid boolean parameter)
333+
enum Monotonicity {
334+
MONOTONIC(true),
335+
NON_MONOTONIC(false);
336+
337+
private final boolean monotonic;
338+
339+
Monotonicity(boolean monotonic) {
340+
this.monotonic = monotonic;
341+
}
342+
343+
public boolean isMonotonic() {
344+
return monotonic;
345+
}
346+
}
147347
}

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.oteldata.otlp;
99

10+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
11+
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.client.internal.node.NodeClient;
1214
import org.elasticsearch.common.bytes.BytesArray;
@@ -48,8 +50,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4850
ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) {
4951
@Override
5052
public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) {
51-
RestStatus restStatus = r.getStatus();
52-
return new RestResponse(restStatus, "application/x-protobuf", r.getResponse());
53+
return new RestResponse(r.getStatus(), "application/x-protobuf", r.getResponse());
5354
}
5455
})
5556
);
@@ -59,7 +60,13 @@ public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r)
5960
// (a request that does not carry any telemetry data)
6061
// the server SHOULD respond with success.
6162
// https://opentelemetry.io/docs/specs/otlp/#full-success-1
62-
return channel -> channel.sendResponse(new RestResponse(RestStatus.OK, "application/x-protobuf", new BytesArray(new byte[0])));
63+
return channel -> channel.sendResponse(
64+
new RestResponse(
65+
RestStatus.OK,
66+
"application/x-protobuf",
67+
new BytesArray(ExportMetricsServiceResponse.newBuilder().build().toByteArray())
68+
)
69+
);
6370
}
6471

6572
}

0 commit comments

Comments
 (0)