Skip to content

Commit

Permalink
Merge pull request vectordotdev#374 from answerbook/dominic/LOG-18535…
Browse files Browse the repository at this point in the history
…-temp-revert

Revert "Merge pull request vectordotdev#370 from answerbook/dominic/LOG-18535"
  • Loading branch information
darinspivey committed Dec 19, 2023
2 parents 61e3f80 + 73a8b28 commit df87df7
Show file tree
Hide file tree
Showing 10 changed files with 8 additions and 1,742 deletions.
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,6 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix"
# make sure to update the external docs when the Lua version changes
mlua = { version = "0.8.9", default-features = false, features = ["lua54", "send", "vendored"], optional = true }

# MEZMO: added dependency for s3-sink file consolidation
gethostname = "0.4.3"

[target.'cfg(windows)'.dependencies]
windows-service = "0.6.0"

Expand Down
61 changes: 1 addition & 60 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ use crate::{
tls::TlsConfig,
};

// MEZMO: added dependencies for s3-sink file consolidation
use crate::sinks::aws_s3::file_consolidator_async::{
FileConsolidationConfig, FileConsolidatorAsync,
};
use gethostname::gethostname;

/// Configuration for the `aws_s3` sink.
#[configurable_component(sink("aws_s3"))]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -139,11 +133,6 @@ pub struct S3SinkConfig {
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,

// MEZMO: added configuration for s3-sink file consolidation
#[configurable(derived)]
#[serde(default)]
pub file_consolidation_config: FileConsolidationConfig,
}

pub(super) fn default_key_prefix() -> String {
Expand Down Expand Up @@ -171,7 +160,6 @@ impl GenerateConfig for S3SinkConfig {
tls: Some(TlsConfig::default()),
auth: AwsAuthentication::default(),
acknowledgements: Default::default(),
file_consolidation_config: Default::default(),
})
.unwrap()
}
Expand Down Expand Up @@ -240,15 +228,7 @@ impl S3SinkConfig {
compression: self.compression,
};

// MEZMO: added new file consolidation process for S3 sinks
let consolidation_process = self.build_consolidation_process(cx.proxy);
let sink = S3Sink::new(
service,
request_options,
partitioner,
batch_settings,
consolidation_process,
);
let sink = S3Sink::new(service, request_options, partitioner, batch_settings);

Ok(VectorSink::from_event_streamsink(sink))
}
Expand All @@ -264,45 +244,6 @@ impl S3SinkConfig {
pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await
}

// MEZMO: added process to define setup for s3-sink file consolidation
fn build_consolidation_process(&self, proxy: ProxyConfig) -> Option<FileConsolidatorAsync> {
// we can perform consolidation assuming that the process itself is requested via the configuration
// we only want to handle this process on the primary instance of the statefulset
// so we don't have to worry about contention between instances of sinks
let host_name = gethostname().into_string().unwrap();
if !host_name.ends_with("-0") || !self.file_consolidation_config.enabled {
info!(
message = "S3 sink file consolidation process disabled",
host_name,
config.enabled = self.file_consolidation_config.enabled,
);
return None;
} else {
info!(
message = "S3 sink file consolidation enabled",
host_name,
config.enabled = self.file_consolidation_config.enabled,
);
}

// build the S3 client and config so we can return a new FileConsolidator
let region_or_endpoint = &self.region;
let endpoint = region_or_endpoint.endpoint().unwrap_or_default();
let region = region_or_endpoint.region();

let consolidator = FileConsolidatorAsync::new(
self.auth.clone(),
region.clone(),
endpoint.clone(),
proxy.clone(),
self.tls.clone(),
self.file_consolidation_config,
self.bucket.clone(),
self.key_prefix.clone(),
);
Some(consolidator)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit df87df7

Please sign in to comment.