Skip to content

NIFI-12115 Add ListenOTLP to collect OpenTelemetry#7830

Closed
exceptionfactory wants to merge 4 commits intoapache:mainfrom
exceptionfactory:NIFI-12115
Closed

NIFI-12115 Add ListenOTLP to collect OpenTelemetry#7830
exceptionfactory wants to merge 4 commits intoapache:mainfrom
exceptionfactory:NIFI-12115

Conversation

@exceptionfactory
Copy link
Copy Markdown
Contributor

Summary

NIFI-12115 Adds a ListenOTLP Processor to collect OpenTelemetry information, supporting OTLP Specification 1.0.0.

OTLP defines HTTP as the basic communication protocol and outlines the following transport encoding options:

  • Protobuf over gRPC
  • Protobuf over HTTP
  • JSON over HTTP

OTLP also includes support for gzip compression over gRPC or HTTP.

The ListenOTLP Processor supports the standard transport encoding options and uses content negotiation based on the HTTP Content-Type header to select the appropriate parsing implementation.

The Processor converts incoming requests to standard OpenTelemetry objects and writes objects using the standard JSON encoding, which includes special handling for hexadecimal-encoding identifiers. Using JSON as the standard output format allows subsequent Processors to select or filter as necessary. Output FlowFiles can contain a batch of one or more resource elements, depending on data volumes and batch size configuration. ListenOTLP adds the client.socket.address and client.socket.port attributes to the Resource definition of received requests to provide an additional level of source tracking.

The Processor requires an SSL Context Service property to enforce communication over HTTPS. The gRPC protocol requires HTTP/2, which uses Application Protocol Negotiation in the TLS handshake to select the HTTP version. With the potentially sensitive nature of telemetry information, the initial version of ListenOTLP requires HTTPS, and can be configured for mTLS with Client Authentication required.

Additional changes include updates to the nifi-event-transport library supporting configurable TLS Cipher Suites through optional SSLParameters settings. HTTP/2 requires a restricted set of TLS Cipher Suites, so this adjustment supports that approach.

The opentelemetry-proto and jackson-datatype-protobuf libraries are both licensed under Apache Software License Version 2 with no additional notice files.

OTLP 1.0.0 JSON examples are available in the opentelemetry-proto examples directory.

For testing with an instrumented system, the OpenTelemetry Java Agent can be configured using Java System properties in bootstrap.conf for basic instrumentation of JVM memory and Jetty calls using the following properties:

java.arg.otelAgent=-javaagent:./opentelemetry-javaagent.jar
java.arg.otelCertificate=-Dotel.exporter.otlp.certificate=./conf/trusted.pem
java.arg.otelEndpoint=-Dotel.exporter.otlp.endpoint=https://localhost:4317

The trusted.pem file must contain the issuer of the server certificate configured for ListenOTLP. The OpenTelemetry Java Agent defaults to using gRPC for transport encoding without compression, but additional options can be configured based on the OTLP exporter properties.

This pull request targets the main branch, with the intent of backporting the addition following minor adjustments.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

- Added ListenOTLP Processor supporting OpenTelemetry OTLP 1.0.0 Specification with gRPC and HTTP
- Updated nifi-event-transport to support configurable SSLParameters for configurable Cipher Suites
@joewitt
Copy link
Copy Markdown
Contributor

joewitt commented Oct 3, 2023

It is alive and working - very cool. Need to learn more about what this java agent exporter really does but the json output is interesting. I did note after running for a minute or two I received a couple of these outputs. I was making minor edits to the flow when it happened but not this processor so dont have the underlying data it choked on.

2023-10-02 22:32:53,888 WARN [ListenOTLP[f3e6ea42-018a-1000-d8d4-3a6b40e25474]-1-1] o.a.n.p.opentelemetry.ListenOTLP ListenOTLP[id=f3e6ea42-018a-1000-d8d4-3a6b40e25474] Client Address [/127.0.0.1:55410] Content-Type [APPLICATION_GRPC] processing failed
java.io.UncheckedIOException: Request parsing failed
at org.apache.nifi.processors.opentelemetry.encoding.ProtobufServiceRequestReader.read(ProtobufServiceRequestReader.java:64)
at org.apache.nifi.processors.opentelemetry.io.StandardRequestContentListener.readMessages(StandardRequestContentListener.java:165)
at org.apache.nifi.processors.opentelemetry.io.StandardRequestContentListener.onSupportedRequest(StandardRequestContentListener.java:145)
at org.apache.nifi.processors.opentelemetry.io.StandardRequestContentListener.onRequest(StandardRequestContentListener.java:109)
at org.apache.nifi.processors.opentelemetry.server.Http2RequestFrameListener.onDataRead(Http2RequestFrameListener.java:177)
at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393)
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1471)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1334)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1383)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message had invalid UTF-8.
at com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:172)
at com.google.protobuf.Utf8$UnsafeProcessor.decodeUtf8Direct(Utf8.java:1434)
at com.google.protobuf.Utf8$Processor.decodeUtf8(Utf8.java:631)
at com.google.protobuf.Utf8.decodeUtf8(Utf8.java:331)
at com.google.protobuf.CodedInputStream$UnsafeDirectNioDecoder.readStringRequireUtf8(CodedInputStream.java:1528)
at io.opentelemetry.proto.metrics.v1.Metric$Builder.mergeFrom(Metric.java:1061)
at io.opentelemetry.proto.metrics.v1.Metric$1.parsePartialFrom(Metric.java:2167)
at io.opentelemetry.proto.metrics.v1.Metric$1.parsePartialFrom(Metric.java:2159)
at com.google.protobuf.CodedInputStream$UnsafeDirectNioDecoder.readMessage(CodedInputStream.java:1600)
at io.opentelemetry.proto.metrics.v1.ScopeMetrics$Builder.mergeFrom(ScopeMetrics.java:604)
at io.opentelemetry.proto.metrics.v1.ScopeMetrics$1.parsePartialFrom(ScopeMetrics.java:1248)
at io.opentelemetry.proto.metrics.v1.ScopeMetrics$1.parsePartialFrom(ScopeMetrics.java:1240)
at com.google.protobuf.CodedInputStream$UnsafeDirectNioDecoder.readMessage(CodedInputStream.java:1600)
at io.opentelemetry.proto.metrics.v1.ResourceMetrics$Builder.mergeFrom(ResourceMetrics.java:603)
at io.opentelemetry.proto.metrics.v1.ResourceMetrics$1.parsePartialFrom(ResourceMetrics.java:1243)
at io.opentelemetry.proto.metrics.v1.ResourceMetrics$1.parsePartialFrom(ResourceMetrics.java:1235)
at com.google.protobuf.CodedInputStream$UnsafeDirectNioDecoder.readMessage(CodedInputStream.java:1600)
at io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest$Builder.mergeFrom(ExportMetricsServiceRequest.java:469)
at io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest$1.parsePartialFrom(ExportMetricsServiceRequest.java:915)
at io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest$1.parsePartialFrom(ExportMetricsServiceRequest.java:907)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:134)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:149)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
at org.apache.nifi.processors.opentelemetry.encoding.ProtobufServiceRequestReader.read(ProtobufServiceRequestReader.java:56)
... 42 common frames omitted

@joewitt
Copy link
Copy Markdown
Contributor

joewitt commented Oct 3, 2023

@exceptionfactory I should add I am not particularly convicted on the byte[]/buffer stuff. Just seems like an opportunity to avoid intermediate copies. My mental model is this stuff could be in some cases fairly high rate and thus we want to be as efficient as we can. I didn't comment on the one related to decompression but it too seemed to be subject to some improvements. I looked at what we're doing in other areas such as this and we have byte[] usage but less so are we jumping between byte buffers to then new byte[] etc.. Ultimately it probably isn't worth looking into now but if we see high utilization and profiling suggests we can do better then we deal with it then.

The error I noted and stack trace above I could not get to happen again during testing last night. It suggests some funky state can be entered but I'm not sure of the nature of it. Might well be a problem with the agent itself.

Copy link
Copy Markdown
Contributor Author

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the helpful feedback @joewitt! I pushed several changes to optimize deserialization, avoiding byte array creation in several places. The input processing went through a few iterations, so it was a good opportunity to avoid unnecessary memory consumption.

I have not observed the Protobuf processing exceptions you noted in testing, but if you find a way to reproduce the issue, that would be helpful.

As one additional test, setting -Dotel.exporter.otlp.protocol=http/protobuf exercises the Protobuf over HTTP protocol versus the default gRPC protocol. Those are different exporters in the agent, so it might be interesting to see if they behave differently.

@joewitt
Copy link
Copy Markdown
Contributor

joewitt commented Oct 3, 2023

thanks - running now at rate with the latest commits and getting a series of buffer size failures and connection resets.

2023-10-03 11:45:16,798 WARN [ListenOTLP[f3e6ea42-018a-1000-d8d4-3a6b40e25474]-1-2] o.a.n.p.opentelemetry.ListenOTLP ListenOTLP[id=f3e6ea42-018a-1000-d8d4-3a6b40e25474] Communication Failed with Remote Address [/127.0.0.1:60346]
java.lang.OutOfMemoryError: Cannot reserve 4194304 bytes of direct buffer memory (allocated: 1072513273, limit: 1073741824)
at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:360)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:197)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:139)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:140)
at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)

- Set maximum input content length to 10 MB
- Set default Queue Capacity to 1000
- Set default Batch Size to 100
@exceptionfactory
Copy link
Copy Markdown
Contributor Author

Thanks for the testing and feedback @joewitt! After reviewing the request content buffering and the available Netty Handlers, I consolidated the HTTP processing to work with the Netty FullHttpRequest for both HTTP/1.1 and HTTP/2. This streamlined the implementation in HttpRequestHandler, avoiding the previous buffer handling issue that resulted in the OutOfMemoryError.

Following these changes, running a continuous stream of JSON files through InvokeHTTP did not produce any errors. Heap and non-heap memory usage also followed a consistent pattern.

As part of streamlining the handler pipeline, I added a 10 MB maximum content length limitation. Various service providers set request limits for OpenTelemetry payloads to 4 MB or 5 MB. This could be changed to a configurable property, but larger sizes also introduce potential memory usage concerns. Along similar lines, the default Queue Capacity is now 1000 and the default Batch Size is now 100, also limiting potential memory issues in the default configuration.

@joewitt
Copy link
Copy Markdown
Contributor

joewitt commented Oct 4, 2023

All looks good. And cruising around 9500 events/sec on default settings. No more errors of any kind and smooth sailing. Thanks! I'll merge to main. Can you please adjust and merge to support as desired. +1 on that

@asfgit asfgit closed this in 6394912 Oct 4, 2023
@exceptionfactory exceptionfactory added the hacktoberfest-accepted Hacktoberfest Accepted label Oct 4, 2023
exceptionfactory added a commit that referenced this pull request Oct 4, 2023
- Added ListenOTLP Processor supporting OpenTelemetry OTLP 1.0.0 Specification with gRPC and HTTP
- Updated nifi-event-transport to support configurable SSLParameters for configurable Cipher Suites

This closes #7830

Signed-off-by: Joseph Witt <joewitt@apache.org>
(cherry picked from commit 6394912)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

hacktoberfest-accepted Hacktoberfest Accepted

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants