diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f5c552458e..cbf016c022 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -77,7 +77,6 @@ jobs: - run: make install-tools - run: go mod download - run: make rust-test - - run: make go-test-ci - run: make rust-binaries - name: Upload Binaries @@ -193,6 +192,11 @@ jobs: - name: Install protobuf compiler (it's not already included in CI runner) run: sudo apt install -y libprotobuf-dev protobuf-compiler + # We require a minimal Go version of 1.17. + - uses: actions/setup-go@v2 + with: + go-version: "1.17.3" + - name: Install rust toolchain run: rustup show - run: make extra-ci-runner-setup @@ -280,6 +284,19 @@ jobs: -o /home/runner/work/flow/flow/.build/package/bin/gazette \ -o /home/runner/work/flow/flow/.build/package/bin/sops + - name: make go-test-ci + run: | + make go-test-ci \ + -o /home/runner/work/flow/flow/.build/package/bin/etcd \ + -o /home/runner/work/flow/flow/.build/package/bin/flowctl \ + -o /home/runner/work/flow/flow/.build/package/bin/flowctl-go \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-connector-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-network-proxy \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-parser \ + -o /home/runner/work/flow/flow/.build/package/bin/flow-schemalate \ + -o /home/runner/work/flow/flow/.build/package/bin/gazette \ + -o /home/runner/work/flow/flow/.build/package/bin/sops + - name: make end-to-end-test run: | make end-to-end-test \ diff --git a/Cargo.lock b/Cargo.lock index 0f8b4b72ad..2e74701c37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,9 +93,9 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" dependencies = [ "async-stream-impl", "futures-core", @@ -103,9 +103,9 @@ dependencies = [ [[package]] name = "async-stream-impl" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" dependencies = [ "proc-macro2", "quote", @@ -238,9 +238,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "base64ct" -version = "1.3.3" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "874f8444adcb4952a8bc51305c8be95c8ec8237bb0d2e78d2e039f771f8828a0" +checksum = "71acf5509fc522cce1b100ac0121c635129bfd4d91cdf036bcc9b9935f97ccf5" [[package]] name = "bcrypt-pbkdf" @@ -540,7 +540,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom 7.1.0", + "nom 7.1.1", ] [[package]] @@ -668,6 +668,8 @@ dependencies = [ "flow_cli_common", "futures-core", "futures-util", + "json-pointer", + "libc", "network-proxy", "prost", "protocol", @@ -677,10 +679,12 @@ dependencies = [ "structopt", "strum 0.24.0", "strum_macros 0.24.0", + "tempfile", "thiserror", "tokio", "tokio-util 0.7.0", "tracing", + "validator", ] [[package]] @@ -833,9 +837,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa" +checksum = "fdbfe11fe19ff083c48923cf179540e8cd0535903dc35e178a1fdeeb59aef51f" dependencies = [ "cfg-if", "crossbeam-utils", @@ -854,10 +858,11 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.7" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c00d6d2ea26e8b151d99093005cb442fb9a37aeaca582a03ec70946f49ab5ed9" +checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c" dependencies = [ + "autocfg", "cfg-if", "crossbeam-utils", "lazy_static", @@ -867,9 +872,9 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dd435b205a4842da59efd07628f921c096bc1cc0a156835b4fa0bcb9a19bcce" +checksum = "1f25d8400f4a7a5778f0e4e52384a48cbd9b5c495d110786187fc750075277a2" dependencies = [ "cfg-if", "crossbeam-utils", @@ -877,9 +882,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" dependencies = [ "cfg-if", "lazy_static", @@ -939,9 +944,9 @@ dependencies = [ [[package]] name = "ctor" -version = "0.1.21" +version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa" +checksum = "f877be4f7c9f246b183111634f75baa039715e3f46ce860677d3b19a69fb229c" dependencies = [ "quote", "syn", @@ -1082,9 +1087,9 @@ dependencies = [ [[package]] name = "dirs-sys" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03d86534ed367a67548dc68113a0f5db55432fdfbb6e6f9d77704397d95d5780" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" dependencies = [ "libc", "redox_users", @@ -1114,7 +1119,7 @@ dependencies = [ [[package]] name = "doc" version = "0.0.0" -source = "git+https://github.com/estuary/flow#d8c3be35fc8ac0657e0c4aa2e5e7ef4b3eb72905" +source = "git+https://github.com/estuary/flow#4f2ca48fda98b608dd2dc2d920dc40ecc60150f5" dependencies = [ "fancy-regex", "itertools", @@ -1143,9 +1148,9 @@ checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" [[package]] name = "dyn-clone" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2626afccd7561a06cf1367e2950c4718ea04565e20fb5029b6c7d8ad09abcf" +checksum = "21e50f3adc76d6a43f5ed73b698a87d0760ca74617f60f7c3b879003536fdd28" [[package]] name = "either" @@ -1645,9 +1650,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "humantime-serde" -version = "1.0.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac34a56cfd4acddb469cc7fff187ed5ac36f498ba085caf8bbc725e3ff474058" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" dependencies = [ "humantime", "serde 1.0.136", @@ -1882,7 +1887,7 @@ dependencies = [ [[package]] name = "json" version = "0.0.0" -source = "git+https://github.com/estuary/flow#d8c3be35fc8ac0657e0c4aa2e5e7ef4b3eb72905" +source = "git+https://github.com/estuary/flow#4f2ca48fda98b608dd2dc2d920dc40ecc60150f5" dependencies = [ "bitvec", "fancy-regex", @@ -1908,6 +1913,15 @@ dependencies = [ "treediff", ] +[[package]] +name = "json-pointer" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fe841b94e719a482213cee19dd04927cf412f26d8dc84c5a446c081e49c2997" +dependencies = [ + "serde_json", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1935,9 +1949,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.119" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" +checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09" [[package]] name = "libflate" @@ -2126,14 +2140,15 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" dependencies = [ "libc", "log", "miow", "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", "winapi", ] @@ -2236,13 +2251,12 @@ dependencies = [ [[package]] name = "nom" -version = "7.1.0" +version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" dependencies = [ "memchr", "minimal-lexical", - "version_check", ] [[package]] @@ -2316,9 +2330,9 @@ dependencies = [ [[package]] name = "num_threads" -version = "0.1.3" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15" +checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0" dependencies = [ "libc", ] @@ -2369,9 +2383,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.17.0+1.1.1m" +version = "111.18.0+1.1.1n" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05d6a336abd10814198f66e2a91ccd7336611f30334119ca8ce300536666fcf4" +checksum = "7897a926e1e8d00219127dc020130eca4292e5ca666dd592480d72c3eca2ff6c" dependencies = [ "cc", ] @@ -2526,9 +2540,9 @@ checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" [[package]] name = "pathfinding" -version = "3.0.10" +version = "3.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae0f81e2934f5e80cf0fa7a3ee971b9b37df69ac111f20772e4dd33ff379d740" +checksum = "18474651993237fdc6197431f5fa7ed719c3d0a5050287f7dff9df2ffc011f75" dependencies = [ "fixedbitset", "indexmap", @@ -2851,9 +2865,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" +checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" dependencies = [ "proc-macro2", ] @@ -3008,12 +3022,13 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" +checksum = "7776223e2696f1aa4c6b0170e83212f47296a00424305117d013dfe86fb0fe55" dependencies = [ "getrandom 0.2.5", "redox_syscall", + "thiserror", ] [[package]] @@ -3053,9 +3068,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.9" +version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f242f1488a539a79bac6dbe7c8609ae43b7914b7736210f239a37cccb32525" +checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" dependencies = [ "base64", "bytes", @@ -3191,9 +3206,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" dependencies = [ "base64", ] @@ -3618,7 +3633,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4" dependencies = [ "itertools", - "nom 7.1.0", + "nom 7.1.1", "unicode_categories", ] @@ -3828,9 +3843,9 @@ checksum = "ab16ced94dbd8a46c82fd81e3ed9a8727dac2977ea869d217bcc4ea1f122e81f" [[package]] name = "syn" -version = "1.0.86" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" +checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" dependencies = [ "proc-macro2", "quote", @@ -4274,9 +4289,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90c125fdea84614a4368fd35786b51d0682ab8d42705e061e92f0b955dea40fb" +checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8" dependencies = [ "bitflags", "bytes", @@ -4670,6 +4685,12 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.79" @@ -4881,9 +4902,9 @@ checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" [[package]] name = "winreg" -version = "0.7.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" dependencies = [ "winapi", ] diff --git a/Makefile b/Makefile index db856a2dde..c6ced2233c 100644 --- a/Makefile +++ b/Makefile @@ -198,7 +198,6 @@ ${PKGDIR}/bin/flow-network-proxy: ${RUST_MUSL_BIN}/flow-network-proxy | ${PKGDIR ${PKGDIR}/bin/flow-connector-proxy: ${RUST_MUSL_BIN}/flow-connector-proxy | ${PKGDIR} cp ${RUST_MUSL_BIN}/flow-connector-proxy $@ - ########################################################################## # Make targets used by CI: @@ -241,7 +240,7 @@ go-test-fast: $(GO_BUILD_DEPS) | ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops ./go.sh test -p ${NPROC} --tags "${GO_BUILD_TAGS}" ./go/... .PHONY: go-test-ci -go-test-ci: $(GO_BUILD_DEPS) | ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops +go-test-ci: $(GO_BUILD_DEPS) | ${PKGDIR}/bin/etcd ${PKGDIR}/bin/sops ${PKGDIR}/bin/flow-connector-proxy ${PKGDIR}/bin/flowctl ${PKGDIR}/bin/flowctl-go PATH=${PKGDIR}/bin:$$PATH ;\ GORACE="halt_on_error=1" ;\ ./go.sh test -p ${NPROC} --tags "${GO_BUILD_TAGS}" --race --count=15 --failfast ./go/... diff --git a/crates/connector_proxy/Cargo.toml b/crates/connector_proxy/Cargo.toml index 1765af109f..8f93abdf0f 100644 --- a/crates/connector_proxy/Cargo.toml +++ b/crates/connector_proxy/Cargo.toml @@ -20,6 +20,8 @@ byteorder="*" clap = { version = "^3", features = ["derive"] } futures-core = "*" futures-util="*" +json-pointer="*" +libc="*" prost = "*" schemars = "*" serde = { version = "*", features = ["derive"]} @@ -27,7 +29,9 @@ serde_json = { version = "*", features = ["raw_value"]} structopt = "*" strum = "*" strum_macros = "*" +tempfile="*" thiserror = "*" tokio = { version = "1.15.0", features = ["full"] } tokio-util = { version = "*", features = ["io"] } tracing="*" +validator = { version = "*", features = ["derive"] } \ No newline at end of file diff --git a/crates/connector_proxy/src/apis.rs b/crates/connector_proxy/src/apis.rs index d3bf09b06c..08cc36cb81 100644 --- a/crates/connector_proxy/src/apis.rs +++ b/crates/connector_proxy/src/apis.rs @@ -1,9 +1,20 @@ -use crate::errors::Error; use bytes::Bytes; use clap::ArgEnum; use futures_core::stream::Stream; use std::pin::Pin; +// The protocol used by FlowRuntime to speak with connector-proxy. +// There are two ways to infer the protocol. +// 1. From the proxy command passed in from FlowRuntime to the connector proxy. +// 2. From the connector image labels and tags. +// The proxy raises an error if both are inconsistent. +#[derive(Debug, strum_macros::Display, ArgEnum, PartialEq, Clone)] +#[strum(serialize_all = "snake_case")] +pub enum FlowRuntimeProtocol { + Capture, + Materialize, +} + // Flow Capture operations defined in // https://github.com/estuary/flow/blob/master/go/protocols/capture/capture.proto #[derive(Debug, strum_macros::Display, ArgEnum, Clone)] @@ -29,39 +40,6 @@ pub enum FlowMaterializeOperation { Transactions, } -// To be used as a trait bound for interceptors. -pub trait FlowOperation {} -impl FlowOperation for FlowCaptureOperation {} -impl FlowOperation for FlowMaterializeOperation {} - // An interceptor modifies the request/response streams between Flow runtime and the connector. // InterceptorStream defines the type of input and output streams handled by interceptors. pub type InterceptorStream = Pin> + Send + Sync>>; - -// The generic param "T" below is bounded by FlowOperation. -// A converter is a function that contains the specific stream-handling logic of an interceptor. -type ConverterFn = Box Result>; -// An intercept is characterized by a pair of converters, corresponding to the handling logic of request and response streams, respectively. -pub type RequestResponseConverterPair = (ConverterFn, ConverterFn); -pub trait Interceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(|_op, stream| Ok(stream)), - Box::new(|_op, stream| Ok(stream)), - ) - } -} - -// Two converter pairs can be composed together to form a new converter pair. -pub fn compose( - a: RequestResponseConverterPair, - b: RequestResponseConverterPair, -) -> RequestResponseConverterPair { - let (req_a, resp_a) = a; - let (req_b, resp_b) = b; - ( - Box::new(move |op, stream| (req_b)(op, (req_a)(op, stream)?)), - // Response conversions are applied in the reverse order of the request conversions. - Box::new(move |op, stream| (resp_a)(op, (resp_b)(op, stream)?)), - ) -} diff --git a/crates/connector_proxy/src/connector_runner.rs b/crates/connector_proxy/src/connector_runner.rs index fbe1a9d61c..28dc1b4255 100644 --- a/crates/connector_proxy/src/connector_runner.rs +++ b/crates/connector_proxy/src/connector_runner.rs @@ -1,48 +1,139 @@ -use crate::apis::RequestResponseConverterPair; +use crate::apis::{FlowCaptureOperation, FlowMaterializeOperation, InterceptorStream}; use crate::errors::Error; -use crate::libs::command::{check_exit_status, invoke_connector}; +use crate::interceptors::{ + airbyte_source_interceptor::AirbyteSourceInterceptor, + network_proxy_capture_interceptor::NetworkProxyCaptureInterceptor, + network_proxy_materialize_interceptor::NetworkProxyMaterializeInterceptor, +}; +use crate::libs::command::{ + check_exit_status, invoke_connector_delayed, invoke_connector_direct, parse_child, +}; use tokio::io::copy; +use tokio::process::{ChildStderr, ChildStdin, ChildStdout}; use tokio_util::io::{ReaderStream, StreamReader}; -pub async fn run_connector( - operation: T, +pub async fn run_flow_capture_connector( + op: &FlowCaptureOperation, entrypoint: Vec, - converter_pair: RequestResponseConverterPair, ) -> Result<(), Error> { - // prepare entrypoint and args. + let (entrypoint, mut args) = parse_entrypoint(&entrypoint)?; + args.push(op.to_string()); + + let (mut child, child_stdin, child_stdout, child_stderr) = + parse_child(invoke_connector_direct(entrypoint, args)?)?; + + let adapted_request_stream = + NetworkProxyCaptureInterceptor::adapt_request_stream(op, request_stream())?; + + let adapted_response_stream = + NetworkProxyCaptureInterceptor::adapt_response_stream(op, response_stream(child_stdout))?; + + streaming_all( + child_stdin, + child_stderr, + adapted_request_stream, + adapted_response_stream, + ) + .await?; + + check_exit_status("flow capture connector:", child.wait().await) +} + +pub async fn run_flow_materialize_connector( + op: &FlowMaterializeOperation, + entrypoint: Vec, +) -> Result<(), Error> { + let (entrypoint, mut args) = parse_entrypoint(&entrypoint)?; + args.push(op.to_string()); + + let (mut child, child_stdin, child_stdout, child_stderr) = + parse_child(invoke_connector_direct(entrypoint, args)?)?; + + let adapted_request_stream = + NetworkProxyMaterializeInterceptor::adapt_request_stream(op, request_stream())?; + + let adapted_response_stream = NetworkProxyMaterializeInterceptor::adapt_response_stream( + op, + response_stream(child_stdout), + )?; + + streaming_all( + child_stdin, + child_stderr, + adapted_request_stream, + adapted_response_stream, + ) + .await?; + + check_exit_status("flow materialize connector:", child.wait().await) +} + +pub async fn run_airbyte_source_connector( + op: &FlowCaptureOperation, + entrypoint: Vec, +) -> Result<(), Error> { + let mut airbyte_interceptor = AirbyteSourceInterceptor::new(); + + let (entrypoint, args) = parse_entrypoint(&entrypoint)?; + let args = airbyte_interceptor.adapt_command_args(op, args)?; + + let (mut child, child_stdin, child_stdout, child_stderr) = + parse_child(invoke_connector_delayed(entrypoint, args).await?)?; + + let adapted_request_stream = airbyte_interceptor.adapt_request_stream( + op, + NetworkProxyCaptureInterceptor::adapt_request_stream(op, request_stream())?, + )?; + + let adapted_response_stream = NetworkProxyCaptureInterceptor::adapt_response_stream( + op, + airbyte_interceptor.adapt_response_stream(op, response_stream(child_stdout))?, + )?; + + streaming_all( + child_stdin, + child_stderr, + adapted_request_stream, + adapted_response_stream, + ) + .await?; + + check_exit_status("airbyte source connector:", child.wait().await) +} + +fn parse_entrypoint(entrypoint: &Vec) -> Result<(String, Vec), Error> { if entrypoint.len() == 0 { return Err(Error::EmptyEntrypointError); } - let mut args = Vec::new(); - args.extend_from_slice(&entrypoint[1..]); - args.push(operation.to_string()); - - let entrypoint = entrypoint[0].clone(); - - // invoke the connector and converts the request/response streams. - let mut child = invoke_connector(entrypoint, &args)?; - - let (request_converter, response_converter) = converter_pair; - // Perform conversions on requests and responses and starts bi-directional copying. - let mut request_source = StreamReader::new((request_converter)( - &operation, - Box::pin(ReaderStream::new(tokio::io::stdin())), - )?); - let mut request_destination = child.stdin.take().ok_or(Error::MissingIOPipe)?; - - let response_stream_out = child.stdout.take().ok_or(Error::MissingIOPipe)?; - let mut response_source = StreamReader::new((response_converter)( - &operation, - Box::pin(ReaderStream::new(response_stream_out)), - )?); - let mut response_destination = tokio::io::stdout(); - - let (a, b) = tokio::join!( - copy(&mut request_source, &mut request_destination), - copy(&mut response_source, &mut response_destination) + + return Ok((entrypoint[0].clone(), entrypoint[1..].to_vec())); +} + +fn request_stream() -> InterceptorStream { + Box::pin(ReaderStream::new(tokio::io::stdin())) +} + +fn response_stream(child_stdout: ChildStdout) -> InterceptorStream { + Box::pin(ReaderStream::new(child_stdout)) +} + +async fn streaming_all( + mut request_stream_writer: ChildStdin, + mut error_reader: ChildStderr, + request_stream: InterceptorStream, + response_stream: InterceptorStream, +) -> Result<(), Error> { + let mut request_stream_reader = StreamReader::new(request_stream); + let mut response_stream_reader = StreamReader::new(response_stream); + let mut response_stream_writer = tokio::io::stdout(); + let mut error_writer = tokio::io::stderr(); + + let (a, b, c) = tokio::join!( + copy(&mut request_stream_reader, &mut request_stream_writer), + copy(&mut response_stream_reader, &mut response_stream_writer), + copy(&mut error_reader, &mut error_writer), ); - a?; - b?; - check_exit_status(child.wait().await) + tracing::info!("Done streaming, transferred bytes: {} {} {}", a?, b?, c?); + Ok(()) } diff --git a/crates/connector_proxy/src/errors.rs b/crates/connector_proxy/src/errors.rs index 47eda729a9..35e5d938a3 100644 --- a/crates/connector_proxy/src/errors.rs +++ b/crates/connector_proxy/src/errors.rs @@ -1,5 +1,8 @@ #[derive(thiserror::Error, Debug)] pub enum Error { + #[error("failed in starting bouncer process.")] + BouncerProcessStartError, + #[error("channel timeout in receiving messages after 5 seconds.")] ChannelTimeoutError, @@ -18,6 +21,12 @@ pub enum Error { #[error("missing process io pipes.")] MissingIOPipe, + #[error("mismatching runtime protocol")] + MismatchingRuntimeProtocol, + + #[error("No ready signal is received. {0}")] + NotReady(&'static str), + #[error("invalid endpoint json config.")] InvalidEndpointConfig, @@ -36,11 +45,28 @@ pub enum Error { #[error(transparent)] MessageEncodeError(#[from] prost::EncodeError), + #[error("Missing required image inspect file. Specify it via --image-inspect-json-path in command line.")] + MissingImageInspectFile, + #[error(transparent)] NetworkProxyError(#[from] network_proxy::errors::Error), + #[error(transparent)] + TempfilePersistError(#[from] tempfile::PersistError), + #[error("Tokio task execution error.")] TokioTaskExecutionError(#[from] tokio::task::JoinError), + + #[error("The operation of '{0}' is not expected for the given protocol.")] + UnexpectedOperation(String), +} + +pub fn raise_custom_error(message: &str) -> Result<(), std::io::Error> { + Err(create_custom_error(message)) +} + +pub fn create_custom_error(message: &str) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, message) } pub trait Must { diff --git a/crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs b/crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs deleted file mode 100644 index 296d7953e2..0000000000 --- a/crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::apis::{ - FlowCaptureOperation, Interceptor, InterceptorStream, RequestResponseConverterPair, -}; -use crate::errors::Error; - -// A placeholder for real logic of airbyte connectors. Details might change during real-implementations. -pub struct AirbyteCaptureInterceptor {} - -impl AirbyteCaptureInterceptor { - fn convert_request( - _operation: &FlowCaptureOperation, - _in_stream: InterceptorStream, - ) -> Result { - panic!("TBD AirbyteCaptureInterceptor") - } - - fn convert_response( - _operation: &FlowCaptureOperation, - _in_stream: InterceptorStream, - ) -> Result { - panic!("TBD AirbyteCaptureInterceptor") - } -} - -impl Interceptor for AirbyteCaptureInterceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(Self::convert_request), - Box::new(Self::convert_response), - ) - } -} diff --git a/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs b/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs new file mode 100644 index 0000000000..ba61bd3509 --- /dev/null +++ b/crates/connector_proxy/src/interceptors/airbyte_source_interceptor.rs @@ -0,0 +1,431 @@ +use crate::apis::{FlowCaptureOperation, InterceptorStream}; + +use crate::errors::{create_custom_error, raise_custom_error, Error}; +use crate::libs::airbyte_catalog::{ + self, ConfiguredCatalog, ConfiguredStream, DestinationSyncMode, Range, ResourceSpec, Status, + SyncMode, +}; +use crate::libs::command::READY; +use crate::libs::json::{create_root_schema, tokenize_jsonpointer}; +use crate::libs::protobuf::{decode_message, encode_message}; +use crate::libs::stream::stream_all_airbyte_messages; + +use async_stream::try_stream; +use bytes::Bytes; +use protocol::capture::{ + discover_response, validate_response, DiscoverRequest, DiscoverResponse, Documents, + PullRequest, PullResponse, SpecRequest, SpecResponse, ValidateRequest, ValidateResponse, +}; +use protocol::flow::{DriverCheckpoint, Slice}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use validator::Validate; + +use futures_util::StreamExt; +use json_pointer::JsonPointer; +use serde_json::value::RawValue; +use std::fs::File; +use std::io::Write; +use tempfile::{Builder, TempDir}; +use tokio_util::io::StreamReader; + +const CONFIG_FILE_NAME: &str = "config.json"; +const CATALOG_FILE_NAME: &str = "catalog.json"; +const STATE_FILE_NAME: &str = "state.json"; + +pub struct AirbyteSourceInterceptor { + validate_request: Arc>>, + stream_to_binding: Arc>>, + tmp_dir: TempDir, +} + +impl AirbyteSourceInterceptor { + pub fn new() -> Self { + AirbyteSourceInterceptor { + validate_request: Arc::new(Mutex::new(None)), + stream_to_binding: Arc::new(Mutex::new(HashMap::new())), + tmp_dir: Builder::new() + .prefix("airbyte-source-") + .tempdir_in("/var/tmp") + .expect("failed to create temp dir."), + } + } + + fn adapt_spec_request_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(try_stream! { + let mut reader = StreamReader::new(in_stream); + decode_message::(&mut reader).await?.ok_or(create_custom_error("missing spec request."))?; + + yield Bytes::from(READY); + }) + } + + fn adapt_spec_response_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream { + Box::pin(try_stream! { + let mut airbyte_message_stream = Box::pin(stream_all_airbyte_messages(in_stream)); + loop { + let message = match airbyte_message_stream.next().await { + None => break, + Some(message) => message? + }; + if let Some(spec) = message.spec { + let mut resp = SpecResponse::default(); + resp.endpoint_spec_schema_json = spec.connection_specification.to_string(); + resp.resource_spec_schema_json = serde_json::to_string_pretty(&create_root_schema::())?; + if let Some(url) = spec.documentation_url { + resp.documentation_url = url; + } + yield encode_message(&resp)?; + } else if let Some(mlog) = message.log { + mlog.log(); + } else { + raise_custom_error("unexpected spec response.")?; + } + } + }) + } + + fn adapt_discover_request( + &mut self, + config_file_path: String, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(try_stream! { + let mut reader = StreamReader::new(in_stream); + let request = decode_message::(&mut reader).await?.ok_or(create_custom_error("missing discover request."))?; + + File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?; + + yield Bytes::from(READY); + }) + } + + fn adapt_discover_response_stream( + &mut self, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(try_stream! { + let mut airbyte_message_stream = Box::pin(stream_all_airbyte_messages(in_stream)); + loop { + let message = match airbyte_message_stream.next().await { + None => break, + Some(message) => message? + }; + + if let Some(catalog) = message.catalog { + let mut resp = DiscoverResponse::default(); + for stream in catalog.streams { + let mode = if stream.supported_sync_modes.contains(&SyncMode::Incremental) {SyncMode::Incremental} else {SyncMode::FullRefresh}; + let resource_spec = ResourceSpec { + stream: stream.name.clone(), + namespace: stream.namespace, + sync_mode: mode + }; + + let key_ptrs = match stream.source_defined_primary_key { + None => Vec::new(), + Some(keys) => keys.iter().map(|k| JsonPointer::new(k).to_string()).collect() + }; + resp.bindings.push(discover_response::Binding{ + recommended_name: stream.name.clone(), + resource_spec_json: serde_json::to_string(&resource_spec)?, + key_ptrs: key_ptrs, + document_schema_json: stream.json_schema.to_string(), + }) + } + + yield encode_message(&resp)?; + } else if let Some(mlog) = message.log { + mlog.log(); + } else { + raise_custom_error("unexpected discover response.")?; + } + } + }) + } + + fn adapt_validate_request_stream( + &mut self, + config_file_path: String, + validate_request: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(try_stream! { + let mut reader = StreamReader::new(in_stream); + let request = decode_message::(&mut reader).await?.ok_or(create_custom_error("missing validate request"))?; + *validate_request.lock().await = Some(request.clone()); + + File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?; + + yield Bytes::from(READY); + }) + } + + fn adapt_validate_response_stream( + &mut self, + validate_request: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(try_stream! { + let mut airbyte_message_stream = Box::pin(stream_all_airbyte_messages(in_stream)); + loop { + let message = match airbyte_message_stream.next().await { + None => break, + Some(message) => message? + }; + + if let Some(connection_status) = message.connection_status { + if connection_status.status != Status::Succeeded { + raise_custom_error(&format!("validation failed {:?}", connection_status))?; + } + + let req = validate_request.lock().await; + let req = req.as_ref().ok_or(create_custom_error("missing validate request."))?; + let mut resp = ValidateResponse::default(); + for binding in &req.bindings { + let resource: ResourceSpec = serde_json::from_str(&binding.resource_spec_json)?; + resp.bindings.push(validate_response::Binding {resource_path: vec![resource.stream]}); + } + drop(req); + yield encode_message(&resp)?; + } else if let Some(mlog) = message.log { + mlog.log(); + } else { + raise_custom_error("unexpected validate response.")?; + } + } + }) + } + + fn adapt_pull_request_stream( + &mut self, + config_file_path: String, + catalog_file_path: String, + state_file_path: String, + stream_to_binding: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(try_stream! { + let mut reader = StreamReader::new(in_stream); + let mut request = decode_message::(&mut reader).await?.ok_or(create_custom_error("missing pull request"))?; + if let Some(ref mut o) = request.open { + File::create(state_file_path)?.write_all(&o.driver_checkpoint_json)?; + + if let Some(ref mut c) = o.capture { + File::create(config_file_path)?.write_all(&c.endpoint_spec_json.as_bytes())?; + + let mut catalog = ConfiguredCatalog { + streams: Vec::new(), + tail: o.tail, + range: Range { begin: o.key_begin, end: o.key_end } + }; + + let mut stream_to_binding = stream_to_binding.lock().await; + + for (i, binding) in c.bindings.iter().enumerate() { + let resource: ResourceSpec = serde_json::from_str(&binding.resource_spec_json)?; + stream_to_binding.insert(resource.stream.clone(), i); + + let mut projections = HashMap::new(); + if let Some(ref collection) = binding.collection { + for p in &collection.projections { + projections.insert(p.field.clone(), p.ptr.clone()); + } + + let primary_key: Vec> = collection.key_ptrs.iter().map(|ptr| tokenize_jsonpointer(ptr)).collect(); + catalog.streams.push(ConfiguredStream{ + sync_mode: resource.sync_mode.clone(), + destination_sync_mode: DestinationSyncMode::Append, + cursor_field: None, + primary_key: Some(primary_key), + stream: airbyte_catalog::Stream{ + name: resource.stream, + namespace: resource.namespace, + json_schema: RawValue::from_string(collection.schema_json.clone())?, + supported_sync_modes: vec![resource.sync_mode.clone()], + default_cursor_field: None, + source_defined_cursor: None, + source_defined_primary_key: None, + }, + projections: projections, + }); + } + } + + if let Err(e) = catalog.validate() { + raise_custom_error(&format!("invalid config_catalog: {:?}", e))? + } + + serde_json::to_writer(File::create(catalog_file_path)?, &catalog)? + } + + // release the lock. + drop(stream_to_binding); + + yield Bytes::from(READY); + } + }) + } + + fn adapt_pull_response_stream( + &mut self, + stream_to_binding: Arc>>, + in_stream: InterceptorStream, + ) -> InterceptorStream { + Box::pin(try_stream! { + let mut airbyte_message_stream = Box::pin(stream_all_airbyte_messages(in_stream)); + // transaction_pending is true if the connector writes output messages and exits _without_ writing + // a final state checkpoint. + let mut transaction_pending = false; + + loop { + let message = match airbyte_message_stream.next().await { + None => break, + Some(message) => message? + }; + + let mut resp = PullResponse::default(); + if let Some(state) = message.state { + resp.checkpoint = Some(DriverCheckpoint{ + driver_checkpoint_json: state.data.get().as_bytes().to_vec(), + rfc7396_merge_patch: match state.merge { + Some(m) => m, + None => false, + }, + }); + + yield encode_message(&resp)?; + transaction_pending = false; + } else if let Some(record) = message.record { + let stream_to_binding = stream_to_binding.lock().await; + match stream_to_binding.get(&record.stream) { + None => { + raise_custom_error(&format!("connector record with unknown stream {}", record.stream))?; + } + Some(binding) => { + let arena = record.data.get().as_bytes().to_vec(); + let arena_len: u32 = arena.len() as u32; + resp.documents = Some(Documents { + binding: *binding as u32, + arena: arena, + docs_json: vec![Slice{begin: 0, end: arena_len}] + }) + } + } + drop(stream_to_binding); + yield encode_message(&resp)?; + transaction_pending = true; + } else if let Some(mlog) = message.log { + mlog.log(); + } else { + raise_custom_error("unexpected pull response.")?; + } + } + + if transaction_pending { + // We generate a synthetic commit now, and the empty checkpoint means the assumed behavior + // of the next invocation will be "full refresh". + let mut resp = PullResponse::default(); + resp.checkpoint = Some(DriverCheckpoint{ + driver_checkpoint_json: Vec::new(), + rfc7396_merge_patch: false + }); + yield encode_message(&resp)?; + } + }) + } + + fn input_file_path(&mut self, file_name: &str) -> String { + self.tmp_dir + .path() + .join(file_name) + .to_str() + .expect("failed construct config file name.") + .into() + } +} + +impl AirbyteSourceInterceptor { + pub fn adapt_command_args( + &mut self, + op: &FlowCaptureOperation, + args: Vec, + ) -> Result, Error> { + let config_file_path = self.input_file_path(CONFIG_FILE_NAME); + let catalog_file_path = self.input_file_path(CATALOG_FILE_NAME); + let state_file_path = self.input_file_path(STATE_FILE_NAME); + + let airbyte_args = match op { + FlowCaptureOperation::Spec => vec!["spec"], + FlowCaptureOperation::Discover => vec!["discover", "--config", &config_file_path], + FlowCaptureOperation::Validate => vec!["check", "--config", &config_file_path], + FlowCaptureOperation::Pull => { + vec![ + "read", + "--config", + &config_file_path, + "--catalog", + &catalog_file_path, + "--state", + &state_file_path, + ] + } + + _ => return Err(Error::UnexpectedOperation(op.to_string())), + }; + + let airbyte_args: Vec = airbyte_args.into_iter().map(Into::into).collect(); + Ok([airbyte_args, args].concat()) + } + + pub fn adapt_request_stream( + &mut self, + op: &FlowCaptureOperation, + in_stream: InterceptorStream, + ) -> Result { + let config_file_path = self.input_file_path(CONFIG_FILE_NAME); + let catalog_file_path = self.input_file_path(CATALOG_FILE_NAME); + let state_file_path = self.input_file_path(STATE_FILE_NAME); + + match op { + FlowCaptureOperation::Spec => Ok(self.adapt_spec_request_stream(in_stream)), + FlowCaptureOperation::Discover => { + Ok(self.adapt_discover_request(config_file_path, in_stream)) + } + FlowCaptureOperation::Validate => Ok(self.adapt_validate_request_stream( + config_file_path, + Arc::clone(&self.validate_request), + in_stream, + )), + FlowCaptureOperation::Pull => Ok(self.adapt_pull_request_stream( + config_file_path, + catalog_file_path, + state_file_path, + Arc::clone(&self.stream_to_binding), + in_stream, + )), + + _ => Err(Error::UnexpectedOperation(op.to_string())), + } + } + + pub fn adapt_response_stream( + &mut self, + op: &FlowCaptureOperation, + in_stream: InterceptorStream, + ) -> Result { + match op { + FlowCaptureOperation::Spec => Ok(self.adapt_spec_response_stream(in_stream)), + FlowCaptureOperation::Discover => Ok(self.adapt_discover_response_stream(in_stream)), + FlowCaptureOperation::Validate => { + Ok(self + .adapt_validate_response_stream(Arc::clone(&self.validate_request), in_stream)) + } + FlowCaptureOperation::Pull => { + Ok(self.adapt_pull_response_stream(Arc::clone(&self.stream_to_binding), in_stream)) + } + _ => Err(Error::UnexpectedOperation(op.to_string())), + } + } +} diff --git a/crates/connector_proxy/src/interceptors/default_interceptors.rs b/crates/connector_proxy/src/interceptors/default_interceptors.rs deleted file mode 100644 index 979c4dc64c..0000000000 --- a/crates/connector_proxy/src/interceptors/default_interceptors.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::apis::{FlowCaptureOperation, FlowMaterializeOperation, Interceptor}; -pub struct DefaultFlowCaptureInterceptor {} -impl Interceptor for DefaultFlowCaptureInterceptor {} - -pub struct DefaultFlowMaterializeInterceptor {} -impl Interceptor for DefaultFlowMaterializeInterceptor {} diff --git a/crates/connector_proxy/src/interceptors/mod.rs b/crates/connector_proxy/src/interceptors/mod.rs index 6323473bad..37b44c07f0 100644 --- a/crates/connector_proxy/src/interceptors/mod.rs +++ b/crates/connector_proxy/src/interceptors/mod.rs @@ -1,4 +1,3 @@ -pub mod airbyte_capture_interceptor; -pub mod default_interceptors; +pub mod airbyte_source_interceptor; pub mod network_proxy_capture_interceptor; pub mod network_proxy_materialize_interceptor; diff --git a/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs b/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs index 8792435bd8..6b98cf6d07 100644 --- a/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs +++ b/crates/connector_proxy/src/interceptors/network_proxy_capture_interceptor.rs @@ -1,6 +1,4 @@ -use crate::apis::{ - FlowCaptureOperation, Interceptor, InterceptorStream, RequestResponseConverterPair, -}; +use crate::apis::{FlowCaptureOperation, InterceptorStream}; use crate::errors::{Error, Must}; use crate::libs::network_proxy::NetworkProxy; use crate::libs::protobuf::{decode_message, encode_message}; @@ -18,7 +16,7 @@ use tokio_util::io::StreamReader; pub struct NetworkProxyCaptureInterceptor {} impl NetworkProxyCaptureInterceptor { - fn convert_discover_request(in_stream: InterceptorStream) -> InterceptorStream { + fn adapt_discover_request_stream(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); @@ -29,7 +27,7 @@ impl NetworkProxyCaptureInterceptor { }) } - fn convert_validate_request(in_stream: InterceptorStream) -> InterceptorStream { + fn adapt_validate_request_stream(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); @@ -40,7 +38,7 @@ impl NetworkProxyCaptureInterceptor { }) } - fn convert_apply_request(in_stream: InterceptorStream) -> InterceptorStream { + fn adapt_apply_request(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); @@ -54,7 +52,7 @@ impl NetworkProxyCaptureInterceptor { }) } - fn convert_pull_request(in_stream: InterceptorStream) -> InterceptorStream { + fn adapt_pull_request_stream(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); @@ -74,27 +72,29 @@ impl NetworkProxyCaptureInterceptor { } }) } +} - fn convert_request( - operation: &FlowCaptureOperation, +impl NetworkProxyCaptureInterceptor { + pub fn adapt_request_stream( + op: &FlowCaptureOperation, in_stream: InterceptorStream, ) -> Result { - Ok(match operation { - FlowCaptureOperation::Discover => Self::convert_discover_request(in_stream), - FlowCaptureOperation::Validate => Self::convert_validate_request(in_stream), + Ok(match op { + FlowCaptureOperation::Discover => Self::adapt_discover_request_stream(in_stream), + FlowCaptureOperation::Validate => Self::adapt_validate_request_stream(in_stream), FlowCaptureOperation::ApplyUpsert | FlowCaptureOperation::ApplyDelete => { - Self::convert_apply_request(in_stream) + Self::adapt_apply_request(in_stream) } - FlowCaptureOperation::Pull => Self::convert_pull_request(in_stream), + FlowCaptureOperation::Pull => Self::adapt_pull_request_stream(in_stream), _ => in_stream, }) } - fn convert_response( - operation: &FlowCaptureOperation, + pub fn adapt_response_stream( + op: &FlowCaptureOperation, in_stream: InterceptorStream, ) -> Result { - Ok(match operation { + Ok(match op { FlowCaptureOperation::Spec => Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut response = decode_message::(&mut reader).await.or_bail().expect("No expected response received."); @@ -107,12 +107,3 @@ impl NetworkProxyCaptureInterceptor { }) } } - -impl Interceptor for NetworkProxyCaptureInterceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(Self::convert_request), - Box::new(Self::convert_response), - ) - } -} diff --git a/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs b/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs index c916dc4409..d85c05ecbc 100644 --- a/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs +++ b/crates/connector_proxy/src/interceptors/network_proxy_materialize_interceptor.rs @@ -1,6 +1,4 @@ -use crate::apis::{ - FlowMaterializeOperation, Interceptor, InterceptorStream, RequestResponseConverterPair, -}; +use crate::apis::{FlowMaterializeOperation, InterceptorStream}; use crate::errors::{Error, Must}; use crate::libs::network_proxy::NetworkProxy; use crate::libs::protobuf::{decode_message, encode_message}; @@ -17,7 +15,7 @@ use tokio_util::io::StreamReader; pub struct NetworkProxyMaterializeInterceptor {} impl NetworkProxyMaterializeInterceptor { - fn convert_spec_request(in_stream: InterceptorStream) -> InterceptorStream { + fn adapt_spec_request(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); @@ -30,7 +28,7 @@ impl NetworkProxyMaterializeInterceptor { }) } - fn convert_apply_request(in_stream: InterceptorStream) -> InterceptorStream { + fn adapt_apply_request(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); @@ -44,7 +42,7 @@ impl NetworkProxyMaterializeInterceptor { }) } - fn convert_transactions_request(in_stream: InterceptorStream) -> InterceptorStream { + fn adapt_transactions_request(in_stream: InterceptorStream) -> InterceptorStream { Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut request = decode_message::(&mut reader).await.or_bail().expect("expected request is not received."); @@ -64,26 +62,28 @@ impl NetworkProxyMaterializeInterceptor { } }) } +} - fn convert_request( - operation: &FlowMaterializeOperation, +impl NetworkProxyMaterializeInterceptor { + pub fn adapt_request_stream( + op: &FlowMaterializeOperation, in_stream: InterceptorStream, ) -> Result { - Ok(match operation { - FlowMaterializeOperation::Validate => Self::convert_spec_request(in_stream), + Ok(match op { + FlowMaterializeOperation::Validate => Self::adapt_spec_request(in_stream), FlowMaterializeOperation::ApplyUpsert | FlowMaterializeOperation::ApplyDelete => { - Self::convert_apply_request(in_stream) + Self::adapt_apply_request(in_stream) } - FlowMaterializeOperation::Transactions => Self::convert_transactions_request(in_stream), + FlowMaterializeOperation::Transactions => Self::adapt_transactions_request(in_stream), _ => in_stream, }) } - fn convert_response( - operation: &FlowMaterializeOperation, + pub fn adapt_response_stream( + op: &FlowMaterializeOperation, in_stream: InterceptorStream, ) -> Result { - Ok(match operation { + Ok(match op { FlowMaterializeOperation::Spec => Box::pin(stream! { let mut reader = StreamReader::new(in_stream); let mut response = decode_message::(&mut reader).await.or_bail().expect("expected response is not received."); @@ -96,12 +96,3 @@ impl NetworkProxyMaterializeInterceptor { }) } } - -impl Interceptor for NetworkProxyMaterializeInterceptor { - fn get_converters() -> RequestResponseConverterPair { - ( - Box::new(Self::convert_request), - Box::new(Self::convert_response), - ) - } -} diff --git a/crates/connector_proxy/src/libs/airbyte_catalog.rs b/crates/connector_proxy/src/libs/airbyte_catalog.rs new file mode 100644 index 0000000000..dc54c110ca --- /dev/null +++ b/crates/connector_proxy/src/libs/airbyte_catalog.rs @@ -0,0 +1,239 @@ +use std::collections::HashMap; + +use schemars::JsonSchema; +use serde::ser::{SerializeStruct, Serializer}; +use serde::{Deserialize, Serialize}; +use serde_json::value::RawValue; +use validator::{Validate, ValidationError}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum SyncMode { + Incremental, + FullRefresh, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "snake_case")] +pub struct Stream { + pub name: String, + pub json_schema: Box, + #[validate(length(min = 1))] + pub supported_sync_modes: Vec, + pub source_defined_cursor: Option, + pub default_cursor_field: Option>, + pub source_defined_primary_key: Option>>, + pub namespace: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub enum DestinationSyncMode { + Append, + Overwrite, + AppendDedup, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "snake_case")] +#[validate(schema(function = "Self::validate_configured_stream"))] +pub struct ConfiguredStream { + #[validate] + pub stream: Stream, + pub sync_mode: SyncMode, + pub destination_sync_mode: DestinationSyncMode, + pub cursor_field: Option>, + pub primary_key: Option>>, + + #[serde(alias = "estuary.dev/projections")] + pub projections: HashMap, +} +impl ConfiguredStream { + fn validate_configured_stream(&self) -> Result<(), ValidationError> { + if self.stream.supported_sync_modes.contains(&self.sync_mode) { + Ok(()) + } else { + Err(ValidationError::new( + "sync_mode is not in the supported list.", + )) + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +pub struct Catalog { + #[serde(rename = "streams")] + #[validate] + pub streams: Vec, +} + +#[derive(Debug, Deserialize, Clone, Validate)] +#[validate(schema(function = "Self::validate_range"))] +pub struct Range { + pub begin: u32, + pub end: u32, +} + +impl Range { + fn validate_range(&self) -> Result<(), ValidationError> { + if self.begin <= self.end { + Ok(()) + } else { + Err(ValidationError::new("expected Begin <= End")) + } + } +} + +impl Serialize for Range { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("Range", 2)?; + state.serialize_field("begin", &format!("{:x}", self.begin))?; + state.serialize_field("end", &format!("{:x}", self.end))?; + state.end() + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "snake_case")] +pub struct ConfiguredCatalog { + #[serde(rename = "streams")] + #[validate(length(min = 1))] + #[validate] + pub streams: Vec, + + #[serde(alias = "estuary.dev/tail")] + pub tail: bool, + + #[serde(alias = "estuary.dev/range")] + #[validate] + pub range: Range, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum Status { + Succeeded, + Failed, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub struct ConnectionStatus { + pub status: Status, + pub message: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub struct Record { + pub stream: String, + pub data: Box, + pub emitted_at: Option, + pub namespace: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, + Fatal, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Log { + pub level: LogLevel, + pub message: String, +} +impl Log { + pub fn log(&self) { + match self.level { + LogLevel::Trace => tracing::trace!(?self.message), + LogLevel::Debug => tracing::debug!(?self.message), + LogLevel::Info => tracing::info!(?self.message), + LogLevel::Warn => tracing::warn!(?self.message), + LogLevel::Error | LogLevel::Fatal => tracing::error!(?self.message), + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub struct State { + // Data is the actual state associated with the ingestion. This must be a JSON _Object_ in order + // to comply with the airbyte specification. + pub data: Box, + + // Merge indicates that Data is an RFC 7396 JSON Merge Patch, and should + // be be reduced into the previous state accordingly. + #[serde(alias = "estuary.dev/merge")] + pub merge: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Spec { + pub documentation_url: Option, + pub changelog_url: Option, + pub connection_specification: Box, + pub supports_incremental: bool, + + // SupportedDestinationSyncModes is ignored by Flow + pub supported_destination_sync_modes: Option>, + // SupportsNormalization is not currently used or supported by Flow or estuary-developed + // connectors + pub supports_normalization: Option, + // SupportsDBT is not currently used or supported by Flow or estuary-developed connectors + #[serde(rename = "supportsDBT")] + pub supports_dbt: Option, + + // AuthSpecification is not currently used or supported by Flow or estuary-developed + // connectors, and it is deprecated in the airbyte spec. + pub auth_specification: Option>, + // AdvancedAuth is not currently used or supported by Flow or estuary-developed + // connectors. + pub advanced_auth: Option>, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum MessageType { + Record, + State, + Log, + Spec, + ConnectionStatus, + Catalog, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Validate)] +#[serde(rename_all = "camelCase")] +pub struct Message { + #[serde(rename = "type")] + pub message_type: MessageType, + + pub log: Option, + pub state: Option, + pub record: Option, + pub connection_status: Option, + pub spec: Option, + #[validate] + pub catalog: Option, +} + +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] +#[serde(rename_all = "camelCase")] +// ResourceSpec is the configuration for Airbyte source streams. +pub struct ResourceSpec { + pub stream: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option, + pub sync_mode: SyncMode, +} diff --git a/crates/connector_proxy/src/libs/command.rs b/crates/connector_proxy/src/libs/command.rs index 60844e10df..7258f7b85d 100644 --- a/crates/connector_proxy/src/libs/command.rs +++ b/crates/connector_proxy/src/libs/command.rs @@ -1,21 +1,28 @@ use crate::errors::Error; +use serde::{Deserialize, Serialize}; use std::process::{ExitStatus, Stdio}; -use tokio::process::{Child, Command}; +use tempfile::NamedTempFile; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; +use tokio::time::timeout; -// Start the proxied connector as a process. -pub fn invoke_connector(entrypoint: String, args: &[String]) -> Result { - Command::new(entrypoint) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .args(args) - .spawn() - .map_err(|e| e.into()) +pub const READY: &[u8] = "READY".as_bytes(); + +// Start the connector directly. +pub fn invoke_connector_direct(entrypoint: String, args: Vec) -> Result { + invoke_connector( + Stdio::piped(), + Stdio::piped(), + Stdio::piped(), + &entrypoint, + &args, + ) } -// Replace this function after `exit_status_error` is stable. https://github.com/rust-lang/rust/issues/84908 -pub fn check_exit_status(result: std::io::Result) -> Result<(), Error> { +// Check the connector execution exit status. +// TODO: replace this function after `exit_status_error` is stable. https://github.com/rust-lang/rust/issues/84908 +pub fn check_exit_status(message: &str, result: std::io::Result) -> Result<(), Error> { match result { Ok(status) => { if status.success() { @@ -23,15 +30,110 @@ pub fn check_exit_status(result: std::io::Result) -> Result<(), Erro } else { match status.code() { Some(code) => Err(Error::CommandExecutionError(format!( - "failed with code {}.", - code + "{} failed with code {}.", + message, code + ))), + None => Err(Error::CommandExecutionError(format!( + "{} process terminated by signal", + message ))), - None => Err(Error::CommandExecutionError( - "process terminated by signal".to_string(), - )), } } } Err(e) => Err(e.into()), } } + +// For storing the entrypoint and args to start a delayed connector. +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub struct CommandConfig { + pub entrypoint: String, + pub args: Vec, +} +// Instead of starting the connector directly, `invoke_connector_delayed` starts a bouncer process first, which will +// start the real connector after reading a "READY" string from Stdin. Two actions are involved, +// The caller of `invoke_connector_delayed` is responsible of sending "READY" to the Stdin of the returned Child process, +// before sending anything else. +pub async fn invoke_connector_delayed( + entrypoint: String, + args: Vec, +) -> Result { + tracing::info!("invoke delayed connector {}, {:?}", entrypoint, args); + + // Saves the configs to start the connector. + let command_config = CommandConfig { + entrypoint: entrypoint, + args: args, + }; + let config_file = NamedTempFile::new()?; + serde_json::to_writer(&config_file, &command_config)?; + let (_, config_file_path) = config_file.keep()?; + let config_file_path = config_file_path + .to_str() + .expect("config file path conversion failed."); + + // Prepares and starts the bouncer process. + let bouncer_process_entrypoint = std::env::current_exe()?; + let bouncer_process_entrypoint = bouncer_process_entrypoint + .to_str() + .expect("unexpected binary path"); + + invoke_connector( + Stdio::piped(), + Stdio::piped(), + Stdio::piped(), + bouncer_process_entrypoint, + &vec!["delayed-execute".to_string(), config_file_path.to_string()], + ) +} + +pub async fn read_ready(reader: &mut R) -> Result<(), Error> { + let mut ready_buf: Vec = vec![0; READY.len()]; + match timeout( + std::time::Duration::from_secs(1), + reader.read_exact(&mut ready_buf), + ) + .await + { + Ok(_) => { + if &ready_buf == READY { + Ok(()) + } else { + Err(Error::NotReady("received unexpected bytes.")) + } + } + Err(_) => Err(Error::NotReady( + "timeout: reading from delayed-connector process wrapper.", + )), + } +} + +// A more flexible API for starting the connector. +pub fn invoke_connector( + stdin: Stdio, + stdout: Stdio, + stderr: Stdio, + entrypoint: &str, + args: &[String], +) -> Result { + tracing::info!("invoke connector {}, {:?}", entrypoint, args); + + Command::new(entrypoint) + .stdin(stdin) + .stdout(stdout) + .stderr(stderr) + .args(args) + .spawn() + .map_err(|e| e.into()) +} + +pub fn parse_child( + mut child: Child, +) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Error> { + let stdout = child.stdout.take().ok_or(Error::MissingIOPipe)?; + let stdin = child.stdin.take().ok_or(Error::MissingIOPipe)?; + let stderr = child.stderr.take().ok_or(Error::MissingIOPipe)?; + + Ok((child, stdin, stdout, stderr)) +} diff --git a/crates/connector_proxy/src/libs/image_config.rs b/crates/connector_proxy/src/libs/image_config.rs deleted file mode 100644 index 6d6ea26de9..0000000000 --- a/crates/connector_proxy/src/libs/image_config.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::errors::{Error, Must}; -use clap::ArgEnum; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fs::File; -use std::io::BufReader; - -// The key of the docker image label that indicates the connector protocol. -const CONNECTOR_PROTOCOL_KEY: &str = "CONNECTOR_PROTOCOL"; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "PascalCase")] -pub struct ImageConfig { - pub entrypoint: Vec, - pub labels: Option>, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "PascalCase")] -pub struct ImageInspect { - pub config: ImageConfig, -} - -impl ImageConfig { - pub fn parse_from_json_file(path: String) -> Result { - let reader = BufReader::new(File::open(path)?); - let image_inspects: Vec = serde_json::from_reader(reader)?; - match image_inspects.len() { - 1 => Ok(image_inspects[0].config.clone()), - _ => Err(Error::InvalidImageInspectFile), - } - } - - pub fn get_entrypoint(&self, default: Vec) -> Vec { - match self.entrypoint.len() { - 0 => { - tracing::warn!( - "No entry point is specified in the image, using default: {:?}", - default - ); - default - } - _ => self.entrypoint.clone(), - } - } - - pub fn get_connector_protocol(&self, default: T) -> T { - if let Some(ref labels) = self.labels { - if let Some(value) = labels.get(CONNECTOR_PROTOCOL_KEY) { - return T::from_str(&value, false).or_bail(); - } - } - tracing::warn!( - "No connector protocol is specified in the image, using default: {:?}", - default - ); - default - } -} diff --git a/crates/connector_proxy/src/libs/image_inspect.rs b/crates/connector_proxy/src/libs/image_inspect.rs new file mode 100644 index 0000000000..aa2d9642ee --- /dev/null +++ b/crates/connector_proxy/src/libs/image_inspect.rs @@ -0,0 +1,88 @@ +use crate::apis::FlowRuntimeProtocol; +use crate::errors::{Error, Must}; +use clap::ArgEnum; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs::File; +use std::io::BufReader; + +// The key of the docker image label that indicates the connector protocol. +const FLOW_RUNTIME_PROTOCOL_KEY: &str = "FLOW_RUNTIME_PROTOCOL"; +const CONNECTOR_PROTOCOL_KEY: &str = "CONNECTOR_PROTOCOL"; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ImageConfig { + pub entrypoint: Vec, + pub labels: Option>, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ImageInspect { + pub config: ImageConfig, + pub repo_tags: Option>, +} + +impl ImageInspect { + pub fn parse_from_json_file(path: Option) -> Result { + if path.is_none() {} + match path { + None => { + return Err(Error::MissingImageInspectFile); + } + Some(p) => { + let reader = BufReader::new(File::open(p)?); + let image_inspects: Vec = serde_json::from_reader(reader)?; + match image_inspects.len() { + 1 => Ok(image_inspects[0].clone()), + _ => Err(Error::InvalidImageInspectFile), + } + } + } + } + + pub fn get_entrypoint(&self, default: Vec) -> Vec { + match self.config.entrypoint.len() { + 0 => { + tracing::warn!( + "No entry point is specified in the image, using default: {:?}", + default + ); + default + } + _ => self.config.entrypoint.clone(), + } + } + + pub fn infer_runtime_protocol(&self) -> FlowRuntimeProtocol { + if let Some(ref labels) = self.config.labels { + if let Some(value) = labels.get(FLOW_RUNTIME_PROTOCOL_KEY) { + return FlowRuntimeProtocol::from_str(&value, false).or_bail(); + } + } + + if let Some(repo_tags) = &self.repo_tags { + for tag in repo_tags { + if tag.starts_with("ghcr.io/estuary/materialize-") { + return FlowRuntimeProtocol::Materialize; + } + } + } + + return FlowRuntimeProtocol::Capture; + } + + pub fn get_connector_protocol(&self, default: T) -> T { + if let Some(ref labels) = self.config.labels { + if let Some(value) = labels.get(CONNECTOR_PROTOCOL_KEY) { + return T::from_str(&value, false).or_bail(); + } + } + tracing::warn!( + "No connector protocol is specified in the image, using default: {:?}", + default + ); + default + } +} diff --git a/crates/connector_proxy/src/libs/json.rs b/crates/connector_proxy/src/libs/json.rs index e09064459b..dbe8f42f6d 100644 --- a/crates/connector_proxy/src/libs/json.rs +++ b/crates/connector_proxy/src/libs/json.rs @@ -1,3 +1,4 @@ +use doc::ptr::{Pointer, Token}; use schemars::{schema::RootSchema, JsonSchema}; use serde_json::Value; @@ -19,3 +20,30 @@ pub fn remove_subobject(mut v: Value, key: &str) -> (Option, Value) { (sub_object, v) } + +pub fn tokenize_jsonpointer(ptr: &str) -> Vec { + Pointer::from_str(&ptr) + .iter() + .map(|t| match t { + // Keep the index and next index for now. Could adjust based on usecases. + Token::Index(ind) => ind.to_string(), + Token::Property(prop) => prop.to_string(), + Token::NextIndex => "-".to_string(), + }) + .collect() +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_tokenize_jsonpointer() { + let expected: Vec = vec!["p1", "p2", "56", "p3", "-"] + .iter() + .map(|s| s.to_string()) + .collect(); + + assert!(expected == tokenize_jsonpointer("/p1/p2/56/p3/-")); + } +} diff --git a/crates/connector_proxy/src/libs/mod.rs b/crates/connector_proxy/src/libs/mod.rs index 80519d2488..c7ff1dedb9 100644 --- a/crates/connector_proxy/src/libs/mod.rs +++ b/crates/connector_proxy/src/libs/mod.rs @@ -1,5 +1,6 @@ +pub mod airbyte_catalog; pub mod command; -pub mod image_config; +pub mod image_inspect; pub mod json; pub mod network_proxy; pub mod protobuf; diff --git a/crates/connector_proxy/src/libs/protobuf.rs b/crates/connector_proxy/src/libs/protobuf.rs index e93c0cfcfd..1f246d3aaf 100644 --- a/crates/connector_proxy/src/libs/protobuf.rs +++ b/crates/connector_proxy/src/libs/protobuf.rs @@ -1,5 +1,3 @@ -use crate::errors::Error; - use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use prost::Message; @@ -10,15 +8,14 @@ pub async fn decode_message< R: AsyncRead + std::marker::Unpin, >( reader: &mut R, -) -> Result, Error> { - // Deserialize the proto message. +) -> Result, std::io::Error> { let mut length_buf: [u8; 4] = [0; 4]; match reader.read_exact(&mut length_buf).await { Err(e) => match e.kind() { // By the current communication protocol, UnexpectedEof indicates the ending of the stream. std::io::ErrorKind::UnexpectedEof => return Ok(None), - _ => return Err(e.into()), + _ => return Err(e), }, Ok(_) => {} } diff --git a/crates/connector_proxy/src/libs/stream.rs b/crates/connector_proxy/src/libs/stream.rs index c1e31748d8..33d1e441e4 100644 --- a/crates/connector_proxy/src/libs/stream.rs +++ b/crates/connector_proxy/src/libs/stream.rs @@ -1,12 +1,19 @@ -use async_stream::stream; -use bytes::{Bytes, BytesMut}; +use crate::apis::InterceptorStream; +use crate::libs::airbyte_catalog::Message; + +use crate::errors::raise_custom_error; +use async_stream::try_stream; +use bytes::{Buf, Bytes, BytesMut}; use futures_core::Stream; +use futures_util::StreamExt; +use serde_json::{Deserializer, Value}; use tokio::io::{AsyncRead, AsyncReadExt}; +use validator::Validate; pub fn stream_all_bytes( mut reader: R, ) -> impl Stream> { - stream! { + try_stream! { loop { // consistent with the default capacity of ReaderStream. // https://github.com/tokio-rs/tokio/blob/master/tokio-util/src/io/reader_stream.rs#L8 @@ -14,12 +21,69 @@ pub fn stream_all_bytes( match reader.read_buf(&mut buf).await { Ok(0) => break, Ok(_) => { - yield Ok(buf.into()); + yield buf.into(); } Err(e) => { - panic!("error during streaming {:?}.", e); + raise_custom_error(&format!("error during streaming {:?}.", e))?; } } } } } + +pub fn stream_all_airbyte_messages( + mut in_stream: InterceptorStream, +) -> impl Stream> { + try_stream! { + let mut buf = BytesMut::new(); + + while let Some(bytes) = in_stream.next().await { + match bytes { + Ok(b) => { + buf.extend_from_slice(b.chunk()); + } + Err(e) => { + raise_custom_error(&format!("error in reading next in_stream: {:?}", e))?; + } + } + + let chunk = buf.chunk(); + let deserializer = Deserializer::from_slice(&chunk); + + // Deserialize to Value first, instead of Message, to avoid missing 'is_eof' signals in error. + let mut value_stream = deserializer.into_iter::(); + while let Some(value) = value_stream.next() { + match value { + Ok(v) => { + let message: Message = serde_json::from_value(v).unwrap(); + if let Err(e) = message.validate() { + raise_custom_error(&format!( + "error in validating message: {:?}, {:?}", + e, std::str::from_utf8(&chunk[value_stream.byte_offset()..])))?; + } + tracing::debug!("read message:: {:?}", &message); + yield message; + } + Err(e) => { + if e.is_eof() { + break; + } + + raise_custom_error(&format!( + "error in decoding message: {:?}, {:?}", + e, std::str::from_utf8(&chunk[value_stream.byte_offset()..])))?; + } + } + } + + let byte_offset = value_stream.byte_offset(); + drop(buf.split_to(byte_offset)); + } + + if buf.len() > 0 { + raise_custom_error("unconsumed content in stream found.")?; + } + + tracing::info!("done reading all in_stream."); + } +} diff --git a/crates/connector_proxy/src/main.rs b/crates/connector_proxy/src/main.rs index 7defe1507c..d157168b2e 100644 --- a/crates/connector_proxy/src/main.rs +++ b/crates/connector_proxy/src/main.rs @@ -3,24 +3,28 @@ pub mod connector_runner; pub mod errors; pub mod interceptors; pub mod libs; +use std::fs::File; +use std::io::BufReader; use clap::{ArgEnum, Parser, Subcommand}; -use tokio::signal::unix::{signal, SignalKind}; +use tokio::{ + io::AsyncReadExt, + signal::unix::{signal, SignalKind}, +}; -use apis::{compose, FlowCaptureOperation, FlowMaterializeOperation, Interceptor}; +use apis::{FlowCaptureOperation, FlowMaterializeOperation, FlowRuntimeProtocol}; use flow_cli_common::{init_logging, LogArgs}; -use connector_runner::run_connector; +use connector_runner::{ + run_airbyte_source_connector, run_flow_capture_connector, run_flow_materialize_connector, +}; use errors::Error; -use libs::image_config::ImageConfig; - -use interceptors::{ - airbyte_capture_interceptor::AirbyteCaptureInterceptor, - default_interceptors::{DefaultFlowCaptureInterceptor, DefaultFlowMaterializeInterceptor}, - network_proxy_capture_interceptor::NetworkProxyCaptureInterceptor, - network_proxy_materialize_interceptor::NetworkProxyMaterializeInterceptor, +use libs::{ + command::{check_exit_status, invoke_connector, read_ready, CommandConfig}, + image_inspect::ImageInspect, }; +use std::process::Stdio; #[derive(Debug, ArgEnum, Clone)] pub enum CaptureConnectorProtocol { @@ -47,12 +51,19 @@ struct ProxyFlowMaterialize { operation: FlowMaterializeOperation, } +#[derive(Debug, clap::Parser)] +struct DelayedExecutionConfig { + config_file_path: String, +} + #[derive(Debug, Subcommand)] enum ProxyCommand { /// proxies the Flow runtime Capture Protocol to the connector. ProxyFlowCapture(ProxyFlowCapture), /// proxies the Flow runtime Materialize Protocol to the connector. ProxyFlowMaterialize(ProxyFlowMaterialize), + /// internal command used by the connector proxy itself to delay execution until signaled. + DelayedExecute(DelayedExecutionConfig), } #[derive(Parser, Debug)] @@ -61,7 +72,7 @@ pub struct Args { /// The path (in the container) to the JSON file that contains the inspection results from the connector image. /// Normally produced via command "docker inspect ". #[clap(short, long)] - image_inspect_json_path: String, + image_inspect_json_path: Option, /// The type of proxy service to provide. #[clap(subcommand)] @@ -91,18 +102,15 @@ async fn main() -> std::io::Result<()> { } = Args::parse(); init_logging(&log_args); - // respond to os signals. - tokio::task::spawn(async move { signal_handler().await }); - let result = async_main(image_inspect_json_path, proxy_command).await; if let Err(err) = result.as_ref() { - tracing::error!(error = ?err, "connector proxy execution failed."); + tracing::error!("connector proxy execution failed. {:?}", err); std::process::exit(1); } Ok(()) } -async fn signal_handler() { +async fn sigterm_handler() { let mut signal_stream = signal(SignalKind::terminate()).expect("failed creating signal."); signal_stream @@ -114,54 +122,94 @@ async fn signal_handler() { } async fn async_main( - image_inspect_json_path: String, + image_inspect_json_path: Option, proxy_command: ProxyCommand, ) -> Result<(), Error> { - let image_config = ImageConfig::parse_from_json_file(image_inspect_json_path)?; - - // TODO(jixiang): add a check to make sure the proxy_command passed in from commandline is consistent with the protocol inferred from image. match proxy_command { - ProxyCommand::ProxyFlowCapture(c) => proxy_flow_capture(c, image_config).await, - ProxyCommand::ProxyFlowMaterialize(m) => proxy_flow_materialize(m, image_config).await, + ProxyCommand::ProxyFlowCapture(c) => proxy_flow_capture(c, image_inspect_json_path).await, + ProxyCommand::ProxyFlowMaterialize(m) => { + proxy_flow_materialize(m, image_inspect_json_path).await + } + ProxyCommand::DelayedExecute(ba) => delayed_execute(ba.config_file_path).await, } } -async fn proxy_flow_capture(c: ProxyFlowCapture, image_config: ImageConfig) -> Result<(), Error> { - let mut converter_pair = match image_config +async fn proxy_flow_capture( + c: ProxyFlowCapture, + image_inspect_json_path: Option, +) -> Result<(), Error> { + let image_inspect = ImageInspect::parse_from_json_file(image_inspect_json_path)?; + if image_inspect.infer_runtime_protocol() != FlowRuntimeProtocol::Capture { + return Err(Error::MismatchingRuntimeProtocol); + } + + let entrypoint = image_inspect.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]); + + match image_inspect .get_connector_protocol::(CaptureConnectorProtocol::Airbyte) { - CaptureConnectorProtocol::FlowCapture => DefaultFlowCaptureInterceptor::get_converters(), - CaptureConnectorProtocol::Airbyte => AirbyteCaptureInterceptor::get_converters(), - }; - - converter_pair = compose( - converter_pair, - NetworkProxyCaptureInterceptor::get_converters(), - ); - - run_connector::( - c.operation, - image_config.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]), - converter_pair, - ) - .await + CaptureConnectorProtocol::FlowCapture => { + run_flow_capture_connector(&c.operation, entrypoint).await + } + CaptureConnectorProtocol::Airbyte => { + run_airbyte_source_connector(&c.operation, entrypoint).await + } + } } async fn proxy_flow_materialize( m: ProxyFlowMaterialize, - image_config: ImageConfig, + image_inspect_json_path: Option, ) -> Result<(), Error> { - // There is only one type of connector protocol for flow materialize. - let mut converter_pair = DefaultFlowMaterializeInterceptor::get_converters(); - converter_pair = compose( - converter_pair, - NetworkProxyMaterializeInterceptor::get_converters(), - ); - - run_connector::( - m.operation, - image_config.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]), - converter_pair, + // Respond to OS sigterm signal. + tokio::task::spawn(async move { sigterm_handler().await }); + + let image_inspect = ImageInspect::parse_from_json_file(image_inspect_json_path)?; + if image_inspect.infer_runtime_protocol() != FlowRuntimeProtocol::Materialize { + return Err(Error::MismatchingRuntimeProtocol); + } + + run_flow_materialize_connector( + &m.operation, + image_inspect.get_entrypoint(vec![DEFAULT_CONNECTOR_ENTRYPOINT.to_string()]), ) .await } + +async fn delayed_execute(command_config_path: String) -> Result<(), Error> { + // Wait for the "READY" signal from the parent process before starting the connector. + read_ready(&mut tokio::io::stdin()).await?; + + tracing::info!("delayed process execution continue..."); + + let reader = BufReader::new(File::open(command_config_path)?); + let command_config: CommandConfig = serde_json::from_reader(reader)?; + + let mut child = invoke_connector( + Stdio::inherit(), + Stdio::inherit(), + Stdio::piped(), + &command_config.entrypoint, + &command_config.args, + )?; + + match check_exit_status("delayed process", child.wait().await) { + Err(e) => { + let mut buf = Vec::new(); + child + .stderr + .take() + .ok_or(Error::MissingIOPipe)? + .read_to_end(&mut buf) + .await?; + + tracing::error!( + "connector failed. command_config: {:?}. stderr from connector: {}", + &command_config, + std::str::from_utf8(&buf).expect("error when decoding stderr") + ); + Err(e) + } + _ => Ok(()), + } +} diff --git a/crates/control/tests/it/snapshots/it__connector_images__spec_test.snap b/crates/control/tests/it/snapshots/it__connector_images__spec_test.snap index 229ef3e727..821c8eb737 100644 --- a/crates/control/tests/it/snapshots/it__connector_images__spec_test.snap +++ b/crates/control/tests/it/snapshots/it__connector_images__spec_test.snap @@ -1,8 +1,6 @@ --- source: crates/control/tests/it/connector_images.rs -assertion_line: 211 -expression: redactor.response_json(response).await.unwrap() - +expression: redactor.response_json(&mut response).await.unwrap() --- { "data": { @@ -16,6 +14,62 @@ expression: redactor.response_json(response).await.unwrap() "description": "Number of greeting documents to produce when running in non-tailing mode", "title": "Number of Greetings", "type": "integer" + }, + "networkProxy": { + "oneOf": [ + { + "additionalProperties": false, + "properties": { + "sshForwarding": { + "additionalProperties": false, + "properties": { + "forwardHost": { + "description": "Host name to connect from the remote SSH server to the remote destination (e.g. DB) via internal network.", + "type": "string" + }, + "forwardPort": { + "description": "Port of the remote destination.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "localPort": { + "description": "Local port to start the SSH tunnel.", + "format": "uint16", + "minimum": 0.0, + "type": "integer" + }, + "privateKey": { + "description": "Private key to connect to the remote SSH server.", + "type": "string" + }, + "sshEndpoint": { + "description": "Endpoint of the remote SSH server that supports tunneling, in the form of ssh://hostname[:port]", + "type": "string" + }, + "user": { + "description": "User name to connect to the remote SSH server.", + "type": "string" + } + }, + "required": [ + "forwardHost", + "forwardPort", + "localPort", + "privateKey", + "sshEndpoint", + "user" + ], + "type": "object" + } + }, + "required": [ + "sshForwarding" + ], + "type": "object" + } + ], + "title": "NetworkProxyConfig" } }, "required": [ @@ -25,16 +79,22 @@ expression: redactor.response_json(response).await.unwrap() "type": "object" }, "resourceSpecSchema": { - "$schema": "http://json-schema.org/draft-04/schema#", - "additionalProperties": false, + "$schema": "http://json-schema.org/draft-07/schema#", "properties": { "namespace": { - "type": "string" + "type": [ + "string", + "null" + ] }, "stream": { "type": "string" }, "syncMode": { + "enum": [ + "incremental", + "full_refresh" + ], "type": "string" } }, @@ -42,6 +102,7 @@ expression: redactor.response_json(response).await.unwrap() "stream", "syncMode" ], + "title": "ResourceSpec", "type": "object" }, "type": "capture" diff --git a/go/capture/driver/airbyte/driver.go b/go/capture/driver/airbyte/driver.go index cd7b005480..dbac669b1d 100644 --- a/go/capture/driver/airbyte/driver.go +++ b/go/capture/driver/airbyte/driver.go @@ -2,18 +2,19 @@ package airbyte import ( "context" + "encoding/binary" "encoding/json" "fmt" "io" "strings" - "github.com/alecthomas/jsonschema" "github.com/estuary/flow/go/connector" "github.com/estuary/flow/go/flow/ops" "github.com/estuary/flow/go/protocols/airbyte" pc "github.com/estuary/flow/go/protocols/capture" pf "github.com/estuary/flow/go/protocols/flow" - "github.com/go-openapi/jsonpointer" + protoio "github.com/gogo/protobuf/io" + "github.com/gogo/protobuf/proto" "github.com/sirupsen/logrus" ) @@ -92,50 +93,39 @@ func (d driver) Spec(ctx context.Context, req *pc.SpecRequest) (*pc.SpecResponse "operation": "spec", }) - var spec *airbyte.Spec - var err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, + var decrypted, err = connector.DecryptConfig(ctx, source.Config) + if err != nil { + return nil, err + } + defer connector.ZeroBytes(decrypted) // connector.Run will also ZeroBytes(). + req.EndpointSpecJson = decrypted + + var resp *pc.SpecResponse + err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, []string{"spec"}, // No configuration is passed to the connector. nil, // No stdin is sent to the connector. - func(w io.Writer) error { return nil }, + func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + return protoio.NewUint32DelimitedWriter(w, binary.LittleEndian). + WriteMsg(req) + }, // Expect to decode Airbyte messages, and a ConnectorSpecification specifically. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.Spec != nil { - spec = rec.Spec - } else { - return fmt.Errorf("unexpected connector message: %v", rec) + connector.NewProtoOutput( + func() proto.Message { return new(pc.SpecResponse) }, + func(m proto.Message) error { + if resp != nil { + return fmt.Errorf("read more than one SpecResponse") } + resp = m.(*pc.SpecResponse) return nil }, - onStdoutDecodeError(logger), ), logger, ) + return resp, err - // Expect connector spit out a successful ConnectorSpecification. - if err == nil && spec == nil { - err = fmt.Errorf("connector didn't produce a Specification") - } - if err != nil { - return nil, err - } - - var reflector = jsonschema.Reflector{ExpandedStruct: true} - resourceSchema, err := reflector.Reflect(new(ResourceSpec)).MarshalJSON() - if err != nil { - return nil, fmt.Errorf("generating resource schema: %w", err) - } - - return &pc.SpecResponse{ - EndpointSpecSchemaJson: spec.ConnectionSpecification, - ResourceSpecSchemaJson: json.RawMessage(resourceSchema), - DocumentationUrl: spec.DocumentationURL, - }, nil } // Discover delegates to the `discover` command of the identified Airbyte image. @@ -156,80 +146,39 @@ func (d driver) Discover(ctx context.Context, req *pc.DiscoverRequest) (*pc.Disc return nil, err } defer connector.ZeroBytes(decrypted) // connector.Run will also ZeroBytes(). + req.EndpointSpecJson = decrypted - var catalog *airbyte.Catalog + var resp *pc.DiscoverResponse err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, []string{ "discover", - "--config", - "/tmp/config.json", }, - // Write configuration JSON to connector input. - map[string]json.RawMessage{"config.json": decrypted}, - // No stdin is sent to the connector. - func(w io.Writer) error { return nil }, - // Expect to decode Airbyte messages, and a ConnectionStatus specifically. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.Catalog != nil { - catalog = rec.Catalog - } else { - return fmt.Errorf("unexpected connector message: %v", rec) + nil, + func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + return protoio.NewUint32DelimitedWriter(w, binary.LittleEndian). + WriteMsg(req) + }, + connector.NewProtoOutput( + func() proto.Message { return new(pc.DiscoverResponse) }, + func(m proto.Message) error { + if resp != nil { + return fmt.Errorf("read more than one DiscoverResponse") } + resp = m.(*pc.DiscoverResponse) return nil }, - onStdoutDecodeError(logger), ), logger, ) // Expect connector spit out a successful ConnectionStatus. - if err == nil && catalog == nil { + if err == nil && resp == nil { err = fmt.Errorf("connector didn't produce a Catalog") - } - if err != nil { + } else if err != nil { return nil, err } - var resp = new(pc.DiscoverResponse) - for _, stream := range catalog.Streams { - // Use incremental mode if available. - var mode = airbyte.SyncModeFullRefresh - for _, m := range stream.SupportedSyncModes { - if m == airbyte.SyncModeIncremental { - mode = m - } - } - - var resourceSpec, err = json.Marshal(ResourceSpec{ - Stream: stream.Name, - Namespace: stream.Namespace, - SyncMode: mode, - }) - if err != nil { - return nil, fmt.Errorf("encoding resource spec: %w", err) - } - - // Encode array of hierarchical properties as a JSON-pointer. - var keyPtrs []string - for _, tokens := range stream.SourceDefinedPrimaryKey { - for i := range tokens { - tokens[i] = jsonpointer.Escape(tokens[i]) - } - keyPtrs = append(keyPtrs, "/"+strings.Join(tokens, "/")) - } - - resp.Bindings = append(resp.Bindings, &pc.DiscoverResponse_Binding{ - RecommendedName: stream.Name, - ResourceSpecJson: json.RawMessage(resourceSpec), - DocumentSchemaJson: stream.JSONSchema, - KeyPtrs: keyPtrs, - }) - } - return resp, nil } @@ -252,57 +201,39 @@ func (d driver) Validate(ctx context.Context, req *pc.ValidateRequest) (*pc.Vali ops.LogSourceField: source.Image, "operation": "validate", }) + req.EndpointSpecJson = decrypted - var status *airbyte.ConnectionStatus + var resp *pc.ValidateResponse err = connector.Run(ctx, source.Image, connector.Capture, d.networkName, []string{ - "check", - "--config", - "/tmp/config.json", + "validate", }, - // Write configuration JSON to connector input. - map[string]json.RawMessage{"config.json": decrypted}, - // No stdin is sent to the connector. - func(w io.Writer) error { return nil }, - // Expect to decode Airbyte messages, and a ConnectionStatus specifically. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.ConnectionStatus != nil { - status = rec.ConnectionStatus - } else { - return fmt.Errorf("unexpected connector message: %v", rec) + nil, + func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + return protoio.NewUint32DelimitedWriter(w, binary.LittleEndian). + WriteMsg(req) + }, + connector.NewProtoOutput( + func() proto.Message { return new(pc.ValidateResponse) }, + func(m proto.Message) error { + if resp != nil { + return fmt.Errorf("read more than one ValidateResponse") } + resp = m.(*pc.ValidateResponse) return nil }, - onStdoutDecodeError(logger), ), logger, ) - // Expect connector spit out a successful ConnectionStatus. - if err == nil && status == nil { - err = fmt.Errorf("connector didn't produce a ConnectionStatus") - } else if err == nil && status.Status != airbyte.StatusSucceeded { - err = fmt.Errorf("%s: %s", status.Status, status.Message) + if err == nil && resp == nil { + err = fmt.Errorf("connector didn't produce a response") } if err != nil { return nil, err } - // Parse stream bindings and send back their resource paths. - var resp = new(pc.ValidateResponse) - for _, binding := range req.Bindings { - var stream = new(ResourceSpec) - if err := pf.UnmarshalStrict(binding.ResourceSpecJson, stream); err != nil { - return nil, fmt.Errorf("parsing stream configuration: %w", err) - } - resp.Bindings = append(resp.Bindings, &pc.ValidateResponse_Binding{ - ResourcePath: []string{stream.Stream}, - }) - } return resp, nil } @@ -332,102 +263,36 @@ func (d driver) Pull(stream pc.Driver_PullServer) error { return fmt.Errorf("parsing connector configuration: %w", err) } - var open = req.Open - var streamToBinding = make(map[string]int) var logger = ops.NewLoggerWithFields(d.logger, logrus.Fields{ ops.LogSourceField: source.Image, "operation": "read", }) - // Build configured Airbyte catalog. - var catalog = airbyte.ConfiguredCatalog{ - Streams: nil, - Tail: open.Tail, - Range: airbyte.Range{ - Begin: open.KeyBegin, - End: open.KeyEnd, - }, - } - for i, binding := range open.Capture.Bindings { - var resource = new(ResourceSpec) - if err := pf.UnmarshalStrict(binding.ResourceSpecJson, resource); err != nil { - return fmt.Errorf("parsing stream configuration: %w", err) - } - - var projections = make(map[string]string) - for _, p := range binding.Collection.Projections { - projections[p.Field] = p.Ptr - } - - var primaryKey = make([][]string, 0, len(binding.Collection.KeyPtrs)) - for _, key := range binding.Collection.KeyPtrs { - if ptr, err := jsonpointer.New(key); err != nil { - return fmt.Errorf("parsing json pointer: %w", err) - } else { - primaryKey = append(primaryKey, ptr.DecodedTokens()) - } - } - - catalog.Streams = append(catalog.Streams, - airbyte.ConfiguredStream{ - SyncMode: resource.SyncMode, - DestinationSyncMode: airbyte.DestinationSyncModeAppend, - PrimaryKey: primaryKey, - Stream: airbyte.Stream{ - Name: resource.Stream, - Namespace: resource.Namespace, - JSONSchema: binding.Collection.SchemaJson, - SupportedSyncModes: []airbyte.SyncMode{resource.SyncMode}, - }, - Projections: projections, - }) - streamToBinding[resource.Stream] = i - } - - catalogJSON, err := json.Marshal(&catalog) - if err != nil { - return fmt.Errorf("encoding catalog: %w", err) - } - logger.Log(logrus.DebugLevel, logrus.Fields{ - "catalog": &catalog, - }, "using configured catalog") - decrypted, err := connector.DecryptConfig(stream.Context(), source.Config) if err != nil { return err } defer connector.ZeroBytes(decrypted) // RunConnector will also ZeroBytes(). - var invokeArgs = []string{ - "read", - "--config", - "/tmp/config.json", - "--catalog", - "/tmp/catalog.json", - } - var invokeFiles = map[string]json.RawMessage{ - "config.json": decrypted, - "catalog.json": catalogJSON, - } - - if len(open.DriverCheckpointJson) != 0 { - invokeArgs = append(invokeArgs, "--state", "/tmp/state.json") - // Copy because RunConnector will ZeroBytes() once sent and, - // as noted in driver{}, we don't own this memory. - invokeFiles["state.json"] = append([]byte(nil), open.DriverCheckpointJson...) - } + req.Open.Capture.EndpointSpecJson = decrypted if err := stream.Send(&pc.PullResponse{Opened: &pc.PullResponse_Opened{}}); err != nil { return fmt.Errorf("sending Opened: %w", err) } - var resp *pc.PullResponse - // Invoke the connector for reading. - if err := connector.Run(stream.Context(), source.Image, connector.Capture, d.networkName, - invokeArgs, - invokeFiles, + return connector.Run(stream.Context(), source.Image, connector.Capture, d.networkName, + []string{"pull"}, + nil, func(w io.Writer) error { + defer connector.ZeroBytes(decrypted) + var enc = protoio.NewUint32DelimitedWriter(w, binary.LittleEndian) + var err = enc.WriteMsg(req) + + if err != nil { + return fmt.Errorf("proxying Open: %w", err) + } + for { var req, err = stream.Recv() if err == io.EOF { @@ -443,49 +308,14 @@ func (d driver) Pull(stream pc.Driver_PullServer) error { } } }, - // Expect to decode Airbyte messages. - connector.NewJSONOutput( - func() interface{} { return new(airbyte.Message) }, - func(i interface{}) error { - if rec := i.(*airbyte.Message); rec.Log != nil { - logger.Log(airbyteToLogrusLevel(rec.Log.Level), nil, rec.Log.Message) - } else if rec.State != nil { - return pc.WritePullCheckpoint(stream, &resp, - &pf.DriverCheckpoint{ - DriverCheckpointJson: rec.State.Data, - Rfc7396MergePatch: rec.State.Merge, - }) - } else if rec.Record != nil { - if b, ok := streamToBinding[rec.Record.Stream]; ok { - return pc.StagePullDocuments(stream, &resp, b, rec.Record.Data) - } - return fmt.Errorf("connector record with unknown stream %q", rec.Record.Stream) - } else { - return fmt.Errorf("unexpected connector message: %v", rec) - } - return nil + connector.NewProtoOutput( + func() proto.Message { return new(pc.PullResponse) }, + func(m proto.Message) error { + return stream.Send(m.(*pc.PullResponse)) }, - onStdoutDecodeError(logger), ), logger, - ); err != nil { - return err - } - - if resp == nil { - return nil // Connector flushed prior to exiting. All done. - } - - // Write a final commit, followed by EOF. - // This happens only when a connector writes output and exits _without_ - // writing a final state checkpoint. We generate a synthetic commit now, - // and the nil checkpoint means the assumed behavior of the next invocation - // will be "full refresh". - return pc.WritePullCheckpoint(stream, &resp, - &pf.DriverCheckpoint{ - DriverCheckpointJson: nil, - Rfc7396MergePatch: false, - }) + ) } // onStdoutDecodeError returns a function that is invoked whenever there's an error parsing a line diff --git a/go/connector/run.go b/go/connector/run.go index a783176207..6b5926a1a2 100644 --- a/go/connector/run.go +++ b/go/connector/run.go @@ -105,41 +105,39 @@ func Run( } defer os.RemoveAll(tempdir) - if protocol == Materialize { - if connectorProxyPath, err := prepareFlowConnectorProxyBinary(tempdir); err != nil { - return fmt.Errorf("prepare flow connector proxy binary: %w", err) - } else { - imageArgs = append(imageArgs, - "--entrypoint", connectorProxyPath, - "--mount", fmt.Sprintf("type=bind,source=%[1]s,target=%[1]s", connectorProxyPath), - ) - } + if connectorProxyPath, err := prepareFlowConnectorProxyBinary(tempdir); err != nil { + return fmt.Errorf("prepare flow connector proxy binary: %w", err) + } else { + imageArgs = append(imageArgs, + "--entrypoint", connectorProxyPath, + "--mount", fmt.Sprintf("type=bind,source=%[1]s,target=%[1]s", connectorProxyPath), + ) + } - if err := pullRemoteImage(ctx, image, logger); err != nil { - // This might be a local image. Log an error and keep going. - // If the image does not exist locally, the inspectImage will return an error and terminate the workflow. - logger.Log(logrus.InfoLevel, logrus.Fields{ - "error": err, - }, "pull remote image does not succeed.") - } + if err := pullRemoteImage(ctx, image, logger); err != nil { + // This might be a local image. Log an error and keep going. + // If the image does not exist locally, the inspectImage will return an error and terminate the workflow. + logger.Log(logrus.InfoLevel, logrus.Fields{ + "error": err, + }, "pull remote image does not succeed.") + } - if inspectOutput, err := inspectImage(ctx, image); err != nil { - return fmt.Errorf("inspect image: %w", err) - } else { - if jsonFiles == nil { - jsonFiles = map[string]json.RawMessage{imageInspectJsonFileName: inspectOutput} + if inspectOutput, err := inspectImage(ctx, image); err != nil { + return fmt.Errorf("inspect image: %w", err) + } else { + if jsonFiles == nil { + jsonFiles = map[string]json.RawMessage{imageInspectJsonFileName: inspectOutput} - } else { - jsonFiles[imageInspectJsonFileName] = inspectOutput - } + } else { + jsonFiles[imageInspectJsonFileName] = inspectOutput } - - args = append([]string{ - fmt.Sprintf("--image-inspect-json-path=/tmp/%s", imageInspectJsonFileName), - protocol.proxyCommand(), - }, args...) } + args = append([]string{ + fmt.Sprintf("--image-inspect-json-path=/tmp/%s", imageInspectJsonFileName), + protocol.proxyCommand(), + }, args...) + for name, data := range jsonFiles { var hostPath = filepath.Join(tempdir, name) var containerPath = filepath.Join("/tmp", name)