diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ff196ae4b2..038b0532d3 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -196,7 +196,7 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ - -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ -o /home/runner/work/flow/flow/.build/package/bin/gazette \ @@ -209,7 +209,7 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ - -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ -o /home/runner/work/flow/flow/.build/package/bin/gazette \ @@ -222,7 +222,7 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ - -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ -o /home/runner/work/flow/flow/.build/package/bin/gazette \ @@ -235,7 +235,7 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ - -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ -o /home/runner/work/flow/flow/.build/package/bin/gazette \ diff --git a/Cargo.lock b/Cargo.lock index 316a29598b..243e70208a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -673,7 +673,7 @@ dependencies = [ "futures-util", "json-pointer", "libc", - "network-proxy", + "network-tunnel", "prost", "protocol", "schemars", @@ -1973,7 +1973,7 @@ dependencies = [ ] [[package]] -name = "network-proxy" +name = "network-tunnel" version = "0.1.0" dependencies = [ "async-trait", diff --git a/Makefile b/Makefile index a2390714a2..5e272c995f 100644 --- a/Makefile +++ b/Makefile @@ -145,9 +145,9 @@ ${RUST_MUSL_BIN}/flow-schemalate: ${RUST_MUSL_BIN}/flow-parser: cargo build --target x86_64-unknown-linux-musl --release --locked -p parser -.PHONY: ${RUST_MUSL_BIN}/flow-network-proxy -${RUST_MUSL_BIN}/flow-network-proxy: - cargo build --target x86_64-unknown-linux-musl --release --locked -p network-proxy +.PHONY: ${RUST_MUSL_BIN}/flow-network-tunnel +${RUST_MUSL_BIN}/flow-network-tunnel: + cargo build --target x86_64-unknown-linux-musl --release --locked -p network-tunnel .PHONY: ${RUST_MUSL_BIN}/flow-connector-proxy ${RUST_MUSL_BIN}/flow-connector-proxy: @@ -167,7 +167,7 @@ RUST_TARGETS = \ MUSL_TARGETS = \ ${PKGDIR}/bin/flow-parser \ ${PKGDIR}/bin/flow-schemalate \ - ${PKGDIR}/bin/flow-network-proxy \ + ${PKGDIR}/bin/flow-network-tunnel \ ${PKGDIR}/bin/flow-connector-proxy \ .PHONY: rust-binaries @@ -193,8 +193,8 @@ ${PKGDIR}/bin/flow-schemalate: ${RUST_MUSL_BIN}/flow-schemalate | ${PKGDIR} cp ${RUST_MUSL_BIN}/flow-schemalate $@ ${PKGDIR}/bin/flow-parser: ${RUST_MUSL_BIN}/flow-parser | ${PKGDIR} cp ${RUST_MUSL_BIN}/flow-parser $@ -${PKGDIR}/bin/flow-network-proxy: ${RUST_MUSL_BIN}/flow-network-proxy | ${PKGDIR} - cp ${RUST_MUSL_BIN}/flow-network-proxy $@ +${PKGDIR}/bin/flow-network-tunnel: ${RUST_MUSL_BIN}/flow-network-tunnel | ${PKGDIR} + cp ${RUST_MUSL_BIN}/flow-network-tunnel $@ ${PKGDIR}/bin/flow-connector-proxy: ${RUST_MUSL_BIN}/flow-connector-proxy | ${PKGDIR} cp ${RUST_MUSL_BIN}/flow-connector-proxy $@ @@ -228,11 +228,11 @@ install-tools: ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops .PHONY: rust-test rust-test: - cargo test --release --locked --workspace --exclude parser --exclude network-proxy --exclude schemalate --exclude connector_proxy + cargo test --release --locked --workspace --exclude parser --exclude network-tunnel --exclude schemalate --exclude connector_proxy .PHONY: musl-test musl-test: - cargo test --release --locked --target x86_64-unknown-linux-musl --package parser --package network-proxy --package schemalate --package connector_proxy + cargo test --release --locked --target x86_64-unknown-linux-musl --package parser --package network-tunnel --package schemalate --package connector_proxy # `go` test targets must have PATH-based access to tools (etcd & sops), # because the `go` tool compiles tests as binaries within a temp directory, diff --git a/crates/connector_proxy/Cargo.toml b/crates/connector_proxy/Cargo.toml index 72fab3ce59..ed5fe19d9d 100644 --- a/crates/connector_proxy/Cargo.toml +++ b/crates/connector_proxy/Cargo.toml @@ -10,7 +10,7 @@ path = "src/main.rs" [dependencies] doc = { path = "../doc", version = "0.0.0" } flow_cli_common = { path = "../flow_cli_common" } -network-proxy = { path = "../network-proxy", version = "0.1.0" } +network-tunnel = { path = "../network-tunnel", version = "0.1.0" } protocol = { path = "../protocol", version = "0.0.0" } async-trait="*" diff --git a/crates/connector_proxy/src/connector_runner.rs b/crates/connector_proxy/src/connector_runner.rs index 4a2d3fc824..4175161299 100644 --- a/crates/connector_proxy/src/connector_runner.rs +++ b/crates/connector_proxy/src/connector_runner.rs @@ -2,8 +2,8 @@ use crate::apis::{FlowCaptureOperation, FlowMaterializeOperation, InterceptorStr use crate::errors::Error; use crate::interceptors::{ airbyte_source_interceptor::AirbyteSourceInterceptor, - network_proxy_capture_interceptor::NetworkProxyCaptureInterceptor, - network_proxy_materialize_interceptor::NetworkProxyMaterializeInterceptor, + network_tunnel_capture_interceptor::NetworkTunnelCaptureInterceptor, + network_tunnel_materialize_interceptor::NetworkTunnelMaterializeInterceptor, }; use crate::libs::command::{ check_exit_status, invoke_connector_delayed, invoke_connector_direct, parse_child, @@ -23,10 +23,10 @@ pub async fn run_flow_capture_connector( parse_child(invoke_connector_direct(entrypoint, args)?)?; let adapted_request_stream = - NetworkProxyCaptureInterceptor::adapt_request_stream(op, request_stream())?; + NetworkTunnelCaptureInterceptor::adapt_request_stream(op, request_stream())?; let adapted_response_stream = - NetworkProxyCaptureInterceptor::adapt_response_stream(op, response_stream(child_stdout))?; + NetworkTunnelCaptureInterceptor::adapt_response_stream(op, response_stream(child_stdout))?; streaming_all( child_stdin, @@ -50,9 +50,9 @@ pub async fn run_flow_materialize_connector( parse_child(invoke_connector_direct(entrypoint, args)?)?; let adapted_request_stream = - NetworkProxyMaterializeInterceptor::adapt_request_stream(op, request_stream())?; + NetworkTunnelMaterializeInterceptor::adapt_request_stream(op, request_stream())?; - let adapted_response_stream = NetworkProxyMaterializeInterceptor::adapt_response_stream( + let adapted_response_stream = NetworkTunnelMaterializeInterceptor::adapt_response_stream( op, response_stream(child_stdout), )?; @@ -82,10 +82,10 @@ pub async fn run_airbyte_source_connector( let adapted_request_stream = airbyte_interceptor.adapt_request_stream( op, - NetworkProxyCaptureInterceptor::adapt_request_stream(op, request_stream())?, + NetworkTunnelCaptureInterceptor::adapt_request_stream(op, request_stream())?, )?; - let adapted_response_stream = NetworkProxyCaptureInterceptor::adapt_response_stream( + let adapted_response_stream = NetworkTunnelCaptureInterceptor::adapt_response_stream( op, airbyte_interceptor.adapt_response_stream(op, response_stream(child_stdout))?, )?; diff --git a/crates/connector_proxy/src/errors.rs b/crates/connector_proxy/src/errors.rs index f246d6fe46..9f4ce3a38a 100644 --- a/crates/connector_proxy/src/errors.rs +++ b/crates/connector_proxy/src/errors.rs @@ -49,7 +49,7 @@ pub enum Error { MissingImageInspectFile, #[error(transparent)] - NetworkProxyError(#[from] network_proxy::errors::Error), + NetworkTunnelError(#[from] network_tunnel::errors::Error), #[error(transparent)] TempfilePersistError(#[from] tempfile::PersistError), diff --git a/crates/connector_proxy/src/interceptors/mod.rs b/crates/connector_proxy/src/interceptors/mod.rs index 37b44c07f0..e8edba8231 100644 --- a/crates/connector_proxy/src/interceptors/mod.rs +++ b/crates/connector_proxy/src/interceptors/mod.rs @@ -1,3 +1,3 @@ pub mod airbyte_source_interceptor; -pub mod network_proxy_capture_interceptor; -pub mod network_proxy_materialize_interceptor; +pub mod network_tunnel_capture_interceptor; +pub mod network_tunnel_materialize_interceptor; diff --git a/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs b/crates/connector_proxy/src/interceptors/network_tunnel_capture_interceptor.rs similarity index 87% rename from crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs rename to crates/connector_proxy/src/interceptors/network_tunnel_capture_interceptor.rs index 74a533ecf1..ece41962c0 100644 --- a/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs +++ b/crates/connector_proxy/src/interceptors/network_tunnel_capture_interceptor.rs @@ -1,6 +1,6 @@ use crate::apis::{FlowCaptureOperation, InterceptorStream}; use crate::errors::{Error, Must}; -use crate::libs::network_proxy::NetworkProxy; +use crate::libs::network_tunnel::NetworkTunnel; use crate::libs::protobuf::{decode_message, encode_message}; use crate::libs::stream::{get_decoded_message, stream_all_bytes}; use futures::{future, stream, StreamExt, TryStreamExt}; @@ -11,14 +11,14 @@ use protocol::capture::{ use serde_json::value::RawValue; use tokio_util::io::StreamReader; -pub struct NetworkProxyCaptureInterceptor {} +pub struct NetworkTunnelCaptureInterceptor {} -impl NetworkProxyCaptureInterceptor { +impl NetworkTunnelCaptureInterceptor { fn adapt_discover_request_stream(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream::once(async { let mut request = get_decoded_message::(in_stream).await?; - request.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( + request.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( RawValue::from_string(request.endpoint_spec_json)?, ) .await @@ -33,7 +33,7 @@ impl NetworkProxyCaptureInterceptor { Box::pin(stream::once(async { let mut request = get_decoded_message::(in_stream).await?; - request.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( + request.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( RawValue::from_string(request.endpoint_spec_json)?, ) .await @@ -49,7 +49,7 @@ impl NetworkProxyCaptureInterceptor { let mut request = get_decoded_message::(in_stream).await?; if let Some(ref mut c) = request.capture { - c.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( + c.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( RawValue::from_string(c.endpoint_spec_json.clone())?, ) .await @@ -71,7 +71,7 @@ impl NetworkProxyCaptureInterceptor { .expect("expected request is not received."); if let Some(ref mut o) = request.open { if let Some(ref mut c) = o.capture { - c.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( + c.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( RawValue::from_string(c.endpoint_spec_json.clone())?, ) .await @@ -91,7 +91,7 @@ impl NetworkProxyCaptureInterceptor { } } -impl NetworkProxyCaptureInterceptor { +impl NetworkTunnelCaptureInterceptor { pub fn adapt_request_stream( op: &FlowCaptureOperation, in_stream: InterceptorStream, @@ -114,7 +114,7 @@ impl NetworkProxyCaptureInterceptor { Ok(match op { FlowCaptureOperation::Spec => Box::pin(stream::once(async move { let mut response = get_decoded_message::(in_stream).await?; - response.endpoint_spec_schema_json = NetworkProxy::extend_endpoint_schema( + response.endpoint_spec_schema_json = NetworkTunnel::extend_endpoint_schema( RawValue::from_string(response.endpoint_spec_schema_json)?, ) .or_bail() diff --git a/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs b/crates/connector_proxy/src/interceptors/network_tunnel_materialize_interceptor.rs similarity index 86% rename from crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs rename to crates/connector_proxy/src/interceptors/network_tunnel_materialize_interceptor.rs index 9bcb484576..fe1ead6840 100644 --- a/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs +++ b/crates/connector_proxy/src/interceptors/network_tunnel_materialize_interceptor.rs @@ -1,6 +1,6 @@ use crate::apis::{FlowMaterializeOperation, InterceptorStream}; use crate::errors::{Error, Must}; -use crate::libs::network_proxy::NetworkProxy; +use crate::libs::network_tunnel::NetworkTunnel; use crate::libs::protobuf::{decode_message, encode_message}; use crate::libs::stream::{get_decoded_message, stream_all_bytes}; @@ -10,18 +10,18 @@ use protocol::materialize::{ApplyRequest, SpecResponse, TransactionRequest, Vali use serde_json::value::RawValue; use tokio_util::io::StreamReader; -pub struct NetworkProxyMaterializeInterceptor {} +pub struct NetworkTunnelMaterializeInterceptor {} -impl NetworkProxyMaterializeInterceptor { +impl NetworkTunnelMaterializeInterceptor { fn adapt_spec_request(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream::once(async { let mut request = get_decoded_message::(in_stream).await?; - request.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( + request.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( RawValue::from_string(request.endpoint_spec_json)?, ) .await - .expect("failed to start network proxy") + .expect("failed to start network tunnel") .to_string(); encode_message(&request) })) @@ -32,7 +32,7 @@ impl NetworkProxyMaterializeInterceptor { let mut request = get_decoded_message::(in_stream).await?; if let Some(ref mut m) = request.materialization { - m.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( + m.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( RawValue::from_string(m.endpoint_spec_json.clone())?, ) .await @@ -54,7 +54,7 @@ impl NetworkProxyMaterializeInterceptor { .expect("expected request is not received."); if let Some(ref mut o) = request.open { if let Some(ref mut m) = o.materialization { - m.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( + m.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( RawValue::from_string(m.endpoint_spec_json.clone())?, ) .await @@ -73,7 +73,7 @@ impl NetworkProxyMaterializeInterceptor { } } -impl NetworkProxyMaterializeInterceptor { +impl NetworkTunnelMaterializeInterceptor { pub fn adapt_request_stream( op: &FlowMaterializeOperation, in_stream: InterceptorStream, @@ -96,7 +96,7 @@ impl NetworkProxyMaterializeInterceptor { FlowMaterializeOperation::Spec => Box::pin(stream::once(async { let mut response = get_decoded_message::(in_stream).await?; - response.endpoint_spec_schema_json = NetworkProxy::extend_endpoint_schema( + response.endpoint_spec_schema_json = NetworkTunnel::extend_endpoint_schema( RawValue::from_string(response.endpoint_spec_schema_json)?, ) .or_bail() diff --git a/crates/connector_proxy/src/libs/mod.rs b/crates/connector_proxy/src/libs/mod.rs index c7ff1dedb9..870add588a 100644 --- a/crates/connector_proxy/src/libs/mod.rs +++ b/crates/connector_proxy/src/libs/mod.rs @@ -2,6 +2,6 @@ pub mod airbyte_catalog; pub mod command; pub mod image_inspect; pub mod json; -pub mod network_proxy; +pub mod network_tunnel; pub mod protobuf; pub mod stream; diff --git a/crates/connector_proxy/src/libs/network_proxy.rs b/crates/connector_proxy/src/libs/network_tunnel.rs similarity index 56% rename from crates/connector_proxy/src/libs/network_proxy.rs rename to crates/connector_proxy/src/libs/network_tunnel.rs index 4be21cfb74..f0ddab9de2 100644 --- a/crates/connector_proxy/src/libs/network_proxy.rs +++ b/crates/connector_proxy/src/libs/network_tunnel.rs @@ -1,29 +1,29 @@ use crate::errors::Error; use crate::libs::json::{create_root_schema, remove_subobject}; -use network_proxy::interface::NetworkProxyConfig; +use network_tunnel::interface::NetworkTunnelConfig; use schemars::schema::{RootSchema, Schema}; use serde_json::value::RawValue; use tokio::sync::oneshot::{self, Receiver}; use tokio::time::timeout; -pub struct NetworkProxy {} -pub const NETWORK_PROXY_KEY: &str = "networkProxy"; +pub struct NetworkTunnel {} +pub const NETWORK_TUNNEL_KEY: &str = "networkTunnel"; -impl NetworkProxy { +impl NetworkTunnel { pub fn extend_endpoint_schema( endpoint_spec_schema: Box, ) -> Result, Error> { - let network_proxy_schema = create_root_schema::(); + let network_tunnel_schema = create_root_schema::(); let mut modified_schema: RootSchema = serde_json::from_str(endpoint_spec_schema.get())?; if let Some(ref mut o) = &mut modified_schema.schema.object { - if o.as_ref().properties.contains_key(NETWORK_PROXY_KEY) { - return Err(Error::DuplicatedKeyError(NETWORK_PROXY_KEY)); + if o.as_ref().properties.contains_key(NETWORK_TUNNEL_KEY) { + return Err(Error::DuplicatedKeyError(NETWORK_TUNNEL_KEY)); } o.as_mut().properties.insert( - NETWORK_PROXY_KEY.to_string(), - Schema::Object(network_proxy_schema.schema), + NETWORK_TUNNEL_KEY.to_string(), + Schema::Object(network_tunnel_schema.schema), ); } @@ -31,24 +31,24 @@ impl NetworkProxy { RawValue::from_string(json).map_err(Into::into) } - // Start the network proxy. The receiver rx will be dropped to indicate the network proxy + // Start the network tunnel. The receiver rx will be dropped to indicate the network tunnel // is ready to accept requests. - async fn start_network_proxy( - config: NetworkProxyConfig, + async fn start_network_tunnel( + config: NetworkTunnelConfig, rx: Receiver<()>, ) -> Result<(), Error> { - let mut network_proxy = config.new_proxy(); + let mut network_tunnel = config.new_tunnel(); tokio::task::spawn(async move { - let result: Result<(), Error> = match network_proxy.prepare().await { + let result: Result<(), Error> = match network_tunnel.prepare().await { Ok(()) => { drop(rx); - network_proxy.start_serve().await.map_err(Into::into) + network_tunnel.start_serve().await.map_err(Into::into) } Err(e) => Err(e.into()), }; if let Err(ref err) = result { - tracing::error!(error=?err, "failed starting network proxy."); + tracing::error!(error=?err, "failed starting network tunnel."); std::process::exit(1); } }) @@ -57,7 +57,7 @@ impl NetworkProxy { Ok(()) } - pub async fn consume_network_proxy_config( + pub async fn consume_network_tunnel_config( endpoint_spec_json: Box, ) -> Result, Error> { if endpoint_spec_json.get().is_empty() { @@ -65,25 +65,25 @@ impl NetworkProxy { } let endpoint_spec = serde_json::from_str(endpoint_spec_json.get())?; - let (network_proxy_config, endpoint_spec) = - remove_subobject(endpoint_spec, NETWORK_PROXY_KEY); + let (network_tunnel_config, endpoint_spec) = + remove_subobject(endpoint_spec, NETWORK_TUNNEL_KEY); - let network_proxy_config: NetworkProxyConfig = match network_proxy_config { + let network_tunnel_config: NetworkTunnelConfig = match network_tunnel_config { None => return Ok(endpoint_spec_json), Some(c) => serde_json::from_value(c)?, }; let (mut tx, rx) = oneshot::channel(); - tokio::spawn(Self::start_network_proxy(network_proxy_config, rx)); + tokio::spawn(Self::start_network_tunnel(network_tunnel_config, rx)); - // TODO: Refact the network-proxy and remove the timeout logic here after all connectors are converted to work with connector-proxy. + // TODO: Refact the network-tunnel and remove the timeout logic here after all connectors are converted to work with connector-proxy. - // Block for at most 5 seconds for network proxy to be prepared. + // Block for at most 5 seconds for network tunnel to be prepared. if let Err(_) = timeout(std::time::Duration::from_secs(5), tx.closed()).await { return Err(Error::ChannelTimeoutError); }; - tracing::info!("network proxy started."); + tracing::info!("network tunnel started."); let json = serde_json::to_string_pretty(&endpoint_spec)?; RawValue::from_string(json).map_err(Into::into) diff --git a/crates/network-proxy/src/main.rs b/crates/network-proxy/src/main.rs deleted file mode 100644 index 1451000708..0000000000 --- a/crates/network-proxy/src/main.rs +++ /dev/null @@ -1,40 +0,0 @@ -pub mod interface; -pub mod sshforwarding; -pub mod errors; -pub mod networkproxy; - -use errors::Error; -use flow_cli_common::{init_logging, LogArgs, LogFormat}; -use std::io::{self, Write}; - -use interface::NetworkProxyConfig; - -#[tokio::main] -async fn main() -> io::Result<()> { - init_logging(&LogArgs{level: "info".to_string(), format: Some(LogFormat::Json)}); - if let Err(err) = run().await.as_ref() { - tracing::error!(error = ?err, "network proxy failed."); - std::process::exit(1); - } - Ok(()) -} - -async fn run() -> Result<(), Error> { - let proxy_config: NetworkProxyConfig = serde_json::from_reader(io::stdin())?; - let mut proxy = proxy_config.new_proxy(); - - proxy.prepare().await?; - - // Write "READY" to stdio to unblock Go logic. - // The current workflow assumes that - // 1. After proxy.prepare() is called, the network proxy is able to accept requests from clients without sending errors back to clients. - // 2. The network proxy is able to process client requests immediately after `proxy.start_serve` is called. - // If either of the assumptions is invalid for any new proxy type, the READY-logic need to be moved to a separate task, which - // sends out the "READY" signal after making sure the network proxy is started and working properly. - println!("READY"); - io::stdout().flush()?; - - proxy.start_serve().await?; - - Ok(()) -} \ No newline at end of file diff --git a/crates/network-proxy/.gitignore b/crates/network-tunnel/.gitignore similarity index 100% rename from crates/network-proxy/.gitignore rename to crates/network-tunnel/.gitignore diff --git a/crates/network-proxy/Cargo.toml b/crates/network-tunnel/Cargo.toml similarity index 91% rename from crates/network-proxy/Cargo.toml rename to crates/network-tunnel/Cargo.toml index 2db5c5e5b8..9040d117c3 100644 --- a/crates/network-proxy/Cargo.toml +++ b/crates/network-tunnel/Cargo.toml @@ -1,12 +1,12 @@ [package] -name = "network-proxy" +name = "network-tunnel" version = "0.1.0" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [[bin]] -name = "flow-network-proxy" +name = "flow-network-tunnel" path = "src/main.rs" [dependencies] diff --git a/crates/network-proxy/src/errors.rs b/crates/network-tunnel/src/errors.rs similarity index 100% rename from crates/network-proxy/src/errors.rs rename to crates/network-tunnel/src/errors.rs diff --git a/crates/network-proxy/src/interface.rs b/crates/network-tunnel/src/interface.rs similarity index 56% rename from crates/network-proxy/src/interface.rs rename to crates/network-tunnel/src/interface.rs index a474c48f06..038f6a6cce 100644 --- a/crates/network-proxy/src/interface.rs +++ b/crates/network-tunnel/src/interface.rs @@ -1,4 +1,4 @@ -use super::networkproxy::NetworkProxy; +use super::networktunnel::NetworkTunnel; use super::sshforwarding::{SshForwarding, SshForwardingConfig}; use schemars::JsonSchema; @@ -7,14 +7,14 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] #[serde(deny_unknown_fields)] -pub enum NetworkProxyConfig { +pub enum NetworkTunnelConfig { SshForwarding(SshForwardingConfig), } -impl NetworkProxyConfig { - pub fn new_proxy(self) -> Box { +impl NetworkTunnelConfig { + pub fn new_tunnel(self) -> Box { match self { - NetworkProxyConfig::SshForwarding(config) => Box::new(SshForwarding::new(config)), + NetworkTunnelConfig::SshForwarding(config) => Box::new(SshForwarding::new(config)), } } } diff --git a/crates/network-proxy/src/lib.rs b/crates/network-tunnel/src/lib.rs similarity index 71% rename from crates/network-proxy/src/lib.rs rename to crates/network-tunnel/src/lib.rs index aea73e18cd..d2dc8c8ae8 100644 --- a/crates/network-proxy/src/lib.rs +++ b/crates/network-tunnel/src/lib.rs @@ -1,4 +1,4 @@ pub mod errors; pub mod interface; -pub mod networkproxy; +pub mod networktunnel; pub mod sshforwarding; diff --git a/crates/network-tunnel/src/main.rs b/crates/network-tunnel/src/main.rs new file mode 100644 index 0000000000..d004c00c97 --- /dev/null +++ b/crates/network-tunnel/src/main.rs @@ -0,0 +1,40 @@ +pub mod interface; +pub mod sshforwarding; +pub mod errors; +pub mod networktunnel; + +use errors::Error; +use flow_cli_common::{init_logging, LogArgs, LogFormat}; +use std::io::{self, Write}; + +use interface::NetworkTunnelConfig; + +#[tokio::main] +async fn main() -> io::Result<()> { + init_logging(&LogArgs{level: "info".to_string(), format: Some(LogFormat::Json)}); + if let Err(err) = run().await.as_ref() { + tracing::error!(error = ?err, "network tunnel failed."); + std::process::exit(1); + } + Ok(()) +} + +async fn run() -> Result<(), Error> { + let tunnel_config: NetworkTunnelConfig = serde_json::from_reader(io::stdin())?; + let mut tunnel = tunnel_config.new_tunnel(); + + tunnel.prepare().await?; + + // Write "READY" to stdio to unblock Go logic. + // The current workflow assumes that + // 1. After tunnel.prepare() is called, the network tunnel is able to accept requests from clients without sending errors back to clients. + // 2. The network tunnel is able to process client requests immediately after `tunnel.start_serve` is called. + // If either of the assumptions is invalid for any new tunnel type, the READY-logic need to be moved to a separate task, which + // sends out the "READY" signal after making sure the network tunnel is started and working properly. + println!("READY"); + io::stdout().flush()?; + + tunnel.start_serve().await?; + + Ok(()) +} \ No newline at end of file diff --git a/crates/network-proxy/src/networkproxy.rs b/crates/network-tunnel/src/networktunnel.rs similarity index 92% rename from crates/network-proxy/src/networkproxy.rs rename to crates/network-tunnel/src/networktunnel.rs index b929a3b9ef..c2dd95a067 100644 --- a/crates/network-proxy/src/networkproxy.rs +++ b/crates/network-tunnel/src/networktunnel.rs @@ -3,7 +3,7 @@ use super::errors::Error; use async_trait::async_trait; #[async_trait] -pub trait NetworkProxy: Send { +pub trait NetworkTunnel: Send { // Setup the network proxy server. Network proxy should be able to listen and accept requests after `prepare` is performed. async fn prepare(&mut self) -> Result<(), Error>; // Start a long-running task that serves and processes all proxy requests from clients. diff --git a/crates/network-proxy/src/sshforwarding.rs b/crates/network-tunnel/src/sshforwarding.rs similarity index 98% rename from crates/network-proxy/src/sshforwarding.rs rename to crates/network-tunnel/src/sshforwarding.rs index e0336f2830..d3d5d89e59 100644 --- a/crates/network-proxy/src/sshforwarding.rs +++ b/crates/network-tunnel/src/sshforwarding.rs @@ -1,5 +1,5 @@ use super::errors::Error; -use super::networkproxy::NetworkProxy; +use super::networktunnel::NetworkTunnel; use async_trait::async_trait; use futures::pin_mut; @@ -99,7 +99,7 @@ impl SshForwarding { } #[async_trait] -impl NetworkProxy for SshForwarding { +impl NetworkTunnel for SshForwarding { async fn prepare(&mut self) -> Result<(), Error> { self.prepare_ssh_client().await?; self.prepare_local_listener().await?; diff --git a/examples/examples.db b/examples/examples.db new file mode 100644 index 0000000000..3d1e885c11 Binary files /dev/null and b/examples/examples.db differ diff --git a/examples/examples.db-shm b/examples/examples.db-shm new file mode 100644 index 0000000000..fe9ac2845e Binary files /dev/null and b/examples/examples.db-shm differ diff --git a/examples/examples.db-wal b/examples/examples.db-wal new file mode 100644 index 0000000000..e69de29bb2 diff --git a/go/network-proxy/networkproxy.go b/go/network-tunnel/networktunnel.go similarity index 66% rename from go/network-proxy/networkproxy.go rename to go/network-tunnel/networktunnel.go index f76e85e85a..9ad95333c9 100644 --- a/go/network-proxy/networkproxy.go +++ b/go/network-tunnel/networktunnel.go @@ -1,4 +1,4 @@ -package networkproxy +package networktunnel import ( "bytes" @@ -11,64 +11,64 @@ import ( "syscall" "time" - sf "github.com/estuary/flow/go/network-proxy/sshforwarding" + sf "github.com/estuary/flow/go/network-tunnel/sshforwarding" ) -const ProgramName = "network-proxy-service" +const ProgramName = "network-tunnel-service" -func SupportedNetworkProxyTypes() []string { +func SupportedNetworkTunnelTypes() []string { return []string{"sshForwarding"} } -type NetworkProxyConfig struct { - ProxyType string `json:"proxyType"` +type NetworkTunnelConfig struct { + TunnelType string `json:"tunnelType"` SshForwardingConfig sf.SshForwardingConfig `json:"sshForwarding"` } // GetFieldDocString implements the jsonschema.customSchemaGetFieldDocString interface. -func (NetworkProxyConfig) GetFieldDocString(fieldName string) string { +func (NetworkTunnelConfig) GetFieldDocString(fieldName string) string { switch fieldName { - case "ProxyType": - return fmt.Sprintf("The type of the network proxy. Supported types are: ( %s )", strings.Join(SupportedNetworkProxyTypes(), ", ")) + case "TunnelType": + return fmt.Sprintf("The type of the network tunnel. Supported types are: ( %s )", strings.Join(SupportedNetworkTunnelTypes(), ", ")) case "SshForwardingConfig": - return "Config for proxy of type sshForwarding" + return "Config for tunnel of type sshForwarding" default: return "" } } -func (npc *NetworkProxyConfig) Validate() error { +func (npc *NetworkTunnelConfig) Validate() error { if npc == nil { return nil } var supported = false - for _, t := range SupportedNetworkProxyTypes() { - if t == npc.ProxyType { + for _, t := range SupportedNetworkTunnelTypes() { + if t == npc.TunnelType { supported = true break } } if !supported { - return fmt.Errorf("Unsupported proxy type: %s. Valid values are: ( %s ).", npc.ProxyType, strings.Join(SupportedNetworkProxyTypes(), ", ")) + return fmt.Errorf("Unsupported proxy type: %s. Valid values are: ( %s ).", npc.TunnelType, strings.Join(SupportedNetworkTunnelTypes(), ", ")) } - switch npc.ProxyType { + switch npc.TunnelType { case "sshForwarding": return npc.SshForwardingConfig.Validate() default: - panic(fmt.Sprintf("Implementation of validating %s is not ready.", npc.ProxyType)) + panic(fmt.Sprintf("Implementation of validating %s is not ready.", npc.TunnelType)) } } -func (npc *NetworkProxyConfig) MarshalJSON() ([]byte, error) { +func (npc *NetworkTunnelConfig) MarshalJSON() ([]byte, error) { var m = make(map[string]interface{}) - switch npc.ProxyType { + switch npc.TunnelType { case "sshForwarding": - m[npc.ProxyType] = npc.SshForwardingConfig + m[npc.TunnelType] = npc.SshForwardingConfig default: - panic(fmt.Sprintf("Implementation of MarshalJSON for %s is missing.", npc.ProxyType)) + panic(fmt.Sprintf("Implementation of MarshalJSON for %s is missing.", npc.TunnelType)) } return json.Marshal(m) @@ -76,13 +76,13 @@ func (npc *NetworkProxyConfig) MarshalJSON() ([]byte, error) { const defaultTimeoutSecs = 5 -func (npc *NetworkProxyConfig) Start() error { +func (npc *NetworkTunnelConfig) Start() error { return npc.startInternal(defaultTimeoutSecs, os.Stderr) } -func (npc *NetworkProxyConfig) startInternal(timeoutSecs uint16, stderr io.Writer) error { +func (npc *NetworkTunnelConfig) startInternal(timeoutSecs uint16, stderr io.Writer) error { if npc == nil { - // NetworkProxyConfig is not set. + // NetworkTunnelConfig is not set. return nil } @@ -117,7 +117,7 @@ func (npc *NetworkProxyConfig) startInternal(timeoutSecs uint16, stderr io.Write } } -func (npc *NetworkProxyConfig) sendInput(cmd *exec.Cmd) error { +func (npc *NetworkTunnelConfig) sendInput(cmd *exec.Cmd) error { stdin, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("getting stdin pipe: %w", err) @@ -131,7 +131,7 @@ func (npc *NetworkProxyConfig) sendInput(cmd *exec.Cmd) error { go func() { if _, err := stdin.Write(input); err != nil { - panic("Failed to send input to network-proxy-service binary.") + panic("Failed to send input to network-tunnel-service binary.") } stdin.Close() }() diff --git a/go/network-proxy/networkproxy_test.go b/go/network-tunnel/networktunnel_test.go similarity index 98% rename from go/network-proxy/networkproxy_test.go rename to go/network-tunnel/networktunnel_test.go index 16c9db37ad..93ea2ae8a4 100644 --- a/go/network-proxy/networkproxy_test.go +++ b/go/network-tunnel/networktunnel_test.go @@ -1,4 +1,4 @@ -package networkproxy +package networktunnel /* import ( diff --git a/go/network-proxy/sshforwarding/sshforwarding.go b/go/network-tunnel/sshforwarding/sshforwarding.go similarity index 100% rename from go/network-proxy/sshforwarding/sshforwarding.go rename to go/network-tunnel/sshforwarding/sshforwarding.go diff --git a/go/network-proxy/sshforwarding/sshforwarding_test.go b/go/network-tunnel/sshforwarding/sshforwarding_test.go similarity index 100% rename from go/network-proxy/sshforwarding/sshforwarding_test.go rename to go/network-tunnel/sshforwarding/sshforwarding_test.go diff --git a/go/network-proxy/testutil.go b/go/network-tunnel/testutil.go similarity index 75% rename from go/network-proxy/testutil.go rename to go/network-tunnel/testutil.go index 655616907f..4bed78074e 100644 --- a/go/network-proxy/testutil.go +++ b/go/network-tunnel/testutil.go @@ -1,20 +1,20 @@ -package networkproxy +package networktunnel import ( "encoding/base64" "os" - sf "github.com/estuary/flow/go/network-proxy/sshforwarding" + sf "github.com/estuary/flow/go/network-tunnel/sshforwarding" ) // Configuration set based on sshforwarding/test_sshd_configs/docker-compose.yaml. -func CreateSshForwardingTestConfig(keyFilePath string, remotePort uint16) (*NetworkProxyConfig, error) { +func CreateSshForwardingTestConfig(keyFilePath string, remotePort uint16) (*NetworkTunnelConfig, error) { var b, err = os.ReadFile(keyFilePath) if err != nil { return nil, err } - return &NetworkProxyConfig{ - ProxyType: "sshForwarding", + return &NetworkTunnelConfig{ + TunnelType: "sshForwarding", SshForwardingConfig: sf.SshForwardingConfig{ SshEndpoint: "ssh://127.0.0.1:2222", SshPrivateKeyBase64: base64.RawStdEncoding.EncodeToString(b), diff --git a/site/docs/concepts/connectors.md b/site/docs/concepts/connectors.md index c7f2f0d703..848b950934 100644 --- a/site/docs/concepts/connectors.md +++ b/site/docs/concepts/connectors.md @@ -567,7 +567,7 @@ materializations: database: flow user: flow_user password: secret - networkProxy: + networkTunnel: sshForwarding: # Port on the local machine from which you'll connect to the SSH server. # If a port is specified elsewhere in the connector configuration, it must match. diff --git a/tests/sshforwarding/materialize-postgres.ssh.config.yaml b/tests/sshforwarding/materialize-postgres.ssh.config.yaml index a5a9e8ebcb..c12e23dd4c 100644 --- a/tests/sshforwarding/materialize-postgres.ssh.config.yaml +++ b/tests/sshforwarding/materialize-postgres.ssh.config.yaml @@ -3,7 +3,7 @@ port: 16666 user: flow password: flow database: flow -networkProxy: +networkTunnel: sshForwarding: localPort: 16666 forwardHost: localhost @@ -49,4 +49,4 @@ networkProxy: KySvYOfiD8waRu2Gf7IqCHdgKBi7AkE45w72GhC+GOoDNMFgnlUgoDeRzxS7idf4 MIVS3sQzezB78ZAuZx0IkH8PxgqRI/D4CK9QBC0b2IT1xmqe5LCGhsMHSvScPLV3 Uu2cs5FkJUnkRpwup7KEfJfZG80DHP81GTsioAt40igx6gVAkIo= - -----END RSA PRIVATE KEY----- \ No newline at end of file + -----END RSA PRIVATE KEY-----