Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-lib base implementation #33409

Merged
merged 34 commits into from
Jan 3, 2024
Merged

airbyte-lib base implementation #33409

merged 34 commits into from
Jan 3, 2024

Conversation

flash1293
Copy link
Contributor

@flash1293 flash1293 commented Dec 13, 2023

What

Introduces the airbyte-lib package to run Python connectors within Python:

import airbyte_lib as ab

# configure source connector and cache
source = ab.get_connector("source-spacex-api", config={"id": "605b4b6aaa5433645e37d03f"})
cache = ab.get_in_memory_cache()

# run connection check
source.check()

# stream records as an iterable
print(list(source.read_stream("capsules")))

# configure streams to fully sync
source.set_streams(["launches", "rockets", "capsules"])

# run the sync
result = source.read_all(cache)

# access the data via the sync result object (will give you a SQL interface for the SQL cache)
for record in result["rockets"]:
    print(f"Stream {name}: {len(records)} records")
---
title: airbyte_lib
---
classDiagram
    VenvExecutor <|-- Executor
    PathExecutor <|-- Executor
    note for PathExecutor "Not implemented in first version"
    note for VenvExecutor "Manages its own venv where the connector package is installed"
    class Executor{
        +execute(args: List[str]) IO[str]
        +ensure_installation()
        +ConnectorMetadata connector
    }
    Executor --> ConnectorMetadata
    class ConnectorMetadata{
        +name
        +version
    }
    Source --> Executor
    class Source{
        +set_streams(streams: List[str])
        +get_available_streams() List[str]
        +set_config(config)
        +read(stream)
        +read_all(cache)
        +check()
    }
    class Cache{
        +write(messages: Iterable[AirbyteRecordMessage])
    }
    Cache <|-- InMemoryCache
    class InMemoryCache{
        +streams: Dict[str, List[Dict]]
    }
    class SqlCache{
        +tables: Dict[str, sqlalchemy.Table]
        +engine: sqlalchemy.Engine
    }
    class ParquetCache{
        +file_paths: Dict[str, str]
    }
    note for SqlCache "Not implemented here"
    Cache <|-- SqlCache
    note for ParquetCache "Not implemented here"
    Cache <|-- ParquetCache
    SyncResult --> Cache
    SyncResult --> Dataset
    class SyncResult{
        +__getitem__() Dataset
    }
    class Dataset{
        +__iter__() Iterator
        +to_padas() Dataframe
    }

Very first iteration of airbyte-lib.

Supported parts:

  • in-memory cache
  • install local sources
  • run a sync into the in-memory cache
  • select streams
  • peek into a stream

Missing:

  • Implementation of installation
  • Cleanup code structure
  • Basic tests
  • CI integration

Copy link

vercel bot commented Dec 13, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Dec 20, 2023 4:06pm

Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@flash1293
Copy link
Contributor Author

@alafanechere To hook this into CI, I had to extend the global airbyte python gradle plugin a bit to deal with pyproject.toml files. As that's not my strongest area, could you take a look whether it makes sense the way I did it?

@flash1293
Copy link
Contributor Author

flash1293 commented Dec 14, 2023

One thing I'm not sure about when invoking connectors:

Right now it works like this:
The process is kicked off in a context manager

  • On exiting the context, a SIGTERM signal is sent to the process
  • Then, it has 10 seconds to terminate before it's killed

As we are dealing with reasonably well behaved python scripts here, they respect the sigterm and exit pretty quickly. However that means that they might now have the time to clean up after themselves.

We could just wait for the process to terminate on its own - that would take a bit longer, but work for most cases, except for the "peek" method, because it isn't interested in all records but just the first few.

So I can see some options here:

  • Keep the current setup (it's definitely the fastest we can do) - if the process really needs to clean something up, it should do that on sigterm as well (I don't think we actually do this)
  • Adjust slightly by giving the process a few seconds to exit on its own before sending a SIGTERM - this should result in a clean exit for all cases except "peek"
  • Always wait for a clean exit and just do not do the "peek" for now (it would require a protocol change to make it work with this)

I tend towards the first one - I cancel python processes via ctrl+c in my daily workflows all the time, and I don't know of any "real" problems this would cause. If I'm missing something here, I would say we should go with the last option as otherwise it would be a weird in-between.

@alafanechere
Copy link
Contributor

alafanechere commented Dec 14, 2023

@alafanechere To hook this into CI, I had to extend the global airbyte python gradle plugin a bit to deal with pyproject.toml files. As that's not my strongest area, could you take a look whether it makes sense the way I did it?

@flash1293 could you please share your CI needs?
I'd prefer new projects to not use the airbyte python gradle plugin and rely on airbyte-ci.
As this is a poetry project and if your need is "just" to run the test suite it can easily be achieved by running:
airbyte-ci test airbyte-lib

We automate this already in the ci for airbyte-ci test themselves and a couple of other packages. I'll try to push a small commit to show you how it works in practice.

In any case I suggest you nest your existing integration_tests folder inside a tests folder, it'll be easier to add test suites without changing the CI.

Edit: commit pushed

@flash1293
Copy link
Contributor Author

Sounds good @alafanechere - you are right, I just want to run the tests for now. I will use your approach 👍

@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label Dec 14, 2023
@flash1293 flash1293 marked this pull request as ready for review December 14, 2023 17:07
@flash1293
Copy link
Contributor Author

@alafanechere I want to make sure the types are sound - what's the best way of running mypy on this project as part of the pipeline? Locally, this can be done via poetry run mypy .. I don't see a similar case for another project, the CDK is using a special case here with a shell script that checks only changed files, I don't want to copy that, it should just check everything.

@flash1293
Copy link
Contributor Author

@aaronsteers Refactored as discussed:

  • Checks mypy via pytest
  • Uses the read_stream / read_all interface now
  • Added SyncResult/Dataset classes (pandas/sql methods are throwing currently)

@aaronsteers aaronsteers added the airbyte-lib Related to AirbyteLib label Jan 3, 2024
Copy link
Collaborator

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ready to go!

I did another pass and (just for context) linked a few places where subsequent PRs may slightly tweak existing behavior.

@@ -0,0 +1 @@
.venv*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that we have to move it, but I'd also be fine with adding to the root .gitignore:

.venv
venv

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I think we might refactor this. But no reason to block at this early stage.

Comment on lines +97 to +99
# TODO this is a temporary install path that will be replaced with a proper package name once they are published. At this point we are also using the version
package_to_install = f"../airbyte-integrations/connectors/{self.metadata.name}"
self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", package_to_install])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an updated version of this in a new PR:

Comment on lines +124 to +134
installed_version = self._get_installed_version()
if installed_version != self.target_version:
# If the version doesn't match, reinstall
self.install()

# Check the version again
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow-on PR, I suggest a flag to make version verification an opt-in behavior, where version would not be checked by default, but could be checked/auto-fixed as needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will get less relevant once we publish properly to pypi, but I can see how the current state might be too restrictive or take too much control from the user.

What some tools are doing is to log a warning if the version doesn't match, but don't fail right away. Seems suitable here, but nothing we can't adjust before the first release.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What some tools are doing is to log a warning if the version doesn't match, but don't fail right away. Seems suitable here, but nothing we can't adjust before the first release.

Totally! Yeah, I think a warning is a nice balance - gives the user information but doesn't necessary cause direct failure. In that case, we could always check version, and the "opt in" part would be whether to hard-fail, rather than warn. I can imagine a hard-failure might be preferred in specific instances where we think we are explicitly requesting a particular version number.

@flash1293
Copy link
Contributor Author

Thanks for the review, merging for now and we'll address these points on subsequent PRs

@flash1293 flash1293 merged commit d58f262 into master Jan 3, 2024
24 checks passed
@flash1293 flash1293 deleted the flash1293/airbyte-lib branch January 3, 2024 09:28
gisripa pushed a commit that referenced this pull request Jan 3, 2024
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
airbyte-lib Related to AirbyteLib
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants