Skip to content

Commit

Permalink
chore: Dropped error field from StreamClosed Error (vectordotdev#17693)
Browse files Browse the repository at this point in the history
Field has been unused, so I've removed it and cleaned up the usages and
conditionals where it was included previously. `send_batch()` appears to
only return `ClosedError` making some additional matching pointless.

Signed-off-by: Spencer Gilbert <spencer.gilbert@datadoghq.com>
  • Loading branch information
spencergilbert committed Jun 14, 2023
1 parent 59e2cbf commit ee480cd
Show file tree
Hide file tree
Showing 37 changed files with 74 additions and 84 deletions.
1 change: 0 additions & 1 deletion src/internal_events/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ const STREAM_CLOSED: &str = "stream_closed";

#[derive(Debug)]
pub struct StreamClosedError {
pub error: crate::source_sender::ClosedError,
pub count: usize,
}

Expand Down
8 changes: 4 additions & 4 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,17 +392,17 @@ async fn finalize_event_stream(
let mut stream = stream.map(|event| event.with_batch_notifier(&batch));

match out.send_event_stream(&mut stream).await {
Err(error) => {
emit!(StreamClosedError { error, count: 1 });
Err(_) => {
emit!(StreamClosedError { count: 1 });
}
Ok(_) => {
finalizer.add(msg.into(), receiver);
}
}
}
None => match out.send_event_stream(&mut stream).await {
Err(error) => {
emit!(StreamClosedError { error, count: 1 });
Err(_) => {
emit!(StreamClosedError { count: 1 });
}
Ok(_) => {
let ack_options = lapin::options::BasicAckOptions::default();
Expand Down
4 changes: 2 additions & 2 deletions src/sources/apache_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ fn apache_metrics(
debug!("Finished sending.");
Ok(())
}
Err(error) => {
Err(_) => {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
Err(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/aws_ecs_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ async fn aws_ecs_metrics(
endpoint: uri.path(),
});

if let Err(error) = out.send_batch(metrics).await {
emit!(StreamClosedError { error, count });
if (out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ pub(super) async fn firehose(

let count = events.len();
if let Err(error) = context.out.send_batch(events).await {
emit!(StreamClosedError {
error: error.clone(),
count,
});
emit!(StreamClosedError { count });
let error = RequestError::ShuttingDown {
request_id: request_id.clone(),
source: error,
Expand Down
4 changes: 2 additions & 2 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,9 @@ impl IngestorProcess {

let send_error = match self.out.send_event_stream(&mut stream).await {
Ok(_) => None,
Err(error) => {
Err(_) => {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
Some(crate::source_sender::ClosedError)
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl SqsSource {
}
}
}
Err(error) => emit!(StreamClosedError { error, count }),
Err(_) => emit!(StreamClosedError { count }),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ pub(crate) async fn handle_request(
} else {
out.send_batch(events).await
}
.map_err(move |error: crate::source_sender::ClosedError| {
emit!(StreamClosedError { error, count });
.map_err(|_| {
emit!(StreamClosedError { count });
warp::reject::custom(ApiError::ServerShutdown)
})?;
match receiver {
Expand Down
4 changes: 2 additions & 2 deletions src/sources/demo_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ async fn demo_logs_source(

event
});
out.send_batch(events).await.map_err(|error| {
emit!(StreamClosedError { error, count });
out.send_batch(events).await.map_err(|_| {
emit!(StreamClosedError { count });
})?;
}
Err(error) => {
Expand Down
11 changes: 4 additions & 7 deletions src/sources/docker_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,13 +814,10 @@ impl EventStreamBuilder {
let result = {
let mut stream = events_stream
.map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
self.out
.send_event_stream(&mut stream)
.await
.map_err(|error| {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { error, count });
})
self.out.send_event_stream(&mut stream).await.map_err(|_| {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { count });
})
};

// End of stream
Expand Down
4 changes: 2 additions & 2 deletions src/sources/eventstoredb_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ fn eventstoredb(

events_received.emit(CountByteSize(count, byte_size));

if let Err(error) = cx.out.send_batch(metrics).await {
emit!(StreamClosedError { count, error });
if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,8 @@ async fn run_command(
for event in &mut events {
handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
}
if let Err(error) = out.send_batch(events).await {
emit!(StreamClosedError { count, error });
if (out.send_batch(events).await).is_err() {
emit!(StreamClosedError { count });
break;
}
},
Expand Down
4 changes: 2 additions & 2 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,9 @@ pub fn file_source(
Ok(()) => {
debug!("Finished sending.");
}
Err(error) => {
Err(_) => {
let (count, _) = messages.size_hint();
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
}
}
});
Expand Down
4 changes: 2 additions & 2 deletions src/sources/file_descriptors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ async fn process_stream(
debug!("Finished sending.");
Ok(())
}
Err(error) => {
Err(_) => {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
Err(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ impl PubsubSource {

let count = events.len();
match self.out.send_batch(events).await {
Err(error) => emit!(StreamClosedError { error, count }),
Err(_) => emit!(StreamClosedError { count }),
Ok(()) => match notifier {
None => ack_ids
.send(ids)
Expand Down
4 changes: 2 additions & 2 deletions src/sources/host_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ impl HostMetricsConfig {
bytes_received.emit(ByteSize(0));
let metrics = generator.capture_metrics().await;
let count = metrics.len();
if let Err(error) = out.send_batch(metrics).await {
emit!(StreamClosedError { count, error });
if (out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/internal_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ async fn run(
Utc::now(),
);

if let Err(error) = out.send_event(Event::from(log)).await {
if (out.send_event(Event::from(log)).await).is_err() {
// this wont trigger any infinite loop considering it stops the component
emit!(StreamClosedError { error, count: 1 });
emit!(StreamClosedError { count: 1 });
return Err(());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/internal_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ impl<'a> InternalMetrics<'a> {
metric
});

if let Err(error) = self.out.send_batch(batch).await {
emit!(StreamClosedError { error, count });
if (self.out.send_batch(batch).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ impl<'a> Batch<'a> {
finalizer.finalize(cursor, self.receiver).await;
}
}
Err(error) => {
emit!(StreamClosedError { error, count });
Err(_) => {
emit!(StreamClosedError { count });
// `out` channel is closed, don't restart journalctl.
self.exiting = Some(false);
}
Expand Down
8 changes: 4 additions & 4 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ async fn parse_message(
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
match out.send_event_stream(&mut stream).await {
Err(error) => {
emit!(StreamClosedError { error, count });
Err(_) => {
emit!(StreamClosedError { count });
}
Ok(_) => {
// Drop stream to avoid borrowing `msg`: "[...] borrow might be used
Expand All @@ -461,8 +461,8 @@ async fn parse_message(
}
}
None => match out.send_event_stream(&mut stream).await {
Err(error) => {
emit!(StreamClosedError { error, count });
Err(_) => {
emit!(StreamClosedError { count });
}
Ok(_) => {
if let Err(error) =
Expand Down
3 changes: 1 addition & 2 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,7 @@ impl Source {
.map(|result| {
match result {
Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."),
Ok(Err(error)) => emit!(StreamClosedError {
error,
Ok(Err(_)) => emit!(StreamClosedError {
count: events_count
}),
Err(error) => emit!(KubernetesLifecycleError {
Expand Down
4 changes: 2 additions & 2 deletions src/sources/mongodb_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ impl SourceConfig for MongoDbMetricsConfig {

let metrics = metrics.into_iter().flatten();

if let Err(error) = cx.out.send_batch(metrics).await {
emit!(StreamClosedError { error, count });
if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ async fn nats_source(
event
});

out.send_batch(events).await.map_err(|error| {
emit!(StreamClosedError { error, count });
out.send_batch(events).await.map_err(|_| {
emit!(StreamClosedError { count });
})?;
}
Err(error) => {
Expand Down
4 changes: 2 additions & 2 deletions src/sources/nginx_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ impl SourceConfig for NginxMetricsConfig {

let metrics = metrics.into_iter().flatten();

if let Err(error) = cx.out.send_batch(metrics).await {
emit!(StreamClosedError { error, count });
if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/opentelemetry/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl LogsService for Service {
.send_batch_named(LOGS, events)
.map_err(|error| {
let message = error.to_string();
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
Status::unavailable(message)
})
.and_then(|_| handle_batch_status(receiver))
Expand Down
10 changes: 4 additions & 6 deletions src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,10 @@ async fn handle_request(
let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
let count = events.len();

out.send_batch_named(output, events)
.await
.map_err(move |error| {
emit!(StreamClosedError { error, count });
warp::reject::custom(ApiError::ServerShutdown)
})?;
out.send_batch_named(output, events).await.map_err(|_| {
emit!(StreamClosedError { count });
warp::reject::custom(ApiError::ServerShutdown)
})?;

match receiver {
None => Ok(protobuf(ExportLogsServiceResponse {}).into_response()),
Expand Down
4 changes: 2 additions & 2 deletions src/sources/postgresql_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ impl SourceConfig for PostgresqlMetricsConfig {
});

let metrics = metrics.into_iter().flatten();
if let Err(error) = cx.out.send_batch(metrics).await {
emit!(StreamClosedError { error, count });
if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ impl InputHandler {
event
});

if let Err(error) = self.cx.out.send_batch(events).await {
emit!(StreamClosedError { error, count });
if (self.cx.out.send_batch(events).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ pub(super) fn udp(

tokio::select!{
result = out.send_batch(events) => {
if let Err(error) = result {
emit!(StreamClosedError { error, count });
if result.is_err() {
emit!(StreamClosedError { count });
return Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ async fn statsd_udp(
match frame {
Ok(((events, _byte_size), _sock)) => {
let count = events.len();
if let Err(error) = out.send_batch(events).await {
emit!(StreamClosedError { error, count });
if (out.send_batch(events).await).is_err() {
emit!(StreamClosedError { count });
}
}
Err(error) => {
Expand Down
4 changes: 2 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ pub fn udp(
debug!("Finished sending.");
Ok(())
}
Err(error) => {
Err(_) => {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
Err(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ async fn handle_request(

let count = events.len();
out.send_batch(events)
.map_err(move |error: crate::source_sender::ClosedError| {
.map_err(|_| {
// can only fail if receiving end disconnected, so we are shutting down,
// probably not gracefully.
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
warp::reject::custom(RejectShuttingDown)
})
.and_then(|_| handle_batch_status(receiver))
Expand Down
4 changes: 2 additions & 2 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ pub(crate) async fn call<
debug!("Finished sending.");
Ok(())
}
Err(error) => {
Err(_) => {
let (count, _) = stream.size_hint();
emit!(StreamClosedError { error, count });
emit!(StreamClosedError { count });
Err(())
}
}
Expand Down

0 comments on commit ee480cd

Please sign in to comment.