diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 34b2b23072f14..6d106cc3f50f7 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.36.3-alpha +current_version = 0.38.1-alpha commit = False tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-[a-z]+)? diff --git a/.env b/.env index 056bc9558a389..311f60ef10a9d 100644 --- a/.env +++ b/.env @@ -10,7 +10,7 @@ ### SHARED ### -VERSION=0.36.3-alpha +VERSION=0.38.1-alpha # When using the airbyte-db via default docker image CONFIG_ROOT=/data @@ -40,7 +40,7 @@ DATABASE_PASSWORD=docker DATABASE_HOST=db DATABASE_PORT=5432 DATABASE_DB=airbyte -# translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT/${DATABASE_DB} (do not include the username or password here) +# translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DB} (do not include the username or password here) DATABASE_URL=jdbc:postgresql://db:5432/airbyte JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001 @@ -93,3 +93,4 @@ MAX_DISCOVER_WORKERS=5 NEW_SCHEDULER=false AUTO_DISABLE_FAILING_CONNECTIONS=false EXPOSE_SECRETS_IN_EXPORT=false +FORCE_MIGRATE_SECRET_STORE=false diff --git a/.gitbook.yaml b/.gitbook.yaml deleted file mode 100644 index 82a85043a8317..0000000000000 --- a/.gitbook.yaml +++ /dev/null @@ -1,75 +0,0 @@ -root: ./docs/ - -structure: - readme: ../README.md - summary: SUMMARY.md - -redirects: - architecture/cdc: ./understanding-airbyte/cdc.md - architecture/catalog: ./understanding-airbyte/catalog.md - architecture/airbyte-specification: ./understanding-airbyte/airbyte-specification.md - architecture/basic-normalization: ./understanding-airbyte/basic-normalization.md - architecture/connections: ./understanding-airbyte/connections.md - architecture/connections/full-refresh-overwrite: ./understanding-airbyte/connections/full-refresh-overwrite.md - architecture/connections/full-refresh-append: ./understanding-airbyte/connections/full-refresh-append.md - architecture/connections/incremental-append: ./understanding-airbyte/connections/incremental-append.md - architecture/https://docs.airbyte.io/understanding-airbyte/connections/incremental-deduped-history: ./understanding-airbyte/https://docs.airbyte.io/understanding-airbyte/connections/incremental-deduped-history.md - architecture/high-level-view: ./understanding-airbyte/high-level-view.md - architecture/jobs: ./understanding-airbyte/jobs.md - architecture/tech-stack: ./understanding-airbyte/tech-stack.md - architecture/namespaces: ./understanding-airbyte/namespaces.md - architecture: ./understanding-airbyte.md - roadmap: ./project-overview/roadmap.md - changelog: ./project-overview/changelog.md - changelog/platform: ./project-overview/changelog/platform.md - changelog/connectors: ./project-overview/changelog/connectors.md - license: ./project-overview/licenses/README.md - tutorials/postgres-replication: ./examples/postgres-replication.md - tutorials/build-a-slack-activity-dashboard: ./examples/build-a-slack-activity-dashboard.md - tutorials/zoom-activity-dashboard: ./examples/zoom-activity-dashboard.md - tutorials/slack-history: ./examples/slack-history.md - tutorials/beginners-guide-to-catalog: ./tutorials/tutorials/beginners-guide-to-catalog.md - tutorials/toy-connector: ./tutorials/tutorials/build-a-connector-the-hard-way.md - tutorials/build-a-connector-the-hard-way: ./tutorials/tutorials/build-a-connector-the-hard-way.md - tutorials/adding-incremental-sync: ./tutorials/tutorials/adding-incremental-sync.md - tutorials/building-a-python-source: ./tutorials/tutorials/building-a-python-source.md - tutorials/transformations-with-sql: ./tutorials/transformation-and-normalization/transformations-with-sql.md - tutorials/transformations-with-dbt: ./tutorials/transformation-and-normalization/transformations-with-dbt.md - contributing-to-airbyte/cdk-tutorial-alpha: ./contributing-to-airbyte/python/README.md - integrations/connector-health: ./integrations.md - tutorials: ./operator-guides.md - tutorials/browsing-output-logs.md: ./operator-guides/browsing-output-logs.md - tutorials/locating-files-local-destination.md: ./operator-guides/locating-files-local-destination.md - tutorials/using-the-airflow-airbyte-operator.md: ./operator-guides/using-the-airflow-airbyte-operator.md - tutorials/transformation-and-normalization: ./operator-guides/transformation-and-normalization.md - tutorials/transformation-and-normalization/transformations-with-sql: ./operator-guides/transformation-and-normalization/transformations-with-sql.md - tutorials/transformation-and-normalization/transformations-with-dbt: ./operator-guides/transformation-and-normalization/transformations-with-dbt.md - tutorials/tutorials: ./contributing-to-airbyte/building-new-connector/tutorials.md - tutorials/tutorials/beginners-guide-to-catalog: ./contributing-to-airbyte/building-new-connector/tutorials/beginners-guide-to-catalog.md - tutorials/tutorials/build-a-connector-the-hard-way: ./contributing-to-airbyte/building-new-connector/tutorials/build-a-connector-the-hard-way.md - tutorials/tutorials/adding-incremental-sync: ./contributing-to-airbyte/building-new-connector/tutorials/adding-incremental-sync.md - tutorials/tutorials/building-a-python-source: ./contributing-to-airbyte/building-new-connector/tutorials/building-a-python-source.md - upgrading-airbyte: ./operator-guides/upgrading-airbyte.md - tutorials/upgrading-airbyte: ./operator-guides/upgrading-airbyte.md - contributing-to-airbyte/python: ./connector-development/cdk-python.md - contributing-to-airbyte/python/concepts/basic-concepts: ./connector-development/cdk-python/basic-concepts.md - contributing-to-airbyte/python/concepts/schemas: ./connector-development/cdk-python/schemas.md - contributing-to-airbyte/python/concepts/full-refresh-stream: ./connector-development/cdk-python/full-refresh-stream.md - contributing-to-airbyte/python/concepts/incremental-stream: ./connector-development/cdk-python/incremental-stream.md - contributing-to-airbyte/python/concepts/http-streams: ./connector-development/cdk-python/http-streams.md - contributing-to-airbyte/python/concepts/python-concepts: ./connector-development/cdk-python/python-concepts.md - contributing-to-airbyte/python/concepts/stream_slices: ./connector-development/cdk-python/stream-slices.md - contributing-to-airbyte/python/tutorials: ./connector-development/tutorials.md - contributing-to-airbyte/python/tutorials/cdk-speedrun: ./connector-development/tutorials/cdk-speedrun.md - contributing-to-airbyte/python/tutorials/cdk-tutorial-python-http: ./connector-development/tutorials/cdk-tutorial-python-http.md - contributing-to-airbyte/building-new-connector: ./connector-development.md - contributing-to-airbyte/building-new-connector/best-practices: ./connector-development.md/best-practices.md - contributing-to-airbyte/building-new-connector/monorepo-python-development: ./contributing-to-airbyte/monorepo-python-development.md - contributing-to-airbyte/building-new-connector/testing-connectors: ./connector-development/testing-connectors.md - contributing-to-airbyte/building-new-connector/source-acceptance-tests: ./connector-development/testing-connectors/source-acceptance-tests-reference.md - contributing-to-airbyte/building-new-connector/tutorials: ./connector-development/tutorials.md - contributing-to-airbyte/building-new-connector/tutorials/beginners-guide-to-catalog: ./understanding-airbyte/beginners-guide-to-catalog.md - contributing-to-airbyte/building-new-connector/tutorials/building-a-python-source: ./connector-development/tutorials/building-a-python-source.md - contributing-to-airbyte/building-new-connector/tutorials/building-a-python-destination: ./connector-development/tutorials/building-a-python-destination.md - contributing-to-airbyte/building-new-connector/tutorials/building-a-java-destination: ./connector-development/tutorials/building-a-java-destination.md - project-overview/code-of-conduct: ./project-overview/slack-code-of-conduct diff --git a/.github/workflows/build-connector-command.yml b/.github/workflows/build-connector-command.yml new file mode 100644 index 0000000000000..ff39163501bed --- /dev/null +++ b/.github/workflows/build-connector-command.yml @@ -0,0 +1,260 @@ +name: Bump, Build, Test Connectors [EXPERIMENTAL] +on: + workflow_dispatch: + inputs: + repo: + description: "Repo to check out code from. Defaults to the main airbyte repo. Set this when building connectors from forked repos." + required: false + default: "airbytehq/airbyte" + gitref: + description: "The git ref to check out from the specified repository." + required: false + default: master + connector: + description: "Airbyte Connector" + required: true + bump-version: + description: "Set to major, minor, or patch to automatically bump connectors version in Dockerfile, definitions.yaml and generate seed spec. You can also do this manually" + required: false + default: "false" + run-tests: + description: "Should run tests" + required: false + default: "true" + comment-id: + description: "The comment-id of the slash command. Used to update the comment with the status." + required: false + +jobs: + find_valid_pat: + name: "Find a PAT with room for actions" + timeout-minutes: 10 + runs-on: ubuntu-latest + outputs: + pat: ${{ steps.variables.outputs.pat }} + steps: + - name: Checkout Airbyte + uses: actions/checkout@v2 + - name: Check PAT rate limits + id: variables + run: | + ./tools/bin/find_non_rate_limited_PAT \ + ${{ secrets.AIRBYTEIO_PAT }} \ + ${{ secrets.OSS_BUILD_RUNNER_GITHUB_PAT }} \ + ${{ secrets.SUPERTOPHER_PAT }} \ + ${{ secrets.DAVINCHIA_PAT }} + ## Gradle Build + # In case of self-hosted EC2 errors, remove this block. + start-bump-build-test-connector-runner: + name: Start Build EC2 Runner + runs-on: ubuntu-latest + needs: find_valid_pat + outputs: + label: ${{ steps.start-ec2-runner.outputs.label }} + ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} + steps: + - name: Checkout Airbyte + uses: actions/checkout@v2 + with: + repository: ${{ github.event.inputs.repo }} + ref: ${{ github.event.inputs.gitref }} + - name: Start AWS Runner + id: start-ec2-runner + uses: ./.github/actions/start-aws-runner + with: + aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} + github-token: ${{ needs.find_valid_pat.outputs.pat }} + # 80 gb disk + ec2-image-id: ami-0d648081937c75a73 + bump-build-test-connector: + name: Bump, Build, Test Connector + needs: start-bump-build-test-connector-runner + runs-on: ${{ needs.start-bump-build-test-connector-runner.outputs.label }} + environment: more-secrets + steps: + ############################ + ## SET UP ## + ############################ + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@v0 + with: + service_account_key: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY }} + export_default_credentials: true + - name: Search for valid connector name format + id: regex + uses: AsasInnab/regex-action@v1 + with: + regex_pattern: "^(connectors|bases)/[a-zA-Z0-9-_]+$" + regex_flags: "i" # required to be set for this plugin + search_string: ${{ github.event.inputs.connector }} + - name: Validate input workflow format + if: steps.regex.outputs.first_match != github.event.inputs.connector + run: echo "The connector provided has an invalid format!" && exit 1 + - name: Link comment to workflow run + if: github.event.inputs.comment-id + uses: peter-evans/create-or-update-comment@v1 + with: + comment-id: ${{ github.event.inputs.comment-id }} + body: | + > :clock2: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} + - name: Checkout Airbyte + uses: actions/checkout@v2 + with: + repository: ${{ github.event.inputs.repo }} + ref: ${{ github.event.inputs.gitref }} + token: ${{ secrets.OCTAVIA_PAT }} + - name: Install Java + uses: actions/setup-java@v1 + with: + java-version: "17" + - name: Install Python + uses: actions/setup-python@v2 + with: + python-version: "3.9" + - name: Install Pyenv and Tox + run: | + python3 -m pip install --quiet virtualenv==16.7.9 --user + python3 -m virtualenv venv + source venv/bin/activate + pip install --quiet tox==3.24.4 + - name: Install yq + if: github.event.inputs.bump-version != 'false' && success() + run: | + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys CC86BB64 + sudo add-apt-repository ppa:rmescandon/yq + sudo apt update + sudo apt install yq -y + - name: Test and install CI scripts + # all CI python packages have the prefix "ci_" + run: | + source venv/bin/activate + tox -r -c ./tools/tox_ci.ini + pip install --quiet -e ./tools/ci_* + - name: Get Credentials for ${{ github.event.inputs.connector }} + run: | + source venv/bin/activate + ci_credentials ${{ github.event.inputs.connector }} + env: + GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} + # TODO: seems like this should run in post-merge workflow + # - name: Prepare Sentry + # if: startsWith(github.event.inputs.connector, 'connectors') + # run: | + # curl -sL https://sentry.io/get-cli/ | bash + # - name: Create Sentry Release + # if: startsWith(github.event.inputs.connector, 'connectors') + # run: | + # sentry-cli releases set-commits "${{ env.IMAGE_NAME }}@${{ env.IMAGE_VERSION }}" --auto --ignore-missing + # env: + # SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_CONNECTOR_RELEASE_AUTH_TOKEN }} + # SENTRY_ORG: airbyte-5j + # SENTRY_PROJECT: airbyte-connectors + ############################ + ## BUMP ## + ############################ + - name: Bump Connector Version + if: github.event.inputs.bump-version != 'false' && success() + run: ./tools/integrations/manage.sh bump_version airbyte-integrations/${{ github.event.inputs.connector }} + - name: Commit and Push Version Bump + if: github.event.inputs.bump-version != 'false' && success() + run: | + git config user.name 'Octavia Squidington III' + git config user.email 'octavia-squidington-iii@users.noreply.github.com' + git add -u + git commit -m "bump-version ${{github.event.inputs.connector}}" + git push origin ${{ github.event.inputs.gitref }} + - name: Add Version Bump Success Comment + if: github.event.inputs.comment-id && github.event.inputs.bump-version != 'false' && success() + uses: peter-evans/create-or-update-comment@v1 + with: + comment-id: ${{ github.event.inputs.comment-id }} + body: | + > :rocket: Bumped version for ${{github.event.inputs.connector}} + - name: Add Version Bump Failure Comment + if: github.event.inputs.comment-id && github.event.inputs.bump-version != 'false' && !success() + uses: peter-evans/create-or-update-comment@v1 + with: + comment-id: ${{ github.event.inputs.comment-id }} + body: | + > :x: Couldn't bump version for ${{github.event.inputs.connector}} + ############################ + ## BUILD AND TEST ## + ############################ + - name: Build ${{ github.event.inputs.connector }} + run: ./tools/integrations/manage.sh build_experiment airbyte-integrations/${{ github.event.inputs.connector }} + id: build + env: + PR_NUMBER: ${{ github.event.number }} + DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} + # Oracle expects this variable to be set. Although usually present, this is not set by default on Github virtual runners. + TZ: UTC + # - name: Test ${{ github.event.inputs.connector }} + # if: github.event.inputs.run-tests == 'true' + # run: ./tools/integrations/manage.sh test airbyte-integrations/${{ github.event.inputs.connector }} + # - name: Finalize Sentry release + # if: startsWith(github.event.inputs.connector, 'connectors') + # run: | + # sentry-cli releases finalize "${{ env.IMAGE_NAME }}@${{ env.IMAGE_VERSION }}" + # env: + # SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_CONNECTOR_RELEASE_AUTH_TOKEN }} + # SENTRY_ORG: airbyte-5j + # SENTRY_PROJECT: airbyte-connectors + # - name: Build and Test Success Comment + # if: github.event.inputs.comment-id && success() + # uses: peter-evans/create-or-update-comment@v1 + # with: + # comment-id: ${{ github.event.inputs.comment-id }} + # body: | + # > :rocket: Successfully built and tested ${{github.event.inputs.connector}} + # - name: Build and Test Failure Comment + # if: github.event.inputs.comment-id && !success() + # uses: peter-evans/create-or-update-comment@v1 + # with: + # comment-id: ${{ github.event.inputs.comment-id }} + # body: | + # > :x: Failed to build and test ${{github.event.inputs.connector}} + # - name: Slack Notification - Failure + # if: failure() + # uses: rtCamp/action-slack-notify@master + # env: + # SLACK_WEBHOOK: ${{ secrets.BUILD_SLACK_WEBHOOK }} + # SLACK_USERNAME: Buildozer + # SLACK_ICON: https://avatars.slack-edge.com/temp/2020-09-01/1342729352468_209b10acd6ff13a649a1.jpg + # SLACK_COLOR: DC143C + # SLACK_TITLE: "Failed to build and test connector ${{ github.event.inputs.connector }} from branch ${{ github.ref }}" + # SLACK_FOOTER: "" + # - name: Add Final Success Comment + # if: github.event.inputs.comment-id && success() + # uses: peter-evans/create-or-update-comment@v1 + # with: + # comment-id: ${{ github.event.inputs.comment-id }} + # body: | + # > :white_check_mark: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} + # - name: Set publish label + # if: success() + # run: | + # echo "set some label on PR" + # In case of self-hosted EC2 errors, remove this block. + stop-bump-build-test-connector-runner: + name: Stop Build EC2 Runner + needs: + - start-bump-build-test-connector-runner # required to get output from the start-runner job + - bump-build-test-connector # required to wait when the main job is done + - find_valid_pat + runs-on: ubuntu-latest + if: ${{ always() }} # required to stop the runner even if the error happened in the previous jobs + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-2 + - name: Stop EC2 runner + uses: supertopher/ec2-github-runner@base64v1.0.10 + with: + mode: stop + github-token: ${{ needs.find_valid_pat.outputs.pat }} + label: ${{ needs.start-bump-build-test-connector-runner.outputs.label }} + ec2-instance-id: ${{ needs.start-bump-build-test-connector-runner.outputs.ec2-instance-id }} diff --git a/.github/workflows/build-report.yml b/.github/workflows/build-report.yml index 44ca93b45f6fb..8e181d2277f02 100644 --- a/.github/workflows/build-report.yml +++ b/.github/workflows/build-report.yml @@ -4,7 +4,7 @@ on: workflow_dispatch: schedule: # 1pm UTC is 6am PDT. - - cron: '0 13 * * *' + - cron: "0 13 * * *" jobs: build-report: @@ -19,7 +19,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install requests slack_sdk + pip install requests slack_sdk pyyaml - name: create and send report run: python ./tools/bin/build_report.py env: @@ -32,6 +32,6 @@ jobs: SLACK_USERNAME: Build Report SLACK_ICON: https://avatars.slack-edge.com/temp/2020-09-01/1342729352468_209b10acd6ff13a649a1.jpg SLACK_COLOR: ${{ job.status }} - SLACK_TITLE: 'Failed to create build report' - SLACK_MESSAGE: 'https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}' + SLACK_TITLE: "Failed to create build report" + SLACK_MESSAGE: "https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}" MSG_MINIMAL: True diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml deleted file mode 100644 index 793172b2911a4..0000000000000 --- a/.github/workflows/documentation.yml +++ /dev/null @@ -1,37 +0,0 @@ -name: compile-docusaurus-static-assets - -on: - push: - branches: [master] - - # Allows you to run this workflow manually from the Actions tab - workflow_dispatch: - -# A workflow run is made up of one or more jobs that can run sequentially or in parallel -jobs: - deploy-docusaurus-to-docs-airbyte-io: - runs-on: ubuntu-latest - steps: - # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - - name: Check out repo - # `uses` taps GH ORG/REPO@version. - # "actions" is a default org for some common GH actions - uses: actions/checkout@v3 - with: - fetch-depth: 0 - # Node is required for yarn - - name: Set up Yarn - uses: actions/setup-node@v2 - with: - node-version: '16.13.0' - cache: 'yarn' - cache-dependency-path: docusaurus - # # Build Docusaurus website - # - name: Check for docusaurus changes not committed - # run: ./tools/bin/check_docusaurus_build_changes - # # Install and build Docusaurus website - # - name: Deploy docs to production (it's weird) - # run: ./tools/bin/deploy_docusaurus - # env: - # GITHUB_TOKEN: ${{ secrets.OCTAVIA_PAT }} - diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 93e141bbc8d15..034e8cf0a899a 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -51,6 +51,7 @@ jobs: - '.github/**' - 'buildSrc/**' - 'tools/**' + - '*.gradle' cli: - 'airbyte-api/**' - 'octavia-cli/**' diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index b7c0aaa175198..c4f2a863c5857 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -68,6 +68,7 @@ jobs: # 80 gb disk ec2-image-id: ami-0d648081937c75a73 publish-image: + timeout-minutes: 240 needs: start-publish-image-runner runs-on: ${{ needs.start-publish-image-runner.outputs.label }} environment: more-secrets @@ -228,6 +229,7 @@ jobs: run: | git add -u git commit -m "auto-bump connector version" + git pull origin ${{ github.event.inputs.gitref }} git push origin ${{ github.event.inputs.gitref }} - name: Add Version Bump Success Comment if: github.event.inputs.comment-id && github.event.inputs.auto-bump-version == 'true' && success() diff --git a/.github/workflows/publish-connector-command.yml b/.github/workflows/publish-connector-command.yml new file mode 100644 index 0000000000000..4aac116c8dbfb --- /dev/null +++ b/.github/workflows/publish-connector-command.yml @@ -0,0 +1,210 @@ +name: Publish Connector [EXPERIMENTAL] +on: + workflow_dispatch: + inputs: + repo: + description: "Repo to check out code from. Defaults to the main airbyte repo. Set this when building connectors from forked repos." + required: false + default: "airbytehq/airbyte" + gitref: + description: "The git ref to check out from the specified repository." + required: false + default: master + connector: + description: "Airbyte Connector" + required: true + bump-version: + description: "Set to major, minor, or patch to automatically bump connectors version in Dockerfile, definitions.yaml and generate seed spec. You can also do this manually" + required: false + default: "false" + run-tests: + description: "Should run tests" + required: false + default: "true" + comment-id: + description: "The comment-id of the slash command. Used to update the comment with the status." + required: false + +jobs: + find_valid_pat: + name: "Find a PAT with room for actions" + timeout-minutes: 10 + runs-on: ubuntu-latest + outputs: + pat: ${{ steps.variables.outputs.pat }} + steps: + - name: Checkout Airbyte + uses: actions/checkout@v2 + - name: Check PAT rate limits + id: variables + run: | + ./tools/bin/find_non_rate_limited_PAT \ + ${{ secrets.AIRBYTEIO_PAT }} \ + ${{ secrets.OSS_BUILD_RUNNER_GITHUB_PAT }} \ + ${{ secrets.SUPERTOPHER_PAT }} \ + ${{ secrets.DAVINCHIA_PAT }} + ## Gradle Build + # In case of self-hosted EC2 errors, remove this block. + +# start-bump-build-test-connector-runner: +# name: Start Build EC2 Runner +# runs-on: ubuntu-latest +# needs: find_valid_pat +# outputs: +# label: ${{ steps.start-ec2-runner.outputs.label }} +# ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} +# steps: +# - name: Checkout Airbyte +# uses: actions/checkout@v2 +# with: +# repository: ${{ github.event.inputs.repo }} +# ref: ${{ github.event.inputs.gitref }} +# - name: Start AWS Runner +# id: start-ec2-runner +# uses: ./.github/actions/start-aws-runner +# with: +# aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} +# aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} +# github-token: ${{ needs.find_valid_pat.outputs.pat }} +# # 80 gb disk +# ec2-image-id: ami-0d648081937c75a73 +# bump-build-test-connector: +# needs: start-bump-build-test-connector-runner +# runs-on: ${{ needs.start-bump-build-test-connector-runner.outputs.label }} +# environment: more-secrets +# steps: +# ############################ +# ## SET UP ## +# ############################ +# - name: Set up Cloud SDK +# uses: google-github-actions/setup-gcloud@v0 +# with: +# service_account_key: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY }} +# export_default_credentials: true +# - name: Search for valid connector name format +# id: regex +# uses: AsasInnab/regex-action@v1 +# with: +# regex_pattern: "^(connectors|bases)/[a-zA-Z0-9-_]+$" +# regex_flags: "i" # required to be set for this plugin +# search_string: ${{ github.event.inputs.connector }} +# - name: Validate input workflow format +# if: steps.regex.outputs.first_match != github.event.inputs.connector +# run: echo "The connector provided has an invalid format!" && exit 1 +# - name: Link comment to workflow run +# if: github.event.inputs.comment-id +# uses: peter-evans/create-or-update-comment@v1 +# with: +# comment-id: ${{ github.event.inputs.comment-id }} +# body: | +# > :clock2: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} +# - name: Checkout Airbyte +# uses: actions/checkout@v2 +# with: +# repository: ${{ github.event.inputs.repo }} +# ref: ${{ github.event.inputs.gitref }} +# token: ${{ secrets.OCTAVIA_PAT }} +# - name: Install Java +# uses: actions/setup-java@v1 +# with: +# java-version: "17" +# - name: Install Python +# uses: actions/setup-python@v2 +# with: +# python-version: "3.9" +# - name: Install Pyenv and Tox +# run: | +# python3 -m pip install --quiet virtualenv==16.7.9 --user +# python3 -m virtualenv venv +# source venv/bin/activate +# pip install --quiet tox==3.24.4 +# - name: Install yq +# if: github.event.inputs.bump-version != 'false' && success() +# run: | +# sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys CC86BB64 +# sudo add-apt-repository ppa:rmescandon/yq +# sudo apt update +# sudo apt install yq -y +# - name: Test and install CI scripts +# # all CI python packages have the prefix "ci_" +# run: | +# source venv/bin/activate +# tox -r -c ./tools/tox_ci.ini +# pip install --quiet -e ./tools/ci_* +# - name: Get Credentials for ${{ github.event.inputs.connector }} +# run: | +# source venv/bin/activate +# ci_credentials ${{ github.event.inputs.connector }} +# env: +# GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }} +# # TODO: seems like this should run in post-merge workflow +# # - name: Prepare Sentry +# # if: startsWith(github.event.inputs.connector, 'connectors') +# # run: | +# # curl -sL https://sentry.io/get-cli/ | bash +# # - name: Create Sentry Release +# # if: startsWith(github.event.inputs.connector, 'connectors') +# # run: | +# # sentry-cli releases set-commits "${{ env.IMAGE_NAME }}@${{ env.IMAGE_VERSION }}" --auto --ignore-missing +# # env: +# # SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_CONNECTOR_RELEASE_AUTH_TOKEN }} +# # SENTRY_ORG: airbyte-5j +# # SENTRY_PROJECT: airbyte-connectors +# # - name: Build and Test Success Comment +# # if: github.event.inputs.comment-id && success() +# # uses: peter-evans/create-or-update-comment@v1 +# # with: +# # comment-id: ${{ github.event.inputs.comment-id }} +# # body: | +# # > :rocket: Successfully built and tested ${{github.event.inputs.connector}} +# # - name: Build and Test Failure Comment +# # if: github.event.inputs.comment-id && !success() +# # uses: peter-evans/create-or-update-comment@v1 +# # with: +# # comment-id: ${{ github.event.inputs.comment-id }} +# # body: | +# # > :x: Failed to build and test ${{github.event.inputs.connector}} +# # - name: Slack Notification - Failure +# # if: failure() +# # uses: rtCamp/action-slack-notify@master +# # env: +# # SLACK_WEBHOOK: ${{ secrets.BUILD_SLACK_WEBHOOK }} +# # SLACK_USERNAME: Buildozer +# # SLACK_ICON: https://avatars.slack-edge.com/temp/2020-09-01/1342729352468_209b10acd6ff13a649a1.jpg +# # SLACK_COLOR: DC143C +# # SLACK_TITLE: "Failed to build and test connector ${{ github.event.inputs.connector }} from branch ${{ github.ref }}" +# # SLACK_FOOTER: "" +# # - name: Add Final Success Comment +# # if: github.event.inputs.comment-id && success() +# # uses: peter-evans/create-or-update-comment@v1 +# # with: +# # comment-id: ${{ github.event.inputs.comment-id }} +# # body: | +# # > :white_check_mark: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}} +# # - name: Set publish label +# # if: success() +# # run: | +# # echo "set some label on PR" +# # In case of self-hosted EC2 errors, remove this block. +# stop-bump-build-test-connector-runner: +# name: Stop Build EC2 Runner +# needs: +# - start-bump-build-test-connector-runner # required to get output from the start-runner job +# - bump-build-test-connector # required to wait when the main job is done +# - find_valid_pat +# runs-on: ubuntu-latest +# if: ${{ always() }} # required to stop the runner even if the error happened in the previous jobs +# steps: +# - name: Configure AWS credentials +# uses: aws-actions/configure-aws-credentials@v1 +# with: +# aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} +# aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} +# aws-region: us-east-2 +# - name: Stop EC2 runner +# uses: supertopher/ec2-github-runner@base64v1.0.10 +# with: +# mode: stop +# github-token: ${{ needs.find_valid_pat.outputs.pat }} +# label: ${{ needs.start-bump-build-test-connector-runner.outputs.label }} +# ec2-instance-id: ${{ needs.start-bump-build-test-connector-runner.outputs.ec2-instance-id }} diff --git a/.github/workflows/shared-issues.yml b/.github/workflows/shared-issues.yml new file mode 100644 index 0000000000000..03bbb8eb0554f --- /dev/null +++ b/.github/workflows/shared-issues.yml @@ -0,0 +1,16 @@ +name: "Shared Issues" +on: + issues: + types: [opened, labeled, unlabeled] + +jobs: + shared-issues: + runs-on: ubuntu-latest + steps: + - uses: nick-fields/private-action-loader@v3 + with: + pal-repo-token: "${{ secrets.OCTAVIA_PAT }}" + pal-repo-name: airbytehq/workflow-actions@production + # the following input gets passed to the private action + token: "${{ secrets.OCTAVIA_PAT }}" + command: "issue" diff --git a/.github/workflows/slash-commands.yml b/.github/workflows/slash-commands.yml index 6f01c940e447e..1c2aed0b941e1 100644 --- a/.github/workflows/slash-commands.yml +++ b/.github/workflows/slash-commands.yml @@ -22,6 +22,8 @@ jobs: commands: | test test-performance + build-connector + publish-connector publish publish-external publish-cdk diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 477d7ee688daa..dd4f72314461d 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2323,6 +2323,7 @@ components: type: string CustomerioNotificationConfiguration: type: object + nullable: true NotificationType: type: string enum: @@ -3140,10 +3141,10 @@ components: sourceCatalogId: type: string format: uuid + nullable: true WebBackendConnectionCreate: type: object required: - - connection - sourceId - destinationId - status @@ -3218,6 +3219,10 @@ components: $ref: "#/components/schemas/ConnectionStatus" resourceRequirements: $ref: "#/components/schemas/ResourceRequirements" + sourceCatalogId: + type: string + format: uuid + nullable: true WebBackendConnectionUpdate: type: object required: @@ -3258,6 +3263,9 @@ components: type: array items: $ref: "#/components/schemas/WebBackendOperationCreateOrUpdate" + sourceCatalogId: + type: string + format: uuid ConnectionRead: type: object required: @@ -3298,6 +3306,10 @@ components: $ref: "#/components/schemas/ConnectionStatus" resourceRequirements: $ref: "#/components/schemas/ResourceRequirements" + sourceCatalogId: + type: string + format: uuid + nullable: true ConnectionSearch: type: object properties: @@ -3507,6 +3519,7 @@ components: #- unnesting OperatorDbt: type: object + nullable: true required: - gitRepoUrl properties: @@ -3587,6 +3600,7 @@ components: sourceDefinedCursor: description: If the source defines the cursor field, then any other cursor field inputs will be ignored. If it does not, either the user_provided one is used, or the default one is used as a backup. type: boolean + nullable: true defaultCursorField: description: Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves. type: array @@ -3601,6 +3615,7 @@ components: type: string namespace: type: string + nullable: true description: Optional Source-defined namespace. Airbyte streams from the same sources should have the same namespace. Currently only used by JDBC destinations to determine what schema to write to. StreamJsonSchema: type: object @@ -3781,6 +3796,7 @@ components: $ref: "#/components/schemas/AttemptFailureSummary" AttemptStats: type: object + nullable: true properties: recordsEmitted: type: integer @@ -3806,6 +3822,7 @@ components: $ref: "#/components/schemas/AttemptStats" AttemptFailureSummary: type: object + nullable: true required: - failures properties: @@ -4016,6 +4033,7 @@ components: ResourceRequirements: description: optional resource requirements to run workers (blank for unbounded allocations) type: object + nullable: true properties: cpu_request: type: string @@ -4359,6 +4377,9 @@ components: type: boolean resourceRequirements: $ref: "#/components/schemas/ResourceRequirements" + catalogId: + type: string + format: uuid WebBackendConnectionReadList: type: object required: diff --git a/airbyte-bootloader/Dockerfile b/airbyte-bootloader/Dockerfile index e2753b0957d2a..f272323d3ff8f 100644 --- a/airbyte-bootloader/Dockerfile +++ b/airbyte-bootloader/Dockerfile @@ -1,7 +1,7 @@ ARG JDK_VERSION=17.0.1 FROM openjdk:${JDK_VERSION}-slim -ARG VERSION=0.36.3-alpha +ARG VERSION=0.38.1-alpha ENV APPLICATION airbyte-bootloader ENV VERSION ${VERSION} diff --git a/airbyte-bootloader/build.gradle b/airbyte-bootloader/build.gradle index 499b05531c03b..77cf5c872fd3f 100644 --- a/airbyte-bootloader/build.gradle +++ b/airbyte-bootloader/build.gradle @@ -10,13 +10,13 @@ dependencies { implementation project(':airbyte-config:persistence') implementation project(':airbyte-db:lib') implementation project(":airbyte-json-validation") + implementation project(':airbyte-protocol:models') implementation project(':airbyte-scheduler:persistence') - implementation project(':airbyte-scheduler:models') implementation 'io.temporal:temporal-sdk:1.8.1' - implementation "org.flywaydb:flyway-core:7.14.0" + implementation libs.flyway.core - testImplementation "org.testcontainers:postgresql:1.15.3" + testImplementation libs.testcontainers.postgresql testImplementation 'uk.org.webcompere:system-stubs-jupiter:1.2.0' } diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index 3b495c40f90cb..089f5c0f9d417 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -4,9 +4,9 @@ package io.airbyte.bootloader; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; @@ -17,23 +17,27 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DataSourceFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.factory.FlywayFactory; import io.airbyte.db.instance.DatabaseMigrator; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; -import io.airbyte.scheduler.models.Job; -import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.validation.json.JsonValidationException; -import io.temporal.client.WorkflowClient; -import io.temporal.serviceclient.WorkflowServiceStubs; -import io.temporal.serviceclient.WorkflowServiceStubsOptions; import java.io.IOException; import java.util.Optional; import java.util.UUID; +import javax.sql.DataSource; +import org.flywaydb.core.Flyway; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,16 +57,18 @@ public class BootloaderApp { private static final Logger LOGGER = LoggerFactory.getLogger(BootloaderApp.class); private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha"); + private static final String DRIVER_CLASS_NAME = DatabaseDriver.POSTGRESQL.getDriverClassName(); private final Configs configs; - private Runnable postLoadExecution; + private final Runnable postLoadExecution; private final FeatureFlags featureFlags; - - @VisibleForTesting - public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) { - this.configs = configs; - this.featureFlags = featureFlags; - } + private final SecretMigrator secretMigrator; + private ConfigPersistence configPersistence; + private Database configDatabase; + private Database jobDatabase; + private JobPersistence jobPersistence; + private final Flyway configsFlyway; + private final Flyway jobsFlyway; /** * This method is exposed for Airbyte Cloud consumption. This lets us override the seed loading @@ -71,82 +77,155 @@ public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) { * * @param configs * @param postLoadExecution + * @param featureFlags + * @param secretMigrator + * @param configsDslContext */ - public BootloaderApp(final Configs configs, final Runnable postLoadExecution, final FeatureFlags featureFlags) { + public BootloaderApp(final Configs configs, + final Runnable postLoadExecution, + final FeatureFlags featureFlags, + final SecretMigrator secretMigrator, + final DSLContext configsDslContext, + final DSLContext jobsDslContext, + final Flyway configsFlyway, + final Flyway jobsFlyway) { this.configs = configs; this.postLoadExecution = postLoadExecution; this.featureFlags = featureFlags; + this.secretMigrator = secretMigrator; + this.configsFlyway = configsFlyway; + this.jobsFlyway = jobsFlyway; + + initPersistences(configsDslContext, jobsDslContext); } - public BootloaderApp() { - configs = new EnvConfigs(); - featureFlags = new EnvVariableFeatureFlags(); + public BootloaderApp(final Configs configs, + final FeatureFlags featureFlags, + final SecretMigrator secretMigrator, + final DSLContext configsDslContext, + final DSLContext jobsDslContext, + final Flyway configsFlyway, + final Flyway jobsFlyway) { + this.configs = configs; + this.featureFlags = featureFlags; + this.secretMigrator = secretMigrator; + this.configsFlyway = configsFlyway; + this.jobsFlyway = jobsFlyway; + + initPersistences(configsDslContext, jobsDslContext); + postLoadExecution = () -> { try { - final Database configDatabase = - new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) - .getAndInitialize(); - final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() - .maskSecrets(!featureFlags.exposeSecretsInExport()) - .copySecrets(true) - .build(); - final ConfigPersistence configPersistence = - DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor); configPersistence.loadData(YamlSeedConfigPersistence.getDefault()); + + if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) { + if (this.secretMigrator != null) { + this.secretMigrator.migrateSecrets(); + LOGGER.info("Secrets successfully migrated."); + } + } LOGGER.info("Loaded seed data.."); - } catch (final IOException e) { - e.printStackTrace(); + } catch (final IOException | JsonValidationException e) { + throw new RuntimeException(e); } }; } public void load() throws Exception { - LOGGER.info("Setting up config database and default workspace.."); - - try ( - final Database configDatabase = - new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) - .getAndInitialize(); - final Database jobDatabase = - new JobsDatabaseInstance(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()).getAndInitialize()) { - LOGGER.info("Created initial jobs and configs database..."); - - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); - final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion(); - assertNonBreakingMigration(jobPersistence, currAirbyteVersion); - - runFlywayMigration(configs, configDatabase, jobDatabase); - LOGGER.info("Ran Flyway migrations..."); - - final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() - .maskSecrets(!featureFlags.exposeSecretsInExport()) - .copySecrets(false) - .build(); - final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor); - final ConfigRepository configRepository = - new ConfigRepository(configPersistence, configDatabase); - - createWorkspaceIfNoneExists(configRepository); - LOGGER.info("Default workspace created.."); - - createDeploymentIfNoneExists(jobPersistence); - LOGGER.info("Default deployment created.."); - - jobPersistence.setVersion(currAirbyteVersion.serialize()); - LOGGER.info("Set version to {}", currAirbyteVersion); - } + LOGGER.info("Setting up config database and default workspace..."); + final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); + final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion(); + assertNonBreakingMigration(jobPersistence, currAirbyteVersion); - if (postLoadExecution != null) { - postLoadExecution.run(); - LOGGER.info("Finished running post load Execution.."); - } + // TODO Will be converted to an injected singleton during DI migration + final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); + final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); + + runFlywayMigration(configs, configDbMigrator, jobDbMigrator); + LOGGER.info("Ran Flyway migrations."); + + final ConfigRepository configRepository = + new ConfigRepository(configPersistence, configDatabase); + + createWorkspaceIfNoneExists(configRepository); + LOGGER.info("Default workspace created."); + + createDeploymentIfNoneExists(jobPersistence); + LOGGER.info("Default deployment created."); + + jobPersistence.setVersion(currAirbyteVersion.serialize()); + LOGGER.info("Set version to {}", currAirbyteVersion); - LOGGER.info("Finished bootstrapping Airbyte environment.."); + postLoadExecution.run(); + + LOGGER.info("Finished running post load Execution."); + + LOGGER.info("Finished bootstrapping Airbyte environment."); + } + + private static Database getConfigDatabase(final DSLContext dslContext) throws IOException { + return new ConfigsDatabaseInstance(dslContext).getAndInitialize(); + } + + private static ConfigPersistence getConfigPersistence(final Database configDatabase) throws IOException { + final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() + .maskSecrets(true) + .copySecrets(true) + .build(); + + return DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor); + } + + private static Database getJobDatabase(final DSLContext dslContext) throws IOException { + return new JobsDatabaseInstance(dslContext).getAndInitialize(); + } + + private static JobPersistence getJobPersistence(final Database jobDatabase) throws IOException { + return new DefaultJobPersistence(jobDatabase); + } + + private void initPersistences(final DSLContext configsDslContext, final DSLContext jobsDslContext) { + try { + configDatabase = getConfigDatabase(configsDslContext); + configPersistence = getConfigPersistence(configDatabase); + jobDatabase = getJobDatabase(jobsDslContext); + jobPersistence = getJobPersistence(jobDatabase); + } catch (final IOException e) { + LOGGER.error("Unable to initialize persistence.", e); + } } public static void main(final String[] args) throws Exception { - final var bootloader = new BootloaderApp(); - bootloader.load(); + final Configs configs = new EnvConfigs(); + final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + + // Manual configuration that will be replaced by Dependency Injection in the future + final DataSource configsDataSource = DataSourceFactory.create(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), + DRIVER_CLASS_NAME, configs.getConfigDatabaseUrl()); + final DataSource jobsDataSource = + DataSourceFactory.create(configs.getDatabaseUser(), configs.getDatabasePassword(), DRIVER_CLASS_NAME, configs.getDatabaseUrl()); + + try (final DSLContext configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + final DSLContext jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + // TODO Will be converted to an injected singleton during DI migration + final Database configDatabase = getConfigDatabase(configsDslContext); + final ConfigPersistence configPersistence = getConfigPersistence(configDatabase); + final Database jobDatabase = getJobDatabase(jobsDslContext); + final JobPersistence jobPersistence = getJobPersistence(jobDatabase); + final SecretMigrator secretMigrator = + new SecretMigrator(configPersistence, jobPersistence, SecretPersistence.getLongLived(configsDslContext, configs)); + final Flyway configsFlyway = FlywayFactory.create(configsDataSource, BootloaderApp.class.getSimpleName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, + ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); + final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, BootloaderApp.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER, + JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); + + // Ensure that the database resources are closed on application shutdown + CloseableShutdownHook.registerRuntimeShutdownHook(configsDataSource, jobsDataSource, configsDslContext, jobsDslContext); + + final var bootloader = new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + bootloader.load(); + } } private static void createDeploymentIfNoneExists(final JobPersistence jobPersistence) throws IOException { @@ -213,10 +292,7 @@ static boolean isLegalUpgrade(final AirbyteVersion airbyteDatabaseVersion, final return !isUpgradingThroughVersionBreak; } - private static void runFlywayMigration(final Configs configs, final Database configDatabase, final Database jobDatabase) { - final DatabaseMigrator configDbMigrator = new ConfigsDatabaseMigrator(configDatabase, BootloaderApp.class.getSimpleName()); - final DatabaseMigrator jobDbMigrator = new JobsDatabaseMigrator(jobDatabase, BootloaderApp.class.getSimpleName()); - + private static void runFlywayMigration(final Configs configs, final DatabaseMigrator configDbMigrator, final DatabaseMigrator jobDbMigrator) { configDbMigrator.createBaseline(); jobDbMigrator.createBaseline(); @@ -230,16 +306,4 @@ private static void runFlywayMigration(final Configs configs, final Database con } } - private static void cleanupZombies(final JobPersistence jobPersistence) throws IOException { - final Configs configs = new EnvConfigs(); - final WorkflowClient wfClient = - WorkflowClient.newInstance(WorkflowServiceStubs.newInstance( - WorkflowServiceStubsOptions.newBuilder().setTarget(configs.getTemporalHost()).build())); - for (final Job zombieJob : jobPersistence.listJobsWithStatus(JobStatus.RUNNING)) { - LOGGER.info("Kill zombie job {} for connection {}", zombieJob.getId(), zombieJob.getScope()); - wfClient.newUntypedWorkflowStub("sync_" + zombieJob.getId()) - .terminate("Zombie"); - } - } - } diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java new file mode 100644 index 0000000000000..d00978b7875c3 --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import static io.airbyte.config.persistence.split_secrets.SecretsHelpers.COORDINATE_FIELD; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.json.JsonPaths; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.split_secrets.SecretCoordinate; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.config.persistence.split_secrets.SecretsHelpers; +import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; + +@AllArgsConstructor +@Slf4j +public class SecretMigrator { + + private final ConfigPersistence configPersistence; + private final JobPersistence jobPersistence; + private final Optional secretPersistence; + + @Value + static class ConnectorConfiguration { + + private final UUID workspace; + private final JsonNode configuration; + private final JsonNode spec; + + } + + /** + * Perform a secret migration. It will load all the actor specs extract the secret JsonPath from it. + * Then for all the secret that are stored in a plain text format, it will save the plain text in + * the secret manager and store the coordinate in the config DB. + */ + public void migrateSecrets() throws JsonValidationException, IOException { + if (secretPersistence.isEmpty()) { + log.info("No secret persistence is provided, the migration won't be run "); + + return; + } + final List standardSourceDefinitions = + configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class); + + final Map definitionIdToSourceSpecs = standardSourceDefinitions + .stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, + def -> def.getSpec().getConnectionSpecification())); + + final List sources = configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); + + migrateSources(sources, definitionIdToSourceSpecs); + + final List standardDestinationDefinitions = + configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, + StandardDestinationDefinition.class); + + final Map definitionIdToDestinationSpecs = standardDestinationDefinitions.stream() + .collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, + def -> def.getSpec().getConnectionSpecification())); + + final List destinations = configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); + + migrateDestinations(destinations, definitionIdToDestinationSpecs); + + jobPersistence.setSecretMigrationDone(); + } + + /** + * This is migrating the secrets for the source actors + */ + @VisibleForTesting + void migrateSources(final List sources, final Map definitionIdToSourceSpecs) + throws JsonValidationException, IOException { + log.info("Migrating Sources"); + final List sourceConnections = sources.stream() + .map(source -> { + final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration( + source.getWorkspaceId(), + source.getConfiguration(), + definitionIdToSourceSpecs.get(source.getSourceDefinitionId())), + () -> UUID.randomUUID()); + source.setConfiguration(migratedConfig); + return source; + }) + .toList(); + + for (final SourceConnection source : sourceConnections) { + configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source); + } + } + + /** + * This is migrating the secrets for the destination actors + */ + @VisibleForTesting + void migrateDestinations(final List destinations, final Map definitionIdToDestinationSpecs) + throws JsonValidationException, IOException { + log.info("Migration Destinations"); + + final List destinationConnections = destinations.stream().map(destination -> { + final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration( + destination.getWorkspaceId(), + destination.getConfiguration(), + definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())), + () -> UUID.randomUUID()); + destination.setConfiguration(migratedConfig); + return destination; + }) + .toList(); + for (final DestinationConnection destination : destinationConnections) { + configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination); + } + } + + /** + * This is a generic method to migrate an actor configuration It will extract the secret path form + * the provided spec and then replace them by coordinates in the actor configuration + */ + @VisibleForTesting + JsonNode migrateConfiguration(final ConnectorConfiguration connectorConfiguration, final Supplier uuidProvider) { + if (connectorConfiguration.getSpec() == null) { + throw new IllegalStateException("No connector definition to match the connector"); + } + + final AtomicReference connectorConfigurationJson = new AtomicReference<>(connectorConfiguration.getConfiguration()); + final List uniqSecretPaths = getSecretPath(connectorConfiguration.getSpec()) + .stream() + .flatMap(secretPath -> getAllExplodedPath(connectorConfigurationJson.get(), secretPath).stream()) + .toList(); + + final UUID workspaceId = connectorConfiguration.getWorkspace(); + uniqSecretPaths.forEach(secretPath -> { + final Optional secretValue = getValueForPath(connectorConfigurationJson.get(), secretPath); + if (secretValue.isEmpty()) { + throw new IllegalStateException("Missing secret for the path: " + secretPath); + } + + // Only migrate plain text. + if (secretValue.get().isTextual()) { + final JsonNode stringSecretValue = secretValue.get(); + + final SecretCoordinate coordinate = + new SecretCoordinate(SecretsHelpers.getCoordinatorBase("airbyte_workspace_", workspaceId, uuidProvider), 1); + secretPersistence.get().write(coordinate, stringSecretValue.textValue()); + connectorConfigurationJson.set(replaceAtJsonNode(connectorConfigurationJson.get(), secretPath, + Jsons.jsonNode(Map.of(COORDINATE_FIELD, coordinate.getFullCoordinate())))); + } else { + log.error("Not migrating already migrated secrets"); + } + + }); + + return connectorConfigurationJson.get(); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + JsonNode replaceAtJsonNode(final JsonNode connectorConfigurationJson, final String secretPath, final JsonNode replacement) { + return JsonPaths.replaceAtJsonNode(connectorConfigurationJson, secretPath, replacement); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + List getSecretPath(final JsonNode specs) { + return SecretsHelpers.getSortedSecretPaths(specs); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + List getAllExplodedPath(final JsonNode node, final String path) { + return JsonPaths.getPaths(node, path); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + Optional getValueForPath(final JsonNode node, final String path) { + return JsonPaths.getSingleValue(node, path); + } + +} diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index 243bbb8227b77..7436ac1ceb75f 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -4,24 +4,50 @@ package io.airbyte.bootloader; +import static io.airbyte.config.Configs.SecretPersistenceType.TESTING_CONFIG_DB_TABLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.ObjectMapper; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.init.YamlSeedConfigPersistence; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DataSourceFactory; +import io.airbyte.db.factory.FlywayFactory; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; import io.airbyte.scheduler.persistence.DefaultJobPersistence; +import java.io.Closeable; +import java.io.IOException; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import javax.sql.DataSource; import lombok.val; +import org.flywaydb.core.Flyway; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.testcontainers.containers.PostgreSQLContainer; @@ -32,16 +58,36 @@ @ExtendWith(SystemStubsExtension.class) public class BootloaderAppTest { - @SystemStub - private EnvironmentVariables environmentVariables; + private PostgreSQLContainer container; + private DataSource configsDataSource; + private DataSource jobsDataSource; - @Test - void testBootloaderAppBlankDb() throws Exception { - val container = new PostgreSQLContainer<>("postgres:13-alpine") + @BeforeEach + void setup() { + container = new PostgreSQLContainer<>("postgres:13-alpine") .withDatabaseName("public") .withUsername("docker") .withPassword("docker"); container.start(); + + configsDataSource = + DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); + jobsDataSource = + DataSourceFactory.create(container.getUsername(), container.getPassword(), container.getDriverClassName(), container.getJdbcUrl()); + } + + @AfterEach + void cleanup() throws IOException { + closeDataSource(configsDataSource); + closeDataSource(jobsDataSource); + container.stop(); + } + + @SystemStub + private EnvironmentVariables environmentVariables; + + @Test + void testBootloaderAppBlankDb() throws Exception { val version = "0.33.0-alpha"; val mockedConfigs = mock(Configs.class); @@ -57,6 +103,8 @@ void testBootloaderAppBlankDb() throws Exception { val mockedFeatureFlags = mock(FeatureFlags.class); when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false); + val mockedSecretMigrator = mock(SecretMigrator.class); + // Although we are able to inject mocked configs into the Bootloader, a particular migration in the // configs database // requires the env var to be set. Flyway prevents injection, so we dynamically set this instead. @@ -64,28 +112,149 @@ void testBootloaderAppBlankDb() throws Exception { environmentVariables.set("DATABASE_PASSWORD", "docker"); environmentVariables.set("DATABASE_URL", container.getJdbcUrl()); - val bootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags); - bootloader.load(); + try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + val configsFlyway = createConfigsFlyway(configsDataSource); + val jobsFlyway = createJobsFlyway(jobsDataSource); + + val bootloader = + new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + bootloader.load(); + + val jobDatabase = new JobsDatabaseInstance(jobsDslContext).getInitialized(); + val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); + assertEquals("0.35.62.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); + + val configDatabase = new ConfigsDatabaseInstance(configsDslContext).getAndInitialize(); + val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); + assertEquals("0.35.65.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + + val jobsPersistence = new DefaultJobPersistence(jobDatabase); + assertEquals(version, jobsPersistence.getVersion().get()); + + assertNotEquals(Optional.empty(), jobsPersistence.getDeployment().get()); + } + } + + @Test + void testBootloaderAppRunSecretMigration() throws Exception { + val version = "0.33.0-alpha"; + + val mockedConfigs = mock(Configs.class); + when(mockedConfigs.getConfigDatabaseUrl()).thenReturn(container.getJdbcUrl()); + when(mockedConfigs.getConfigDatabaseUser()).thenReturn(container.getUsername()); + when(mockedConfigs.getConfigDatabasePassword()).thenReturn(container.getPassword()); + when(mockedConfigs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); + when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername()); + when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword()); + when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(version)); + when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true); + when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE); + + val mockedFeatureFlags = mock(FeatureFlags.class); + when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false); + + final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() + .copySecrets(true) + .maskSecrets(true) + .build(); + + try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + val configsFlyway = createConfigsFlyway(configsDataSource); + val jobsFlyway = createJobsFlyway(jobsDataSource); + + final Database configDatabase = new Database(configsDslContext); + final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor); + + val jobsPersistence = new DefaultJobPersistence(configDatabase); + + val spiedSecretMigrator = + spy(new SecretMigrator(configPersistence, jobsPersistence, SecretPersistence.getLongLived(configsDslContext, mockedConfigs))); + + // Although we are able to inject mocked configs into the Bootloader, a particular migration in the + // configs database requires the env var to be set. Flyway prevents injection, so we dynamically set + // this instead. + environmentVariables.set("DATABASE_USER", "docker"); + environmentVariables.set("DATABASE_PASSWORD", "docker"); + environmentVariables.set("DATABASE_URL", container.getJdbcUrl()); + + // Bootstrap the database for the test + val initBootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, null, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + initBootloader.load(); + + final ConfigPersistence localSchema = YamlSeedConfigPersistence.getDefault(); + final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase); + configRepository.loadDataNoSecrets(localSchema); + + final String sourceSpecs = """ + { + "account_id": "1234567891234567", + "start_date": "2022-04-01T00:00:00Z", + "access_token": "nonhiddensecret", + "include_deleted": false, + "fetch_thumbnail_images": false + } + + """; + + final ObjectMapper mapper = new ObjectMapper(); + + final UUID workspaceId = UUID.randomUUID(); + configRepository.writeStandardWorkspace(new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withName("wName") + .withSlug("wSlug") + .withEmail("email@mail.com") + .withTombstone(false) + .withInitialSetupComplete(false)); + final UUID sourceId = UUID.randomUUID(); + configRepository.writeSourceConnectionNoSecrets(new SourceConnection() + .withSourceDefinitionId(UUID.fromString("e7778cfc-e97c-4458-9ecb-b4f2bba8946c")) // Facebook Marketing + .withSourceId(sourceId) + .withName("test source") + .withWorkspaceId(workspaceId) + .withConfiguration(mapper.readTree(sourceSpecs))); + + when(mockedFeatureFlags.forceSecretMigration()).thenReturn(false); + + // Perform secrets migration + var bootloader = + new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + boolean isMigrated = jobsPersistence.isSecretMigrated(); - val jobDatabase = new JobsDatabaseInstance( - container.getUsername(), - container.getPassword(), - container.getJdbcUrl()).getInitialized(); - val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, this.getClass().getName()); - assertEquals("0.35.62.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); + assertFalse(isMigrated); - val configDatabase = new ConfigsDatabaseInstance( - mockedConfigs.getConfigDatabaseUser(), - mockedConfigs.getConfigDatabasePassword(), - mockedConfigs.getConfigDatabaseUrl()) - .getAndInitialize(); - val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName()); - assertEquals("0.35.65.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + bootloader.load(); + verify(spiedSecretMigrator).migrateSecrets(); - val jobsPersistence = new DefaultJobPersistence(jobDatabase); - assertEquals(version, jobsPersistence.getVersion().get()); + final SourceConnection sourceConnection = configRepository.getSourceConnection(sourceId); - assertNotEquals(Optional.empty(), jobsPersistence.getDeployment().get()); + assertFalse(sourceConnection.getConfiguration().toString().contains("nonhiddensecret")); + assertTrue(sourceConnection.getConfiguration().toString().contains("_secret")); + + isMigrated = jobsPersistence.isSecretMigrated(); + assertTrue(isMigrated); + + // Verify that the migration does not happen if it has already been performed + reset(spiedSecretMigrator); + // We need to re-create the bootloader because it is closing the persistence after running load + bootloader = + new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + bootloader.load(); + verifyNoInteractions(spiedSecretMigrator); + + // Verify that the migration occurs if the force migration feature flag is enabled + reset(spiedSecretMigrator); + when(mockedFeatureFlags.forceSecretMigration()).thenReturn(true); + // We need to re-create the bootloader because it is closing the persistence after running load + bootloader = + new BootloaderApp(mockedConfigs, mockedFeatureFlags, spiedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway); + bootloader.load(); + verify(spiedSecretMigrator).migrateSecrets(); + } } @Test @@ -113,12 +282,6 @@ void testIsLegalUpgradePredicate() { @Test void testPostLoadExecutionExecutes() throws Exception { final var testTriggered = new AtomicBoolean(); - - val container = new PostgreSQLContainer<>("postgres:13-alpine") - .withDatabaseName("public") - .withUsername("docker") - .withPassword("docker"); - container.start(); val version = "0.33.0-alpha"; val mockedConfigs = mock(Configs.class); @@ -134,9 +297,36 @@ void testPostLoadExecutionExecutes() throws Exception { val mockedFeatureFlags = mock(FeatureFlags.class); when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false); - new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags).load(); + val mockedSecretMigrator = mock(SecretMigrator.class); + + try (val configsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES); + val jobsDslContext = DSLContextFactory.create(configsDataSource, SQLDialect.POSTGRES)) { + + val configsFlyway = createConfigsFlyway(configsDataSource); + val jobsFlyway = createJobsFlyway(jobsDataSource); + + new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext, + configsFlyway, jobsFlyway) + .load(); + + assertTrue(testTriggered.get()); + } + } + + private Flyway createConfigsFlyway(final DataSource dataSource) { + return FlywayFactory.create(dataSource, getClass().getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, + ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION); + } + + private Flyway createJobsFlyway(final DataSource dataSource) { + return FlywayFactory.create(dataSource, getClass().getName(), JobsDatabaseMigrator.DB_IDENTIFIER, + JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); + } - assertTrue(testTriggered.get()); + private void closeDataSource(final DataSource dataSource) throws IOException { + if (dataSource instanceof Closeable closeable) { + closeable.close(); + } } } diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/SecretMigratorTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/SecretMigratorTest.java new file mode 100644 index 0000000000000..7240778710ef8 --- /dev/null +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/SecretMigratorTest.java @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.bootloader.SecretMigrator.ConnectorConfiguration; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.split_secrets.SecretCoordinate; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class SecretMigratorTest { + + private final UUID workspaceId = UUID.randomUUID(); + + @Mock + private ConfigPersistence configPersistence; + + @Mock + private SecretPersistence secretPersistence; + + @Mock + private JobPersistence jobPersistence; + + private SecretMigrator secretMigrator; + + @BeforeEach + void setup() { + secretMigrator = Mockito.spy(new SecretMigrator(configPersistence, jobPersistence, Optional.of(secretPersistence))); + } + + @Test + public void testMigrateSecret() throws JsonValidationException, IOException { + final JsonNode sourceSpec = Jsons.jsonNode("sourceSpec"); + final UUID sourceDefinitionId = UUID.randomUUID(); + final StandardSourceDefinition standardSourceDefinition = new StandardSourceDefinition() + .withSourceDefinitionId(sourceDefinitionId) + .withSpec( + new ConnectorSpecification() + .withConnectionSpecification(sourceSpec)); + final Map standardSourceDefinitions = new HashMap<>(); + standardSourceDefinitions.put(sourceDefinitionId, standardSourceDefinition.getSpec().getConnectionSpecification()); + Mockito.when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(Lists.newArrayList(standardSourceDefinition)); + + final JsonNode sourceConfiguration = Jsons.jsonNode("sourceConfiguration"); + final SourceConnection sourceConnection = new SourceConnection() + .withSourceId(UUID.randomUUID()) + .withSourceDefinitionId(sourceDefinitionId) + .withConfiguration(sourceConfiguration) + .withWorkspaceId(workspaceId); + final List sourceConnections = Lists.newArrayList(sourceConnection); + Mockito.when(configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class)) + .thenReturn(sourceConnections); + + final JsonNode destinationSpec = Jsons.jsonNode("destinationSpec"); + final UUID destinationDefinitionId = UUID.randomUUID(); + final StandardDestinationDefinition standardDestinationDefinition = new StandardDestinationDefinition() + .withDestinationDefinitionId(destinationDefinitionId) + .withSpec( + new ConnectorSpecification() + .withConnectionSpecification(destinationSpec)); + final Map standardDestinationDefinitions = new HashMap<>(); + standardDestinationDefinitions.put(destinationDefinitionId, standardDestinationDefinition.getSpec().getConnectionSpecification()); + Mockito.when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) + .thenReturn(Lists.newArrayList(standardDestinationDefinition)); + + final JsonNode destinationConfiguration = Jsons.jsonNode("destinationConfiguration"); + final DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(UUID.randomUUID()) + .withDestinationDefinitionId(destinationDefinitionId) + .withConfiguration(destinationConfiguration) + .withWorkspaceId(workspaceId); + final List destinationConnections = Lists.newArrayList(destinationConnection); + Mockito.when(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class)) + .thenReturn(destinationConnections); + + // Mockito.doNothing().when(secretMigrator).migrateDestinations(Mockito.any(), Mockito.any()); + + final String path = "Mocked static call source"; + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getSecretPath(sourceSpec); + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getAllExplodedPath(sourceConfiguration, path); + final String sourceSecret = "sourceSecret"; + Mockito.doReturn(Optional.of(Jsons.jsonNode(sourceSecret))).when(secretMigrator).getValueForPath(sourceConfiguration, path); + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getSecretPath(destinationSpec); + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getAllExplodedPath(destinationConfiguration, path); + final String destinationSecret = "destinationSecret"; + Mockito.doReturn(Optional.of(Jsons.jsonNode(destinationSecret))).when(secretMigrator).getValueForPath(destinationConfiguration, path); + + Mockito.doReturn(Jsons.jsonNode("sanitized")).when(secretMigrator).replaceAtJsonNode(Mockito.any(), Mockito.any(), Mockito.any()); + secretMigrator.migrateSecrets(); + + Mockito.verify(secretMigrator).migrateSources(sourceConnections, standardSourceDefinitions); + Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.eq(sourceSecret)); + secretPersistence.write(Mockito.any(), Mockito.any()); + Mockito.verify(secretMigrator).migrateDestinations(destinationConnections, standardDestinationDefinitions); + Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.eq(destinationSecret)); + + Mockito.verify(jobPersistence).setSecretMigrationDone(); + } + + @Test + void testSourceMigration() throws JsonValidationException, IOException { + final UUID definitionId1 = UUID.randomUUID(); + final UUID definitionId2 = UUID.randomUUID(); + final UUID sourceId1 = UUID.randomUUID(); + final UUID sourceId2 = UUID.randomUUID(); + final JsonNode sourceConfiguration1 = Jsons.jsonNode("conf1"); + final JsonNode sourceConfiguration2 = Jsons.jsonNode("conf2"); + final JsonNode sourceDefinition1 = Jsons.jsonNode("def1"); + final JsonNode sourceDefinition2 = Jsons.jsonNode("def2"); + final SourceConnection sourceConnection1 = new SourceConnection() + .withSourceId(sourceId1) + .withSourceDefinitionId(definitionId1) + .withConfiguration(sourceConfiguration1); + final SourceConnection sourceConnection2 = new SourceConnection() + .withSourceId(sourceId2) + .withSourceDefinitionId(definitionId2) + .withConfiguration(sourceConfiguration2); + + final List sources = Lists.newArrayList(sourceConnection1, sourceConnection2); + final Map definitionIdToDestinationSpecs = new HashMap<>(); + definitionIdToDestinationSpecs.put(definitionId1, sourceDefinition1); + definitionIdToDestinationSpecs.put(definitionId2, sourceDefinition2); + + Mockito.doReturn(Jsons.emptyObject()).when(secretMigrator).migrateConfiguration( + Mockito.any(), + Mockito.any()); + + secretMigrator.migrateSources(sources, definitionIdToDestinationSpecs); + + Mockito.verify(configPersistence).writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceId1.toString(), sourceConnection1); + Mockito.verify(configPersistence).writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceId2.toString(), sourceConnection2); + } + + @Test + void testDestinationMigration() throws JsonValidationException, IOException { + final UUID definitionId1 = UUID.randomUUID(); + final UUID definitionId2 = UUID.randomUUID(); + final UUID destinationId1 = UUID.randomUUID(); + final UUID destinationId2 = UUID.randomUUID(); + final JsonNode destinationConfiguration1 = Jsons.jsonNode("conf1"); + final JsonNode destinationConfiguration2 = Jsons.jsonNode("conf2"); + final JsonNode destinationDefinition1 = Jsons.jsonNode("def1"); + final JsonNode destinationDefinition2 = Jsons.jsonNode("def2"); + final DestinationConnection destinationConnection1 = new DestinationConnection() + .withDestinationId(destinationId1) + .withDestinationDefinitionId(definitionId1) + .withConfiguration(destinationConfiguration1); + final DestinationConnection destinationConnection2 = new DestinationConnection() + .withDestinationId(destinationId2) + .withDestinationDefinitionId(definitionId2) + .withConfiguration(destinationConfiguration2); + + final List destinations = Lists.newArrayList(destinationConnection1, destinationConnection2); + final Map definitionIdToDestinationSpecs = new HashMap<>(); + definitionIdToDestinationSpecs.put(definitionId1, destinationDefinition1); + definitionIdToDestinationSpecs.put(definitionId2, destinationDefinition2); + + Mockito.doReturn(Jsons.emptyObject()).when(secretMigrator).migrateConfiguration( + Mockito.any(), + Mockito.any()); + + secretMigrator.migrateDestinations(destinations, definitionIdToDestinationSpecs); + + Mockito.verify(configPersistence).writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId1.toString(), destinationConnection1); + Mockito.verify(configPersistence).writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId2.toString(), destinationConnection2); + } + + @Test + void testMigrateConfigurationWithoutSpecs() { + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(null, null, null); + + Assertions.assertThrows(IllegalStateException.class, () -> secretMigrator.migrateConfiguration(connectorConfiguration, null)); + } + + @Test + void testMissingSecret() { + final List secretPathList = Lists.newArrayList("secretPath"); + + Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any()); + Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + Mockito.doReturn(Optional.empty()).when(secretMigrator).getValueForPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject()); + Assertions.assertThrows(IllegalStateException.class, () -> secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID())); + } + + @Test + void testMigrateConfiguration() { + final List secretPathList = Lists.newArrayList("$.secretPath"); + + Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any()); + Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + Mockito.doReturn(Optional.of(Jsons.jsonNode(secretPathList.get(0)))).when(secretMigrator).getValueForPath(Mockito.any(), + Mockito.eq(secretPathList.get(0))); + + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject()); + + secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID()); + Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.any()); + } + + @Test + void testMigrateConfigurationAlreadyInSecretManager() { + final List secretPathList = Lists.newArrayList("$.secretPath"); + + Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any()); + Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + + final SecretCoordinate fakeCoordinate = new SecretCoordinate("fake", 1); + Mockito.doReturn(Optional.of(Jsons.jsonNode(fakeCoordinate))).when(secretMigrator).getValueForPath(Mockito.any(), + Mockito.eq(secretPathList.get(0))); + + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject()); + + secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID()); + Mockito.verify(secretPersistence, Mockito.times(0)).write(Mockito.any(), Mockito.any()); + } + +} diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 145ef289cc7c3..cd74c76c10b5b 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.1.56 +- Update protocol models to include `AirbyteTraceMessage` +- Emit an `AirbyteTraceMessage` on uncaught exceptions +- Add `AirbyteTracedException` + ## 0.1.55 Add support for reading the spec from a YAML file (`spec.yaml`) diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index 493911b3293e5..2292628ab5d5d 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -11,15 +11,17 @@ import tempfile from typing import Any, Dict, Iterable, List -from airbyte_cdk.logger import AirbyteLogFormatter, init_logger +from airbyte_cdk.exception_handler import init_uncaught_exception_handler +from airbyte_cdk.logger import init_logger from airbyte_cdk.models import AirbyteMessage, Status, Type from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, get_secret_values, split_config from airbyte_cdk.sources.utils.sentry import AirbyteSentry -from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets +from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets logger = init_logger("airbyte") +init_uncaught_exception_handler(logger) class AirbyteEntrypoint(object): @@ -89,7 +91,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: # Now that we have the config, we can use it to get a list of ai airbyte_secrets # that we should filter in logging to avoid leaking secrets config_secrets = get_secrets(self.source, config, self.logger) - AirbyteLogFormatter.update_secrets(config_secrets) + update_secrets(config_secrets) # Remove internal flags from config before validating so # jsonschema's additionalProperties flag wont fail the validation diff --git a/airbyte-cdk/python/airbyte_cdk/exception_handler.py b/airbyte-cdk/python/airbyte_cdk/exception_handler.py new file mode 100644 index 0000000000000..3e295d61bdfe5 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/exception_handler.py @@ -0,0 +1,34 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import logging +import sys + +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +def init_uncaught_exception_handler(logger: logging.Logger) -> None: + """ + Handles uncaught exceptions by emitting an AirbyteTraceMessage and making sure they are not + printed to the console without having secrets removed. + """ + + def hook_fn(exception_type, exception_value, traceback_): + # For developer ergonomics, we want to see the stack trace in the logs when we do a ctrl-c + if issubclass(exception_type, KeyboardInterrupt): + sys.__excepthook__(exception_type, exception_value, traceback_) + return + + logger.fatal(exception_value, exc_info=exception_value) + + # emit an AirbyteTraceMessage for any exception that gets to this spot + traced_exc = ( + exception_value + if issubclass(exception_type, AirbyteTracedException) + else AirbyteTracedException.from_exception(exception_value) + ) + + traced_exc.emit_message() + + sys.excepthook = hook_fn diff --git a/airbyte-cdk/python/airbyte_cdk/logger.py b/airbyte-cdk/python/airbyte_cdk/logger.py index a54c59325fbd8..add926ccf18f0 100644 --- a/airbyte-cdk/python/airbyte_cdk/logger.py +++ b/airbyte-cdk/python/airbyte_cdk/logger.py @@ -4,11 +4,11 @@ import logging import logging.config -import sys import traceback -from typing import List, Tuple +from typing import Tuple from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage +from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from deprecated import deprecated TRACE_LEVEL_NUM = 5 @@ -32,42 +32,18 @@ } -def init_unhandled_exception_output_filtering(logger: logging.Logger) -> None: - """ - Make sure unhandled exceptions are not printed to the console without passing through the Airbyte logger and having - secrets removed. - """ - - def hook_fn(exception_type, exception_value, traceback_): - # For developer ergonomics, we want to see the stack trace in the logs when we do a ctrl-c - if issubclass(exception_type, KeyboardInterrupt): - sys.__excepthook__(exception_type, exception_value, traceback_) - else: - logger.critical(exception_value, exc_info=exception_value) - - sys.excepthook = hook_fn - - def init_logger(name: str = None): """Initial set up of logger""" logging.addLevelName(TRACE_LEVEL_NUM, "TRACE") logger = logging.getLogger(name) logger.setLevel(TRACE_LEVEL_NUM) logging.config.dictConfig(LOGGING_CONFIG) - init_unhandled_exception_output_filtering(logger) return logger class AirbyteLogFormatter(logging.Formatter): """Output log records using AirbyteMessage""" - _secrets: List[str] = [] - - @classmethod - def update_secrets(cls, secrets: List[str]): - """Update the list of secrets to be replaced in the log message""" - cls._secrets = secrets - # Transforming Python log levels to Airbyte protocol log levels level_mapping = { logging.FATAL: "FATAL", @@ -82,8 +58,7 @@ def format(self, record: logging.LogRecord) -> str: """Return a JSON representation of the log message""" message = super().format(record) airbyte_level = self.level_mapping.get(record.levelno, "INFO") - for secret in AirbyteLogFormatter._secrets: - message = message.replace(secret, "****") + message = filter_secrets(message) log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=airbyte_level, message=message)) return log_message.json(exclude_unset=True) diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index f703cbd8080c7..e309b7f00d8d7 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -20,6 +20,7 @@ class Type(Enum): SPEC = "SPEC" CONNECTION_STATUS = "CONNECTION_STATUS" CATALOG = "CATALOG" + TRACE = "TRACE" class AirbyteRecordMessage(BaseModel): @@ -59,6 +60,25 @@ class Config: message: str = Field(..., description="the log message") +class TraceType(Enum): + ERROR = "ERROR" + + +class FailureType(Enum): + system_error = "system_error" + config_error = "config_error" + + +class AirbyteErrorTraceMessage(BaseModel): + class Config: + extra = Extra.allow + + message: str = Field(..., description="A user-friendly message that indicates the cause of the error") + internal_message: Optional[str] = Field(None, description="The internal error that caused the failure") + stack_trace: Optional[str] = Field(None, description="The full stack trace of the error") + failure_type: Optional[FailureType] = Field(None, description="The type of error") + + class Status(Enum): SUCCEEDED = "SUCCEEDED" FAILED = "FAILED" @@ -137,6 +157,15 @@ class OAuthConfigSpecification(BaseModel): ) +class AirbyteTraceMessage(BaseModel): + class Config: + extra = Extra.allow + + type: TraceType = Field(..., description="the type of trace message", title="trace type") + emitted_at: float = Field(..., description="the time in ms that the message was emitted") + error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object") + + class AirbyteStream(BaseModel): class Config: extra = Extra.allow @@ -246,6 +275,10 @@ class Config: None, description="schema message: the state. Must be the last message produced. The platform uses this information", ) + trace: Optional[AirbyteTraceMessage] = Field( + None, + description="trace message: a message to communicate information about the status and performance of a connector", + ) class AirbyteProtocol(BaseModel): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py index 2401e51005d52..6a7eeb64f9cfd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -2,21 +2,19 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # - +import logging import sys import time from typing import Optional import backoff -from airbyte_cdk.logger import AirbyteLogger from requests import codes, exceptions from .exceptions import DefaultBackoffException, UserDefinedBackoffException TRANSIENT_EXCEPTIONS = (DefaultBackoffException, exceptions.ConnectTimeout, exceptions.ReadTimeout, exceptions.ConnectionError) -# TODO inject singleton logger? -logger = AirbyteLogger() +logger = logging.getLogger("airbyte") def default_backoff_handler(max_tries: Optional[int], factor: float, **kwargs): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py index ed974ef1305b8..7825eae3e5980 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py @@ -2,14 +2,14 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import logging from distutils.util import strtobool from enum import Flag, auto from typing import Any, Callable, Dict, Mapping, Optional -from airbyte_cdk.logger import AirbyteLogger from jsonschema import Draft7Validator, validators -logger = AirbyteLogger() +logger = logging.getLogger("airbyte") class TransformConfig(Flag): @@ -174,4 +174,4 @@ def transform(self, record: Dict[str, Any], schema: Mapping[str, Any]): just calling normalizer.validate() would throw an exception on first validation occurences and stop processing rest of schema. """ - logger.warn(e.message) + logger.warning(e.message) diff --git a/airbyte-cdk/python/airbyte_cdk/utils/airbyte_secrets_utils.py b/airbyte-cdk/python/airbyte_cdk/utils/airbyte_secrets_utils.py index 0a64efed7df4f..95a3d55926bd8 100644 --- a/airbyte-cdk/python/airbyte_cdk/utils/airbyte_secrets_utils.py +++ b/airbyte-cdk/python/airbyte_cdk/utils/airbyte_secrets_utils.py @@ -18,3 +18,19 @@ def get_secrets(source: Source, config: Mapping[str, Any], logger: logging.Logge ".".join(key.split(".")[:1]) for key, value in flattened_key_values.items() if value and key.endswith("airbyte_secret") ] return [str(get_value_by_dot_notation(config, key)) for key in secret_key_names if config.get(key)] + + +__SECRETS_FROM_CONFIG: List[str] = [] + + +def update_secrets(secrets: List[str]): + """Update the list of secrets to be replaced""" + global __SECRETS_FROM_CONFIG + __SECRETS_FROM_CONFIG = secrets + + +def filter_secrets(string: str) -> str: + """Filter secrets from a string by replacing them with ****""" + for secret in __SECRETS_FROM_CONFIG: + string = string.replace(secret, "****") + return string diff --git a/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py b/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py index 25983c42c71a4..1f70404e06ba8 100644 --- a/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py +++ b/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py @@ -3,14 +3,13 @@ # import datetime +import logging import time from contextlib import contextmanager from dataclasses import dataclass, field from typing import Optional -from airbyte_cdk.logger import AirbyteLogger - -logger = AirbyteLogger() +logger = logging.getLogger("airbyte") class EventTimer: @@ -42,7 +41,7 @@ def finish_event(self): event = self.stack.pop(0) event.finish() else: - logger.warn(f"{self.name} finish_event called without start_event") + logger.warning(f"{self.name} finish_event called without start_event") def report(self, order_by="name"): """ diff --git a/airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py b/airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py new file mode 100644 index 0000000000000..0b6388a4cd23d --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py @@ -0,0 +1,74 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import traceback +from datetime import datetime + +from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, FailureType, TraceType +from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets + + +class AirbyteTracedException(Exception): + """ + An exception that should be emitted as an AirbyteTraceMessage + """ + + def __init__( + self, + internal_message: str = None, + message: str = None, + failure_type: FailureType = FailureType.system_error, + exception: BaseException = None, + ): + """ + :param internal_message: the internal error that caused the failure + :param message: a user-friendly message that indicates the cause of the error + :param failure_type: the type of error + :param exception: the exception that caused the error, from which the stack trace should be retrieved + """ + self.internal_message = internal_message + self.message = message + self.failure_type = failure_type + self._exception = exception + super().__init__(internal_message) + + def as_airbyte_message(self) -> AirbyteMessage: + """ + Builds an AirbyteTraceMessage from the exception + """ + now_millis = datetime.now().timestamp() * 1000.0 + + trace_exc = self._exception or self + stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format()) + + trace_message = AirbyteTraceMessage( + type=TraceType.ERROR, + emitted_at=now_millis, + error=AirbyteErrorTraceMessage( + message=self.message or "Something went wrong in the connector. See the logs for more details.", + internal_message=self.internal_message, + failure_type=self.failure_type, + stack_trace=stack_trace_str, + ), + ) + + return AirbyteMessage(type=MessageType.TRACE, trace=trace_message) + + def emit_message(self): + """ + Prints the exception as an AirbyteTraceMessage. + Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint. + """ + message = self.as_airbyte_message().json(exclude_unset=True) + filtered_message = filter_secrets(message) + print(filtered_message) + + @classmethod + def from_exception(cls, exc: Exception, *args, **kwargs) -> "AirbyteTracedException": + """ + Helper to create an AirbyteTracedException from an existing exception + :param exc: the exception that caused the error + """ + return cls(internal_message=str(exc), exception=exc, *args, **kwargs) diff --git a/airbyte-cdk/python/bin/generate-protocol-files.sh b/airbyte-cdk/python/bin/generate-protocol-files.sh index b268f4d531852..e3fea30173b3e 100755 --- a/airbyte-cdk/python/bin/generate-protocol-files.sh +++ b/airbyte-cdk/python/bin/generate-protocol-files.sh @@ -18,6 +18,7 @@ function main() { docker run --user "$(id -u):$(id -g)" -v "$ROOT_DIR":/airbyte airbyte/code-generator:dev \ --input "/airbyte/$YAML_DIR/$filename_wo_ext.yaml" \ --output "/airbyte/$OUTPUT_DIR/$filename_wo_ext.py" \ + --use-title-as-name \ --disable-timestamp done } diff --git a/airbyte-cdk/python/docs/tutorials/http_api_source.md b/airbyte-cdk/python/docs/tutorials/http_api_source.md index c622fe0e310b6..3d8327596f296 100644 --- a/airbyte-cdk/python/docs/tutorials/http_api_source.md +++ b/airbyte-cdk/python/docs/tutorials/http_api_source.md @@ -312,7 +312,8 @@ Backoff policy options: - `max_retries` Specifies maximum amount of retries for backoff policy (by default is 5) - `raise_on_http_errors` If set to False, allows opting-out of raising HTTP code exception (by default is True) -There are many other customizable options - you can find them in the [`base_python.cdk.streams.http.HttpStream`](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/bases/base-python/base_python/cdk/streams/http.py) class. + +There are many other customizable options - you can find them in the [`airbyte_cdk.sources.streams.http.HttpStream`](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py) class. So in order to read data from the exchange rates API, we'll fill out the necessary information for the stream to do its work. First, we'll implement a basic read that just reads the last day's exchange rates, then we'll implement incremental sync using stream slicing. diff --git a/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json b/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json index 66ab9be9e7bb5..7aa9a7e9b2229 100644 --- a/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json +++ b/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json @@ -7,6 +7,9 @@ "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { + "access_key": { + "type": "string" + }, "base": { "type": "string" }, diff --git a/airbyte-cdk/python/docs/tutorials/http_api_source_assets/exchange_rates.json b/airbyte-cdk/python/docs/tutorials/http_api_source_assets/exchange_rates.json index 7476b088094e2..9462ce0079e6e 100644 --- a/airbyte-cdk/python/docs/tutorials/http_api_source_assets/exchange_rates.json +++ b/airbyte-cdk/python/docs/tutorials/http_api_source_assets/exchange_rates.json @@ -2,6 +2,9 @@ "type": "object", "required": ["base", "date", "rates"], "properties": { + "access_key": { + "type": "string" + }, "base": { "type": "string" }, diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index df9eaacc46fb0..e246d802e08ab 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.55", + version="0.1.56", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -70,7 +70,4 @@ "sphinx-rtd-theme~=1.0", ], }, - entry_points={ - "console_scripts": ["base-python=base_python.entrypoint:main"], - }, ) diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_transform.py b/airbyte-cdk/python/unit_tests/sources/utils/test_transform.py index e801e475b92a8..c74bab22f6ef2 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_transform.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_transform.py @@ -151,15 +151,17 @@ ), ], ) -def test_transform(schema, actual, expected, expected_warns, capsys): +def test_transform(schema, actual, expected, expected_warns, caplog): t = TypeTransformer(TransformConfig.DefaultSchemaNormalization) t.transform(actual, schema) assert json.dumps(actual) == json.dumps(expected) - stdout = capsys.readouterr().out if expected_warns: - assert expected_warns in stdout + record = caplog.records[0] + assert record.name == "airbyte" + assert record.levelname == "WARNING" + assert record.message == expected_warns else: - assert not stdout + assert len(caplog.records) == 0 def test_transform_wrong_config(): diff --git a/airbyte-cdk/python/unit_tests/test_exception_handler.py b/airbyte-cdk/python/unit_tests/test_exception_handler.py new file mode 100644 index 0000000000000..188f9e7e39db7 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/test_exception_handler.py @@ -0,0 +1,57 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import json +import subprocess +import sys + +import pytest +from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteLogMessage, AirbyteMessage, AirbyteTraceMessage + + +def test_uncaught_exception_handler(): + cmd = "from airbyte_cdk.logger import init_logger; from airbyte_cdk.exception_handler import init_uncaught_exception_handler; logger = init_logger('airbyte'); init_uncaught_exception_handler(logger); raise 1" + exception_message = "exceptions must derive from BaseException" + exception_trace = ( + "Traceback (most recent call last):\n" + ' File "", line 1, in \n' + "TypeError: exceptions must derive from BaseException" + ) + + expected_log_message = AirbyteMessage( + type="LOG", log=AirbyteLogMessage(level="FATAL", message=f"{exception_message}\n{exception_trace}") + ) + + expected_trace_message = AirbyteMessage( + type="TRACE", + trace=AirbyteTraceMessage( + type="ERROR", + emitted_at=0.0, + error=AirbyteErrorTraceMessage( + failure_type="system_error", + message="Something went wrong in the connector. See the logs for more details.", + internal_message=exception_message, + stack_trace=f"{exception_trace}\n", + ), + ), + ) + + with pytest.raises(subprocess.CalledProcessError) as err: + subprocess.check_output([sys.executable, "-c", cmd], stderr=subprocess.STDOUT) + + assert not err.value.stderr, "nothing on the stderr" + + stdout_lines = err.value.output.decode("utf-8").strip().split("\n") + assert len(stdout_lines) == 2 + + log_output, trace_output = stdout_lines + + out_log_message = AirbyteMessage.parse_obj(json.loads(log_output)) + assert out_log_message == expected_log_message, "Log message should be emitted in expected form" + + out_trace_message = AirbyteMessage.parse_obj(json.loads(trace_output)) + assert out_trace_message.trace.emitted_at > 0 + out_trace_message.trace.emitted_at = 0.0 # set a specific emitted_at value for testing + assert out_trace_message == expected_trace_message, "Trace message should be emitted in expected form" diff --git a/airbyte-cdk/python/unit_tests/test_logger.py b/airbyte-cdk/python/unit_tests/test_logger.py index 96b5be2052089..dbf99fab59557 100644 --- a/airbyte-cdk/python/unit_tests/test_logger.py +++ b/airbyte-cdk/python/unit_tests/test_logger.py @@ -5,13 +5,10 @@ import json import logging -import subprocess -import sys from typing import Dict import pytest from airbyte_cdk.logger import AirbyteLogFormatter -from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage @pytest.fixture(scope="session") @@ -95,21 +92,3 @@ def test_fatal(logger, caplog): record = caplog.records[0] assert record.levelname == "CRITICAL" assert record.message == "Test fatal 1" - - -def test_unhandled_logger(): - cmd = "from airbyte_cdk.logger import init_logger; init_logger('airbyte'); raise 1" - expected_message = ( - "exceptions must derive from BaseException\n" - "Traceback (most recent call last):\n" - ' File "", line 1, in \n' - "TypeError: exceptions must derive from BaseException" - ) - log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level="FATAL", message=expected_message)) - expected_output = log_message.json(exclude_unset=True) - - with pytest.raises(subprocess.CalledProcessError) as err: - subprocess.check_output([sys.executable, "-c", cmd], stderr=subprocess.STDOUT) - - assert not err.value.stderr, "nothing on the stderr" - assert err.value.output.decode("utf-8").strip() == expected_output, "Error should be printed in expected form" diff --git a/airbyte-cdk/python/unit_tests/test_secure_logger.py b/airbyte-cdk/python/unit_tests/test_secure_logger.py index ed1e86cfc4dbb..458850cf07845 100644 --- a/airbyte-cdk/python/unit_tests/test_secure_logger.py +++ b/airbyte-cdk/python/unit_tests/test_secure_logger.py @@ -145,7 +145,7 @@ def test_airbyte_secret_is_masked_on_logger_output(source_spec, mocker, config, assert all([str(v) in log_result for v in expected_plain_text_values]) -def test_airbyte_secrets_are_masked_on_uncaught_exceptions(mocker, caplog): +def test_airbyte_secrets_are_masked_on_uncaught_exceptions(mocker, caplog, capsys): caplog.set_level(logging.DEBUG, logger="airbyte.test") caplog.handler.setFormatter(AirbyteLogFormatter()) @@ -188,10 +188,11 @@ def read( list(entrypoint.run(parsed_args)) except Exception: sys.excepthook(*sys.exc_info()) - assert I_AM_A_SECRET_VALUE not in caplog.text, "Should have filtered secret value from exception" + assert I_AM_A_SECRET_VALUE not in capsys.readouterr().out, "Should have filtered non-secret value from exception trace message" + assert I_AM_A_SECRET_VALUE not in caplog.text, "Should have filtered secret value from exception log message" -def test_non_airbyte_secrets_are_not_masked_on_uncaught_exceptions(mocker, caplog): +def test_non_airbyte_secrets_are_not_masked_on_uncaught_exceptions(mocker, caplog, capsys): caplog.set_level(logging.DEBUG, logger="airbyte.test") caplog.handler.setFormatter(AirbyteLogFormatter()) @@ -235,4 +236,5 @@ def read( list(entrypoint.run(parsed_args)) except Exception: sys.excepthook(*sys.exc_info()) - assert NOT_A_SECRET_VALUE in caplog.text, "Should not have filtered non-secret value from exception" + assert NOT_A_SECRET_VALUE in capsys.readouterr().out, "Should not have filtered non-secret value from exception trace message" + assert NOT_A_SECRET_VALUE in caplog.text, "Should not have filtered non-secret value from exception log message" diff --git a/airbyte-cdk/python/unit_tests/utils/__init__.py b/airbyte-cdk/python/unit_tests/utils/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-cdk/python/unit_tests/utils/test_secret_utils.py b/airbyte-cdk/python/unit_tests/utils/test_secret_utils.py new file mode 100644 index 0000000000000..3ded3d6a588b9 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/utils/test_secret_utils.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets, update_secrets + +SECRET_VALUE = "i am a very sensitive secret" +ANOTHER_SECRET_VALUE = "also super secret" +NOT_SECRET_VALUE = "unimportant value" + + +def test_secret_filtering(): + sensitive_str = f"{SECRET_VALUE} {NOT_SECRET_VALUE} {SECRET_VALUE} {ANOTHER_SECRET_VALUE}" + + update_secrets([]) + filtered = filter_secrets(sensitive_str) + assert filtered == sensitive_str + + update_secrets([SECRET_VALUE, ANOTHER_SECRET_VALUE]) + filtered = filter_secrets(sensitive_str) + assert filtered == f"**** {NOT_SECRET_VALUE} **** ****" diff --git a/airbyte-cdk/python/unit_tests/utils/test_traced_exception.py b/airbyte-cdk/python/unit_tests/utils/test_traced_exception.py new file mode 100644 index 0000000000000..7f072a2663b28 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/utils/test_traced_exception.py @@ -0,0 +1,83 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import json + +import pytest +from airbyte_cdk.models.airbyte_protocol import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, FailureType, TraceType +from airbyte_cdk.models.airbyte_protocol import Type as MessageType +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +@pytest.fixture +def raised_exception(): + try: + raise RuntimeError("an error has occurred") + except RuntimeError as e: + return e + + +def test_build_from_existing_exception(raised_exception): + traced_exc = AirbyteTracedException.from_exception(raised_exception, message="my user-friendly message") + assert traced_exc.message == "my user-friendly message" + assert traced_exc.internal_message == "an error has occurred" + assert traced_exc.failure_type == FailureType.system_error + assert traced_exc._exception == raised_exception + + +def test_exception_as_airbyte_message(): + traced_exc = AirbyteTracedException("an internal message") + airbyte_message = traced_exc.as_airbyte_message() + + assert type(airbyte_message) == AirbyteMessage + assert airbyte_message.type == MessageType.TRACE + assert airbyte_message.trace.type == TraceType.ERROR + assert airbyte_message.trace.emitted_at > 0 + assert airbyte_message.trace.error.failure_type == FailureType.system_error + assert airbyte_message.trace.error.message == "Something went wrong in the connector. See the logs for more details." + assert airbyte_message.trace.error.internal_message == "an internal message" + assert airbyte_message.trace.error.stack_trace == "airbyte_cdk.utils.traced_exception.AirbyteTracedException: an internal message\n" + + +def test_existing_exception_as_airbyte_message(raised_exception): + traced_exc = AirbyteTracedException.from_exception(raised_exception) + airbyte_message = traced_exc.as_airbyte_message() + + assert type(airbyte_message) == AirbyteMessage + assert airbyte_message.type == MessageType.TRACE + assert airbyte_message.trace.type == TraceType.ERROR + assert airbyte_message.trace.error.message == "Something went wrong in the connector. See the logs for more details." + assert airbyte_message.trace.error.internal_message == "an error has occurred" + assert airbyte_message.trace.error.stack_trace.startswith("Traceback (most recent call last):") + assert airbyte_message.trace.error.stack_trace.endswith( + 'raise RuntimeError("an error has occurred")\n' "RuntimeError: an error has occurred\n" + ) + + +def test_emit_message(capsys): + traced_exc = AirbyteTracedException( + internal_message="internal message", message="user-friendly message", exception=RuntimeError("oh no") + ) + + expected_message = AirbyteMessage( + type="TRACE", + trace=AirbyteTraceMessage( + type="ERROR", + emitted_at=0.0, + error=AirbyteErrorTraceMessage( + failure_type="system_error", + message="user-friendly message", + internal_message="internal message", + stack_trace="RuntimeError: oh no\n", + ), + ), + ) + + traced_exc.emit_message() + + stdout = capsys.readouterr().out + printed_message = AirbyteMessage.parse_obj(json.loads(stdout)) + printed_message.trace.emitted_at = 0.0 + + assert printed_message == expected_message diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index a0963e0765f09..77cbe6ffb2fa5 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -28,4 +28,9 @@ public boolean exposeSecretsInExport() { return Boolean.parseBoolean(System.getenv("EXPOSE_SECRETS_IN_EXPORT")); } + @Override + public boolean forceSecretMigration() { + return Boolean.parseBoolean(System.getenv("FORCE_MIGRATE_SECRET_STORE")); + } + } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 78516bcb00a29..5853df64c9768 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -16,4 +16,6 @@ public interface FeatureFlags { boolean exposeSecretsInExport(); + boolean forceSecretMigration(); + } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonPaths.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonPaths.java index 428299aa958b4..0b75b981500dd 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonPaths.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/JsonPaths.java @@ -42,15 +42,15 @@ * returning a list for query results. In addition, we provide helper functions that will just * return a single value (see: {@link JsonPaths#getSingleValue(JsonNode, String)}). These should * only be used if it is not possible for a query to return more than one value. - * - * Note: Package private as most uses of JsonPaths seems like they can be hidden inside other - * commons libraries (i.e. Jsons and JsonsSchemas). If this assumption proves incorrect, we can open - * it up. */ -class JsonPaths { +public class JsonPaths { private static final Logger LOGGER = LoggerFactory.getLogger(JsonPaths.class); + static final String JSON_PATH_START_CHARACTER = "$"; + static final String JSON_PATH_LIST_SPLAT = "[*]"; + static final String JSON_PATH_FIELD_SEPARATOR = "."; + // set default configurations at start up to match our JSON setup. static { Configuration.setDefaults(new Configuration.Defaults() { @@ -82,6 +82,18 @@ public Set