Skip to content

Commit

Permalink
Add support for concurrent exports
Browse files Browse the repository at this point in the history
Applications generating significant span volume can end up dropping data
due to the synchronous export step. According to the opentelemetry spec,

    This function will never be called concurrently for the same exporter
    instance. It can be called again only after the current call returns.

However, it does not place a restriction on concurrent I/O or anything
of that nature. There is an [ongoing discussion] about tweaking the
language to make this more clear.

With that in mind, this commit makes the exporters return a future that
can be spawned concurrently. Unfortunately, this means that the
`export()` method can no longer be async while taking &mut self. The
latter is desirable to enforce the no concurrent calls line of the spec,
so the choice is made here to return a future instead with the lifetime
decoupled from self. This resulted in a bit of additional verbosity, but
for the most part the async code can still be shoved into an async fn
for the ergonomics.

The main exception to this is the `jaeger` exporter which internally
requires a bunch of mutable references. I plan to discuss with the
opentelemetry team the overall goal of this PR and get buy-in before
making more invasive changes to support this in the jaeger exporter.

[ongoing discussion]: open-telemetry/opentelemetry-specification#2434
  • Loading branch information
jwilm committed Apr 20, 2022
1 parent b3fa553 commit 06d8b4c
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 138 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Expand Up @@ -4,31 +4,31 @@ members = [
"opentelemetry-contrib",
"opentelemetry-datadog",
"opentelemetry-http",
"opentelemetry-jaeger",
# "opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-aws",
"opentelemetry-semantic-conventions",
"opentelemetry-stackdriver",
"opentelemetry-zipkin",
"opentelemetry-zpages",
"examples/actix-http",
"examples/actix-http-tracing",
"examples/actix-udp",
"examples/async",
# "examples/actix-http",
# "examples/actix-http-tracing",
# "examples/actix-udp",
# "examples/async",
"examples/aws-xray",
"examples/basic",
# "examples/basic",
"examples/basic-otlp",
"examples/basic-otlp-with-selector",
"examples/basic-otlp-http",
"examples/datadog",
"examples/external-otlp-tonic-tokio",
"examples/grpc",
# "examples/grpc",
"examples/http",
"examples/hyper-prometheus",
"examples/tracing-grpc",
# "examples/tracing-grpc",
"examples/zipkin",
"examples/multiple-span-processors",
# "examples/multiple-span-processors",
"examples/zpages"
]
exclude = ["examples/external-otlp-grpcio-async-std"]
1 change: 1 addition & 0 deletions opentelemetry-datadog/Cargo.toml
Expand Up @@ -36,6 +36,7 @@ thiserror = "1.0"
itertools = "0.10"
http = "0.2"
lazy_static = "1.4"
futures = "0.3"

[dev-dependencies]
base64 = "0.13"
Expand Down
59 changes: 39 additions & 20 deletions opentelemetry-datadog/src/exporter/mod.rs
Expand Up @@ -5,6 +5,7 @@ pub use model::ApiVersion;
pub use model::Error;

use async_trait::async_trait;
use futures::future::BoxFuture;
use http::{Method, Request, Uri};
use itertools::Itertools;
use opentelemetry::sdk::export::trace;
Expand All @@ -29,7 +30,7 @@ const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";
/// Datadog span exporter
#[derive(Debug)]
pub struct DatadogExporter {
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
request_url: Uri,
service_name: String,
version: ApiVersion,
Expand All @@ -40,7 +41,7 @@ impl DatadogExporter {
service_name: String,
request_url: Uri,
version: ApiVersion,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
) -> Self {
DatadogExporter {
client,
Expand All @@ -49,6 +50,21 @@ impl DatadogExporter {
version,
}
}

fn build_request(&self, batch: Vec<SpanData>) -> Result<http::Request<Vec<u8>>, TraceError> {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(&self.service_name, traces)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;

Ok(req)
}
}

/// Create a new Datadog exporter pipeline builder.
Expand All @@ -63,7 +79,7 @@ pub struct DatadogPipelineBuilder {
agent_endpoint: String,
trace_config: Option<sdk::trace::Config>,
version: ApiVersion,
client: Option<Box<dyn HttpClient>>,
client: Option<Arc<dyn HttpClient>>,
}

impl Default for DatadogPipelineBuilder {
Expand All @@ -84,15 +100,15 @@ impl Default for DatadogPipelineBuilder {
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "surf-client"),
not(feature = "reqwest-blocking-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
}
}
}
Expand Down Expand Up @@ -206,7 +222,7 @@ impl DatadogPipelineBuilder {
/// Choose the http client used by uploader
pub fn with_http_client<T: HttpClient + 'static>(
mut self,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
) -> Self {
self.client = Some(client);
self
Expand Down Expand Up @@ -234,22 +250,25 @@ fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
.collect()
}

async fn send_request(
client: Arc<dyn HttpClient>,
request: http::Request<Vec<u8>>,
) -> trace::ExportResult {
let _ = client.send(request).await?.error_for_status()?;
Ok(())
}

#[async_trait]
impl trace::SpanExporter for DatadogExporter {
/// Export spans to datadog-agent
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(&self.service_name, traces)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;
let _ = self.client.send(req).await?.error_for_status()?;
Ok(())
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let request = match self.build_request(batch) {
Ok(req) => req,
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let client = self.client.clone();
Box::pin(send_request(client, request))
}
}

Expand Down
11 changes: 6 additions & 5 deletions opentelemetry-otlp/src/exporter/http.rs
@@ -1,6 +1,7 @@
use crate::{ExportConfig, Protocol};
use opentelemetry_http::HttpClient;
use std::collections::HashMap;
use std::sync::Arc;

/// Configuration of the http transport
#[cfg(feature = "http-proto")]
Expand All @@ -15,7 +16,7 @@ use std::collections::HashMap;
)]
pub struct HttpConfig {
/// Select the HTTP client
pub client: Option<Box<dyn HttpClient>>,
pub client: Option<Arc<dyn HttpClient>>,

/// Additional headers to send to the collector.
pub headers: Option<HashMap<String, String>>,
Expand All @@ -30,19 +31,19 @@ impl Default for HttpConfig {
fn default() -> Self {
HttpConfig {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
#[cfg(all(
not(feature = "reqwest-blocking-client"),
not(feature = "surf-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "surf-client"),
Expand Down Expand Up @@ -78,7 +79,7 @@ impl Default for HttpExporterBuilder {
impl HttpExporterBuilder {
/// Assign client implementation
pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
self.http_config.client = Some(Box::new(client));
self.http_config.client = Some(Arc::new(client));
self
}

Expand Down
127 changes: 83 additions & 44 deletions opentelemetry-otlp/src/span.rs
Expand Up @@ -67,7 +67,7 @@ use opentelemetry::trace::{TraceError, TracerProvider};

use async_trait::async_trait;

#[cfg(feature = "grpc-sys")]
#[cfg(any(feature = "grpc-sys", feature = "http-proto"))]
use std::sync::Arc;

use std::time::Duration;
Expand Down Expand Up @@ -280,7 +280,7 @@ pub enum SpanExporter {
/// The Collector URL
collector_endpoint: Uri,
/// The HTTP trace exporter
trace_exporter: Option<Box<dyn HttpClient>>,
trace_exporter: Option<Arc<dyn HttpClient>>,
},
}

Expand Down Expand Up @@ -408,9 +408,74 @@ impl SpanExporter {
}
}

#[cfg(feature = "grpc-sys")]
async fn grpcio_send_request(
trace_exporter: GrpcioTraceServiceClient,
request: GrpcRequest,
call_options: CallOption,
) -> ExportResult {
let receiver = trace_exporter
.export_async_opt(&request, call_options)
.map_err::<crate::Error, _>(Into::into)?;
receiver.await.map_err::<crate::Error, _>(Into::into)?;
Ok(())
}

#[cfg(feature = "tonic")]
async fn tonic_send_request(
trace_exporter: TonicTraceServiceClient<TonicChannel>,
request: Request<TonicRequest>,
) -> ExportResult {
trace_exporter
.to_owned()
.export(request)
.await
.map_err::<crate::Error, _>(Into::into)?;

Ok(())
}

#[cfg(feature = "http-proto")]
async fn http_send_request(
batch: Vec<SpanData>,
client: std::sync::Arc<dyn HttpClient>,
headers: Option<HashMap<String, String>>,
collector_endpoint: Uri,
) -> ExportResult {
let req = ProstRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
};

let mut buf = vec![];
req.encode(&mut buf)
.map_err::<crate::Error, _>(Into::into)?;

let mut request = http::Request::builder()
.method(Method::POST)
.uri(collector_endpoint)
.header(CONTENT_TYPE, "application/x-protobuf")
.body(buf)
.map_err::<crate::Error, _>(Into::into)?;

if let Some(headers) = headers {
for (k, val) in headers {
let value =
HeaderValue::from_str(val.as_ref()).map_err::<crate::Error, _>(Into::into)?;
let key = HeaderName::try_from(&k).map_err::<crate::Error, _>(Into::into)?;
request.headers_mut().insert(key, value);
}
}

client.send(request).await?;
Ok(())
}

#[async_trait]
impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
fn export(
&mut self,
batch: Vec<SpanData>,
) -> futures::future::BoxFuture<'static, ExportResult> {
match self {
#[cfg(feature = "grpc-sys")]
SpanExporter::Grpcio {
Expand Down Expand Up @@ -438,11 +503,11 @@ impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter {
call_options = call_options.headers(metadata_builder.build());
}

let receiver = trace_exporter
.export_async_opt(&request, call_options)
.map_err::<crate::Error, _>(Into::into)?;
receiver.await.map_err::<crate::Error, _>(Into::into)?;
Ok(())
Box::pin(grpcio_send_request(
trace_exporter.clone(),
request,
call_options,
))
}

#[cfg(feature = "tonic")]
Expand All @@ -468,13 +533,7 @@ impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter {
}
}

trace_exporter
.to_owned()
.export(request)
.await
.map_err::<crate::Error, _>(Into::into)?;

Ok(())
Box::pin(tonic_send_request(trace_exporter.to_owned(), request))
}

#[cfg(feature = "http-proto")]
Expand All @@ -484,36 +543,16 @@ impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter {
headers,
..
} => {
let req = ProstRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
};

let mut buf = vec![];
req.encode(&mut buf)
.map_err::<crate::Error, _>(Into::into)?;

let mut request = http::Request::builder()
.method(Method::POST)
.uri(collector_endpoint.clone())
.header(CONTENT_TYPE, "application/x-protobuf")
.body(buf)
.map_err::<crate::Error, _>(Into::into)?;

if let Some(headers) = headers.clone() {
for (k, val) in headers {
let value = HeaderValue::from_str(val.as_ref())
.map_err::<crate::Error, _>(Into::into)?;
let key =
HeaderName::try_from(&k).map_err::<crate::Error, _>(Into::into)?;
request.headers_mut().insert(key, value);
}
}

if let Some(client) = trace_exporter {
client.send(request).await?;
Ok(())
if let Some(ref client) = trace_exporter {
let client = Arc::clone(client);
Box::pin(http_send_request(
batch,
client,
headers.clone(),
collector_endpoint.clone(),
))
} else {
Err(crate::Error::NoHttpClient.into())
Box::pin(std::future::ready(Err(crate::Error::NoHttpClient.into())))
}
}
}
Expand Down

0 comments on commit 06d8b4c

Please sign in to comment.