diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ac1d364981..038b0532d3 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -61,7 +61,6 @@ jobs: - run: make install-tools - run: go mod download - run: make rust-test - - run: make go-test-ci - run: make rust-binaries - name: Ensure that generated files are unchanged. @@ -144,6 +143,19 @@ jobs: fetch-depth: 0 submodules: true + - name: Install protobuf compiler (it's not already included in CI runner) + run: sudo apt install -y libprotobuf-dev protobuf-compiler + + # We require a minimal Go version of 1.17. + - uses: actions/setup-go@v2 + with: + go-version: "1.17.3" + + - name: Install rust toolchain + run: rustup show + - run: make extra-ci-runner-setup + - run: make print-versions + - name: Set up Google Cloud SDK uses: google-github-actions/setup-gcloud@v0 with: @@ -184,7 +196,20 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ - -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ + -o /home/runner/work/flow/flow/.build/package/bin/gazette \ + -o /home/runner/work/flow/flow/.build/package/bin/sops + + - name: make go-test-ci + run: | + make go-test-ci \ + -o /home/runner/work/flow/flow/.build/package/bin/etcd \ + -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ + -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ -o /home/runner/work/flow/flow/.build/package/bin/gazette \ @@ -197,7 +222,7 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ - -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ -o /home/runner/work/flow/flow/.build/package/bin/gazette \ @@ -210,7 +235,7 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ - -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-tunnel \ -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ -o /home/runner/work/flow/flow/.build/package/bin/gazette \ diff --git a/.vscode/settings.json b/.vscode/settings.json index e871e49d00..7b3b3c8b84 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -21,6 +21,7 @@ "files.trimTrailingWhitespace": true, "editor.formatOnSave": true, "cSpell.words": [ + "airbyte", "Firebolt", "schemalate" ], diff --git a/Cargo.lock b/Cargo.lock index b7e50da9eb..d636f2adcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -450,6 +450,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" +[[package]] +name = "bytelines" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784face321c535fcd9a1456632fa720aa53ea0640b57341d961c8c09de2da59f" +dependencies = [ + "futures", + "tokio", +] + [[package]] name = "byteorder" version = "1.4.3" @@ -662,16 +672,19 @@ dependencies = [ name = "connector_proxy" version = "0.0.0" dependencies = [ - "async-stream", "async-trait", + "bytelines", "byteorder", "bytes", "clap 3.1.8", "doc", "flow_cli_common", + "futures", "futures-core", "futures-util", - "network-proxy", + "json-pointer", + "libc", + "network-tunnel", "prost", "protocol", "schemars", @@ -680,10 +693,12 @@ dependencies = [ "structopt", "strum 0.24.0", "strum_macros 0.24.0", + "tempfile", "thiserror", "tokio", "tokio-util", "tracing", + "validator", ] [[package]] @@ -786,9 +801,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.4" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +checksum = "fdbfe11fe19ff083c48923cf179540e8cd0535903dc35e178a1fdeeb59aef51f" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1691,6 +1706,15 @@ dependencies = [ "treediff", ] +[[package]] +name = "json-pointer" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fe841b94e719a482213cee19dd04927cf412f26d8dc84c5a446c081e49c2997" +dependencies = [ + "serde_json", +] + [[package]] name = "labels" version = "0.0.0" @@ -1709,9 +1733,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" +checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259" [[package]] name = "libflate" @@ -1960,7 +1984,7 @@ dependencies = [ ] [[package]] -name = "network-proxy" +name = "network-tunnel" version = "0.1.0" dependencies = [ "async-trait", @@ -2656,9 +2680,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.17" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" +checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" dependencies = [ "proc-macro2", ] @@ -2813,9 +2837,9 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.4.3" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +checksum = "7776223e2696f1aa4c6b0170e83212f47296a00424305117d013dfe86fb0fe55" dependencies = [ "getrandom 0.2.6", "redox_syscall", @@ -3367,9 +3391,9 @@ checksum = "ab16ced94dbd8a46c82fd81e3ed9a8727dac2977ea869d217bcc4ea1f122e81f" [[package]] name = "syn" -version = "1.0.90" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" +checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" dependencies = [ "proc-macro2", "quote", diff --git a/Makefile b/Makefile index d9925b3f52..0a15e432cb 100644 --- a/Makefile +++ b/Makefile @@ -145,9 +145,9 @@ ${RUST_MUSL_BIN}/flow-schemalate: ${RUST_MUSL_BIN}/flow-parser: cargo build --target x86_64-unknown-linux-musl --release --locked -p parser -.PHONY: ${RUST_MUSL_BIN}/flow-network-proxy -${RUST_MUSL_BIN}/flow-network-proxy: - cargo build --target x86_64-unknown-linux-musl --release --locked -p network-proxy +.PHONY: ${RUST_MUSL_BIN}/flow-network-tunnel +${RUST_MUSL_BIN}/flow-network-tunnel: + cargo build --target x86_64-unknown-linux-musl --release --locked -p network-tunnel .PHONY: ${RUST_MUSL_BIN}/flow-connector-proxy ${RUST_MUSL_BIN}/flow-connector-proxy: @@ -167,7 +167,7 @@ RUST_TARGETS = \ MUSL_TARGETS = \ ${PKGDIR}/bin/flow-parser \ ${PKGDIR}/bin/flow-schemalate \ - ${PKGDIR}/bin/flow-network-proxy \ + ${PKGDIR}/bin/flow-network-tunnel \ ${PKGDIR}/bin/flow-connector-proxy \ .PHONY: rust-binaries @@ -193,12 +193,11 @@ ${PKGDIR}/bin/flow-schemalate: ${RUST_MUSL_BIN}/flow-schemalate | ${PKGDIR} cp ${RUST_MUSL_BIN}/flow-schemalate $@ ${PKGDIR}/bin/flow-parser: ${RUST_MUSL_BIN}/flow-parser | ${PKGDIR} cp ${RUST_MUSL_BIN}/flow-parser $@ -${PKGDIR}/bin/flow-network-proxy: ${RUST_MUSL_BIN}/flow-network-proxy | ${PKGDIR} - cp ${RUST_MUSL_BIN}/flow-network-proxy $@ +${PKGDIR}/bin/flow-network-tunnel: ${RUST_MUSL_BIN}/flow-network-tunnel | ${PKGDIR} + cp ${RUST_MUSL_BIN}/flow-network-tunnel $@ ${PKGDIR}/bin/flow-connector-proxy: ${RUST_MUSL_BIN}/flow-connector-proxy | ${PKGDIR} cp ${RUST_MUSL_BIN}/flow-connector-proxy $@ - ########################################################################## # Make targets used by CI: @@ -229,11 +228,11 @@ install-tools: ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops .PHONY: rust-test rust-test: - cargo test --release --locked --workspace --exclude parser --exclude network-proxy --exclude schemalate --exclude connector_proxy + cargo test --release --locked --workspace --exclude parser --exclude network-tunnel --exclude schemalate --exclude connector_proxy .PHONY: musl-test musl-test: - cargo test --release --locked --target x86_64-unknown-linux-musl --package parser --package network-proxy --package schemalate --package connector_proxy + cargo test --release --locked --target x86_64-unknown-linux-musl --package parser --package network-tunnel --package schemalate --package connector_proxy # `go` test targets must have PATH-based access to tools (etcd & sops), # because the `go` tool compiles tests as binaries within a temp directory, @@ -245,7 +244,7 @@ go-test-fast: $(GO_BUILD_DEPS) | ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops ./go.sh test -p ${NPROC} --tags "${GO_BUILD_TAGS}" ./go/... .PHONY: go-test-ci -go-test-ci: $(GO_BUILD_DEPS) | ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops +go-test-ci: $(GO_BUILD_DEPS) | ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops ${PKGDIR}/bin/flow-connector-proxy ${PKGDIR}/bin/flowctl ${PKGDIR}/bin/flowctl-go PATH=${PKGDIR}/bin:$$PATH ;\ GORACE="halt_on_error=1" ;\ ./go.sh test -p ${NPROC} --tags "${GO_BUILD_TAGS}" --race --count=15 --failfast ./go/... @@ -282,4 +281,4 @@ docker-push: # It is invoked only for builds on the master branch. .PHONY: docker-push-dev docker-push-dev: - docker push ghcr.io/estuary/flow:dev + docker push ghcr.io/estuary/flow:dev \ No newline at end of file diff --git a/crates/connector_proxy/Cargo.toml b/crates/connector_proxy/Cargo.toml index 1765af109f..9cf16674ad 100644 --- a/crates/connector_proxy/Cargo.toml +++ b/crates/connector_proxy/Cargo.toml @@ -10,16 +10,18 @@ path = "src/main.rs" [dependencies] doc = { path = "../doc", version = "0.0.0" } flow_cli_common = { path = "../flow_cli_common" } -network-proxy = { path = "../network-proxy", version = "0.1.0" } +network-tunnel = { path = "../network-tunnel", version = "0.1.0" } protocol = { path = "../protocol", version = "0.0.0" } async-trait="*" -async-stream="*" bytes = "*" byteorder="*" clap = { version = "^3", features = ["derive"] } futures-core = "*" futures-util="*" +futures="*" +json-pointer="*" +libc="*" prost = "*" schemars = "*" serde = { version = "*", features = ["derive"]} @@ -27,7 +29,10 @@ serde_json = { version = "*", features = ["raw_value"]} structopt = "*" strum = "*" strum_macros = "*" +tempfile="*" thiserror = "*" tokio = { version = "1.15.0", features = ["full"] } tokio-util = { version = "*", features = ["io"] } +bytelines = "*" tracing="*" +validator = { version = "*", features = ["derive"] } \ No newline at end of file diff --git a/crates/connector_proxy/src/apis.rs b/crates/connector_proxy/src/apis.rs index d3bf09b06c..7c6c781bbd 100644 --- a/crates/connector_proxy/src/apis.rs +++ b/crates/connector_proxy/src/apis.rs @@ -1,9 +1,20 @@ -use crate::errors::Error; use bytes::Bytes; use clap::ArgEnum; -use futures_core::stream::Stream; +use futures::TryStream; use std::pin::Pin; +// The protocol used by FlowRuntime to speak with connector-proxy. +// There are two ways to infer the protocol. +// 1. From the proxy command passed in from FlowRuntime to the connector proxy. +// 2. From the connector image labels and tags. +// The proxy raises an error if both are inconsistent. +#[derive(Debug, strum_macros::Display, ArgEnum, PartialEq, Clone)] +#[strum(serialize_all = "snake_case")] +pub enum FlowRuntimeProtocol { + Capture, + Materialize, +} + // Flow Capture operations defined in // https://github.com/estuary/flow/blob/master/go/protocols/capture/capture.proto #[derive(Debug, strum_macros::Display, ArgEnum, Clone)] @@ -29,39 +40,12 @@ pub enum FlowMaterializeOperation { Transactions, } -// To be used as a trait bound for interceptors. -pub trait FlowOperation {} -impl FlowOperation for FlowCaptureOperation {} -impl FlowOperation for FlowMaterializeOperation {} - // An interceptor modifies the request/response streams between Flow runtime and the connector. // InterceptorStream defines the type of input and output streams handled by interceptors. -pub type InterceptorStream = Pin> + Send + Sync>>; - -// The generic param "T" below is bounded by FlowOperation. -// A converter is a function that contains the specific stream-handling logic of an interceptor. -type ConverterFn = Box Result>; -// An intercept is characterized by a pair of converters, corresponding to the handling logic of request and response streams, respectively. -pub type RequestResponseConverterPair = (ConverterFn, ConverterFn); -pub trait Interceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(|_op, stream| Ok(stream)), - Box::new(|_op, stream| Ok(stream)), - ) - } -} - -// Two converter pairs can be composed together to form a new converter pair. -pub fn compose( - a: RequestResponseConverterPair, - b: RequestResponseConverterPair, -) -> RequestResponseConverterPair { - let (req_a, resp_a) = a; - let (req_b, resp_b) = b; - ( - Box::new(move |op, stream| (req_b)(op, (req_a)(op, stream)?)), - // Response conversions are applied in the reverse order of the request conversions. - Box::new(move |op, stream| (resp_a)(op, (resp_b)(op, stream)?)), - ) -} +pub type InterceptorStream = Pin< + Box< + dyn TryStream> + + Send + + Sync, + >, +>; diff --git a/crates/connector_proxy/src/connector_runner.rs b/crates/connector_proxy/src/connector_runner.rs index fbe1a9d61c..2b349d6c11 100644 --- a/crates/connector_proxy/src/connector_runner.rs +++ b/crates/connector_proxy/src/connector_runner.rs @@ -1,48 +1,144 @@ -use crate::apis::RequestResponseConverterPair; +use crate::apis::{FlowCaptureOperation, FlowMaterializeOperation, InterceptorStream}; use crate::errors::Error; -use crate::libs::command::{check_exit_status, invoke_connector}; +use crate::interceptors::{ + airbyte_source_interceptor::AirbyteSourceInterceptor, + network_tunnel_capture_interceptor::NetworkTunnelCaptureInterceptor, + network_tunnel_materialize_interceptor::NetworkTunnelMaterializeInterceptor, +}; +use crate::libs::command::{ + check_exit_status, invoke_connector_delayed, invoke_connector_direct, parse_child, +}; use tokio::io::copy; +use tokio::process::{ChildStderr, ChildStdin, ChildStdout}; use tokio_util::io::{ReaderStream, StreamReader}; -pub async fn run_connector( - operation: T, +pub async fn run_flow_capture_connector( + op: &FlowCaptureOperation, entrypoint: Vec, - converter_pair: RequestResponseConverterPair, ) -> Result<(), Error> { - // prepare entrypoint and args. + let (entrypoint, mut args) = parse_entrypoint(&entrypoint)?; + args.push(op.to_string()); + + let (mut child, child_stdin, child_stdout, child_stderr) = + parse_child(invoke_connector_direct(entrypoint, args)?)?; + + let adapted_request_stream = + NetworkTunnelCaptureInterceptor::adapt_request_stream(op, request_stream())?; + + let adapted_response_stream = + NetworkTunnelCaptureInterceptor::adapt_response_stream(op, response_stream(child_stdout))?; + + streaming_all( + child_stdin, + child_stderr, + adapted_request_stream, + adapted_response_stream, + ) + .await?; + + check_exit_status("flow capture connector:", child.wait().await) +} + +pub async fn run_flow_materialize_connector( + op: &FlowMaterializeOperation, + entrypoint: Vec, +) -> Result<(), Error> { + let (entrypoint, mut args) = parse_entrypoint(&entrypoint)?; + args.push(op.to_string()); + + let (mut child, child_stdin, child_stdout, child_stderr) = + parse_child(invoke_connector_direct(entrypoint, args)?)?; + + let adapted_request_stream = + NetworkTunnelMaterializeInterceptor::adapt_request_stream(op, request_stream())?; + + let adapted_response_stream = NetworkTunnelMaterializeInterceptor::adapt_response_stream( + op, + response_stream(child_stdout), + )?; + + streaming_all( + child_stdin, + child_stderr, + adapted_request_stream, + adapted_response_stream, + ) + .await?; + + check_exit_status("flow materialize connector:", child.wait().await) +} + +pub async fn run_airbyte_source_connector( + op: &FlowCaptureOperation, + entrypoint: Vec, +) -> Result<(), Error> { + let mut airbyte_interceptor = AirbyteSourceInterceptor::new(); + + let (entrypoint, args) = parse_entrypoint(&entrypoint)?; + let args = airbyte_interceptor.adapt_command_args(op, args)?; + + let (mut child, child_stdin, child_stdout, child_stderr) = + parse_child(invoke_connector_delayed(entrypoint, args).await?)?; + + let adapted_request_stream = airbyte_interceptor.adapt_request_stream( + op, + NetworkTunnelCaptureInterceptor::adapt_request_stream(op, request_stream())?, + )?; + + let adapted_response_stream = NetworkTunnelCaptureInterceptor::adapt_response_stream( + op, + airbyte_interceptor.adapt_response_stream(op, response_stream(child_stdout))?, + )?; + + streaming_all( + child_stdin, + child_stderr, + adapted_request_stream, + adapted_response_stream, + ) + .await?; + + check_exit_status("airbyte source connector:", child.wait().await) +} + +fn parse_entrypoint(entrypoint: &Vec) -> Result<(String, Vec), Error> { if entrypoint.len() == 0 { return Err(Error::EmptyEntrypointError); } - let mut args = Vec::new(); - args.extend_from_slice(&entrypoint[1..]); - args.push(operation.to_string()); - - let entrypoint = entrypoint[0].clone(); - - // invoke the connector and converts the request/response streams. - let mut child = invoke_connector(entrypoint, &args)?; - - let (request_converter, response_converter) = converter_pair; - // Perform conversions on requests and responses and starts bi-directional copying. - let mut request_source = StreamReader::new((request_converter)( - &operation, - Box::pin(ReaderStream::new(tokio::io::stdin())), - )?); - let mut request_destination = child.stdin.take().ok_or(Error::MissingIOPipe)?; - - let response_stream_out = child.stdout.take().ok_or(Error::MissingIOPipe)?; - let mut response_source = StreamReader::new((response_converter)( - &operation, - Box::pin(ReaderStream::new(response_stream_out)), - )?); - let mut response_destination = tokio::io::stdout(); - - let (a, b) = tokio::join!( - copy(&mut request_source, &mut request_destination), - copy(&mut response_source, &mut response_destination) - ); - a?; - b?; - check_exit_status(child.wait().await) + return Ok((entrypoint[0].clone(), entrypoint[1..].to_vec())); +} + +fn request_stream() -> InterceptorStream { + Box::pin(ReaderStream::new(tokio::io::stdin())) +} + +fn response_stream(child_stdout: ChildStdout) -> InterceptorStream { + Box::pin(ReaderStream::new(child_stdout)) +} + +async fn streaming_all( + mut request_stream_writer: ChildStdin, + mut error_reader: ChildStderr, + request_stream: InterceptorStream, + response_stream: InterceptorStream, +) -> Result<(), Error> { + let mut request_stream_reader = StreamReader::new(request_stream); + let mut response_stream_reader = StreamReader::new(response_stream); + let mut response_stream_writer = tokio::io::stdout(); + let mut error_writer = tokio::io::stderr(); + + let (a, b, c) = tokio::try_join!( + copy(&mut request_stream_reader, &mut request_stream_writer), + copy(&mut response_stream_reader, &mut response_stream_writer), + copy(&mut error_reader, &mut error_writer), + )?; + + tracing::info!( + req_stream = a, + resp_stream = b, + stderr = c, + "Done streaming" + ); + Ok(()) } diff --git a/crates/connector_proxy/src/errors.rs b/crates/connector_proxy/src/errors.rs index 47eda729a9..0316bcf12a 100644 --- a/crates/connector_proxy/src/errors.rs +++ b/crates/connector_proxy/src/errors.rs @@ -1,5 +1,8 @@ #[derive(thiserror::Error, Debug)] pub enum Error { + #[error("failed in starting bouncer process.")] + BouncerProcessStartError, + #[error("channel timeout in receiving messages after 5 seconds.")] ChannelTimeoutError, @@ -18,6 +21,12 @@ pub enum Error { #[error("missing process io pipes.")] MissingIOPipe, + #[error("mismatching runtime protocol")] + MismatchingRuntimeProtocol, + + #[error("No ready signal is received. {0}")] + NotReady(&'static str), + #[error("invalid endpoint json config.")] InvalidEndpointConfig, @@ -36,13 +45,34 @@ pub enum Error { #[error(transparent)] MessageEncodeError(#[from] prost::EncodeError), + #[error("Missing required image inspect file. Specify it via --image-inspect-json-path in command line.")] + MissingImageInspectFile, + + #[error(transparent)] + NetworkTunnelError(#[from] network_tunnel::errors::Error), + #[error(transparent)] - NetworkProxyError(#[from] network_proxy::errors::Error), + TempfilePersistError(#[from] tempfile::PersistError), #[error("Tokio task execution error.")] TokioTaskExecutionError(#[from] tokio::task::JoinError), + + #[error("The operation of '{0}' is not expected for the given protocol.")] + UnexpectedOperation(String), +} +/*> +> +>>*/ + +pub fn raise_err(message: &str) -> Result { + Err(create_custom_error(message)) +} + +pub fn create_custom_error(message: &str) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, message) } +// TODO: refactor to remove or_bail usages pub trait Must { fn or_bail(self) -> T; } diff --git a/crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs b/crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs deleted file mode 100644 index 296d7953e2..0000000000 --- a/crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::apis::{ - FlowCaptureOperation, Interceptor, InterceptorStream, RequestResponseConverterPair, -}; -use crate::errors::Error; - -// A placeholder for real logic of airbyte connectors. Details might change during real-implementations. -pub struct AirbyteCaptureInterceptor {} - -impl AirbyteCaptureInterceptor { - fn convert_request( - _operation: &FlowCaptureOperation, - _in_stream: InterceptorStream, - ) -> Result { - panic!("TBD AirbyteCaptureInterceptor") - } - - fn convert_response( - _operation: &FlowCaptureOperation, - _in_stream: InterceptorStream, - ) -> Result { - panic!("TBD AirbyteCaptureInterceptor") - } -} - -impl Interceptor for AirbyteCaptureInterceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(Self::convert_request), - Box::new(Self::convert_response), - ) - } -} diff --git a/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs b/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs new file mode 100644 index 0000000000..283380fc6d --- /dev/null +++ b/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs @@ -0,0 +1,431 @@ +use crate::apis::{FlowCaptureOperation, InterceptorStream}; + +use crate::errors::{create_custom_error, raise_err, Error}; +use crate::libs::airbyte_catalog::{ + self, ConfiguredCatalog, ConfiguredStream, DestinationSyncMode, Range, ResourceSpec, Status, + SyncMode, +}; +use crate::libs::command::READY; +use crate::libs::json::{create_root_schema, tokenize_jsonpointer}; +use crate::libs::protobuf::encode_message; +use crate::libs::stream::{get_airbyte_response, get_decoded_message, stream_airbyte_responses}; + +use bytes::Bytes; +use protocol::capture::{ + discover_response, validate_response, DiscoverRequest, DiscoverResponse, Documents, + PullRequest, PullResponse, SpecRequest, SpecResponse, ValidateRequest, ValidateResponse, +}; +use protocol::flow::{DriverCheckpoint, Slice}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use validator::Validate; + +use futures::{stream, StreamExt, TryStreamExt}; +use json_pointer::JsonPointer; +use serde_json::value::RawValue; +use std::fs::File; +use std::io::Write; +use tempfile::{Builder, TempDir}; + +const CONFIG_FILE_NAME: &str = "config.json"; +const CATALOG_FILE_NAME: &str = "catalog.json"; +const STATE_FILE_NAME: &str = "state.json"; + +pub struct AirbyteSourceInterceptor { + validate_request: Arc>>, + stream_to_binding: Arc>>, + tmp_dir: TempDir, +} + +impl AirbyteSourceInterceptor { + pub fn new() -> Self { + AirbyteSourceInterceptor { + validate_request: Arc::new(Mutex::new(None)), + stream_to_binding: Arc::new(Mutex::new(HashMap::new())), + tmp_dir: Builder::new() + .prefix("airbyte-source-") + .tempdir_in("/var/tmp") + .expect("failed to create temp dir."), + } + } + + fn adapt_spec_request_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(stream::once(async { + get_decoded_message::(in_stream).await?; + Ok(Bytes::from(READY)) + })) + } + + fn adapt_spec_response_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(stream::once(async { + let message = get_airbyte_response(in_stream, |m| m.spec.is_some()).await?; + let spec = message.spec.unwrap(); + + let mut resp = SpecResponse::default(); + resp.endpoint_spec_schema_json = spec.connection_specification.to_string(); + resp.resource_spec_schema_json = + serde_json::to_string_pretty(&create_root_schema::())?; + if let Some(url) = spec.documentation_url { + resp.documentation_url = url; + } + encode_message(&resp) + })) + } + + fn adapt_discover_request( + &mut self, + config_file_path: String, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(stream::once(async { + let request = get_decoded_message::(in_stream).await?; + + File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?; + + Ok(Bytes::from(READY)) + })) + } + + fn adapt_discover_response_stream( + &mut self, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(stream::once(async { + let message = get_airbyte_response(in_stream, |m| m.catalog.is_some()).await?; + let catalog = message.catalog.unwrap(); + + let mut resp = DiscoverResponse::default(); + for stream in catalog.streams { + let has_incremental = stream + .supported_sync_modes + .map(|modes| modes.contains(&SyncMode::Incremental)) + .unwrap_or(false); + let mode = if has_incremental { + SyncMode::Incremental + } else { + SyncMode::FullRefresh + }; + let resource_spec = ResourceSpec { + stream: stream.name.clone(), + namespace: stream.namespace, + sync_mode: mode, + }; + + let key_ptrs = match stream.source_defined_primary_key { + None => Vec::new(), + // TODO: use doc::Pointer, and if necessary implement creation of new json pointers + // in that module. What about the existing tokenize_jsonpointer function? + Some(keys) => keys + .iter() + .map(|k| JsonPointer::new(k).to_string()) + .collect(), + }; + resp.bindings.push(discover_response::Binding { + recommended_name: stream.name.clone(), + resource_spec_json: serde_json::to_string(&resource_spec)?, + key_ptrs: key_ptrs, + document_schema_json: stream.json_schema.to_string(), + }) + } + + encode_message(&resp) + })) + } + + fn adapt_validate_request_stream( + &mut self, + config_file_path: String, + validate_request: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(stream::once(async move { + let request = get_decoded_message::(in_stream).await?; + *validate_request.lock().await = Some(request.clone()); + + File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?; + + Ok(Bytes::from(READY)) + })) + } + + fn adapt_validate_response_stream( + &mut self, + validate_request: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(stream::once(async move { + let message = + get_airbyte_response(in_stream, |m| m.connection_status.is_some()).await?; + + let connection_status = message.connection_status.unwrap(); + + if connection_status.status != Status::Succeeded { + return raise_err(&format!("validation failed {:?}", connection_status)); + } + + let req = validate_request.lock().await; + let req = req + .as_ref() + .ok_or(create_custom_error("missing validate request."))?; + let mut resp = ValidateResponse::default(); + for binding in &req.bindings { + let resource: ResourceSpec = serde_json::from_str(&binding.resource_spec_json)?; + resp.bindings.push(validate_response::Binding { + resource_path: vec![resource.stream], + }); + } + + encode_message(&resp) + })) + } + + fn adapt_pull_request_stream( + &mut self, + config_file_path: String, + catalog_file_path: String, + state_file_path: String, + stream_to_binding: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin( + stream::once(async move { + let mut request = get_decoded_message::(in_stream).await?; + if let Some(ref mut o) = request.open { + File::create(state_file_path)?.write_all(&o.driver_checkpoint_json)?; + + if let Some(ref mut c) = o.capture { + File::create(config_file_path)? + .write_all(&c.endpoint_spec_json.as_bytes())?; + + let mut catalog = ConfiguredCatalog { + streams: Vec::new(), + tail: o.tail, + range: Range { + begin: o.key_begin, + end: o.key_end, + }, + }; + + let mut stream_to_binding = stream_to_binding.lock().await; + + for (i, binding) in c.bindings.iter().enumerate() { + let resource: ResourceSpec = + serde_json::from_str(&binding.resource_spec_json)?; + stream_to_binding.insert(resource.stream.clone(), i); + + let mut projections = HashMap::new(); + if let Some(ref collection) = binding.collection { + for p in &collection.projections { + projections.insert(p.field.clone(), p.ptr.clone()); + } + + let primary_key: Vec> = collection + .key_ptrs + .iter() + .map(|ptr| tokenize_jsonpointer(ptr)) + .collect(); + catalog.streams.push(ConfiguredStream { + sync_mode: resource.sync_mode.clone(), + destination_sync_mode: DestinationSyncMode::Append, + cursor_field: None, + primary_key: Some(primary_key), + stream: airbyte_catalog::Stream { + name: resource.stream, + namespace: resource.namespace, + json_schema: RawValue::from_string( + collection.schema_json.clone(), + )?, + supported_sync_modes: Some(vec![resource + .sync_mode + .clone()]), + default_cursor_field: None, + source_defined_cursor: None, + source_defined_primary_key: None, + }, + projections: projections, + }); + } + } + + if let Err(e) = catalog.validate() { + raise_err(&format!("invalid config_catalog: {:?}", e))? + } + + serde_json::to_writer(File::create(catalog_file_path)?, &catalog)? + } + + // release the lock. + drop(stream_to_binding); + + Ok(Some(Bytes::from(READY))) + } else { + Ok(None) + } + }) + .try_filter_map(|item| futures::future::ready(Ok(item))), + ) + } + + fn adapt_pull_response_stream( + &mut self, + stream_to_binding: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + let airbyte_message_stream = Box::pin(stream_airbyte_responses(in_stream)); + + Box::pin(stream::try_unfold( + (false, stream_to_binding, airbyte_message_stream), + |(transaction_pending, stb, mut stream)| async move { + let message = match stream.next().await { + Some(m) => m?, + None => { + // transaction_pending is true if the connector writes output messages and exits _without_ writing + // a final state checkpoint. + if transaction_pending { + // We generate a synthetic commit now, and the empty checkpoint means the assumed behavior + // of the next invocation will be "full refresh". + let mut resp = PullResponse::default(); + resp.checkpoint = Some(DriverCheckpoint { + driver_checkpoint_json: Vec::new(), + rfc7396_merge_patch: false, + }); + return Ok(Some((encode_message(&resp)?, (false, stb, stream)))); + } else { + return Ok(None); + } + } + }; + + let mut resp = PullResponse::default(); + if let Some(state) = message.state { + resp.checkpoint = Some(DriverCheckpoint { + driver_checkpoint_json: state.data.get().as_bytes().to_vec(), + rfc7396_merge_patch: match state.merge { + Some(m) => m, + None => false, + }, + }); + + Ok(Some((encode_message(&resp)?, (false, stb, stream)))) + } else if let Some(record) = message.record { + let stream_to_binding = stb.lock().await; + let binding = + stream_to_binding + .get(&record.stream) + .ok_or(create_custom_error(&format!( + "connector record with unknown stream {}", + record.stream + )))?; + let arena = record.data.get().as_bytes().to_vec(); + let arena_len: u32 = arena.len() as u32; + resp.documents = Some(Documents { + binding: *binding as u32, + arena: arena, + docs_json: vec![Slice { + begin: 0, + end: arena_len, + }], + }); + drop(stream_to_binding); + Ok(Some((encode_message(&resp)?, (true, stb, stream)))) + } else { + raise_err("unexpected pull response.") + } + }, + )) + } + + fn input_file_path(&mut self, file_name: &str) -> String { + self.tmp_dir + .path() + .join(file_name) + .to_str() + .expect("failed construct config file name.") + .into() + } +} + +impl AirbyteSourceInterceptor { + pub fn adapt_command_args( + &mut self, + op: &FlowCaptureOperation, + args: Vec, + ) -> Result, Error> { + let config_file_path = self.input_file_path(CONFIG_FILE_NAME); + let catalog_file_path = self.input_file_path(CATALOG_FILE_NAME); + let state_file_path = self.input_file_path(STATE_FILE_NAME); + + let airbyte_args = match op { + FlowCaptureOperation::Spec => vec!["spec"], + FlowCaptureOperation::Discover => vec!["discover", "--config", &config_file_path], + FlowCaptureOperation::Validate => vec!["check", "--config", &config_file_path], + FlowCaptureOperation::Pull => { + vec![ + "read", + "--config", + &config_file_path, + "--catalog", + &catalog_file_path, + "--state", + &state_file_path, + ] + } + + _ => return Err(Error::UnexpectedOperation(op.to_string())), + }; + + let airbyte_args: Vec = airbyte_args.into_iter().map(Into::into).collect(); + Ok([args, airbyte_args].concat()) + } + + pub fn adapt_request_stream( + &mut self, + op: &FlowCaptureOperation, + in_stream: InterceptorStream, + ) -> Result { + let config_file_path = self.input_file_path(CONFIG_FILE_NAME); + let catalog_file_path = self.input_file_path(CATALOG_FILE_NAME); + let state_file_path = self.input_file_path(STATE_FILE_NAME); + + match op { + FlowCaptureOperation::Spec => Ok(self.adapt_spec_request_stream(in_stream)), + FlowCaptureOperation::Discover => { + Ok(self.adapt_discover_request(config_file_path, in_stream)) + } + FlowCaptureOperation::Validate => Ok(self.adapt_validate_request_stream( + config_file_path, + Arc::clone(&self.validate_request), + in_stream, + )), + FlowCaptureOperation::Pull => Ok(self.adapt_pull_request_stream( + config_file_path, + catalog_file_path, + state_file_path, + Arc::clone(&self.stream_to_binding), + in_stream, + )), + + _ => Err(Error::UnexpectedOperation(op.to_string())), + } + } + + pub fn adapt_response_stream( + &mut self, + op: &FlowCaptureOperation, + in_stream: InterceptorStream, + ) -> Result { + match op { + FlowCaptureOperation::Spec => Ok(self.adapt_spec_response_stream(in_stream)), + FlowCaptureOperation::Discover => Ok(self.adapt_discover_response_stream(in_stream)), + FlowCaptureOperation::Validate => { + Ok(self + .adapt_validate_response_stream(Arc::clone(&self.validate_request), in_stream)) + } + FlowCaptureOperation::Pull => { + Ok(self.adapt_pull_response_stream(Arc::clone(&self.stream_to_binding), in_stream)) + } + _ => Err(Error::UnexpectedOperation(op.to_string())), + } + } +} diff --git a/crates/connector_proxy/src/interceptors/default_interceptors.rs b/crates/connector_proxy/src/interceptors/default_interceptors.rs deleted file mode 100644 index 979c4dc64c..0000000000 --- a/crates/connector_proxy/src/interceptors/default_interceptors.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::apis::{FlowCaptureOperation, FlowMaterializeOperation, Interceptor}; -pub struct DefaultFlowCaptureInterceptor {} -impl Interceptor for DefaultFlowCaptureInterceptor {} - -pub struct DefaultFlowMaterializeInterceptor {} -impl Interceptor for DefaultFlowMaterializeInterceptor {} diff --git a/crates/connector_proxy/src/interceptors/mod.rs b/crates/connector_proxy/src/interceptors/mod.rs index 6323473bad..e8edba8231 100644 --- a/crates/connector_proxy/src/interceptors/mod.rs +++ b/crates/connector_proxy/src/interceptors/mod.rs @@ -1,4 +1,3 @@ -pub mod airbyte_capture_interceptor; -pub mod default_interceptors; -pub mod network_proxy_capture_interceptor; -pub mod network_proxy_materialize_interceptor; +pub mod airbyte_source_interceptor; +pub mod network_tunnel_capture_interceptor; +pub mod network_tunnel_materialize_interceptor; diff --git a/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs b/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs deleted file mode 100644 index 8792435bd8..0000000000 --- a/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::apis::{ - FlowCaptureOperation, Interceptor, InterceptorStream, RequestResponseConverterPair, -}; -use crate::errors::{Error, Must}; -use crate::libs::network_proxy::NetworkProxy; -use crate::libs::protobuf::{decode_message, encode_message}; -use crate::libs::stream::stream_all_bytes; -use protocol::capture::{ - ApplyRequest, DiscoverRequest, PullRequest, SpecResponse, ValidateRequest, -}; - -use async_stream::stream; -use futures_util::pin_mut; -use futures_util::StreamExt; -use serde_json::value::RawValue; -use tokio_util::io::StreamReader; - -pub struct NetworkProxyCaptureInterceptor {} - -impl NetworkProxyCaptureInterceptor { - fn convert_discover_request(in_stream: InterceptorStream) -> InterceptorStream { - Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); - request.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( - RawValue::from_string(request.endpoint_spec_json)?, - ).await.or_bail().to_string(); - yield encode_message(&request); - }) - } - - fn convert_validate_request(in_stream: InterceptorStream) -> InterceptorStream { - Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); - request.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( - RawValue::from_string(request.endpoint_spec_json)?, - ).await.or_bail().to_string(); - yield encode_message(&request); - }) - } - - fn convert_apply_request(in_stream: InterceptorStream) -> InterceptorStream { - Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); - if let Some(ref mut c) = request.capture { - c.endpoint_spec_json = - NetworkProxy::consume_network_proxy_config( - RawValue::from_string(c.endpoint_spec_json.clone())?, - ).await.or_bail().to_string(); - } - yield encode_message(&request); - }) - } - - fn convert_pull_request(in_stream: InterceptorStream) -> InterceptorStream { - Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); - if let Some(ref mut o) = request.open { - if let Some(ref mut c) = o.capture { - c.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( - RawValue::from_string(c.endpoint_spec_json.clone())?, - ).await.or_bail().to_string(); - } - } - yield encode_message(&request); - // deliver the rest messages in the stream. - let s = stream_all_bytes(reader); - pin_mut!(s); - while let Some(value) = s.next().await { - yield value; - } - }) - } - - fn convert_request( - operation: &FlowCaptureOperation, - in_stream: InterceptorStream, - ) -> Result { - Ok(match operation { - FlowCaptureOperation::Discover => Self::convert_discover_request(in_stream), - FlowCaptureOperation::Validate => Self::convert_validate_request(in_stream), - FlowCaptureOperation::ApplyUpsert | FlowCaptureOperation::ApplyDelete => { - Self::convert_apply_request(in_stream) - } - FlowCaptureOperation::Pull => Self::convert_pull_request(in_stream), - _ => in_stream, - }) - } - - fn convert_response( - operation: &FlowCaptureOperation, - in_stream: InterceptorStream, - ) -> Result { - Ok(match operation { - FlowCaptureOperation::Spec => Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut response = decode_message::(&mut reader).await.or_bail().expect("No expected response received."); - response.endpoint_spec_schema_json = NetworkProxy::extend_endpoint_schema( - RawValue::from_string(response.endpoint_spec_schema_json)?, - ).or_bail().to_string(); - yield encode_message(&response); - }), - _ => in_stream, - }) - } -} - -impl Interceptor for NetworkProxyCaptureInterceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(Self::convert_request), - Box::new(Self::convert_response), - ) - } -} diff --git a/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs b/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs deleted file mode 100644 index c916dc4409..0000000000 --- a/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs +++ /dev/null @@ -1,107 +0,0 @@ -use crate::apis::{ - FlowMaterializeOperation, Interceptor, InterceptorStream, RequestResponseConverterPair, -}; -use crate::errors::{Error, Must}; -use crate::libs::network_proxy::NetworkProxy; -use crate::libs::protobuf::{decode_message, encode_message}; -use crate::libs::stream::stream_all_bytes; - -use protocol::materialize::{ApplyRequest, SpecResponse, TransactionRequest, ValidateRequest}; - -use async_stream::stream; -use futures_util::pin_mut; -use futures_util::StreamExt; -use serde_json::value::RawValue; -use tokio_util::io::StreamReader; - -pub struct NetworkProxyMaterializeInterceptor {} - -impl NetworkProxyMaterializeInterceptor { - fn convert_spec_request(in_stream: InterceptorStream) -> InterceptorStream { - Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); - request.endpoint_spec_json = - NetworkProxy::consume_network_proxy_config(RawValue::from_string(request.endpoint_spec_json)?) - .await - .expect("failed to start network proxy") - .to_string(); - yield encode_message(&request); - }) - } - - fn convert_apply_request(in_stream: InterceptorStream) -> InterceptorStream { - Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); - if let Some(ref mut m) = request.materialization { - m.endpoint_spec_json = - NetworkProxy::consume_network_proxy_config( - RawValue::from_string(m.endpoint_spec_json.clone())?, - ).await.or_bail().to_string(); - } - yield encode_message(&request); - }) - } - - fn convert_transactions_request(in_stream: InterceptorStream) -> InterceptorStream { - Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); - if let Some(ref mut o) = request.open { - if let Some(ref mut m) = o.materialization { - m.endpoint_spec_json = NetworkProxy::consume_network_proxy_config( - RawValue::from_string(m.endpoint_spec_json.clone())?, - ).await.or_bail().to_string(); - } - } - yield encode_message(&request); - // deliver the remaining messages in the stream. - let s = stream_all_bytes(reader); - pin_mut!(s); - while let Some(bytes) = s.next().await { - yield bytes; - } - }) - } - - fn convert_request( - operation: &FlowMaterializeOperation, - in_stream: InterceptorStream, - ) -> Result { - Ok(match operation { - FlowMaterializeOperation::Validate => Self::convert_spec_request(in_stream), - FlowMaterializeOperation::ApplyUpsert | FlowMaterializeOperation::ApplyDelete => { - Self::convert_apply_request(in_stream) - } - FlowMaterializeOperation::Transactions => Self::convert_transactions_request(in_stream), - _ => in_stream, - }) - } - - fn convert_response( - operation: &FlowMaterializeOperation, - in_stream: InterceptorStream, - ) -> Result { - Ok(match operation { - FlowMaterializeOperation::Spec => Box::pin(stream! { - let mut reader = StreamReader::new(in_stream); - let mut response = decode_message::(&mut reader).await.or_bail().expect("expected response is not received."); - response.endpoint_spec_schema_json = NetworkProxy::extend_endpoint_schema( - RawValue::from_string(response.endpoint_spec_schema_json)?, - ).or_bail().to_string(); - yield encode_message(&response); - }), - _ => in_stream, - }) - } -} - -impl Interceptor for NetworkProxyMaterializeInterceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(Self::convert_request), - Box::new(Self::convert_response), - ) - } -} diff --git a/crates/connector_proxy/src/interceptors/network_tunnel_capture_interceptor.rs b/crates/connector_proxy/src/interceptors/network_tunnel_capture_interceptor.rs new file mode 100644 index 0000000000..2d05e61b72 --- /dev/null +++ b/crates/connector_proxy/src/interceptors/network_tunnel_capture_interceptor.rs @@ -0,0 +1,127 @@ +use crate::apis::{FlowCaptureOperation, InterceptorStream}; +use crate::errors::{Error, Must}; +use crate::libs::network_tunnel::NetworkTunnel; +use crate::libs::protobuf::{decode_message, encode_message}; +use crate::libs::stream::get_decoded_message; +use futures::{future, stream, StreamExt, TryStreamExt}; +use protocol::capture::{ + ApplyRequest, DiscoverRequest, PullRequest, SpecResponse, ValidateRequest, +}; + +use serde_json::value::RawValue; +use tokio_util::io::{ReaderStream, StreamReader}; + +pub struct NetworkTunnelCaptureInterceptor {} + +impl NetworkTunnelCaptureInterceptor { + fn adapt_discover_request_stream(in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(stream::once(async { + let mut request = get_decoded_message::(in_stream).await?; + + request.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( + RawValue::from_string(request.endpoint_spec_json)?, + ) + .await + .or_bail() + .to_string(); + + encode_message(&request) + })) + } + + fn adapt_validate_request_stream(in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(stream::once(async { + let mut request = get_decoded_message::(in_stream).await?; + + request.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( + RawValue::from_string(request.endpoint_spec_json)?, + ) + .await + .or_bail() + .to_string(); + + encode_message(&request) + })) + } + + fn adapt_apply_request(in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(stream::once(async { + let mut request = get_decoded_message::(in_stream).await?; + + if let Some(ref mut c) = request.capture { + c.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( + RawValue::from_string(c.endpoint_spec_json.clone())?, + ) + .await + .or_bail() + .to_string(); + } + + encode_message(&request) + })) + } + + fn adapt_pull_request_stream(in_stream: InterceptorStream) -> InterceptorStream { + Box::pin( + stream::once(async { + let mut reader = StreamReader::new(in_stream); + let mut request = decode_message::(&mut reader) + .await + .or_bail() + .expect("expected request is not received."); + if let Some(ref mut o) = request.open { + if let Some(ref mut c) = o.capture { + c.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( + RawValue::from_string(c.endpoint_spec_json.clone())?, + ) + .await + .or_bail() + .to_string(); + } + } + + let first = stream::once(future::ready(encode_message(&request))); + let rest = ReaderStream::new(reader); + + // We need to set explicit error type, see https://github.com/rust-lang/rust/issues/63502 + Ok::<_, std::io::Error>(first.chain(rest)) + }) + .try_flatten(), + ) + } +} + +impl NetworkTunnelCaptureInterceptor { + pub fn adapt_request_stream( + op: &FlowCaptureOperation, + in_stream: InterceptorStream, + ) -> Result { + Ok(match op { + FlowCaptureOperation::Discover => Self::adapt_discover_request_stream(in_stream), + FlowCaptureOperation::Validate => Self::adapt_validate_request_stream(in_stream), + FlowCaptureOperation::ApplyUpsert | FlowCaptureOperation::ApplyDelete => { + Self::adapt_apply_request(in_stream) + } + FlowCaptureOperation::Pull => Self::adapt_pull_request_stream(in_stream), + _ => in_stream, + }) + } + + pub fn adapt_response_stream( + op: &FlowCaptureOperation, + in_stream: InterceptorStream, + ) -> Result { + Ok(match op { + FlowCaptureOperation::Spec => Box::pin(stream::once(async move { + let mut response = get_decoded_message::(in_stream).await?; + response.endpoint_spec_schema_json = NetworkTunnel::extend_endpoint_schema( + RawValue::from_string(response.endpoint_spec_schema_json)?, + ) + .or_bail() + .to_string(); + encode_message(&response) + })), + _ => in_stream, + }) + } +} diff --git a/crates/connector_proxy/src/interceptors/network_tunnel_materialize_interceptor.rs b/crates/connector_proxy/src/interceptors/network_tunnel_materialize_interceptor.rs new file mode 100644 index 0000000000..13ba1f3272 --- /dev/null +++ b/crates/connector_proxy/src/interceptors/network_tunnel_materialize_interceptor.rs @@ -0,0 +1,109 @@ +use crate::apis::{FlowMaterializeOperation, InterceptorStream}; +use crate::errors::{Error, Must}; +use crate::libs::network_tunnel::NetworkTunnel; +use crate::libs::protobuf::{decode_message, encode_message}; +use crate::libs::stream::get_decoded_message; + +use futures::{future, stream, StreamExt, TryStreamExt}; +use protocol::materialize::{ApplyRequest, SpecResponse, TransactionRequest, ValidateRequest}; + +use serde_json::value::RawValue; +use tokio_util::io::{ReaderStream, StreamReader}; + +pub struct NetworkTunnelMaterializeInterceptor {} + +impl NetworkTunnelMaterializeInterceptor { + fn adapt_spec_request(in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(stream::once(async { + let mut request = get_decoded_message::(in_stream).await?; + + request.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( + RawValue::from_string(request.endpoint_spec_json)?, + ) + .await + .expect("failed to start network tunnel") + .to_string(); + encode_message(&request) + })) + } + + fn adapt_apply_request(in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(stream::once(async { + let mut request = get_decoded_message::(in_stream).await?; + + if let Some(ref mut m) = request.materialization { + m.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( + RawValue::from_string(m.endpoint_spec_json.clone())?, + ) + .await + .or_bail() + .to_string(); + } + + encode_message(&request) + })) + } + + fn adapt_transactions_request(in_stream: InterceptorStream) -> InterceptorStream { + Box::pin( + stream::once(async { + let mut reader = StreamReader::new(in_stream); + let mut request = decode_message::(&mut reader) + .await + .or_bail() + .expect("expected request is not received."); + if let Some(ref mut o) = request.open { + if let Some(ref mut m) = o.materialization { + m.endpoint_spec_json = NetworkTunnel::consume_network_tunnel_config( + RawValue::from_string(m.endpoint_spec_json.clone())?, + ) + .await + .or_bail() + .to_string(); + } + } + let first = stream::once(future::ready(encode_message(&request))); + let rest = ReaderStream::new(reader); + + // We need to set explicit error type, see https://github.com/rust-lang/rust/issues/63502 + Ok::<_, std::io::Error>(first.chain(rest)) + }) + .try_flatten(), + ) + } +} + +impl NetworkTunnelMaterializeInterceptor { + pub fn adapt_request_stream( + op: &FlowMaterializeOperation, + in_stream: InterceptorStream, + ) -> Result { + Ok(match op { + FlowMaterializeOperation::Validate => Self::adapt_spec_request(in_stream), + FlowMaterializeOperation::ApplyUpsert | FlowMaterializeOperation::ApplyDelete => { + Self::adapt_apply_request(in_stream) + } + FlowMaterializeOperation::Transactions => Self::adapt_transactions_request(in_stream), + _ => in_stream, + }) + } + + pub fn adapt_response_stream( + op: &FlowMaterializeOperation, + in_stream: InterceptorStream, + ) -> Result { + Ok(match op { + FlowMaterializeOperation::Spec => Box::pin(stream::once(async { + let mut response = get_decoded_message::(in_stream).await?; + + response.endpoint_spec_schema_json = NetworkTunnel::extend_endpoint_schema( + RawValue::from_string(response.endpoint_spec_schema_json)?, + ) + .or_bail() + .to_string(); + encode_message(&response) + })), + _ => in_stream, + }) + } +} diff --git a/crates/connector_proxy/src/libs/airbyte_catalog.rs b/crates/connector_proxy/src/libs/airbyte_catalog.rs new file mode 100644 index 0000000000..657cf7c044 --- /dev/null +++ b/crates/connector_proxy/src/libs/airbyte_catalog.rs @@ -0,0 +1,247 @@ +use std::collections::HashMap; + +use schemars::JsonSchema; +use serde::ser::{SerializeStruct, Serializer}; +use serde::{Deserialize, Serialize}; +use serde_json::value::RawValue; +use validator::{Validate, ValidationError}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum SyncMode { + Incremental, + FullRefresh, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "snake_case")] +pub struct Stream { + pub name: String, + pub json_schema: Box, + // supported_sync_modes is planned to be made required soon + // see https://is.gd/RqAhTO + #[validate(length(min = 1))] + pub supported_sync_modes: Option>, + pub source_defined_cursor: Option, + pub default_cursor_field: Option>, + pub source_defined_primary_key: Option>>, + pub namespace: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub enum DestinationSyncMode { + Append, + Overwrite, + AppendDedup, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "snake_case")] +#[validate(schema(function = "Self::validate_configured_stream"))] +pub struct ConfiguredStream { + #[validate] + pub stream: Stream, + pub sync_mode: SyncMode, + pub destination_sync_mode: DestinationSyncMode, + pub cursor_field: Option>, + pub primary_key: Option>>, + + #[serde(alias = "estuary.dev/projections")] + pub projections: HashMap, +} +impl ConfiguredStream { + fn validate_configured_stream(&self) -> Result<(), ValidationError> { + if self + .stream + .supported_sync_modes + .as_ref() + .map(|modes| modes.contains(&self.sync_mode)) + .unwrap_or(false) + { + Ok(()) + } else { + Err(ValidationError::new( + "sync_mode is not in the supported list.", + )) + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +pub struct Catalog { + #[serde(rename = "streams")] + #[validate] + pub streams: Vec, +} + +#[derive(Debug, Deserialize, Clone, Validate, PartialEq)] +#[validate(schema(function = "Self::validate_range"))] +pub struct Range { + pub begin: u32, + pub end: u32, +} + +impl Range { + fn validate_range(&self) -> Result<(), ValidationError> { + if self.begin <= self.end { + Ok(()) + } else { + Err(ValidationError::new("expected Begin <= End")) + } + } +} + +impl Serialize for Range { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("Range", 2)?; + state.serialize_field("begin", &format!("{:x}", self.begin))?; + state.serialize_field("end", &format!("{:x}", self.end))?; + state.end() + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "snake_case")] +pub struct ConfiguredCatalog { + #[serde(rename = "streams")] + #[validate(length(min = 1))] + #[validate] + pub streams: Vec, + + #[serde(alias = "estuary.dev/tail")] + pub tail: bool, + + #[serde(alias = "estuary.dev/range")] + #[validate] + pub range: Range, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum Status { + Succeeded, + Failed, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(rename_all = "snake_case")] +pub struct ConnectionStatus { + pub status: Status, + pub message: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub struct Record { + pub stream: String, + pub data: Box, + pub emitted_at: i64, + pub namespace: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, + Fatal, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Log { + pub level: LogLevel, + pub message: String, +} +impl Log { + pub fn log(&self) { + match self.level { + LogLevel::Trace => tracing::trace!(?self.message), + LogLevel::Debug => tracing::debug!(?self.message), + LogLevel::Info => tracing::info!(?self.message), + LogLevel::Warn => tracing::warn!(?self.message), + LogLevel::Error | LogLevel::Fatal => tracing::error!(?self.message), + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub struct State { + // Data is the actual state associated with the ingestion. This must be a JSON _Object_ in order + // to comply with the airbyte specification. + pub data: Box, + + // Merge indicates that Data is an RFC 7396 JSON Merge Patch, and should + // be be reduced into the previous state accordingly. + #[serde(alias = "estuary.dev/merge")] + pub merge: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Spec { + pub documentation_url: Option, + pub changelog_url: Option, + pub connection_specification: Box, + pub supports_incremental: Option, + + // SupportedDestinationSyncModes is ignored by Flow + pub supported_destination_sync_modes: Option>, + // SupportsNormalization is not currently used or supported by Flow or estuary-developed + // connectors + pub supports_normalization: Option, + // SupportsDBT is not currently used or supported by Flow or estuary-developed connectors + #[serde(rename = "supportsDBT")] + pub supports_dbt: Option, + + // AuthSpecification is not currently used or supported by Flow or estuary-developed + // connectors, and it is deprecated in the airbyte spec. + pub auth_specification: Option>, + // AdvancedAuth is not currently used or supported by Flow or estuary-developed + // connectors. + pub advanced_auth: Option>, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum MessageType { + Record, + State, + Log, + Spec, + ConnectionStatus, + Catalog, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "camelCase")] +pub struct Message { + #[serde(rename = "type")] + pub message_type: MessageType, + + pub log: Option, + pub state: Option, + pub record: Option, + pub connection_status: Option, + pub spec: Option, + #[validate] + pub catalog: Option, +} + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] +#[serde(rename_all = "camelCase")] +// ResourceSpec is the configuration for Airbyte source streams. +pub struct ResourceSpec { + pub stream: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option, + pub sync_mode: SyncMode, +} diff --git a/crates/connector_proxy/src/libs/command.rs b/crates/connector_proxy/src/libs/command.rs index 60844e10df..7258f7b85d 100644 --- a/crates/connector_proxy/src/libs/command.rs +++ b/crates/connector_proxy/src/libs/command.rs @@ -1,21 +1,28 @@ use crate::errors::Error; +use serde::{Deserialize, Serialize}; use std::process::{ExitStatus, Stdio}; -use tokio::process::{Child, Command}; +use tempfile::NamedTempFile; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; +use tokio::time::timeout; -// Start the proxied connector as a process. -pub fn invoke_connector(entrypoint: String, args: &[String]) -> Result { - Command::new(entrypoint) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .args(args) - .spawn() - .map_err(|e| e.into()) +pub const READY: &[u8] = "READY".as_bytes(); + +// Start the connector directly. +pub fn invoke_connector_direct(entrypoint: String, args: Vec) -> Result { + invoke_connector( + Stdio::piped(), + Stdio::piped(), + Stdio::piped(), + &entrypoint, + &args, + ) } -// Replace this function after `exit_status_error` is stable. https://github.com/rust-lang/rust/issues/84908 -pub fn check_exit_status(result: std::io::Result) -> Result<(), Error> { +// Check the connector execution exit status. +// TODO: replace this function after `exit_status_error` is stable. https://github.com/rust-lang/rust/issues/84908 +pub fn check_exit_status(message: &str, result: std::io::Result) -> Result<(), Error> { match result { Ok(status) => { if status.success() { @@ -23,15 +30,110 @@ pub fn check_exit_status(result: std::io::Result) -> Result<(), Erro } else { match status.code() { Some(code) => Err(Error::CommandExecutionError(format!( - "failed with code {}.", - code + "{} failed with code {}.", + message, code + ))), + None => Err(Error::CommandExecutionError(format!( + "{} process terminated by signal", + message ))), - None => Err(Error::CommandExecutionError( - "process terminated by signal".to_string(), - )), } } } Err(e) => Err(e.into()), } } + +// For storing the entrypoint and args to start a delayed connector. +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub struct CommandConfig { + pub entrypoint: String, + pub args: Vec, +} +// Instead of starting the connector directly, `invoke_connector_delayed` starts a bouncer process first, which will +// start the real connector after reading a "READY" string from Stdin. Two actions are involved, +// The caller of `invoke_connector_delayed` is responsible of sending "READY" to the Stdin of the returned Child process, +// before sending anything else. +pub async fn invoke_connector_delayed( + entrypoint: String, + args: Vec, +) -> Result { + tracing::info!("invoke delayed connector {}, {:?}", entrypoint, args); + + // Saves the configs to start the connector. + let command_config = CommandConfig { + entrypoint: entrypoint, + args: args, + }; + let config_file = NamedTempFile::new()?; + serde_json::to_writer(&config_file, &command_config)?; + let (_, config_file_path) = config_file.keep()?; + let config_file_path = config_file_path + .to_str() + .expect("config file path conversion failed."); + + // Prepares and starts the bouncer process. + let bouncer_process_entrypoint = std::env::current_exe()?; + let bouncer_process_entrypoint = bouncer_process_entrypoint + .to_str() + .expect("unexpected binary path"); + + invoke_connector( + Stdio::piped(), + Stdio::piped(), + Stdio::piped(), + bouncer_process_entrypoint, + &vec!["delayed-execute".to_string(), config_file_path.to_string()], + ) +} + +pub async fn read_ready(reader: &mut R) -> Result<(), Error> { + let mut ready_buf: Vec = vec![0; READY.len()]; + match timeout( + std::time::Duration::from_secs(1), + reader.read_exact(&mut ready_buf), + ) + .await + { + Ok(_) => { + if &ready_buf == READY { + Ok(()) + } else { + Err(Error::NotReady("received unexpected bytes.")) + } + } + Err(_) => Err(Error::NotReady( + "timeout: reading from delayed-connector process wrapper.", + )), + } +} + +// A more flexible API for starting the connector. +pub fn invoke_connector( + stdin: Stdio, + stdout: Stdio, + stderr: Stdio, + entrypoint: &str, + args: &[String], +) -> Result { + tracing::info!("invoke connector {}, {:?}", entrypoint, args); + + Command::new(entrypoint) + .stdin(stdin) + .stdout(stdout) + .stderr(stderr) + .args(args) + .spawn() + .map_err(|e| e.into()) +} + +pub fn parse_child( + mut child: Child, +) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Error> { + let stdout = child.stdout.take().ok_or(Error::MissingIOPipe)?; + let stdin = child.stdin.take().ok_or(Error::MissingIOPipe)?; + let stderr = child.stderr.take().ok_or(Error::MissingIOPipe)?; + + Ok((child, stdin, stdout, stderr)) +} diff --git a/crates/connector_proxy/src/libs/image_config.rs b/crates/connector_proxy/src/libs/image_config.rs deleted file mode 100644 index 6d6ea26de9..0000000000 --- a/crates/connector_proxy/src/libs/image_config.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::errors::{Error, Must}; -use clap::ArgEnum; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fs::File; -use std::io::BufReader; - -// The key of the docker image label that indicates the connector protocol. -const CONNECTOR_PROTOCOL_KEY: &str = "CONNECTOR_PROTOCOL"; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "PascalCase")] -pub struct ImageConfig { - pub entrypoint: Vec, - pub labels: Option>, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "PascalCase")] -pub struct ImageInspect { - pub config: ImageConfig, -} - -impl ImageConfig { - pub fn parse_from_json_file(path: String) -> Result { - let reader = BufReader::new(File::open(path)?); - let image_inspects: Vec = serde_json::from_reader(reader)?; - match image_inspects.len() { - 1 => Ok(image_inspects[0].config.clone()), - _ => Err(Error::InvalidImageInspectFile), - } - } - - pub fn get_entrypoint(&self, default: Vec) -> Vec { - match self.entrypoint.len() { - 0 => { - tracing::warn!( - "No entry point is specified in the image, using default: {:?}", - default - ); - default - } - _ => self.entrypoint.clone(), - } - } - - pub fn get_connector_protocol(&self, default: T) -> T { - if let Some(ref labels) = self.labels { - if let Some(value) = labels.get(CONNECTOR_PROTOCOL_KEY) { - return T::from_str(&value, false).or_bail(); - } - } - tracing::warn!( - "No connector protocol is specified in the image, using default: {:?}", - default - ); - default - } -} diff --git a/crates/connector_proxy/src/libs/image_inspect.rs b/crates/connector_proxy/src/libs/image_inspect.rs new file mode 100644 index 0000000000..0004dbbad2 --- /dev/null +++ b/crates/connector_proxy/src/libs/image_inspect.rs @@ -0,0 +1,90 @@ +use crate::apis::FlowRuntimeProtocol; +use crate::errors::{Error, Must}; +use clap::ArgEnum; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs::File; +use std::io::BufReader; + +// The key of the docker image label that indicates the connector protocol. +const FLOW_RUNTIME_PROTOCOL_KEY: &str = "FLOW_RUNTIME_PROTOCOL"; +const CONNECTOR_PROTOCOL_KEY: &str = "CONNECTOR_PROTOCOL"; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ImageConfig { + pub entrypoint: Vec, + pub labels: Option>, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ImageInspect { + pub config: ImageConfig, + pub repo_tags: Option>, +} + +impl ImageInspect { + pub fn parse_from_json_file(path: Option) -> Result { + if path.is_none() {} + match path { + None => { + return Err(Error::MissingImageInspectFile); + } + Some(p) => { + let reader = BufReader::new(File::open(p)?); + let image_inspects: Vec = serde_json::from_reader(reader)?; + match image_inspects.len() { + 1 => Ok(image_inspects[0].clone()), + _ => Err(Error::InvalidImageInspectFile), + } + } + } + } + + pub fn get_entrypoint(&self, default: Vec) -> Vec { + match self.config.entrypoint.len() { + 0 => { + tracing::warn!( + "No entry point is specified in the image, using default: {:?}", + default + ); + default + } + _ => self.config.entrypoint.clone(), + } + } + + pub fn infer_runtime_protocol(&self) -> FlowRuntimeProtocol { + if let Some(ref labels) = self.config.labels { + if let Some(value) = labels.get(FLOW_RUNTIME_PROTOCOL_KEY) { + return FlowRuntimeProtocol::from_str(&value, false).or_bail(); + } + } + + // TODO: change this to allow arbitrary docker images to be recognized + // as a materialization + if let Some(repo_tags) = &self.repo_tags { + for tag in repo_tags { + if tag.starts_with("ghcr.io/estuary/materialize-") { + return FlowRuntimeProtocol::Materialize; + } + } + } + + return FlowRuntimeProtocol::Capture; + } + + pub fn get_connector_protocol(&self, default: T) -> T { + if let Some(ref labels) = self.config.labels { + if let Some(value) = labels.get(CONNECTOR_PROTOCOL_KEY) { + return T::from_str(&value, false).or_bail(); + } + } + tracing::warn!( + "No connector protocol is specified in the image, using default: {:?}", + default + ); + default + } +} diff --git a/crates/connector_proxy/src/libs/json.rs b/crates/connector_proxy/src/libs/json.rs index e09064459b..dbe8f42f6d 100644 --- a/crates/connector_proxy/src/libs/json.rs +++ b/crates/connector_proxy/src/libs/json.rs @@ -1,3 +1,4 @@ +use doc::ptr::{Pointer, Token}; use schemars::{schema::RootSchema, JsonSchema}; use serde_json::Value; @@ -19,3 +20,30 @@ pub fn remove_subobject(mut v: Value, key: &str) -> (Option, Value) { (sub_object, v) } + +pub fn tokenize_jsonpointer(ptr: &str) -> Vec { + Pointer::from_str(&ptr) + .iter() + .map(|t| match t { + // Keep the index and next index for now. Could adjust based on usecases. + Token::Index(ind) => ind.to_string(), + Token::Property(prop) => prop.to_string(), + Token::NextIndex => "-".to_string(), + }) + .collect() +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_tokenize_jsonpointer() { + let expected: Vec = vec!["p1", "p2", "56", "p3", "-"] + .iter() + .map(|s| s.to_string()) + .collect(); + + assert!(expected == tokenize_jsonpointer("/p1/p2/56/p3/-")); + } +} diff --git a/crates/connector_proxy/src/libs/mod.rs b/crates/connector_proxy/src/libs/mod.rs index 80519d2488..870add588a 100644 --- a/crates/connector_proxy/src/libs/mod.rs +++ b/crates/connector_proxy/src/libs/mod.rs @@ -1,6 +1,7 @@ +pub mod airbyte_catalog; pub mod command; -pub mod image_config; +pub mod image_inspect; pub mod json; -pub mod network_proxy; +pub mod network_tunnel; pub mod protobuf; pub mod stream; diff --git a/crates/connector_proxy/src/libs/network_proxy.rs b/crates/connector_proxy/src/libs/network_tunnel.rs similarity index 56% rename from crates/connector_proxy/src/libs/network_proxy.rs rename to crates/connector_proxy/src/libs/network_tunnel.rs index bdbc6d1050..f0ddab9de2 100644 --- a/crates/connector_proxy/src/libs/network_proxy.rs +++ b/crates/connector_proxy/src/libs/network_tunnel.rs @@ -1,29 +1,29 @@ use crate::errors::Error; use crate::libs::json::{create_root_schema, remove_subobject}; -use network_proxy::interface::NetworkProxyConfig; +use network_tunnel::interface::NetworkTunnelConfig; use schemars::schema::{RootSchema, Schema}; use serde_json::value::RawValue; use tokio::sync::oneshot::{self, Receiver}; use tokio::time::timeout; -pub struct NetworkProxy {} -pub const NETWORK_PROXY_KEY: &str = "networkProxy"; +pub struct NetworkTunnel {} +pub const NETWORK_TUNNEL_KEY: &str = "networkTunnel"; -impl NetworkProxy { +impl NetworkTunnel { pub fn extend_endpoint_schema( endpoint_spec_schema: Box, ) -> Result, Error> { - let network_proxy_schema = create_root_schema::(); + let network_tunnel_schema = create_root_schema::(); let mut modified_schema: RootSchema = serde_json::from_str(endpoint_spec_schema.get())?; if let Some(ref mut o) = &mut modified_schema.schema.object { - if o.as_ref().properties.contains_key(NETWORK_PROXY_KEY) { - return Err(Error::DuplicatedKeyError(NETWORK_PROXY_KEY)); + if o.as_ref().properties.contains_key(NETWORK_TUNNEL_KEY) { + return Err(Error::DuplicatedKeyError(NETWORK_TUNNEL_KEY)); } o.as_mut().properties.insert( - NETWORK_PROXY_KEY.to_string(), - Schema::Object(network_proxy_schema.schema), + NETWORK_TUNNEL_KEY.to_string(), + Schema::Object(network_tunnel_schema.schema), ); } @@ -31,24 +31,24 @@ impl NetworkProxy { RawValue::from_string(json).map_err(Into::into) } - // Start the network proxy. The receiver rx will be dropped to indicate the network proxy + // Start the network tunnel. The receiver rx will be dropped to indicate the network tunnel // is ready to accept requests. - async fn start_network_proxy( - config: NetworkProxyConfig, + async fn start_network_tunnel( + config: NetworkTunnelConfig, rx: Receiver<()>, ) -> Result<(), Error> { - let mut network_proxy = config.new_proxy(); + let mut network_tunnel = config.new_tunnel(); tokio::task::spawn(async move { - let result: Result<(), Error> = match network_proxy.prepare().await { + let result: Result<(), Error> = match network_tunnel.prepare().await { Ok(()) => { drop(rx); - network_proxy.start_serve().await.map_err(Into::into) + network_tunnel.start_serve().await.map_err(Into::into) } Err(e) => Err(e.into()), }; if let Err(ref err) = result { - tracing::error!(error=?err, "failed starting network proxy."); + tracing::error!(error=?err, "failed starting network tunnel."); std::process::exit(1); } }) @@ -57,7 +57,7 @@ impl NetworkProxy { Ok(()) } - pub async fn consume_network_proxy_config( + pub async fn consume_network_tunnel_config( endpoint_spec_json: Box, ) -> Result, Error> { if endpoint_spec_json.get().is_empty() { @@ -65,25 +65,25 @@ impl NetworkProxy { } let endpoint_spec = serde_json::from_str(endpoint_spec_json.get())?; - let (network_proxy_config, endpoint_spec) = - remove_subobject(endpoint_spec, NETWORK_PROXY_KEY); + let (network_tunnel_config, endpoint_spec) = + remove_subobject(endpoint_spec, NETWORK_TUNNEL_KEY); - let network_proxy_config: NetworkProxyConfig = match network_proxy_config { + let network_tunnel_config: NetworkTunnelConfig = match network_tunnel_config { None => return Ok(endpoint_spec_json), Some(c) => serde_json::from_value(c)?, }; let (mut tx, rx) = oneshot::channel(); - tokio::spawn(Self::start_network_proxy(network_proxy_config, rx)); + tokio::spawn(Self::start_network_tunnel(network_tunnel_config, rx)); - // TODO(jixiang): Refact the network-proxy and remove the timeout logic here after all connectors are converted to work with connector-proxy. + // TODO: Refact the network-tunnel and remove the timeout logic here after all connectors are converted to work with connector-proxy. - // Block for at most 5 seconds for network proxy to be prepared. + // Block for at most 5 seconds for network tunnel to be prepared. if let Err(_) = timeout(std::time::Duration::from_secs(5), tx.closed()).await { return Err(Error::ChannelTimeoutError); }; - tracing::info!("network proxy started."); + tracing::info!("network tunnel started."); let json = serde_json::to_string_pretty(&endpoint_spec)?; RawValue::from_string(json).map_err(Into::into) diff --git a/crates/connector_proxy/src/libs/protobuf.rs b/crates/connector_proxy/src/libs/protobuf.rs index e93c0cfcfd..1f246d3aaf 100644 --- a/crates/connector_proxy/src/libs/protobuf.rs +++ b/crates/connector_proxy/src/libs/protobuf.rs @@ -1,5 +1,3 @@ -use crate::errors::Error; - use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use prost::Message; @@ -10,15 +8,14 @@ pub async fn decode_message< R: AsyncRead + std::marker::Unpin, >( reader: &mut R, -) -> Result, Error> { - // Deserialize the proto message. +) -> Result, std::io::Error> { let mut length_buf: [u8; 4] = [0; 4]; match reader.read_exact(&mut length_buf).await { Err(e) => match e.kind() { // By the current communication protocol, UnexpectedEof indicates the ending of the stream. std::io::ErrorKind::UnexpectedEof => return Ok(None), - _ => return Err(e.into()), + _ => return Err(e), }, Ok(_) => {} } diff --git a/crates/connector_proxy/src/libs/stream.rs b/crates/connector_proxy/src/libs/stream.rs index c1e31748d8..4649376b51 100644 --- a/crates/connector_proxy/src/libs/stream.rs +++ b/crates/connector_proxy/src/libs/stream.rs @@ -1,25 +1,269 @@ -use async_stream::stream; -use bytes::{Bytes, BytesMut}; -use futures_core::Stream; -use tokio::io::{AsyncRead, AsyncReadExt}; - -pub fn stream_all_bytes( - mut reader: R, -) -> impl Stream> { - stream! { - loop { - // consistent with the default capacity of ReaderStream. - // https://github.com/tokio-rs/tokio/blob/master/tokio-util/src/io/reader_stream.rs#L8 - let mut buf = BytesMut::with_capacity(4096); - match reader.read_buf(&mut buf).await { - Ok(0) => break, - Ok(_) => { - yield Ok(buf.into()); - } - Err(e) => { - panic!("error during streaming {:?}.", e); +use crate::libs::airbyte_catalog::Message; +use crate::{apis::InterceptorStream, errors::create_custom_error}; + +use crate::errors::raise_err; +use bytelines::AsyncByteLines; +use bytes::Bytes; +use futures::{StreamExt, TryStream, TryStreamExt}; +use tokio_util::io::StreamReader; +use validator::Validate; + +use super::airbyte_catalog::{Log, LogLevel, MessageType}; +use super::protobuf::decode_message; + +// Creates a stream of bytes of lines from the given stream +// This allows our other methods such as stream_airbyte_responses to operate +// on lines, simplifying their logic +pub fn stream_lines( + in_stream: InterceptorStream, +) -> impl TryStream, Error = std::io::Error, Ok = bytes::Bytes> { + AsyncByteLines::new(StreamReader::new(in_stream)) + .into_stream() + .map_ok(Bytes::from) +} + +/// Given a stream of lines, try to deserialize them into Airbyte Messages. +/// This can be used when reading responses from the Airbyte connector, and will +/// handle validation of messages as well as handling of AirbyteLogMessages. +/// Will ignore* lines that cannot be parsed to an AirbyteMessage. +/// * See https://docs.airbyte.com/understanding-airbyte/airbyte-specification#the-airbyte-protocol +pub fn stream_airbyte_responses( + in_stream: InterceptorStream, +) -> impl TryStream, Ok = Message, Error = std::io::Error> { + stream_lines(in_stream).try_filter_map(|line| async move { + let message: Message = match serde_json::from_slice(&line) { + Ok(m) => m, + Err(e) => { + // It is currently ambiguous for us whether Airbyte protocol specification + // mandates that there must be no plaintext or not, as such we handle all + // errors in parsing of stdout lines by logging the issue, but not failing + Message { + message_type: MessageType::Log, + connection_status: None, + state: None, + record: None, + spec: None, + catalog: None, + log: Some(Log { + level: LogLevel::Debug, + message: format!("Encountered error while trying to parse Airbyte Message: {:?} in line {:?}", e, line) + }) } } + }; + + message + .validate() + .map_err(|e| create_custom_error(&format!("error in validating message {:?}", e)))?; + + Ok(Some(message)) + }) + .try_filter_map(|message| async { + // For AirbyteLogMessages, log them and then filter them out + // so that we don't have to handle them elsewhere + if let Some(log) = message.log { + log.log(); + Ok(None) + } else { + Ok(Some(message)) + } + }) +} + +/// Read the given stream and try to find an Airbyte message that matches the predicate +/// ignoring* other message kinds. This can be used to work with Airbyte connector responses. +/// * See https://docs.airbyte.com/understanding-airbyte/airbyte-specification#the-airbyte-protocol +pub fn get_airbyte_response( + in_stream: InterceptorStream, + predicate: F, +) -> impl futures::Future> +where + F: Fn(&Message) -> bool, +{ + async move { + let stream_head = Box::pin(stream_airbyte_responses(in_stream)).next().await; + + let message = match stream_head { + Some(m) => m, + None => return raise_err("Could not find message in stream"), + }?; + + if predicate(&message) { + Ok(message) + } else { + raise_err("Could not find message matching condition") } } } + +/// Read the given stream of bytes and try to decode it to type +pub fn get_decoded_message( + in_stream: InterceptorStream, +) -> impl futures::Future> +where + T: prost::Message + std::default::Default, +{ + async move { + let mut reader = StreamReader::new(in_stream); + decode_message::(&mut reader) + .await? + .ok_or(create_custom_error("missing request")) + } +} + +#[cfg(test)] +mod test { + use std::{collections::HashMap, pin::Pin}; + + use bytes::BytesMut; + use futures::stream; + use protocol::{ + flow::EndpointType, + materialize::{validate_request, ValidateRequest}, + }; + use tokio_util::io::ReaderStream; + + use crate::libs::{ + airbyte_catalog::{ConnectionStatus, MessageType, Status}, + protobuf::encode_message, + }; + + use super::*; + + fn create_stream( + input: Vec, + ) -> Pin, Ok = T, Error = std::io::Error>>> { + Box::pin(stream::iter(input.into_iter().map(Ok::))) + } + + #[tokio::test] + async fn test_stream_lines() { + let line_0 = "{\"test\": \"hello\"}".as_bytes(); + let line_1 = "other".as_bytes(); + let line_2 = "{\"object\": {}}".as_bytes(); + let newline = "\n".as_bytes(); + let mut input = BytesMut::new(); + input.extend_from_slice(line_0); + input.extend_from_slice(newline); + input.extend_from_slice(line_1); + input.extend_from_slice(newline); + input.extend_from_slice(line_2); + let stream = create_stream(vec![Bytes::from(input)]); + let all_bytes = Box::pin(stream_lines(stream)); + + let result: Vec = all_bytes.try_collect::>().await.unwrap(); + assert_eq!(result, vec![line_0, line_1, line_2]); + } + + #[tokio::test] + async fn test_stream_airbyte_responses_eof_split_json() { + let input_message = Message { + message_type: MessageType::ConnectionStatus, + log: None, + state: None, + record: None, + spec: None, + catalog: None, + connection_status: Some(ConnectionStatus { + status: Status::Succeeded, + message: Some("test".to_string()), + }), + }; + let input = vec![ + Bytes::from("{\"type\": \"CONNECTION_STATUS\", \"connectionStatus\": {"), + Bytes::from("\"status\": \"SUCCEEDED\",\"message\":\"test\"}}"), + ]; + let stream = create_stream(input); + + let mut messages = Box::pin(stream_airbyte_responses(stream)); + + let result = messages.next().await.unwrap().unwrap(); + assert_eq!( + result.connection_status.unwrap(), + input_message.connection_status.unwrap() + ); + } + + #[tokio::test] + async fn test_stream_airbyte_responses_eof_split_json_partial() { + let input_message = Message { + message_type: MessageType::ConnectionStatus, + log: None, + state: None, + record: None, + spec: None, + catalog: None, + connection_status: Some(ConnectionStatus { + status: Status::Succeeded, + message: Some("test".to_string()), + }), + }; + let input = vec![ + Bytes::from("{}\n{\"type\": \"CONNECTION_STATUS\", \"connectionStatus\": {"), + Bytes::from("\"status\": \"SUCCEEDED\",\"message\":\"test\"}}"), + ]; + let stream = create_stream(input); + + let mut messages = Box::pin(stream_airbyte_responses(stream)); + + let result = messages.next().await.unwrap().unwrap(); + assert_eq!( + result.connection_status.unwrap(), + input_message.connection_status.unwrap() + ); + } + + #[tokio::test] + async fn test_stream_airbyte_responses_plaintext_mixed() { + let input_message = Message { + message_type: MessageType::ConnectionStatus, + log: None, + state: None, + record: None, + spec: None, + catalog: None, + connection_status: Some(ConnectionStatus { + status: Status::Succeeded, + message: Some("test".to_string()), + }), + }; + let input = vec![ + Bytes::from( + "I am plaintext!\n{\"type\": \"CONNECTION_STATUS\", \"connectionStatus\": {", + ), + Bytes::from("\"status\": \"SUCCEEDED\",\"message\":\"test\"}}"), + ]; + let stream = create_stream(input); + + let mut messages = Box::pin(stream_airbyte_responses(stream)); + + let result = messages.next().await.unwrap().unwrap(); + assert_eq!( + result.connection_status.unwrap(), + input_message.connection_status.unwrap() + ); + } + + #[tokio::test] + async fn test_get_decoded_message() { + let msg = ValidateRequest { + materialization: "materialization".to_string(), + endpoint_type: EndpointType::AirbyteSource.into(), + endpoint_spec_json: "{}".to_string(), + bindings: vec![validate_request::Binding { + resource_spec_json: "{}".to_string(), + collection: None, + field_config_json: HashMap::new(), + }], + }; + + let msg_buf = encode_message(&msg).unwrap(); + + let stream = Box::pin(ReaderStream::new(std::io::Cursor::new(msg_buf))); + let result = get_decoded_message::(stream) + .await + .unwrap(); + + assert_eq!(result, msg); + } +} diff --git a/crates/connector_proxy/src/main.rs b/crates/connector_proxy/src/main.rs index 7defe1507c..169d388f8b 100644 --- a/crates/connector_proxy/src/main.rs +++ b/crates/connector_proxy/src/main.rs @@ -3,24 +3,28 @@ pub mod connector_runner; pub mod errors; pub mod interceptors; pub mod libs; +use std::fs::File; +use std::io::BufReader; use clap::{ArgEnum, Parser, Subcommand}; -use tokio::signal::unix::{signal, SignalKind}; +use tokio::{ + io::AsyncReadExt, + signal::unix::{signal, SignalKind}, +}; -use apis::{compose, FlowCaptureOperation, FlowMaterializeOperation, Interceptor}; +use apis::{FlowCaptureOperation, FlowMaterializeOperation, FlowRuntimeProtocol}; use flow_cli_common::{init_logging, LogArgs}; -use connector_runner::run_connector; +use connector_runner::{ + run_airbyte_source_connector, run_flow_capture_connector, run_flow_materialize_connector, +}; use errors::Error; -use libs::image_config::ImageConfig; - -use interceptors::{ - airbyte_capture_interceptor::AirbyteCaptureInterceptor, - default_interceptors::{DefaultFlowCaptureInterceptor, DefaultFlowMaterializeInterceptor}, - network_proxy_capture_interceptor::NetworkProxyCaptureInterceptor, - network_proxy_materialize_interceptor::NetworkProxyMaterializeInterceptor, +use libs::{ + command::{check_exit_status, invoke_connector, read_ready, CommandConfig}, + image_inspect::ImageInspect, }; +use std::process::Stdio; #[derive(Debug, ArgEnum, Clone)] pub enum CaptureConnectorProtocol { @@ -47,12 +51,19 @@ struct ProxyFlowMaterialize { operation: FlowMaterializeOperation, } +#[derive(Debug, clap::Parser)] +struct DelayedExecutionConfig { + config_file_path: String, +} + #[derive(Debug, Subcommand)] enum ProxyCommand { /// proxies the Flow runtime Capture Protocol to the connector. ProxyFlowCapture(ProxyFlowCapture), /// proxies the Flow runtime Materialize Protocol to the connector. ProxyFlowMaterialize(ProxyFlowMaterialize), + /// internal command used by the connector proxy itself to delay execution until signaled. + DelayedExecute(DelayedExecutionConfig), } #[derive(Parser, Debug)] @@ -61,7 +72,7 @@ pub struct Args { /// The path (in the container) to the JSON file that contains the inspection results from the connector image. /// Normally produced via command "docker inspect ". #[clap(short, long)] - image_inspect_json_path: String, + image_inspect_json_path: Option, /// The type of proxy service to provide. #[clap(subcommand)] @@ -91,9 +102,6 @@ async fn main() -> std::io::Result<()> { } = Args::parse(); init_logging(&log_args); - // respond to os signals. - tokio::task::spawn(async move { signal_handler().await }); - let result = async_main(image_inspect_json_path, proxy_command).await; if let Err(err) = result.as_ref() { tracing::error!(error = ?err, "connector proxy execution failed."); @@ -102,7 +110,7 @@ async fn main() -> std::io::Result<()> { Ok(()) } -async fn signal_handler() { +async fn sigterm_handler() { let mut signal_stream = signal(SignalKind::terminate()).expect("failed creating signal."); signal_stream @@ -114,54 +122,94 @@ async fn signal_handler() { } async fn async_main( - image_inspect_json_path: String, + image_inspect_json_path: Option, proxy_command: ProxyCommand, ) -> Result<(), Error> { - let image_config = ImageConfig::parse_from_json_file(image_inspect_json_path)?; - - // TODO(jixiang): add a check to make sure the proxy_command passed in from commandline is consistent with the protocol inferred from image. match proxy_command { - ProxyCommand::ProxyFlowCapture(c) => proxy_flow_capture(c, image_config).await, - ProxyCommand::ProxyFlowMaterialize(m) => proxy_flow_materialize(m, image_config).await, + ProxyCommand::ProxyFlowCapture(c) => proxy_flow_capture(c, image_inspect_json_path).await, + ProxyCommand::ProxyFlowMaterialize(m) => { + proxy_flow_materialize(m, image_inspect_json_path).await + } + ProxyCommand::DelayedExecute(ba) => delayed_execute(ba.config_file_path).await, } } -async fn proxy_flow_capture(c: ProxyFlowCapture, image_config: ImageConfig) -> Result<(), Error> { - let mut converter_pair = match image_config +async fn proxy_flow_capture( + c: ProxyFlowCapture, + image_inspect_json_path: Option, +) -> Result<(), Error> { + let image_inspect = ImageInspect::parse_from_json_file(image_inspect_json_path)?; + if image_inspect.infer_runtime_protocol() != FlowRuntimeProtocol::Capture { + return Err(Error::MismatchingRuntimeProtocol); + } + + let entrypoint = image_inspect.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]); + + match image_inspect .get_connector_protocol::(CaptureConnectorProtocol::Airbyte) { - CaptureConnectorProtocol::FlowCapture => DefaultFlowCaptureInterceptor::get_converters(), - CaptureConnectorProtocol::Airbyte => AirbyteCaptureInterceptor::get_converters(), - }; - - converter_pair = compose( - converter_pair, - NetworkProxyCaptureInterceptor::get_converters(), - ); - - run_connector::( - c.operation, - image_config.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]), - converter_pair, - ) - .await + CaptureConnectorProtocol::FlowCapture => { + run_flow_capture_connector(&c.operation, entrypoint).await + } + CaptureConnectorProtocol::Airbyte => { + run_airbyte_source_connector(&c.operation, entrypoint).await + } + } } async fn proxy_flow_materialize( m: ProxyFlowMaterialize, - image_config: ImageConfig, + image_inspect_json_path: Option, ) -> Result<(), Error> { - // There is only one type of connector protocol for flow materialize. - let mut converter_pair = DefaultFlowMaterializeInterceptor::get_converters(); - converter_pair = compose( - converter_pair, - NetworkProxyMaterializeInterceptor::get_converters(), - ); - - run_connector::( - m.operation, - image_config.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]), - converter_pair, + // Respond to OS sigterm signal. + tokio::task::spawn(async move { sigterm_handler().await }); + + let image_inspect = ImageInspect::parse_from_json_file(image_inspect_json_path)?; + if image_inspect.infer_runtime_protocol() != FlowRuntimeProtocol::Materialize { + return Err(Error::MismatchingRuntimeProtocol); + } + + run_flow_materialize_connector( + &m.operation, + image_inspect.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]), ) .await } + +async fn delayed_execute(command_config_path: String) -> Result<(), Error> { + // Wait for the "READY" signal from the parent process before starting the connector. + read_ready(&mut tokio::io::stdin()).await?; + + tracing::info!("delayed process execution continue..."); + + let reader = BufReader::new(File::open(command_config_path)?); + let command_config: CommandConfig = serde_json::from_reader(reader)?; + + let mut child = invoke_connector( + Stdio::inherit(), + Stdio::inherit(), + Stdio::piped(), + &command_config.entrypoint, + &command_config.args, + )?; + + match check_exit_status("delayed process", child.wait().await) { + Err(e) => { + let mut buf = Vec::new(); + child + .stderr + .take() + .ok_or(Error::MissingIOPipe)? + .read_to_end(&mut buf) + .await?; + + tracing::error!( + "connector failed. command_config: {:?}. stderr from connector: {}", + &command_config, + std::str::from_utf8(&buf).expect("error when decoding stderr") + ); + Err(e) + } + _ => Ok(()), + } +} diff --git a/crates/network-proxy/src/main.rs b/crates/network-proxy/src/main.rs deleted file mode 100644 index 1451000708..0000000000 --- a/crates/network-proxy/src/main.rs +++ /dev/null @@ -1,40 +0,0 @@ -pub mod interface; -pub mod sshforwarding; -pub mod errors; -pub mod networkproxy; - -use errors::Error; -use flow_cli_common::{init_logging, LogArgs, LogFormat}; -use std::io::{self, Write}; - -use interface::NetworkProxyConfig; - -#[tokio::main] -async fn main() -> io::Result<()> { - init_logging(&LogArgs{level: "info".to_string(), format: Some(LogFormat::Json)}); - if let Err(err) = run().await.as_ref() { - tracing::error!(error = ?err, "network proxy failed."); - std::process::exit(1); - } - Ok(()) -} - -async fn run() -> Result<(), Error> { - let proxy_config: NetworkProxyConfig = serde_json::from_reader(io::stdin())?; - let mut proxy = proxy_config.new_proxy(); - - proxy.prepare().await?; - - // Write "READY" to stdio to unblock Go logic. - // The current workflow assumes that - // 1. After proxy.prepare() is called, the network proxy is able to accept requests from clients without sending errors back to clients. - // 2. The network proxy is able to process client requests immediately after `proxy.start_serve` is called. - // If either of the assumptions is invalid for any new proxy type, the READY-logic need to be moved to a separate task, which - // sends out the "READY" signal after making sure the network proxy is started and working properly. - println!("READY"); - io::stdout().flush()?; - - proxy.start_serve().await?; - - Ok(()) -} \ No newline at end of file diff --git a/crates/network-proxy/.gitignore b/crates/network-tunnel/.gitignore similarity index 100% rename from crates/network-proxy/.gitignore rename to crates/network-tunnel/.gitignore diff --git a/crates/network-proxy/Cargo.toml b/crates/network-tunnel/Cargo.toml similarity index 91% rename from crates/network-proxy/Cargo.toml rename to crates/network-tunnel/Cargo.toml index 2db5c5e5b8..9040d117c3 100644 --- a/crates/network-proxy/Cargo.toml +++ b/crates/network-tunnel/Cargo.toml @@ -1,12 +1,12 @@ [package] -name = "network-proxy" +name = "network-tunnel" version = "0.1.0" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [[bin]] -name = "flow-network-proxy" +name = "flow-network-tunnel" path = "src/main.rs" [dependencies] diff --git a/crates/network-proxy/src/errors.rs b/crates/network-tunnel/src/errors.rs similarity index 100% rename from crates/network-proxy/src/errors.rs rename to crates/network-tunnel/src/errors.rs diff --git a/crates/network-proxy/src/interface.rs b/crates/network-tunnel/src/interface.rs similarity index 56% rename from crates/network-proxy/src/interface.rs rename to crates/network-tunnel/src/interface.rs index a474c48f06..038f6a6cce 100644 --- a/crates/network-proxy/src/interface.rs +++ b/crates/network-tunnel/src/interface.rs @@ -1,4 +1,4 @@ -use super::networkproxy::NetworkProxy; +use super::networktunnel::NetworkTunnel; use super::sshforwarding::{SshForwarding, SshForwardingConfig}; use schemars::JsonSchema; @@ -7,14 +7,14 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] #[serde(deny_unknown_fields)] -pub enum NetworkProxyConfig { +pub enum NetworkTunnelConfig { SshForwarding(SshForwardingConfig), } -impl NetworkProxyConfig { - pub fn new_proxy(self) -> Box { +impl NetworkTunnelConfig { + pub fn new_tunnel(self) -> Box { match self { - NetworkProxyConfig::SshForwarding(config) => Box::new(SshForwarding::new(config)), + NetworkTunnelConfig::SshForwarding(config) => Box::new(SshForwarding::new(config)), } } } diff --git a/crates/network-proxy/src/lib.rs b/crates/network-tunnel/src/lib.rs similarity index 71% rename from crates/network-proxy/src/lib.rs rename to crates/network-tunnel/src/lib.rs index aea73e18cd..d2dc8c8ae8 100644 --- a/crates/network-proxy/src/lib.rs +++ b/crates/network-tunnel/src/lib.rs @@ -1,4 +1,4 @@ pub mod errors; pub mod interface; -pub mod networkproxy; +pub mod networktunnel; pub mod sshforwarding; diff --git a/crates/network-tunnel/src/main.rs b/crates/network-tunnel/src/main.rs new file mode 100644 index 0000000000..9584ec1ea6 --- /dev/null +++ b/crates/network-tunnel/src/main.rs @@ -0,0 +1,39 @@ +pub mod interface; +pub mod sshforwarding; +pub mod errors; +pub mod networktunnel; + +use errors::Error; +use flow_cli_common::{init_logging, LogArgs, LogFormat}; +use std::io::{self, Write}; + +use interface::NetworkTunnelConfig; + +#[tokio::main] +async fn main() -> io::Result<()> { + init_logging(&LogArgs{level: "info".to_string(), format: Some(LogFormat::Json)}); + if let Err(err) = run().await.as_ref() { + tracing::error!(error = ?err, "network tunnel failed."); + std::process::exit(1); + } + Ok(()) +} + +async fn run() -> Result<(), Error> { + let tunnel_config: NetworkTunnelConfig = serde_json::from_reader(io::stdin())?; + let mut tunnel = tunnel_config.new_tunnel(); + + tunnel.prepare().await?; + + // Write "READY" to stdio to unblock Go logic. + // The current workflow assumes that + // 1. After tunnel.prepare() is called, the network tunnel is able to accept requests from clients without sending errors back to clients. + // 2. The network tunnel is able to process client requests immediately after `tunnel.start_serve` is called. + // If either of the assumptions is invalid for any new tunnel type, the READY-logic need to be moved to a separate task, which + // sends out the "READY" signal after making sure the network tunnel is started and working properly. + println!("READY"); + + tunnel.start_serve().await?; + + Ok(()) +} \ No newline at end of file diff --git a/crates/network-proxy/src/networkproxy.rs b/crates/network-tunnel/src/networktunnel.rs similarity index 92% rename from crates/network-proxy/src/networkproxy.rs rename to crates/network-tunnel/src/networktunnel.rs index b929a3b9ef..c2dd95a067 100644 --- a/crates/network-proxy/src/networkproxy.rs +++ b/crates/network-tunnel/src/networktunnel.rs @@ -3,7 +3,7 @@ use super::errors::Error; use async_trait::async_trait; #[async_trait] -pub trait NetworkProxy: Send { +pub trait NetworkTunnel: Send { // Setup the network proxy server. Network proxy should be able to listen and accept requests after `prepare` is performed. async fn prepare(&mut self) -> Result<(), Error>; // Start a long-running task that serves and processes all proxy requests from clients. diff --git a/crates/network-proxy/src/sshforwarding.rs b/crates/network-tunnel/src/sshforwarding.rs similarity index 97% rename from crates/network-proxy/src/sshforwarding.rs rename to crates/network-tunnel/src/sshforwarding.rs index 07db484f70..d3d5d89e59 100644 --- a/crates/network-proxy/src/sshforwarding.rs +++ b/crates/network-tunnel/src/sshforwarding.rs @@ -1,5 +1,5 @@ use super::errors::Error; -use super::networkproxy::NetworkProxy; +use super::networktunnel::NetworkTunnel; use async_trait::async_trait; use futures::pin_mut; @@ -76,6 +76,8 @@ impl SshForwarding { } pub async fn authenticate(&mut self) -> Result<(), Error> { + // TODO: this breaks on the new OpenSSH keys, see: + // https://stackoverflow.com/questions/54994641/openssh-private-key-to-rsa-private-key let key_pair = Arc::new(key::KeyPair::RSA { key: openssl::rsa::Rsa::private_key_from_pem(&self.config.private_key.as_bytes())?, hash: key::SignatureHash::SHA2_256, @@ -97,7 +99,7 @@ impl SshForwarding { } #[async_trait] -impl NetworkProxy for SshForwarding { +impl NetworkTunnel for SshForwarding { async fn prepare(&mut self) -> Result<(), Error> { self.prepare_ssh_client().await?; self.prepare_local_listener().await?; diff --git a/go/capture/driver/airbyte/driver.go b/go/capture/driver/airbyte/driver.go index cd7b005480..dbac669b1d 100644 --- a/go/capture/driver/airbyte/driver.go +++ b/go/capture/driver/airbyte/driver.go @@ -2,18 +2,19 @@ package airbyte import ( "context" + "encoding/binary" "encoding/json" "fmt" "io" "strings" - "github.com/alecthomas/jsonschema" "github.com/estuary/flow/go/connector" "github.com/estuary/flow/go/flow/ops" "github.com/estuary/flow/go/protocols/airbyte" pc "github.com/estuary/flow/go/protocols/capture" pf "github.com/estuary/flow/go/protocols/flow" - "github.com/go-openapi/jsonpointer" + protoio "github.com/gogo/protobuf/io" + "github.com/gogo/protobuf/proto" "github.com/sirupsen/logrus" ) @@ -92,50 +93,39 @@ func (d driver) Spec(ctx context.Context, req *pc.SpecRequest) (*pc.SpecResponse "operation": "spec", }) - var spec *airbyte.Spec - var err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, + var decrypted, err = connector.DecryptConfig(ctx, source.Config) + if err != nil { + return nil, err + } + defer connector.ZeroBytes(decrypted) // connector.Run will also ZeroBytes(). + req.EndpointSpecJson = decrypted + + var resp *pc.SpecResponse + err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, []string{"spec"}, // No configuration is passed to the connector. nil, // No stdin is sent to the connector. - func(w io.Writer) error { return nil }, + func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + return protoio.NewUint32DelimitedWriter(w, binary.LittleEndian). + WriteMsg(req) + }, // Expect to decode Airbyte messages, and a ConnectorSpecification specifically. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.Spec != nil { - spec = rec.Spec - } else { - return fmt.Errorf("unexpected connector message: %v", rec) + connector.NewProtoOutput( + func() proto.Message { return new(pc.SpecResponse) }, + func(m proto.Message) error { + if resp != nil { + return fmt.Errorf("read more than one SpecResponse") } + resp = m.(*pc.SpecResponse) return nil }, - onStdoutDecodeError(logger), ), logger, ) + return resp, err - // Expect connector spit out a successful ConnectorSpecification. - if err == nil && spec == nil { - err = fmt.Errorf("connector didn't produce a Specification") - } - if err != nil { - return nil, err - } - - var reflector = jsonschema.Reflector{ExpandedStruct: true} - resourceSchema, err := reflector.Reflect(new(ResourceSpec)).MarshalJSON() - if err != nil { - return nil, fmt.Errorf("generating resource schema: %w", err) - } - - return &pc.SpecResponse{ - EndpointSpecSchemaJson: spec.ConnectionSpecification, - ResourceSpecSchemaJson: json.RawMessage(resourceSchema), - DocumentationUrl: spec.DocumentationURL, - }, nil } // Discover delegates to the `discover` command of the identified Airbyte image. @@ -156,80 +146,39 @@ func (d driver) Discover(ctx context.Context, req *pc.DiscoverRequest) (*pc.Disc return nil, err } defer connector.ZeroBytes(decrypted) // connector.Run will also ZeroBytes(). + req.EndpointSpecJson = decrypted - var catalog *airbyte.Catalog + var resp *pc.DiscoverResponse err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, []string{ "discover", - "--config", - "/tmp/config.json", }, - // Write configuration JSON to connector input. - map[string]json.RawMessage{"config.json": decrypted}, - // No stdin is sent to the connector. - func(w io.Writer) error { return nil }, - // Expect to decode Airbyte messages, and a ConnectionStatus specifically. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.Catalog != nil { - catalog = rec.Catalog - } else { - return fmt.Errorf("unexpected connector message: %v", rec) + nil, + func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + return protoio.NewUint32DelimitedWriter(w, binary.LittleEndian). + WriteMsg(req) + }, + connector.NewProtoOutput( + func() proto.Message { return new(pc.DiscoverResponse) }, + func(m proto.Message) error { + if resp != nil { + return fmt.Errorf("read more than one DiscoverResponse") } + resp = m.(*pc.DiscoverResponse) return nil }, - onStdoutDecodeError(logger), ), logger, ) // Expect connector spit out a successful ConnectionStatus. - if err == nil && catalog == nil { + if err == nil && resp == nil { err = fmt.Errorf("connector didn't produce a Catalog") - } - if err != nil { + } else if err != nil { return nil, err } - var resp = new(pc.DiscoverResponse) - for _, stream := range catalog.Streams { - // Use incremental mode if available. - var mode = airbyte.SyncModeFullRefresh - for _, m := range stream.SupportedSyncModes { - if m == airbyte.SyncModeIncremental { - mode = m - } - } - - var resourceSpec, err = json.Marshal(ResourceSpec{ - Stream: stream.Name, - Namespace: stream.Namespace, - SyncMode: mode, - }) - if err != nil { - return nil, fmt.Errorf("encoding resource spec: %w", err) - } - - // Encode array of hierarchical properties as a JSON-pointer. - var keyPtrs []string - for _, tokens := range stream.SourceDefinedPrimaryKey { - for i := range tokens { - tokens[i] = jsonpointer.Escape(tokens[i]) - } - keyPtrs = append(keyPtrs, "/"+strings.Join(tokens, "/")) - } - - resp.Bindings = append(resp.Bindings, &pc.DiscoverResponse_Binding{ - RecommendedName: stream.Name, - ResourceSpecJson: json.RawMessage(resourceSpec), - DocumentSchemaJson: stream.JSONSchema, - KeyPtrs: keyPtrs, - }) - } - return resp, nil } @@ -252,57 +201,39 @@ func (d driver) Validate(ctx context.Context, req *pc.ValidateRequest) (*pc.Vali ops.LogSourceField: source.Image, "operation": "validate", }) + req.EndpointSpecJson = decrypted - var status *airbyte.ConnectionStatus + var resp *pc.ValidateResponse err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, []string{ - "check", - "--config", - "/tmp/config.json", + "validate", }, - // Write configuration JSON to connector input. - map[string]json.RawMessage{"config.json": decrypted}, - // No stdin is sent to the connector. - func(w io.Writer) error { return nil }, - // Expect to decode Airbyte messages, and a ConnectionStatus specifically. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.ConnectionStatus != nil { - status = rec.ConnectionStatus - } else { - return fmt.Errorf("unexpected connector message: %v", rec) + nil, + func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + return protoio.NewUint32DelimitedWriter(w, binary.LittleEndian). + WriteMsg(req) + }, + connector.NewProtoOutput( + func() proto.Message { return new(pc.ValidateResponse) }, + func(m proto.Message) error { + if resp != nil { + return fmt.Errorf("read more than one ValidateResponse") } + resp = m.(*pc.ValidateResponse) return nil }, - onStdoutDecodeError(logger), ), logger, ) - // Expect connector spit out a successful ConnectionStatus. - if err == nil && status == nil { - err = fmt.Errorf("connector didn't produce a ConnectionStatus") - } else if err == nil && status.Status != airbyte.StatusSucceeded { - err = fmt.Errorf("%s: %s", status.Status, status.Message) + if err == nil && resp == nil { + err = fmt.Errorf("connector didn't produce a response") } if err != nil { return nil, err } - // Parse stream bindings and send back their resource paths. - var resp = new(pc.ValidateResponse) - for _, binding := range req.Bindings { - var stream = new(ResourceSpec) - if err := pf.UnmarshalStrict(binding.ResourceSpecJson, stream); err != nil { - return nil, fmt.Errorf("parsing stream configuration: %w", err) - } - resp.Bindings = append(resp.Bindings, &pc.ValidateResponse_Binding{ - ResourcePath: []string{stream.Stream}, - }) - } return resp, nil } @@ -332,102 +263,36 @@ func (d driver) Pull(stream pc.Driver_PullServer) error { return fmt.Errorf("parsing connector configuration: %w", err) } - var open = req.Open - var streamToBinding = make(map[string]int) var logger = ops.NewLoggerWithFields(d.logger, logrus.Fields{ ops.LogSourceField: source.Image, "operation": "read", }) - // Build configured Airbyte catalog. - var catalog = airbyte.ConfiguredCatalog{ - Streams: nil, - Tail: open.Tail, - Range: airbyte.Range{ - Begin: open.KeyBegin, - End: open.KeyEnd, - }, - } - for i, binding := range open.Capture.Bindings { - var resource = new(ResourceSpec) - if err := pf.UnmarshalStrict(binding.ResourceSpecJson, resource); err != nil { - return fmt.Errorf("parsing stream configuration: %w", err) - } - - var projections = make(map[string]string) - for _, p := range binding.Collection.Projections { - projections[p.Field] = p.Ptr - } - - var primaryKey = make([][]string, 0, len(binding.Collection.KeyPtrs)) - for _, key := range binding.Collection.KeyPtrs { - if ptr, err := jsonpointer.New(key); err != nil { - return fmt.Errorf("parsing json pointer: %w", err) - } else { - primaryKey = append(primaryKey, ptr.DecodedTokens()) - } - } - - catalog.Streams = append(catalog.Streams, - airbyte.ConfiguredStream{ - SyncMode: resource.SyncMode, - DestinationSyncMode: airbyte.DestinationSyncModeAppend, - PrimaryKey: primaryKey, - Stream: airbyte.Stream{ - Name: resource.Stream, - Namespace: resource.Namespace, - JSONSchema: binding.Collection.SchemaJson, - SupportedSyncModes: []airbyte.SyncMode{resource.SyncMode}, - }, - Projections: projections, - }) - streamToBinding[resource.Stream] = i - } - - catalogJSON, err := json.Marshal(&catalog) - if err != nil { - return fmt.Errorf("encoding catalog: %w", err) - } - logger.Log(logrus.DebugLevel, logrus.Fields{ - "catalog": &catalog, - }, "using configured catalog") - decrypted, err := connector.DecryptConfig(stream.Context(), source.Config) if err != nil { return err } defer connector.ZeroBytes(decrypted) // RunConnector will also ZeroBytes(). - var invokeArgs = []string{ - "read", - "--config", - "/tmp/config.json", - "--catalog", - "/tmp/catalog.json", - } - var invokeFiles = map[string]json.RawMessage{ - "config.json": decrypted, - "catalog.json": catalogJSON, - } - - if len(open.DriverCheckpointJson) != 0 { - invokeArgs = append(invokeArgs, "--state", "/tmp/state.json") - // Copy because RunConnector will ZeroBytes() once sent and, - // as noted in driver{}, we don't own this memory. - invokeFiles["state.json"] = append([]byte(nil), open.DriverCheckpointJson...) - } + req.Open.Capture.EndpointSpecJson = decrypted if err := stream.Send(&pc.PullResponse{Opened: &pc.PullResponse_Opened{}}); err != nil { return fmt.Errorf("sending Opened: %w", err) } - var resp *pc.PullResponse - // Invoke the connector for reading. - if err := connector.Run(stream.Context(), source.Image, connector.Capture, d.networkName, - invokeArgs, - invokeFiles, + return connector.Run(stream.Context(), source.Image, connector.Capture, d.networkName, + []string{"pull"}, + nil, func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + var enc = protoio.NewUint32DelimitedWriter(w, binary.LittleEndian) + var err = enc.WriteMsg(req) + + if err != nil { + return fmt.Errorf("proxying Open: %w", err) + } + for { var req, err = stream.Recv() if err == io.EOF { @@ -443,49 +308,14 @@ func (d driver) Pull(stream pc.Driver_PullServer) error { } } }, - // Expect to decode Airbyte messages. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.State != nil { - return pc.WritePullCheckpoint(stream, &resp, - &pf.DriverCheckpoint{ - DriverCheckpointJson: rec.State.Data, - Rfc7396MergePatch: rec.State.Merge, - }) - } else if rec.Record != nil { - if b, ok := streamToBinding[rec.Record.Stream]; ok { - return pc.StagePullDocuments(stream, &resp, b, rec.Record.Data) - } - return fmt.Errorf("connector record with unknown stream %q", rec.Record.Stream) - } else { - return fmt.Errorf("unexpected connector message: %v", rec) - } - return nil + connector.NewProtoOutput( + func() proto.Message { return new(pc.PullResponse) }, + func(m proto.Message) error { + return stream.Send(m.(*pc.PullResponse)) }, - onStdoutDecodeError(logger), ), logger, - ); err != nil { - return err - } - - if resp == nil { - return nil // Connector flushed prior to exiting. All done. - } - - // Write a final commit, followed by EOF. - // This happens only when a connector writes output and exits _without_ - // writing a final state checkpoint. We generate a synthetic commit now, - // and the nil checkpoint means the assumed behavior of the next invocation - // will be "full refresh". - return pc.WritePullCheckpoint(stream, &resp, - &pf.DriverCheckpoint{ - DriverCheckpointJson: nil, - Rfc7396MergePatch: false, - }) + ) } // onStdoutDecodeError returns a function that is invoked whenever there's an error parsing a line diff --git a/go/connector/run.go b/go/connector/run.go index a783176207..6b5926a1a2 100644 --- a/go/connector/run.go +++ b/go/connector/run.go @@ -105,41 +105,39 @@ func Run( } defer os.RemoveAll(tempdir) - if protocol == Materialize { - if connectorProxyPath, err := prepareFlowConnectorProxyBinary(tempdir); err != nil { - return fmt.Errorf("prepare flow connector proxy binary: %w", err) - } else { - imageArgs = append(imageArgs, - "--entrypoint", connectorProxyPath, - "--mount", fmt.Sprintf("type=bind,source=%[1]s,target=%[1]s", connectorProxyPath), - ) - } + if connectorProxyPath, err := prepareFlowConnectorProxyBinary(tempdir); err != nil { + return fmt.Errorf("prepare flow connector proxy binary: %w", err) + } else { + imageArgs = append(imageArgs, + "--entrypoint", connectorProxyPath, + "--mount", fmt.Sprintf("type=bind,source=%[1]s,target=%[1]s", connectorProxyPath), + ) + } - if err := pullRemoteImage(ctx, image, logger); err != nil { - // This might be a local image. Log an error and keep going. - // If the image does not exist locally, the inspectImage will return an error and terminate the workflow. - logger.Log(logrus.InfoLevel, logrus.Fields{ - "error": err, - }, "pull remote image does not succeed.") - } + if err := pullRemoteImage(ctx, image, logger); err != nil { + // This might be a local image. Log an error and keep going. + // If the image does not exist locally, the inspectImage will return an error and terminate the workflow. + logger.Log(logrus.InfoLevel, logrus.Fields{ + "error": err, + }, "pull remote image does not succeed.") + } - if inspectOutput, err := inspectImage(ctx, image); err != nil { - return fmt.Errorf("inspect image: %w", err) - } else { - if jsonFiles == nil { - jsonFiles = map[string]json.RawMessage{imageInspectJsonFileName: inspectOutput} + if inspectOutput, err := inspectImage(ctx, image); err != nil { + return fmt.Errorf("inspect image: %w", err) + } else { + if jsonFiles == nil { + jsonFiles = map[string]json.RawMessage{imageInspectJsonFileName: inspectOutput} - } else { - jsonFiles[imageInspectJsonFileName] = inspectOutput - } + } else { + jsonFiles[imageInspectJsonFileName] = inspectOutput } - - args = append([]string{ - fmt.Sprintf("--image-inspect-json-path=/tmp/%s", imageInspectJsonFileName), - protocol.proxyCommand(), - }, args...) } + args = append([]string{ + fmt.Sprintf("--image-inspect-json-path=/tmp/%s", imageInspectJsonFileName), + protocol.proxyCommand(), + }, args...) + for name, data := range jsonFiles { var hostPath = filepath.Join(tempdir, name) var containerPath = filepath.Join("/tmp", name) diff --git a/go/network-proxy/networkproxy.go b/go/network-tunnel/networktunnel.go similarity index 66% rename from go/network-proxy/networkproxy.go rename to go/network-tunnel/networktunnel.go index f76e85e85a..9ad95333c9 100644 --- a/go/network-proxy/networkproxy.go +++ b/go/network-tunnel/networktunnel.go @@ -1,4 +1,4 @@ -package networkproxy +package networktunnel import ( "bytes" @@ -11,64 +11,64 @@ import ( "syscall" "time" - sf "github.com/estuary/flow/go/network-proxy/sshforwarding" + sf "github.com/estuary/flow/go/network-tunnel/sshforwarding" ) -const ProgramName = "network-proxy-service" +const ProgramName = "network-tunnel-service" -func SupportedNetworkProxyTypes() []string { +func SupportedNetworkTunnelTypes() []string { return []string{"sshForwarding"} } -type NetworkProxyConfig struct { - ProxyType string `json:"proxyType"` +type NetworkTunnelConfig struct { + TunnelType string `json:"tunnelType"` SshForwardingConfig sf.SshForwardingConfig `json:"sshForwarding"` } // GetFieldDocString implements the jsonschema.customSchemaGetFieldDocString interface. -func (NetworkProxyConfig) GetFieldDocString(fieldName string) string { +func (NetworkTunnelConfig) GetFieldDocString(fieldName string) string { switch fieldName { - case "ProxyType": - return fmt.Sprintf("The type of the network proxy. Supported types are: ( %s )", strings.Join(SupportedNetworkProxyTypes(), ", ")) + case "TunnelType": + return fmt.Sprintf("The type of the network tunnel. Supported types are: ( %s )", strings.Join(SupportedNetworkTunnelTypes(), ", ")) case "SshForwardingConfig": - return "Config for proxy of type sshForwarding" + return "Config for tunnel of type sshForwarding" default: return "" } } -func (npc *NetworkProxyConfig) Validate() error { +func (npc *NetworkTunnelConfig) Validate() error { if npc == nil { return nil } var supported = false - for _, t := range SupportedNetworkProxyTypes() { - if t == npc.ProxyType { + for _, t := range SupportedNetworkTunnelTypes() { + if t == npc.TunnelType { supported = true break } } if !supported { - return fmt.Errorf("Unsupported proxy type: %s. Valid values are: ( %s ).", npc.ProxyType, strings.Join(SupportedNetworkProxyTypes(), ", ")) + return fmt.Errorf("Unsupported proxy type: %s. Valid values are: ( %s ).", npc.TunnelType, strings.Join(SupportedNetworkTunnelTypes(), ", ")) } - switch npc.ProxyType { + switch npc.TunnelType { case "sshForwarding": return npc.SshForwardingConfig.Validate() default: - panic(fmt.Sprintf("Implementation of validating %s is not ready.", npc.ProxyType)) + panic(fmt.Sprintf("Implementation of validating %s is not ready.", npc.TunnelType)) } } -func (npc *NetworkProxyConfig) MarshalJSON() ([]byte, error) { +func (npc *NetworkTunnelConfig) MarshalJSON() ([]byte, error) { var m = make(map[string]interface{}) - switch npc.ProxyType { + switch npc.TunnelType { case "sshForwarding": - m[npc.ProxyType] = npc.SshForwardingConfig + m[npc.TunnelType] = npc.SshForwardingConfig default: - panic(fmt.Sprintf("Implementation of MarshalJSON for %s is missing.", npc.ProxyType)) + panic(fmt.Sprintf("Implementation of MarshalJSON for %s is missing.", npc.TunnelType)) } return json.Marshal(m) @@ -76,13 +76,13 @@ func (npc *NetworkProxyConfig) MarshalJSON() ([]byte, error) { const defaultTimeoutSecs = 5 -func (npc *NetworkProxyConfig) Start() error { +func (npc *NetworkTunnelConfig) Start() error { return npc.startInternal(defaultTimeoutSecs, os.Stderr) } -func (npc *NetworkProxyConfig) startInternal(timeoutSecs uint16, stderr io.Writer) error { +func (npc *NetworkTunnelConfig) startInternal(timeoutSecs uint16, stderr io.Writer) error { if npc == nil { - // NetworkProxyConfig is not set. + // NetworkTunnelConfig is not set. return nil } @@ -117,7 +117,7 @@ func (npc *NetworkProxyConfig) startInternal(timeoutSecs uint16, stderr io.Write } } -func (npc *NetworkProxyConfig) sendInput(cmd *exec.Cmd) error { +func (npc *NetworkTunnelConfig) sendInput(cmd *exec.Cmd) error { stdin, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("getting stdin pipe: %w", err) @@ -131,7 +131,7 @@ func (npc *NetworkProxyConfig) sendInput(cmd *exec.Cmd) error { go func() { if _, err := stdin.Write(input); err != nil { - panic("Failed to send input to network-proxy-service binary.") + panic("Failed to send input to network-tunnel-service binary.") } stdin.Close() }() diff --git a/go/network-proxy/networkproxy_test.go b/go/network-tunnel/networktunnel_test.go similarity index 98% rename from go/network-proxy/networkproxy_test.go rename to go/network-tunnel/networktunnel_test.go index 16c9db37ad..93ea2ae8a4 100644 --- a/go/network-proxy/networkproxy_test.go +++ b/go/network-tunnel/networktunnel_test.go @@ -1,4 +1,4 @@ -package networkproxy +package networktunnel /* import ( diff --git a/go/network-proxy/sshforwarding/sshforwarding.go b/go/network-tunnel/sshforwarding/sshforwarding.go similarity index 100% rename from go/network-proxy/sshforwarding/sshforwarding.go rename to go/network-tunnel/sshforwarding/sshforwarding.go diff --git a/go/network-proxy/sshforwarding/sshforwarding_test.go b/go/network-tunnel/sshforwarding/sshforwarding_test.go similarity index 100% rename from go/network-proxy/sshforwarding/sshforwarding_test.go rename to go/network-tunnel/sshforwarding/sshforwarding_test.go diff --git a/go/network-proxy/testutil.go b/go/network-tunnel/testutil.go similarity index 75% rename from go/network-proxy/testutil.go rename to go/network-tunnel/testutil.go index 655616907f..4bed78074e 100644 --- a/go/network-proxy/testutil.go +++ b/go/network-tunnel/testutil.go @@ -1,20 +1,20 @@ -package networkproxy +package networktunnel import ( "encoding/base64" "os" - sf "github.com/estuary/flow/go/network-proxy/sshforwarding" + sf "github.com/estuary/flow/go/network-tunnel/sshforwarding" ) // Configuration set based on sshforwarding/test_sshd_configs/docker-compose.yaml. -func CreateSshForwardingTestConfig(keyFilePath string, remotePort uint16) (*NetworkProxyConfig, error) { +func CreateSshForwardingTestConfig(keyFilePath string, remotePort uint16) (*NetworkTunnelConfig, error) { var b, err = os.ReadFile(keyFilePath) if err != nil { return nil, err } - return &NetworkProxyConfig{ - ProxyType: "sshForwarding", + return &NetworkTunnelConfig{ + TunnelType: "sshForwarding", SshForwardingConfig: sf.SshForwardingConfig{ SshEndpoint: "ssh://127.0.0.1:2222", SshPrivateKeyBase64: base64.RawStdEncoding.EncodeToString(b), diff --git a/site/docs/concepts/connectors.md b/site/docs/concepts/connectors.md index c7f2f0d703..848b950934 100644 --- a/site/docs/concepts/connectors.md +++ b/site/docs/concepts/connectors.md @@ -567,7 +567,7 @@ materializations: database: flow user: flow_user password: secret - networkProxy: + networkTunnel: sshForwarding: # Port on the local machine from which you'll connect to the SSH server. # If a port is specified elsewhere in the connector configuration, it must match. diff --git a/tests/sshforwarding/materialize-postgres.ssh.config.yaml b/tests/sshforwarding/materialize-postgres.ssh.config.yaml index a5a9e8ebcb..c12e23dd4c 100644 --- a/tests/sshforwarding/materialize-postgres.ssh.config.yaml +++ b/tests/sshforwarding/materialize-postgres.ssh.config.yaml @@ -3,7 +3,7 @@ port: 16666 user: flow password: flow database: flow -networkProxy: +networkTunnel: sshForwarding: localPort: 16666 forwardHost: localhost @@ -49,4 +49,4 @@ networkProxy: KySvYOfiD8waRu2Gf7IqCHdgKBi7AkE45w72GhC+GOoDNMFgnlUgoDeRzxS7idf4 MIVS3sQzezB78ZAuZx0IkH8PxgqRI/D4CK9QBC0b2IT1xmqe5LCGhsMHSvScPLV3 Uu2cs5FkJUnkRpwup7KEfJfZG80DHP81GTsioAt40igx6gVAkIo= - -----END RSA PRIVATE KEY----- \ No newline at end of file + -----END RSA PRIVATE KEY-----