From ee480cd08a5451bc3f0b83a2b037ba131e38d4b9 Mon Sep 17 00:00:00 2001 From: Spencer Gilbert Date: Wed, 14 Jun 2023 11:00:28 -0400 Subject: [PATCH] chore: Dropped error field from StreamClosed Error (#17693) Field has been unused, so I've removed it and cleaned up the usages and conditionals where it was included previously. `send_batch()` appears to only return `ClosedError` making some additional matching pointless. Signed-off-by: Spencer Gilbert --- src/internal_events/common.rs | 1 - src/sources/amqp.rs | 8 ++++---- src/sources/apache_metrics/mod.rs | 4 ++-- src/sources/aws_ecs_metrics/mod.rs | 4 ++-- src/sources/aws_kinesis_firehose/handlers.rs | 5 +---- src/sources/aws_s3/sqs.rs | 4 ++-- src/sources/aws_sqs/source.rs | 2 +- src/sources/datadog_agent/mod.rs | 4 ++-- src/sources/demo_logs.rs | 4 ++-- src/sources/docker_logs/mod.rs | 11 ++++------- src/sources/eventstoredb_metrics/mod.rs | 4 ++-- src/sources/exec/mod.rs | 4 ++-- src/sources/file.rs | 4 ++-- src/sources/file_descriptors/mod.rs | 4 ++-- src/sources/gcp_pubsub.rs | 2 +- src/sources/host_metrics/mod.rs | 4 ++-- src/sources/internal_logs.rs | 4 ++-- src/sources/internal_metrics.rs | 4 ++-- src/sources/journald.rs | 4 ++-- src/sources/kafka.rs | 8 ++++---- src/sources/kubernetes_logs/mod.rs | 3 +-- src/sources/mongodb_metrics/mod.rs | 4 ++-- src/sources/nats.rs | 4 ++-- src/sources/nginx_metrics/mod.rs | 4 ++-- src/sources/opentelemetry/grpc.rs | 2 +- src/sources/opentelemetry/http.rs | 10 ++++------ src/sources/postgresql_metrics.rs | 4 ++-- src/sources/redis/mod.rs | 4 ++-- src/sources/socket/udp.rs | 4 ++-- src/sources/statsd/mod.rs | 4 ++-- src/sources/syslog.rs | 4 ++-- src/sources/util/http/prelude.rs | 4 ++-- src/sources/util/http_client.rs | 4 ++-- src/sources/util/net/tcp/mod.rs | 4 ++-- src/sources/util/unix_datagram.rs | 4 ++-- src/sources/util/unix_stream.rs | 4 ++-- src/sources/vector/mod.rs | 2 +- 37 files changed, 74 insertions(+), 84 deletions(-) diff --git a/src/internal_events/common.rs b/src/internal_events/common.rs index fafa280df099e..0bf94f54ae25a 100644 --- a/src/internal_events/common.rs +++ b/src/internal_events/common.rs @@ -83,7 +83,6 @@ const STREAM_CLOSED: &str = "stream_closed"; #[derive(Debug)] pub struct StreamClosedError { - pub error: crate::source_sender::ClosedError, pub count: usize, } diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index 94fc250806cf5..f5cf22f493a61 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -392,8 +392,8 @@ async fn finalize_event_stream( let mut stream = stream.map(|event| event.with_batch_notifier(&batch)); match out.send_event_stream(&mut stream).await { - Err(error) => { - emit!(StreamClosedError { error, count: 1 }); + Err(_) => { + emit!(StreamClosedError { count: 1 }); } Ok(_) => { finalizer.add(msg.into(), receiver); @@ -401,8 +401,8 @@ async fn finalize_event_stream( } } None => match out.send_event_stream(&mut stream).await { - Err(error) => { - emit!(StreamClosedError { error, count: 1 }); + Err(_) => { + emit!(StreamClosedError { count: 1 }); } Ok(_) => { let ack_options = lapin::options::BasicAckOptions::default(); diff --git a/src/sources/apache_metrics/mod.rs b/src/sources/apache_metrics/mod.rs index 7a0fec5404e0e..ebcabce4525e1 100644 --- a/src/sources/apache_metrics/mod.rs +++ b/src/sources/apache_metrics/mod.rs @@ -273,9 +273,9 @@ fn apache_metrics( debug!("Finished sending."); Ok(()) } - Err(error) => { + Err(_) => { let (count, _) = stream.size_hint(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); Err(()) } } diff --git a/src/sources/aws_ecs_metrics/mod.rs b/src/sources/aws_ecs_metrics/mod.rs index fe652964cf80b..5829d4e004116 100644 --- a/src/sources/aws_ecs_metrics/mod.rs +++ b/src/sources/aws_ecs_metrics/mod.rs @@ -207,8 +207,8 @@ async fn aws_ecs_metrics( endpoint: uri.path(), }); - if let Err(error) = out.send_batch(metrics).await { - emit!(StreamClosedError { error, count }); + if (out.send_batch(metrics).await).is_err() { + emit!(StreamClosedError { count }); return Err(()); } } diff --git a/src/sources/aws_kinesis_firehose/handlers.rs b/src/sources/aws_kinesis_firehose/handlers.rs index 054f91af58a72..11bc526b0648c 100644 --- a/src/sources/aws_kinesis_firehose/handlers.rs +++ b/src/sources/aws_kinesis_firehose/handlers.rs @@ -145,10 +145,7 @@ pub(super) async fn firehose( let count = events.len(); if let Err(error) = context.out.send_batch(events).await { - emit!(StreamClosedError { - error: error.clone(), - count, - }); + emit!(StreamClosedError { count }); let error = RequestError::ShuttingDown { request_id: request_id.clone(), source: error, diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index b206e212a3465..3074a8f383d6e 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -574,9 +574,9 @@ impl IngestorProcess { let send_error = match self.out.send_event_stream(&mut stream).await { Ok(_) => None, - Err(error) => { + Err(_) => { let (count, _) = stream.size_hint(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); Some(crate::source_sender::ClosedError) } }; diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index 3a17e8c801fcb..2d01647516e54 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -180,7 +180,7 @@ impl SqsSource { } } } - Err(error) => emit!(StreamClosedError { error, count }), + Err(_) => emit!(StreamClosedError { count }), } } } diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 32afa640ff05e..2e4dbf6656597 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -443,8 +443,8 @@ pub(crate) async fn handle_request( } else { out.send_batch(events).await } - .map_err(move |error: crate::source_sender::ClosedError| { - emit!(StreamClosedError { error, count }); + .map_err(|_| { + emit!(StreamClosedError { count }); warp::reject::custom(ApiError::ServerShutdown) })?; match receiver { diff --git a/src/sources/demo_logs.rs b/src/sources/demo_logs.rs index 6a31744047501..fa92cd3ac0681 100644 --- a/src/sources/demo_logs.rs +++ b/src/sources/demo_logs.rs @@ -264,8 +264,8 @@ async fn demo_logs_source( event }); - out.send_batch(events).await.map_err(|error| { - emit!(StreamClosedError { error, count }); + out.send_batch(events).await.map_err(|_| { + emit!(StreamClosedError { count }); })?; } Err(error) => { diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 4c93b20abe12a..9dc7ffeeedd09 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -814,13 +814,10 @@ impl EventStreamBuilder { let result = { let mut stream = events_stream .map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace)); - self.out - .send_event_stream(&mut stream) - .await - .map_err(|error| { - let (count, _) = stream.size_hint(); - emit!(StreamClosedError { error, count }); - }) + self.out.send_event_stream(&mut stream).await.map_err(|_| { + let (count, _) = stream.size_hint(); + emit!(StreamClosedError { count }); + }) }; // End of stream diff --git a/src/sources/eventstoredb_metrics/mod.rs b/src/sources/eventstoredb_metrics/mod.rs index 1d205ae6799e2..7b15edff19035 100644 --- a/src/sources/eventstoredb_metrics/mod.rs +++ b/src/sources/eventstoredb_metrics/mod.rs @@ -137,8 +137,8 @@ fn eventstoredb( events_received.emit(CountByteSize(count, byte_size)); - if let Err(error) = cx.out.send_batch(metrics).await { - emit!(StreamClosedError { count, error }); + if (cx.out.send_batch(metrics).await).is_err() { + emit!(StreamClosedError { count }); break; } } diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 5b163db025772..8af71919a3d76 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -498,8 +498,8 @@ async fn run_command( for event in &mut events { handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace); } - if let Err(error) = out.send_batch(events).await { - emit!(StreamClosedError { count, error }); + if (out.send_batch(events).await).is_err() { + emit!(StreamClosedError { count }); break; } }, diff --git a/src/sources/file.rs b/src/sources/file.rs index 5e72bb7676917..1c48a2cefe298 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -659,9 +659,9 @@ pub fn file_source( Ok(()) => { debug!("Finished sending."); } - Err(error) => { + Err(_) => { let (count, _) = messages.size_hint(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); } } }); diff --git a/src/sources/file_descriptors/mod.rs b/src/sources/file_descriptors/mod.rs index 06710adec9c37..04189d33e871c 100644 --- a/src/sources/file_descriptors/mod.rs +++ b/src/sources/file_descriptors/mod.rs @@ -195,9 +195,9 @@ async fn process_stream( debug!("Finished sending."); Ok(()) } - Err(error) => { + Err(_) => { let (count, _) = stream.size_hint(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); Err(()) } } diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 262693551eb45..d38688592d4fd 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -618,7 +618,7 @@ impl PubsubSource { let count = events.len(); match self.out.send_batch(events).await { - Err(error) => emit!(StreamClosedError { error, count }), + Err(_) => emit!(StreamClosedError { count }), Ok(()) => match notifier { None => ack_ids .send(ids) diff --git a/src/sources/host_metrics/mod.rs b/src/sources/host_metrics/mod.rs index 9f577b3b85a1a..18fb45cb91fc8 100644 --- a/src/sources/host_metrics/mod.rs +++ b/src/sources/host_metrics/mod.rs @@ -296,8 +296,8 @@ impl HostMetricsConfig { bytes_received.emit(ByteSize(0)); let metrics = generator.capture_metrics().await; let count = metrics.len(); - if let Err(error) = out.send_batch(metrics).await { - emit!(StreamClosedError { count, error }); + if (out.send_batch(metrics).await).is_err() { + emit!(StreamClosedError { count }); return Err(()); } } diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index 0d22b3b310208..d74a97788a3b5 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -191,9 +191,9 @@ async fn run( Utc::now(), ); - if let Err(error) = out.send_event(Event::from(log)).await { + if (out.send_event(Event::from(log)).await).is_err() { // this wont trigger any infinite loop considering it stops the component - emit!(StreamClosedError { error, count: 1 }); + emit!(StreamClosedError { count: 1 }); return Err(()); } } diff --git a/src/sources/internal_metrics.rs b/src/sources/internal_metrics.rs index 908daab49e1f7..2b876aeaa8397 100644 --- a/src/sources/internal_metrics.rs +++ b/src/sources/internal_metrics.rs @@ -190,8 +190,8 @@ impl<'a> InternalMetrics<'a> { metric }); - if let Err(error) = self.out.send_batch(batch).await { - emit!(StreamClosedError { error, count }); + if (self.out.send_batch(batch).await).is_err() { + emit!(StreamClosedError { count }); return Err(()); } } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index f6426d69e4cdd..3ec938bda0b47 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -606,8 +606,8 @@ impl<'a> Batch<'a> { finalizer.finalize(cursor, self.receiver).await; } } - Err(error) => { - emit!(StreamClosedError { error, count }); + Err(_) => { + emit!(StreamClosedError { count }); // `out` channel is closed, don't restart journalctl. self.exiting = Some(false); } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 39f16074f5342..ca904de314a62 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -449,8 +449,8 @@ async fn parse_message( let (batch, receiver) = BatchNotifier::new_with_receiver(); let mut stream = stream.map(|event| event.with_batch_notifier(&batch)); match out.send_event_stream(&mut stream).await { - Err(error) => { - emit!(StreamClosedError { error, count }); + Err(_) => { + emit!(StreamClosedError { count }); } Ok(_) => { // Drop stream to avoid borrowing `msg`: "[...] borrow might be used @@ -461,8 +461,8 @@ async fn parse_message( } } None => match out.send_event_stream(&mut stream).await { - Err(error) => { - emit!(StreamClosedError { error, count }); + Err(_) => { + emit!(StreamClosedError { count }); } Ok(_) => { if let Err(error) = diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index de609ff055758..17878ec54c28b 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -871,8 +871,7 @@ impl Source { .map(|result| { match result { Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."), - Ok(Err(error)) => emit!(StreamClosedError { - error, + Ok(Err(_)) => emit!(StreamClosedError { count: events_count }), Err(error) => emit!(KubernetesLifecycleError { diff --git a/src/sources/mongodb_metrics/mod.rs b/src/sources/mongodb_metrics/mod.rs index a144b5f09c2a9..16dc2a79a23d3 100644 --- a/src/sources/mongodb_metrics/mod.rs +++ b/src/sources/mongodb_metrics/mod.rs @@ -147,8 +147,8 @@ impl SourceConfig for MongoDbMetricsConfig { let metrics = metrics.into_iter().flatten(); - if let Err(error) = cx.out.send_batch(metrics).await { - emit!(StreamClosedError { error, count }); + if (cx.out.send_batch(metrics).await).is_err() { + emit!(StreamClosedError { count }); return Err(()); } } diff --git a/src/sources/nats.rs b/src/sources/nats.rs index e5409836803b3..6600bffc1a256 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -238,8 +238,8 @@ async fn nats_source( event }); - out.send_batch(events).await.map_err(|error| { - emit!(StreamClosedError { error, count }); + out.send_batch(events).await.map_err(|_| { + emit!(StreamClosedError { count }); })?; } Err(error) => { diff --git a/src/sources/nginx_metrics/mod.rs b/src/sources/nginx_metrics/mod.rs index 0e6bb83049a12..6a849ddfac51d 100644 --- a/src/sources/nginx_metrics/mod.rs +++ b/src/sources/nginx_metrics/mod.rs @@ -135,8 +135,8 @@ impl SourceConfig for NginxMetricsConfig { let metrics = metrics.into_iter().flatten(); - if let Err(error) = cx.out.send_batch(metrics).await { - emit!(StreamClosedError { error, count }); + if (cx.out.send_batch(metrics).await).is_err() { + emit!(StreamClosedError { count }); return Err(()); } } diff --git a/src/sources/opentelemetry/grpc.rs b/src/sources/opentelemetry/grpc.rs index a7036d0175861..9916043520a3a 100644 --- a/src/sources/opentelemetry/grpc.rs +++ b/src/sources/opentelemetry/grpc.rs @@ -48,7 +48,7 @@ impl LogsService for Service { .send_batch_named(LOGS, events) .map_err(|error| { let message = error.to_string(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); Status::unavailable(message) }) .and_then(|_| handle_batch_status(receiver)) diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index e2d5dede3edfd..be7ebef83ef7a 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -122,12 +122,10 @@ async fn handle_request( let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); let count = events.len(); - out.send_batch_named(output, events) - .await - .map_err(move |error| { - emit!(StreamClosedError { error, count }); - warp::reject::custom(ApiError::ServerShutdown) - })?; + out.send_batch_named(output, events).await.map_err(|_| { + emit!(StreamClosedError { count }); + warp::reject::custom(ApiError::ServerShutdown) + })?; match receiver { None => Ok(protobuf(ExportLogsServiceResponse {}).into_response()), diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index 6c6d8505c6b9d..bca7a56c13317 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -227,8 +227,8 @@ impl SourceConfig for PostgresqlMetricsConfig { }); let metrics = metrics.into_iter().flatten(); - if let Err(error) = cx.out.send_batch(metrics).await { - emit!(StreamClosedError { error, count }); + if (cx.out.send_batch(metrics).await).is_err() { + emit!(StreamClosedError { count }); return Err(()); } } diff --git a/src/sources/redis/mod.rs b/src/sources/redis/mod.rs index c8ab0ed6de71e..03f4da5fe5dd8 100644 --- a/src/sources/redis/mod.rs +++ b/src/sources/redis/mod.rs @@ -279,8 +279,8 @@ impl InputHandler { event }); - if let Err(error) = self.cx.out.send_batch(events).await { - emit!(StreamClosedError { error, count }); + if (self.cx.out.send_batch(events).await).is_err() { + emit!(StreamClosedError { count }); return Err(()); } } diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index 1671be995a299..1951cc9ff01ff 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -269,8 +269,8 @@ pub(super) fn udp( tokio::select!{ result = out.send_batch(events) => { - if let Err(error) = result { - emit!(StreamClosedError { error, count }); + if result.is_err() { + emit!(StreamClosedError { count }); return Ok(()) } } diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 4f18e1d14be1b..467dae41fec30 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -295,8 +295,8 @@ async fn statsd_udp( match frame { Ok(((events, _byte_size), _sock)) => { let count = events.len(); - if let Err(error) = out.send_batch(events).await { - emit!(StreamClosedError { error, count }); + if (out.send_batch(events).await).is_err() { + emit!(StreamClosedError { count }); } } Err(error) => { diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 06c8a44ebd65d..728cda24a6c46 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -364,9 +364,9 @@ pub fn udp( debug!("Finished sending."); Ok(()) } - Err(error) => { + Err(_) => { let (count, _) = stream.size_hint(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); Err(()) } } diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 26f073f7bb7d1..1087a65b81f40 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -213,10 +213,10 @@ async fn handle_request( let count = events.len(); out.send_batch(events) - .map_err(move |error: crate::source_sender::ClosedError| { + .map_err(|_| { // can only fail if receiving end disconnected, so we are shutting down, // probably not gracefully. - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); warp::reject::custom(RejectShuttingDown) }) .and_then(|_| handle_batch_status(receiver)) diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index aa5cdb1b8db7e..08f14b878608f 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -233,9 +233,9 @@ pub(crate) async fn call< debug!("Finished sending."); Ok(()) } - Err(error) => { + Err(_) => { let (count, _) = stream.size_hint(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); Err(()) } } diff --git a/src/sources/util/net/tcp/mod.rs b/src/sources/util/net/tcp/mod.rs index e3b68e539d8aa..a31ba249a172d 100644 --- a/src/sources/util/net/tcp/mod.rs +++ b/src/sources/util/net/tcp/mod.rs @@ -405,8 +405,8 @@ async fn handle_stream( break; } } - Err(error) => { - emit!(StreamClosedError { error, count }); + Err(_) => { + emit!(StreamClosedError { count }); break; } } diff --git a/src/sources/util/unix_datagram.rs b/src/sources/util/unix_datagram.rs index 7a0f4b0e55eab..aa5c957cad060 100644 --- a/src/sources/util/unix_datagram.rs +++ b/src/sources/util/unix_datagram.rs @@ -114,8 +114,8 @@ async fn listen( handle_events(&mut events, received_from.clone()); let count = events.len(); - if let Err(error) = out.send_batch(events).await { - emit!(StreamClosedError { error, count }); + if (out.send_batch(events).await).is_err() { + emit!(StreamClosedError { count }); } }, Some(Err(error)) => { diff --git a/src/sources/util/unix_stream.rs b/src/sources/util/unix_stream.rs index f19c888362c30..3748ae005be16 100644 --- a/src/sources/util/unix_stream.rs +++ b/src/sources/util/unix_stream.rs @@ -118,8 +118,8 @@ pub fn build_unix_stream_source( handle_events(&mut events, Some(received_from.clone())); let count = events.len(); - if let Err(error) = out.send_batch(events).await { - emit!(StreamClosedError { error, count }); + if (out.send_batch(events).await).is_err() { + emit!(StreamClosedError { count }); } } Err(error) => { diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index 66aef5f7b4b19..a6f5ad5b049f3 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -78,7 +78,7 @@ impl proto::Service for Service { .send_batch(events) .map_err(|error| { let message = error.to_string(); - emit!(StreamClosedError { error, count }); + emit!(StreamClosedError { count }); Status::unavailable(message) }) .and_then(|_| handle_batch_status(receiver))