Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connector proxy - source connectors #409

Closed
wants to merge 33 commits into from
Closed

Conversation

ghost
Copy link

@ghost ghost commented Mar 10, 2022

Description:

This is the second half of the connector proxy work, which enables it to work with capture connectors that currently speaking in airbyte source protocols.

airbyte_source_interceptor was implemented to translate the airbyte messages into Flow capture protocol.

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:
Once this change is merged, the network proxy configuration that are available to materialize connectors will be available to all capture connectors in theory.

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable

@ghost ghost force-pushed the jj-connector-proxy-source branch from 89007ba to c00b6f0 Compare March 10, 2022 18:05
pub trait Interceptor<T: FlowOperation> {
fn get_converters() -> RequestResponseConverterPair<T> {

This comment was marked as outdated.

ProxyCommand::ProxyFlowMaterialize(m) => {
proxy_flow_materialize(m, image_inspect_json_path).await
}
ProxyCommand::DelayedExecute(ba) => delayed_execute(ba.config_file_path).await,

This comment was marked as outdated.


async fn delayed_execute(command_config_path: String) -> Result<(), Error> {
// Sleep for some time to allow parent process to stop the current process.
std::thread::sleep(std::time::Duration::from_millis(100));

This comment was marked as outdated.

@ghost ghost requested review from jgraettinger and psFried March 10, 2022 23:29
@ghost
Copy link
Author

ghost commented Mar 10, 2022

@jgraettinger @psFried
This is a draft PR with lots of TODOs. I am not in a hurry, but if you can take a quick look and give some initial feedbacks, it will be great. Just to make sure this is in the right track. Thank you!

Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jixij , I'm not able to review this yet but did at least address your comments below.

Reviewed 1 of 10 files at r1, all commit messages.
Reviewable status: 1 of 16 files reviewed, 5 unresolved discussions (waiting on @jixij and @psFried)


crates/connector_proxy/src/apis.rs, line 49 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…

pass in pid in the convert_request, so that the connector process could be started after the interceptors.

The reason is that that arguments used to start the connectors process are constructed by the interceptor, so it is easier if we avoid the interceptor to depend on the connector process to be started.

As we've discussed, I think this can all be simplified by removing the Intercepter trait and ComposedInterceptor, and building the concrete stacks as needed, and as composed literals. Interceptor is a leaky abstraction (since pid is only needed for airbyte) that doesn't buy much.

Instead have a top-level switch over the protocol cases that constructs an impl InterceptorStream by composing its component pieces:

let request_stack = airbyte_request_adapter(
   pid,
   other_arg_specific_to_airbyte,
   some_wrapped_adapter(
      some,
      stuff,
      convert_my_stdin_to_stream(stdin())));

let response_stack = airbyte_response_adapter( ... )

crates/connector_proxy/src/main.rs, line 140 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…

Not using a bash script to start the proxy-process, b/c we cannot assume the shell script are working for all docker images, (e.g. distroless). So I structured it as this additional sub-command of the connector-proxy, which basically a separate process that will start the real connector after received some signals from the connector-proxy that starts it.

The specific "call ourselves" approach is reasonable, but I do want to raise a latent concern that we need to taking bigger steps back to reconsider engineering constraints we're setting for ourselves: ("it must run in distroless", "we can't assume there's a ssh binary we can shell to", etc).

We don't need to hit these constraints to have meaningful functionality that works in most contexts, and as we raise the bar for how self-sufficient we're trying to be, it adds exponentially to development time and maintenance.


crates/connector_proxy/src/main.rs, line 194 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…

This might look a bit hacky, and here is the reason.

The approach I've tried is - let the delayed_execute process pause itself, and wait for sigcont signal from its parent (connector proxy) process to continue. The issue here was that the - sigcont from the parent might came even before the delayed_execute stopped and prepared to listen to the sigcont signal. If delayed_execute stopped after missing the sigcont signal, it will stack forever.
The main issue is that - separate the stop/resume actions in two processes is not good, b/c it is difficult to coordinate them.

So in this implementation, both the stop and start signals are coming from the parent process, so that they could be ordered, and added the sleep logic in the delayed process to give enough time for the parent process to stop it (it might be messy if the real connector being started before process was stopped).
Not sure if this is ok. Alternatives I was thinking -

  • Using a locker file. The parent creates a locker file before starting the child process. And the child process will block and ping the locker file until it is deleted.
  • Building a binary using other languages, and inject it to the docker image together with connector-proxy. (Or calling to the other languages from Rust). The process API in Rust does not allow us to do much. Hopefully the other language could.

A generalized pattern to do this is to have the child write to stderr when it's ready (has installed signal handler in this case).

See flow/js_worker.go for an example.
The child writes "READY\n" to stderr when it's started. The parent reads this, and then passes through the remainder of child stderr to its own stderr.


crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs, line 44 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…

just came across tmpfs that stores files in host memory being another way of storing secure infomation in docker. Shall we look more into that?

No. KISS. The connector-proxy also shouldn't assume anything about the filesystem it's running on (from its perspective, it's not even "aware" it's running within a container).


go/capture/driver/airbyte/driver.go, line 304 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…

The translation from airbyte message to proto message is done in the Rust connector. However, staging / combining the messages are done in Go side. It this OK? Or should the boundary between the Rust/Go be shifted?

Let's not worry about it for now. connector-proxy should eventually do coalescing but that's an optimization we can leave for later.

This can be simplified to pass-through |pullResp| without inspecting it -- the runtime already has error cases over unexpected message types.

@ghost
Copy link
Author

ghost commented Mar 11, 2022

Thanks @jixij , I'm not able to review this yet but did at least address your comments below.

Reviewed 1 of 10 files at r1, all commit messages.
Reviewable status: 1 of 16 files reviewed, 5 unresolved discussions (waiting on @jixij and @psFried)

crates/connector_proxy/src/apis.rs, line 49 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…
As we've discussed, I think this can all be simplified by removing the Intercepter trait and ComposedInterceptor, and building the concrete stacks as needed, and as composed literals. Interceptor is a leaky abstraction (since pid is only needed for airbyte) that doesn't buy much.

Instead have a top-level switch over the protocol cases that constructs an impl InterceptorStream by composing its component pieces:

let request_stack = airbyte_request_adapter(
   pid,
   other_arg_specific_to_airbyte,
   some_wrapped_adapter(
      some,
      stuff,
      convert_my_stdin_to_stream(stdin())));

let response_stack = airbyte_response_adapter( ... )

crates/connector_proxy/src/main.rs, line 140 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…
The specific "call ourselves" approach is reasonable, but I do want to raise a latent concern that we need to taking bigger steps back to reconsider engineering constraints we're setting for ourselves: ("it must run in distroless", "we can't assume there's a ssh binary we can shell to", etc).

We don't need to hit these constraints to have meaningful functionality that works in most contexts, and as we raise the bar for how self-sufficient we're trying to be, it adds exponentially to development time and maintenance.

crates/connector_proxy/src/main.rs, line 194 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…
A generalized pattern to do this is to have the child write to stderr when it's ready (has installed signal handler in this case).

See flow/js_worker.go for an example. The child writes "READY\n" to stderr when it's started. The parent reads this, and then passes through the remainder of child stderr to its own stderr.

crates/connector_proxy/src/interceptors/airbyte_capture_interceptor.rs, line 44 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…
No. KISS. The connector-proxy also shouldn't assume anything about the filesystem it's running on (from its perspective, it's not even "aware" it's running within a container).

go/capture/driver/airbyte/driver.go, line 304 at r6 (raw file):

Previously, jixij (Jixiang Jiang) wrote…
Let's not worry about it for now. connector-proxy should eventually do coalescing but that's an optimization we can leave for later.

This can be simplified to pass-through |pullResp| without inspecting it -- the runtime already has error cases over unexpected message types.

Sounds great. Thanks a lot @jgraettinger for the confirmations, it is more clear to me now.

@ghost ghost force-pushed the jj-connector-proxy-source branch 6 times, most recently from ffef2ac to c49b3c0 Compare March 17, 2022 18:58
@ghost
Copy link
Author

ghost commented Mar 17, 2022


crates/connector_proxy/src/apis.rs, line 49 at r6 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

As we've discussed, I think this can all be simplified by removing the Intercepter trait and ComposedInterceptor, and building the concrete stacks as needed, and as composed literals. Interceptor is a leaky abstraction (since pid is only needed for airbyte) that doesn't buy much.

Instead have a top-level switch over the protocol cases that constructs an impl InterceptorStream by composing its component pieces:

let request_stack = airbyte_request_adapter(
   pid,
   other_arg_specific_to_airbyte,
   some_wrapped_adapter(
      some,
      stuff,
      convert_my_stdin_to_stream(stdin())));

let response_stack = airbyte_response_adapter( ... )

Yes, I've removed the these traits. And implemented three runner functions for different types of connector.
run_flow_capture_connector and run_flow_materialize_connector for connectors that speaks Flow protocols.
run_airbyte_source_connector for connectors that speaks airbyte protocols.

They construct their own interceptors (or req/resp stream adaptors), and common logic that shared by them are extracted to separate functions.

@ghost
Copy link
Author

ghost commented Mar 17, 2022


crates/connector_proxy/src/main.rs, line 140 at r6 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

The specific "call ourselves" approach is reasonable, but I do want to raise a latent concern that we need to taking bigger steps back to reconsider engineering constraints we're setting for ourselves: ("it must run in distroless", "we can't assume there's a ssh binary we can shell to", etc).

We don't need to hit these constraints to have meaningful functionality that works in most contexts, and as we raise the bar for how self-sufficient we're trying to be, it adds exponentially to development time and maintenance.

Got it.. Thanks!

@ghost
Copy link
Author

ghost commented Mar 17, 2022


crates/connector_proxy/src/main.rs, line 194 at r6 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

A generalized pattern to do this is to have the child write to stderr when it's ready (has installed signal handler in this case).

See flow/js_worker.go for an example.
The child writes "READY\n" to stderr when it's started. The parent reads this, and then passes through the remainder of child stderr to its own stderr.

cool. this functionality was implemented in invoke_delayed_connector, which will send back a READY from STDERR and wait for sigcont signal.

@ghost ghost force-pushed the jj-connector-proxy-source branch from 07e6cde to 141f8e0 Compare March 17, 2022 20:03

//if resp == nil {
// return nil // Connector flushed prior to exiting. All done.
//}

// Write a final commit, followed by EOF.
// This happens only when a connector writes output and exits _without_
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that I am not very sure about.
It looks to me the go-logic will append an EOF to the stream if there are pending messages that are not checked in. This logic was missing in Rust, should this be back?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be in rust, because their are airbyte connectors that write only records with no checkpoint at all.

pub cursor_field: Option<Vec<String>>,
pub primary_key: Option<Vec<Vec<String>>>,

// TODO: might be broken if both 'projection' and its alias is present in the JSON data.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the pattern here -
A json object could specify two fields field and estuary.dev/field, and there values are merged.

In serde, alias was used here to specify the fields. However, in this implementation, if a json object has both fields present, it will break.
I am not very sure the usecases of these fields. If there are such usecases that needs both fields co-exist in the same object, I'll switch to a customized deserializer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine, they never co-exist. We're the only ones who use this and afaik it will always come in as estuary.dev/projections

stream_to_binding: Arc::new(Mutex::new(HashMap::new())),
tmp_dir: Builder::new()
.prefix("airbyte-source-")
.tempdir_in("/connector-tmpfs")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time on the file location. The struggle here is that it is difficult to find a path that the connector could write to.

  • The tmp dir was mounted to local, so not safe.
  • The proxy is not running as a Root user, so does not have a permission to create a dir in most places in the file system. And the available locations could be image-dependent and complex.

On the other hand, when researching, I found the tmpfs is pretty easy to set up, with just a few lines of code change. So I added this as a placehoder. And if storing data as files in host machine memory is not working well for other coonsiderations, especially in k8s env, I'll look into more on this..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about /var/tmp for now ?

Later we can also add docker inspect info to the *Request / Open messages, which would be the one remaining file passed in I believe, and altogether remove the current /tmp binding we do to pass through files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, /var/tmp works fine. Thank you! We can plan the next steps after this. Removing the /tmp binding would be ideal.

@ghost ghost force-pushed the jj-connector-proxy-source branch 5 times, most recently from 57e5ef0 to 26b3e95 Compare March 18, 2022 04:00
@ghost ghost force-pushed the jj-connector-proxy-source branch from 98f499e to 681956f Compare March 24, 2022 21:55
@ghost ghost force-pushed the jj-connector-proxy-source branch from 9ccd79b to f302e28 Compare March 25, 2022 13:13
@ghost
Copy link
Author

ghost commented Mar 25, 2022


crates/connector_proxy/Cargo.toml, line 23 at r10 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

use doc::Pointer

Yes, I was using doc::Pointer for parsing the json pointers (see tokenize_jsonpointer in crates/connector_proxy/src/libs/json.rs).

The json-pointer is used for another purpose - to escape and concatenate a list of strings to construct a jsoinpointer. Basically to match this Go-logic (https://github.com/estuary/flow/blob/master/go/capture/driver/airbyte/driver.go#L219-L223), which looks to be missing direct support in doc::Pointer. So I was relying on the json-pointer. But I can also extend the doc::Pointer or wraps around it to avoid this dependency, since this logic is very light-weighted.

@ghost
Copy link
Author

ghost commented Mar 25, 2022


go/capture/driver/airbyte/driver.go, line 101 at r10 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

don't you need to send in the SpecRequest ?

Good point. I missed the differences between the FlowCapture and AirbyteSource.

@ghost ghost force-pushed the jj-connector-proxy-source branch from f302e28 to 85915af Compare March 25, 2022 14:55
@ghost
Copy link
Author

ghost commented Mar 25, 2022

Thank you again Johnny for the review. I've made changes accordingly. PTAL!

FYI, Yes, the timeout issue in CI is real, and as always, it most likely caused by some race conditions that cause the sigcont to be missing after the bouncer process stops itself. It is now more difficult to debug b/c it only occurs on the CI.

As I read about the inter-process communication, I felt a pipe-based communication mechanism would be more robust. And I have switched to that - the delayed process will wait for a "READY" message from the parent process from stdin, before it starts the connector process. And it seems to be working fine for the CI tests runs so far. If there is any other thoughts, please let me know.

@ghost
Copy link
Author

ghost commented Mar 25, 2022


crates/connector_proxy/src/libs/airbyte_catalog.rs, line 48 at r9 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

this is fine, they never co-exist. We're the only ones who use this and afaik it will always come in as estuary.dev/projections

great!

@ghost ghost closed this Mar 25, 2022
@mdibaiee
Copy link
Member

will continue this work here: #425

This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants