Skip to content

Commit

Permalink
chore(clickhouse sink): refactor to new style (vectordotdev#17723)
Browse files Browse the repository at this point in the history
Closes: vectordotdev#17094

Updates the clickhouse docker image to support aarch64.
  • Loading branch information
dsmith3197 committed Jun 30, 2023
1 parent 47c3da1 commit 77ac63c
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 299 deletions.
2 changes: 1 addition & 1 deletion scripts/integration/clickhouse/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ version: '3'

services:
clickhouse:
image: docker.io/yandex/clickhouse-server:${CONFIG_VERSION}
image: docker.io/clickhouse/clickhouse-server:${CONFIG_VERSION}
2 changes: 1 addition & 1 deletion scripts/integration/clickhouse/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ env:
CLICKHOUSE_ADDRESS: http://clickhouse:8123

matrix:
version: ['19']
version: ['23']

# changes to these files/paths will invoke the integration test in CI
# expressions are evaluated using https://github.com/micromatch/picomatch
Expand Down
86 changes: 71 additions & 15 deletions src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
use vector_config::configurable_component;
use http::{Request, StatusCode, Uri};
use hyper::Body;

use super::{
service::{ClickhouseRetryLogic, ClickhouseService},
sink::ClickhouseSink,
};
use crate::{
codecs::Transformer,
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
http::Auth,
http::{get_http_scheme_from_uri, Auth, HttpClient, MaybeAuth},
sinks::{
util::{
BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig,
UriSerde,
},
Healthcheck, VectorSink,
prelude::*,
util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde},
},
tls::TlsConfig,
};

use super::http_sink::build_http_sink;

/// Configuration for the `clickhouse` sink.
#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -82,9 +79,41 @@ impl_generate_config_from_default!(ClickhouseConfig);
#[typetag::serde(name = "clickhouse")]
impl SinkConfig for ClickhouseConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
// later we can build different sink(http, native) here
// according to the clickhouseConfig
build_http_sink(self, cx).await
let endpoint = self.endpoint.with_default_parts().uri;
let protocol = get_http_scheme_from_uri(&endpoint);

let auth = self.auth.choose_one(&self.endpoint.auth)?;

let tls_settings = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls_settings, &cx.proxy)?;

let service = ClickhouseService::new(
client.clone(),
auth.clone(),
&endpoint,
self.database.as_deref(),
self.table.as_str(),
self.skip_unknown_fields,
self.date_time_best_effort,
)?;

let request_limits = self.request.unwrap_with(&Default::default());
let service = ServiceBuilder::new()
.settings(request_limits, ClickhouseRetryLogic::default())
.service(service);

let batch_settings = self.batch.into_batcher_settings()?;
let sink = ClickhouseSink::new(
batch_settings,
self.compression,
self.encoding.clone(),
service,
protocol,
);

let healthcheck = Box::pin(healthcheck(client, endpoint, auth));

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
Expand All @@ -95,3 +124,30 @@ impl SinkConfig for ClickhouseConfig {
&self.acknowledgements
}
}

async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
// TODO: check if table exists?
let uri = format!("{}/?query=SELECT%201", endpoint);
let mut request = Request::get(uri).body(Body::empty()).unwrap();

if let Some(auth) = auth {
auth.apply(&mut request);
}

let response = client.send(request).await?;

match response.status() {
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<ClickhouseConfig>();
}
}
250 changes: 0 additions & 250 deletions src/sinks/clickhouse/http_sink.rs

This file was deleted.

Loading

0 comments on commit 77ac63c

Please sign in to comment.