Skip to content

Commit

Permalink
chore(api): Refactor top and tap for library use (vectordotdev#18129)
Browse files Browse the repository at this point in the history
* Expose tap EventFormatter and run

* Shorten comment

* Add default_graphql_url

* Move healthcheck error outside

* Refactor tap pattern creation

* Expose RECONNECT_DELAY

* Refactor tap to one exportable function

* Add url() method to tap Opts

* Refactor core top logic into function

* Refactor web socket URL creation

* Adjust error message

* Publicize tap/top

* Use cmd::tap

* Use cmd::top

* Allow customizing dashboard title

* Apply PR suggestion
  • Loading branch information
001wwang committed Aug 2, 2023
1 parent 8022464 commit 600f819
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 154 deletions.
33 changes: 5 additions & 28 deletions lib/vector-api-client/src/client.rs
@@ -1,8 +1,9 @@
use anyhow::Context;
use graphql_client::GraphQLQuery;
use indoc::indoc;
use url::Url;

use crate::gql::HealthQueryExt;

/// Wrapped `Result` type, that returns deserialized GraphQL response data.
pub type QueryResult<T> =
anyhow::Result<graphql_client::Response<<T as GraphQLQuery>::ResponseData>>;
Expand All @@ -19,33 +20,9 @@ impl Client {
Self { url }
}

pub async fn new_with_healthcheck(url: Url) -> Option<Self> {
#![allow(clippy::print_stderr)]

use crate::gql::HealthQueryExt;

// Create a new API client for connecting to the local/remote Vector instance.
let client = Self::new(url.clone());

// Check that the GraphQL server is reachable
match client.health_query().await {
Ok(_) => Some(client),
_ => {
eprintln!(
indoc! {"
Vector API server isn't reachable ({}).
Have you enabled the API?
To enable the API, add the following to your `vector.toml` config file:
[api]
enabled = true"},
url
);
None
}
}
/// Send a health query
pub async fn healthcheck(&self) -> Result<(), ()> {
self.health_query().await.map(|_| ()).map_err(|_| ())
}

/// Issue a GraphQL query using Reqwest, serializing the response to the associated
Expand Down
8 changes: 8 additions & 0 deletions src/config/api.rs
@@ -1,5 +1,6 @@
use std::net::{Ipv4Addr, SocketAddr};

use url::Url;
use vector_config::configurable_component;

/// API options.
Expand Down Expand Up @@ -41,6 +42,13 @@ pub fn default_address() -> Option<SocketAddr> {
Some(SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8686))
}

/// Default GraphQL API address
pub fn default_graphql_url() -> Url {
let addr = default_address().unwrap();
Url::parse(&format!("http://{}/graphql", addr))
.expect("Couldn't parse default API URL. Please report this.")
}

const fn default_playground() -> bool {
true
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Expand Up @@ -101,12 +101,12 @@ pub mod sources;
pub mod stats;
#[cfg(feature = "api-client")]
#[allow(unreachable_pub)]
mod tap;
pub mod tap;
pub mod template;
pub mod test_util;
#[cfg(feature = "api-client")]
#[allow(unreachable_pub)]
pub(crate) mod top;
pub mod top;
#[allow(unreachable_pub)]
pub mod topology;
pub mod trace;
Expand Down
63 changes: 26 additions & 37 deletions src/tap/cmd.rs
Expand Up @@ -12,60 +12,49 @@ use vector_api_client::{
Client,
};

use crate::{
config,
signal::{SignalRx, SignalTo},
};
use crate::signal::{SignalRx, SignalTo};

/// Delay (in milliseconds) before attempting to reconnect to the Vector API
const RECONNECT_DELAY: u64 = 5000;

/// CLI command func for issuing 'tap' queries, and communicating with a local/remote
/// Vector API server via HTTP/WebSockets.
pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
// Use the provided URL as the Vector GraphQL API server, or default to the local port
// provided by the API config. This will work despite `api` and `api-client` being distinct
// features; the config is available even if `api` is disabled.
let mut url = opts.url.clone().unwrap_or_else(|| {
let addr = config::api::default_address().unwrap();
Url::parse(&format!("http://{}/graphql", addr))
.expect("Couldn't parse default API URL. Please report this.")
});

pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
let url = opts.url();
// Return early with instructions for enabling the API if the endpoint isn't reachable
// via a healthcheck.
if Client::new_with_healthcheck(url.clone()).await.is_none() {
let client = Client::new(url.clone());
#[allow(clippy::print_stderr)]
if client.healthcheck().await.is_err() {
eprintln!(
indoc::indoc! {"
Vector API server isn't reachable ({}).
Have you enabled the API?
To enable the API, add the following to your `vector.toml` config file:
[api]
enabled = true"},
url
);
return exitcode::UNAVAILABLE;
}

// Change the HTTP schema to WebSockets.
url.set_scheme(match url.scheme() {
"https" => "wss",
_ => "ws",
})
.expect("Couldn't build WebSocket URL. Please report.");

// If no patterns are provided, tap all components' outputs
let outputs_patterns = if opts.component_id_patterns.is_empty()
&& opts.outputs_of.is_empty()
&& opts.inputs_of.is_empty()
{
vec!["*".to_string()]
} else {
opts.outputs_of
.iter()
.cloned()
.chain(opts.component_id_patterns.iter().cloned())
.collect()
};
tap(opts, signal_rx).await
}

/// Observe event flow from specified components
pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
let subscription_url = opts.web_socket_url();
let formatter = EventFormatter::new(opts.meta, opts.format);
let outputs_patterns = opts.outputs_patterns();

loop {
tokio::select! {
biased;
Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break,
status = run(url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => {
status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => {
if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect {
#[allow(clippy::print_stderr)]
{
Expand Down Expand Up @@ -93,7 +82,7 @@ async fn run(
Err(e) => {
#[allow(clippy::print_stderr)]
{
eprintln!("[tap] Couldn't connect to Vector API via WebSockets: {}", e);
eprintln!("[tap] Couldn't connect to API via WebSockets: {}", e);
}
return exitcode::UNAVAILABLE;
}
Expand Down
47 changes: 45 additions & 2 deletions src/tap/mod.rs
@@ -1,18 +1,23 @@
//! Tap subcommand
mod cmd;

use clap::Parser;
pub(crate) use cmd::cmd;
pub use cmd::tap;
use url::Url;
use vector_api_client::gql::TapEncodingFormat;

use crate::config::api::default_graphql_url;

/// Tap options
#[derive(Parser, Debug, Clone)]
#[command(rename_all = "kebab-case")]
pub struct Opts {
/// Interval to sample logs at, in milliseconds
#[arg(default_value = "500", short = 'i', long)]
interval: u32,

/// Vector GraphQL API server endpoint
/// GraphQL API server endpoint
#[arg(short, long)]
url: Option<Url>,

Expand Down Expand Up @@ -44,7 +49,45 @@ pub struct Opts {
#[arg(short, long)]
meta: bool,

/// Whether to reconnect if the underlying Vector API connection drops. By default, tap will attempt to reconnect if the connection drops.
/// Whether to reconnect if the underlying API connection drops. By default, tap will attempt to reconnect if the connection drops.
#[arg(short, long)]
no_reconnect: bool,
}

impl Opts {
/// Component ID patterns to tap
///
/// If no patterns are provided, tap all components' outputs
pub fn outputs_patterns(&self) -> Vec<String> {
if self.component_id_patterns.is_empty()
&& self.outputs_of.is_empty()
&& self.inputs_of.is_empty()
{
vec!["*".to_string()]
} else {
self.outputs_of
.iter()
.cloned()
.chain(self.component_id_patterns.iter().cloned())
.collect()
}
}

/// Use the provided URL as the Vector GraphQL API server, or default to the local port
/// provided by the API config.
pub fn url(&self) -> Url {
self.url.clone().unwrap_or_else(default_graphql_url)
}

/// URL with scheme set to WebSockets
pub fn web_socket_url(&self) -> Url {
let mut url = self.url();
url.set_scheme(match url.scheme() {
"https" => "wss",
_ => "ws",
})
.expect("Couldn't build WebSocket URL. Please report.");

url
}
}

0 comments on commit 600f819

Please sign in to comment.