Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector proxy airbyte #425

Merged
merged 9 commits into from
Apr 14, 2022
Merged

Connector proxy airbyte #425

merged 9 commits into from
Apr 14, 2022

Conversation

mdibaiee
Copy link
Member

@mdibaiee mdibaiee commented Mar 29, 2022

Description:

(Describe the high level scope of new or changed features)

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)


This change is Reviewable


if let Some(repo_tags) = &self.repo_tags {
for tag in repo_tags {
if tag.starts_with("ghcr.io/estuary/materialize-") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition works for now for published materializations, but I personally use local docker images for testing and also I can imagine other people writing materializations for Flow at one point. Should we also use a Docker label for this and only fallback to this check for backward compatibility?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! The intention is that we move to docker labels for all of our published connectors, where those labels carry metadata like whether it's a capture or materialization, and its specific protocol.

We haven't done this yet, though, and this is a short-term hack to keep things working.

Just in case it's useful: you can ask docker to build an image, or locally re-tag an image, to ghcr.io/estuary/materialize-my-thing, even if you have no intention of pushing that up as an image.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgraettinger I put a TODO here for us to revisit this 👍🏽

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to surface it, here's the other place where we're using this hack, as part of the flowctl api spec subcommand:

https://github.com/estuary/flow/blob/master/go/flowctl-go/cmd-api-spec.go#L44-L49

@mdibaiee mdibaiee marked this pull request as ready for review March 31, 2022 07:33
@mdibaiee
Copy link
Member Author

mdibaiee commented Mar 31, 2022

Okay, I got to test using airbyte/source-hubspot with this branch and it worked:

  1. No error on second run of discovery (Airbyte Connectors with OAuth Failing #357)
  2. Could actually get data moving from Hubspot to postgres materialize

I just had to make some minor adjustments to the Airbyte specification (some optional fields were required by our code, etc.) and a fix for argument ordering when invoking the connectors.

Put in a bunch of TODOs for things to improve. I'm still a bit afraid of the potential blast radius of this PR, a few questions:

  1. If we merge this, does this affect our existing workflows for our customers? Wouldn't want to break their flows once this is merged
  2. Can we test this for an actual use case (perhaps what @dyaffe mentioned about Facebook source for one of our customers) after merging and then once we are happy, roll it out for other use cases
  3. Or rather keep the branch as is and try using it for that use case and if it's successful we merge?

I will endeavour to write tests for this piece so that we can iterate on it with more confidence. I will do that before refactorings to make sure I don't break things as I refactor some parts of the code.

@dyaffe
Copy link
Member

dyaffe commented Mar 31, 2022

@mdibaiee and I connected and are in the process of testing this for Facebook with a customer use case

@mdibaiee
Copy link
Member Author

mdibaiee commented Apr 1, 2022

For the record, the Airbyte facebook connector is using a deprecated API and as such is currently not working:
airbytehq/airbyte#10629

@mdibaiee mdibaiee force-pushed the connector-proxy-airbyte branch 8 times, most recently from f9ca1a0 to fb6f62e Compare April 12, 2022 11:52
@mdibaiee
Copy link
Member Author

There was a bug in implementation of stream_airbyte_responses (one of the most important functions here), fixed it and wrote some tests to make sure we don't break that again.

@mdibaiee mdibaiee force-pushed the connector-proxy-airbyte branch 3 times, most recently from 34f680b to b0ee86e Compare April 12, 2022 16:12
Copy link
Member

@psFried psFried left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is getting pretty close. All but one of my inline comments are things that can be deferred until later.

One other thing I noticed, which I couldn't comment on inline: the examples/examples.db* files should not be checked in. Looks like they were added by accident.

Apart from checking the usage of BytesMut in the response stream, I think this is ok to merge.

crates/connector_proxy/src/connector_runner.rs Outdated Show resolved Hide resolved
crates/connector_proxy/src/libs/json.rs Show resolved Hide resolved
crates/connector_proxy/src/libs/stream.rs Outdated Show resolved Hide resolved
crates/connector_proxy/src/main.rs Outdated Show resolved Hide resolved
crates/network-tunnel/src/main.rs Outdated Show resolved Hide resolved
Jixiang Jiang and others added 6 commits April 13, 2022 14:59
stop and cont

airbyte discover

spec / validate / apply capture requests

airbyte pull

airbyte discover

various improvements

check runtime protocol

check runtime protocol

ready signal from stderr

makefile more deps

try_stream! for error processing

update Cargo.lock

Makefile change

improve airbyte source logic

ldconfig

XXtesting

move test to common stage

NsMerge

tweaks

update Cargo.lock

use libc instead of nix

skip null field in specresult

update control plane snapshots

delayed process stop itself

setup go in stage 2

tweaks

using /var/tmp

send SpecRequest

synthetic chekcpoint

convert_ to adapt_

debug - replacing sigcont with stdin

simplify delay waiting logic

remove a TODO
// We ignore JSONs that are not Airbyte Messages according
// to the specification:
// https://docs.airbyte.com/understanding-airbyte/airbyte-specification#the-airbyte-protocol
Err(_) => return Ok(None),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon a closer read, this isn't right, either. The wording of the spec in ambiguous, and makes it seem like you could expect that each line on stdout is JSON, but that is not so. Here's some completely reasonable assumptions about airbyte connectors that turned out to be wrong:

  • stdout will only have JSON: Nope, it'll also contain lines of plain text mixed in
  • all logs are emitted as airbyte Messages: Nope, see above. The log Messages seem to be more for things that are meant to be surfaced directly to the end user, as opposed to debug logs.
  • debug logs are written to stderr: Nope, only some of their connectors do that. Many log only to stdout, or to both.

The behavior we're using currently is sort of the opposite of what's here. Lines are allowed to not parse as JSON, but if they do parse as JSON, then they're required to deserialize sucessfully as airbyte messages. Based on yet another read of the spec, we might also need to permit that second case, but it hasn't caused an issue so far.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the protocol specification once again and my reading still is that each line must be JSON, although it can be a non-AirbyteMessage, in which case it's ignored. https://docs.airbyte.com/understanding-airbyte/airbyte-specification/#the-airbyte-protocol

There may be cases where debug logs are printed to stdout as plaintext (as in this example which I opened a PR for) but I'm pretty sure it's a bug. However, I think Airbyte is being forgiving about this in their own product which leads to them not finding these bugs out. When I asked one of their engineers about this plaintext print statement, he was very surprised and asked me "I wonder how it's working at all".

Now with that said, we might say we do not trust connectors to conform to the specification properly and we would rather be forgiving to avoid issues on our side, and that's reasonable.

Copy link
Member Author

@mdibaiee mdibaiee Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I updated the code so that we now operate on lines, rather than on chunks of length-capped bytes directly. This has simplified the message handling function a lot, and now allows us to handle plaintext lines. The serde StreamDeserializer could not handle this (i.e. it would never progress if it sees an input such as: I am plaintext\n{"x": 2}) because it does not operate on lines.

Copy link
Member Author

@mdibaiee mdibaiee Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if there is any consequence to switching to operating over lines. For one thing, I know that each line can be large, does that cause a problem to have a single large Bytes rather than multiple smaller ones? We used to cap our Bytes at 4096 bytes. I still don't expect us to go over three times that number in each line, but it's always a possibility.

Copy link
Member Author

@mdibaiee mdibaiee Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any way at the moment to be able to deserialize lines into JSON while handling plaintext without having to somehow run the deserialize on a line, so in the end we will have to feed "one line" to the deserializer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in the end we will have to feed "one line" to the deserializer.

Yeah, I think that's right.

There may be cases where debug logs are printed to stdout as plaintext (airbytehq/airbyte#11977 which I opened a PR for) but I'm pretty sure it's a bug.

Yeah, I'd agree with that, as well as your suggestion that we should also tolerate it, since that's what airbyte does. Basically, I think we have to treat the "spec" as aspirational, and consider the "real spec" to be whatever airbyte accepts. The rationale is that users will not be very understanding if they can't use an airbyte connector in Flow because of a "bug", when that connector seems to work just fine in Airbyte. I do think that pushing for conformance to the published spec is also important, though, and so ultimately I think we need to take it case by case. The stdout logging issue is just one that we'd seen in (anecdotally) many connectors, but I don't really have a good sense of how common it is. Sorry for the ramble, but I think my conclusion here is just that I'd trust your judgement on what to do here now that you're informed on the issue.

@mdibaiee mdibaiee force-pushed the connector-proxy-airbyte branch 4 times, most recently from a5741a2 to bf26153 Compare April 14, 2022 14:56
@mdibaiee mdibaiee force-pushed the connector-proxy-airbyte branch 2 times, most recently from 5993e51 to e7bf994 Compare April 14, 2022 17:48
* wip moving away from try_stream!

* refactor simple streams into stream::once

* more stream refactoring

* more refactorings

* simplify stream usage with helper functions

* more comments

* two more small streams
@mdibaiee mdibaiee force-pushed the connector-proxy-airbyte branch 2 times, most recently from 69bccea to 26c418c Compare April 14, 2022 17:55
Copy link
Member

@psFried psFried left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That last round of changes looks great. Merge on! 🚀

@mdibaiee mdibaiee merged commit 68911c3 into master Apr 14, 2022
@mdibaiee mdibaiee deleted the connector-proxy-airbyte branch April 15, 2022 10:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants