From 4d76ac58095dd97b5a7ca8594288dcde27dc45a5 Mon Sep 17 00:00:00 2001 From: Ryan Fu Date: Wed, 21 Jun 2023 14:13:20 -0700 Subject: [PATCH 1/4] Turn off java formatter (#27579) --- .github/workflows/connectors_tests.yml | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/.github/workflows/connectors_tests.yml b/.github/workflows/connectors_tests.yml index 12987f5b19aa..dd45b932c71b 100644 --- a/.github/workflows/connectors_tests.yml +++ b/.github/workflows/connectors_tests.yml @@ -30,18 +30,18 @@ jobs: if: github.event_name == 'workflow_dispatch' run: echo "branch=${GITHUB_REF#refs/heads/}" >> $GITHUB_OUTPUT id: extract_branch - - name: Format connectors [PULL REQUESTS] - if: github.event_name == 'pull_request' - uses: ./.github/actions/run-dagger-pipeline - with: - context: "pull_request" - docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} - docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} - gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }} - git_branch: ${{ github.head_ref }} - git_revision: ${{ github.sha }} - github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} - subcommand: "connectors --modified format" +# - name: Format connectors [PULL REQUESTS] +# if: github.event_name == 'pull_request' +# uses: ./.github/actions/run-dagger-pipeline +# with: +# context: "pull_request" +# docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }} +# docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }} +# gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }} +# git_branch: ${{ github.head_ref }} +# git_revision: ${{ github.sha }} +# github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} +# subcommand: "connectors --modified format" - name: Fetch last commit id from remote branch [PULL REQUESTS] if: github.event_name == 'pull_request' id: fetch_last_commit_id_pr From 77ea559f33d79e89482e25db2c1c07cbfa1b737d Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Wed, 21 Jun 2023 14:18:34 -0700 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=9A=A8=F0=9F=9A=A8=20=F0=9F=90=9B=20C?= =?UTF-8?q?onvex=20source=20fix=20skipped=20records=20(#27226)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * update cursor in next_page_token, add request_headers * . * fix unit tests * also fix SyncMode.incremental so it doesn't return duplicate records in race conditions * fix client version in json_schemas * fix version number * fix sync_mode * cache json_schemas to pass typecheck * never mind * fix sync_mode * update metadata and doc * backward compatiblity bypass --------- Co-authored-by: Marcos Marx Co-authored-by: marcosmarxm --- .../connectors/source-convex/Dockerfile | 2 +- .../source-convex/acceptance-test-config.yml | 2 + .../connectors/source-convex/metadata.yaml | 2 +- .../source-convex/source_convex/source.py | 47 ++++++++++++++----- .../unit_tests/test_incremental_streams.py | 4 ++ .../source-convex/unit_tests/test_streams.py | 3 +- docs/integrations/sources/convex.md | 3 +- 7 files changed, 45 insertions(+), 18 deletions(-) diff --git a/airbyte-integrations/connectors/source-convex/Dockerfile b/airbyte-integrations/connectors/source-convex/Dockerfile index 0bf904d7c506..472b313e8dd7 100644 --- a/airbyte-integrations/connectors/source-convex/Dockerfile +++ b/airbyte-integrations/connectors/source-convex/Dockerfile @@ -34,5 +34,5 @@ COPY source_convex ./source_convex ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/source-convex diff --git a/airbyte-integrations/connectors/source-convex/acceptance-test-config.yml b/airbyte-integrations/connectors/source-convex/acceptance-test-config.yml index 6b0426a2ba08..4f422a6a0546 100644 --- a/airbyte-integrations/connectors/source-convex/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-convex/acceptance-test-config.yml @@ -15,6 +15,8 @@ acceptance_tests: discovery: tests: - config_path: "secrets/config.json" + backward_compatibility_tests_config: + disable_for_version: "0.1.1" # TODO: Need creds to run integration tests https://github.com/airbytehq/airbyte/issues/25571 # basic_read: # tests: diff --git a/airbyte-integrations/connectors/source-convex/metadata.yaml b/airbyte-integrations/connectors/source-convex/metadata.yaml index 76e7c0f598bb..858c98d69af7 100644 --- a/airbyte-integrations/connectors/source-convex/metadata.yaml +++ b/airbyte-integrations/connectors/source-convex/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: api connectorType: source definitionId: c332628c-f55c-4017-8222-378cfafda9b2 - dockerImageTag: 0.1.1 + dockerImageTag: 0.2.0 dockerRepository: airbyte/source-convex githubIssueLabel: source-convex icon: convex.svg diff --git a/airbyte-integrations/connectors/source-convex/source_convex/source.py b/airbyte-integrations/connectors/source-convex/source_convex/source.py index 5df2576bdbc2..ecf094ff9c01 100644 --- a/airbyte-integrations/connectors/source-convex/source_convex/source.py +++ b/airbyte-integrations/connectors/source-convex/source_convex/source.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, TypedDict import requests +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import IncrementalMixin, Stream from airbyte_cdk.sources.streams.http import HttpStream @@ -30,6 +31,8 @@ }, ) +CONVEX_CLIENT_VERSION = "0.2.0" + # Source class SourceConvex(AbstractSource): @@ -37,7 +40,10 @@ def _json_schemas(self, config: ConvexConfig) -> requests.Response: deployment_url = config["deployment_url"] access_key = config["access_key"] url = f"{deployment_url}/api/json_schemas?deltaSchema=true&format=convex_json" - headers = {"Authorization": f"Convex {access_key}"} + headers = { + "Authorization": f"Convex {access_key}", + "Convex-Client": f"airbyte-export-{CONVEX_CLIENT_VERSION}", + } return requests.get(url, headers=headers) def check_connection(self, logger: Any, config: ConvexConfig) -> Tuple[bool, Any]: @@ -124,8 +130,16 @@ def state(self, value: ConvexState) -> None: self._delta_cursor_value = value["delta_cursor"] def next_page_token(self, response: requests.Response) -> Optional[ConvexState]: - # Inner level of pagination shares the same state as outer, - # and returns None to indicate that we're done. + if response.status_code != 200: + raise Exception(format_http_error("Failed request", response)) + resp_json = response.json() + if self._snapshot_has_more: + self._snapshot_cursor_value = resp_json["cursor"] + self._snapshot_has_more = resp_json["hasMore"] + self._delta_cursor_value = resp_json["snapshot"] + else: + self._delta_cursor_value = resp_json["cursor"] + self._delta_has_more = resp_json["hasMore"] return self.state if self._delta_has_more else None def path( @@ -150,13 +164,6 @@ def parse_response( if response.status_code != 200: raise Exception(format_http_error("Failed request", response)) resp_json = response.json() - if self._snapshot_has_more: - self._snapshot_cursor_value = resp_json["cursor"] - self._snapshot_has_more = resp_json["hasMore"] - self._delta_cursor_value = resp_json["snapshot"] - else: - self._delta_cursor_value = resp_json["cursor"] - self._delta_has_more = resp_json["hasMore"] return list(resp_json["values"]) def request_params( @@ -176,14 +183,28 @@ def request_params( params["cursor"] = self._delta_cursor_value return params + def request_headers( + self, + stream_state: ConvexState, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[ConvexState] = None, + ) -> Dict[str, str]: + """ + Custom headers for each HTTP request, not including Authorization. + """ + return { + "Convex-Client": f"airbyte-export-{CONVEX_CLIENT_VERSION}", + } + def get_updated_state(self, current_stream_state: ConvexState, latest_record: Mapping[str, Any]) -> ConvexState: """ This (deprecated) method is still used by AbstractSource to update state between calls to `read_records`. """ return self.state - def read_records(self, *args: Any, **kwargs: Any) -> Iterator[Any]: - for record in super().read_records(*args, **kwargs): + def read_records(self, sync_mode: SyncMode, *args: Any, **kwargs: Any) -> Iterator[Any]: + self._delta_has_more = sync_mode == SyncMode.incremental + for record in super().read_records(sync_mode, *args, **kwargs): ts_ns = record["_ts"] ts_seconds = ts_ns / 1e9 # convert from nanoseconds. # equivalent of java's `new Timestamp(transactionMillis).toInstant().toString()` @@ -203,5 +224,5 @@ def format_http_error(context: str, resp: requests.Response) -> str: try: err = resp.json() return f"{context}: {resp.status_code}: {err['code']}: {err['message']}" - except JSONDecodeError or KeyError: + except (JSONDecodeError, KeyError): return f"{context}: {resp.text}" diff --git a/airbyte-integrations/connectors/source-convex/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-convex/unit_tests/test_incremental_streams.py index 910f48ad1681..6f7a56c5577c 100644 --- a/airbyte-integrations/connectors/source-convex/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-convex/unit_tests/test_incremental_streams.py @@ -30,6 +30,7 @@ def test_get_updated_state(patch_incremental_base_class): resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 3000, "hasMore": True} resp.status_code = 200 stream.parse_response(resp, {}) + stream.next_page_token(resp) assert stream.get_updated_state(None, None) == { "snapshot_cursor": 1234, "snapshot_has_more": True, @@ -37,6 +38,7 @@ def test_get_updated_state(patch_incremental_base_class): } resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 1235, "snapshot": 3000, "hasMore": False} stream.parse_response(resp, {}) + stream.next_page_token(resp) assert stream.get_updated_state(None, None) == { "snapshot_cursor": 1235, "snapshot_has_more": False, @@ -44,6 +46,7 @@ def test_get_updated_state(patch_incremental_base_class): } resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 8000, "hasMore": True} stream.parse_response(resp, {}) + stream.next_page_token(resp) assert stream.get_updated_state(None, None) == { "snapshot_cursor": 1235, "snapshot_has_more": False, @@ -52,6 +55,7 @@ def test_get_updated_state(patch_incremental_base_class): assert stream._delta_has_more is True resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 9000, "hasMore": False} stream.parse_response(resp, {}) + stream.next_page_token(resp) assert stream.get_updated_state(None, None) == { "snapshot_cursor": 1235, "snapshot_has_more": False, diff --git a/airbyte-integrations/connectors/source-convex/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-convex/unit_tests/test_streams.py index c02bcd81f287..267126670024 100644 --- a/airbyte-integrations/connectors/source-convex/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-convex/unit_tests/test_streams.py @@ -70,13 +70,12 @@ def test_parse_response(patch_base_class): inputs = {"response": resp, "stream_state": {}} expected_parsed_objects = [{"_id": "my_id", "field": "f", "_ts": 1234}] assert stream.parse_response(**inputs) == expected_parsed_objects - assert stream.state == {"snapshot_cursor": 1234, "snapshot_has_more": True, "delta_cursor": 2000} def test_request_headers(patch_base_class): stream = ConvexStream("murky-swan-635", "accesskey", "messages", None) inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} - assert stream.request_headers(**inputs) == {} + assert stream.request_headers(**inputs) == {"Convex-Client": "airbyte-export-0.2.0"} def test_http_method(patch_base_class): diff --git a/docs/integrations/sources/convex.md b/docs/integrations/sources/convex.md index c1a4796c7784..c0dd127574aa 100644 --- a/docs/integrations/sources/convex.md +++ b/docs/integrations/sources/convex.md @@ -72,5 +72,6 @@ In the Data tab, you should see the tables and a sample of the data that will be | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :-------------------- | +| 0.2.0 | 2023-06-21 | [27226](https://github.com/airbytehq/airbyte/pull/27226) | 🐛 Convex source fix skipped records | | 0.1.1 | 2023-03-06 | [23797](https://github.com/airbytehq/airbyte/pull/23797) | 🐛 Convex source connector error messages | -| 0.1.0 | 2022-10-24 | [18403](https://github.com/airbytehq/airbyte/pull/18403) | 🎉 New Source: Convex | +| 0.1.0 | 2022-10-24 | [18403](https://github.com/airbytehq/airbyte/pull/18403) | 🎉 New Source: Convex | From bf52bcffcebe67c8f7c4d85e911d864d8864d3df Mon Sep 17 00:00:00 2001 From: Charles Date: Wed, 21 Jun 2023 16:44:05 -0700 Subject: [PATCH 3/4] deprecate otel docs (#27589) --- docs/operator-guides/collecting-metrics.md | 43 ++++++++++++++++++++++ docs/operator-guides/scaling-airbyte.md | 41 --------------------- docs/release_notes/july_2022.md | 2 +- docusaurus/sidebars.js | 1 - 4 files changed, 44 insertions(+), 43 deletions(-) diff --git a/docs/operator-guides/collecting-metrics.md b/docs/operator-guides/collecting-metrics.md index 09406df8e564..9d210329daa4 100644 --- a/docs/operator-guides/collecting-metrics.md +++ b/docs/operator-guides/collecting-metrics.md @@ -268,3 +268,46 @@ Visit [OssMetricsRegistry.java](https://github.com/airbytehq/airbyte-platform/bl ## Additional information Suppose you are looking for a non-production way of collecting metrics with dbt and Metabase, the tutorial [Airbyte Monitoring with dbt and Metabase](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) by accessing Airbyte's Postgres DB. The source code is open on [airbytehq/open-data-stack](https://github.com/airbytehq/open-data-stack). Think of it as an exploratory for data analysts and data engineers of building a dashboard on top of the existing Airbyte Postgres database versus the Prometheus more for DevOps engineers in production. + +# Scaling Airbyte + +## Metrics +Airbyte supports exporting built-in metrics to Datadog or OpenTelemetry. + +### Key Metrics + +| Key Metrics | Description | +|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------| +| ``oldest_pending_job_age_secs`` | Shows how long a pending job waits before it is scheduled. If a job is in pending state for a long time, more workers may be required. | +| ``oldest_running_job_age_secs`` | Shows how long the oldest job has been running. A running job that is too large can indicate stuck jobs. This is relative to each job’s runtime. | +| ``job_failed_by_release_stage`` | Shows jobs that have failed in that release stage and is tagged as alpha, beta, or GA. | + +:::note + +Metrics with ``by_release_stage`` in their name are tagged by connector release stage (alpha, beta, or GA). These tags allow you to filter by release stage. Alpha and beta connectors are less stable and have a higher failure rate than GA connectors, so filtering by those release stages can help you find failed jobs. + +::: + +### Recommended Metrics + +| Recommended Metrics | Description | +|-----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------| +| ``num_running_jobs & num_pending_jobs`` | Shows how many jobs are currently running and how many jobs are in pending state. These metrics help you understand the general system state. | +| ``job_succeeded_by_release_stage`` | Shows successful jobs in that release stage and is tagged as alpha, beta, or GA. | +| ``job_created_by_release_stage`` | Shows the jobs created in that release stage and is tagged as alpha, beta, or GA. | + +### Example +If a job was created for an Alpha source to a Beta destination and the outcome of the job is a success, the following metrics are displayed: + +``` +job_created_by_release_stage[“alpha”] = 1; +job_created_by_release_stage[“beta”] = 1; +job_failed_by_release_stage[“alpha”] = 1; +job_succeeded_by_release_stage[“beta”] = 1; +``` + +:::note + +Each job has a source and destination, so each metric is counted twice — once for source and once for destination. + +::: diff --git a/docs/operator-guides/scaling-airbyte.md b/docs/operator-guides/scaling-airbyte.md index 5019757f4860..062cbd33d715 100644 --- a/docs/operator-guides/scaling-airbyte.md +++ b/docs/operator-guides/scaling-airbyte.md @@ -65,44 +65,3 @@ is capped by `SQL_MAX_CONNS`. The advice here is best-effort and by no means comprehensive. Please reach out on Slack if anything doesn't make sense or if something can be improved. If you've been running Airbyte in production and have more tips up your sleeve, we welcome contributions! - -## Metrics -Airbyte supports exporting built-in metrics to Datadog or [OpenTelemetry](https://docs.airbyte.com/operator-guides/collecting-metrics/) - -### Key Metrics - -| Key Metrics | Description | -|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------| -| ``oldest_pending_job_age_secs`` | Shows how long a pending job waits before it is scheduled. If a job is in pending state for a long time, more workers may be required. | -| ``oldest_running_job_age_secs`` | Shows how long the oldest job has been running. A running job that is too large can indicate stuck jobs. This is relative to each job’s runtime. | -| ``job_failed_by_release_stage`` | Shows jobs that have failed in that release stage and is tagged as alpha, beta, or GA. | - -:::note - -Metrics with ``by_release_stage`` in their name are tagged by connector release stage (alpha, beta, or GA). These tags allow you to filter by release stage. Alpha and beta connectors are less stable and have a higher failure rate than GA connectors, so filtering by those release stages can help you find failed jobs. - -::: - -### Recommended Metrics - -| Recommended Metrics | Description | -|-----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------| -| ``num_running_jobs & num_pending_jobs`` | Shows how many jobs are currently running and how many jobs are in pending state. These metrics help you understand the general system state. | -| ``job_succeeded_by_release_stage`` | Shows successful jobs in that release stage and is tagged as alpha, beta, or GA. | -| ``job_created_by_release_stage`` | Shows the jobs created in that release stage and is tagged as alpha, beta, or GA. | - -### Example -If a job was created for an Alpha source to a Beta destination and the outcome of the job is a success, the following metrics are displayed: - -``` -job_created_by_release_stage[“alpha”] = 1; -job_created_by_release_stage[“beta”] = 1; -job_failed_by_release_stage[“alpha”] = 1; -job_succeeded_by_release_stage[“beta”] = 1; -``` - -:::note - -Each job has a source and destination, so each metric is counted twice — once for source and once for destination. - -::: diff --git a/docs/release_notes/july_2022.md b/docs/release_notes/july_2022.md index 0cf73f7d4b7b..0c6cbc35e004 100644 --- a/docs/release_notes/july_2022.md +++ b/docs/release_notes/july_2022.md @@ -26,7 +26,7 @@ This page includes new features and improvements to the Airbyte Cloud and Airbyt * Improved Airbyte Open Source self-hosting by refactoring and publishing Helm charts according to best practices as we prepare to formally support Helm deployments. [#14794](https://github.com/airbytehq/airbyte/pull/14794) -* Improved Airbyte Open Source by supporting the [OpenTelemetry (OTEL) Collector](https://docs.airbyte.com/operator-guides/collecting-metrics/). Airbyte Open Source now sends telemetry data to the OTEL collector, and we included a set of [recommended metrics](https://docs.airbyte.com/operator-guides/scaling-airbyte/#metrics) to export to OTEL when running Airbyte Open Source at scale. [#12908](https://github.com/airbytehq/airbyte/issues/12908) +* Improved Airbyte Open Source by supporting the OpenTelemetry (OTEL) Collector. Airbyte Open Source now sends telemetry data to the OTEL collector, and we included a set of [recommended metrics](https://docs.airbyte.com/operator-guides/scaling-airbyte/#metrics) to export to OTEL when running Airbyte Open Source at scale. [#12908](https://github.com/airbytehq/airbyte/issues/12908) * Improved the [Airbyte Connector Development Kit (CDK)](https://airbyte.com/connector-development-kit) by enabling detailed bug logs from the command line. In addition to the preset CDK debug logs, you can also create custom debug statements and display custom debug logs in the command line. [#14521](https://github.com/airbytehq/airbyte/pull/14521) diff --git a/docusaurus/sidebars.js b/docusaurus/sidebars.js index 2680d9308031..0ac2bb40b14b 100644 --- a/docusaurus/sidebars.js +++ b/docusaurus/sidebars.js @@ -383,7 +383,6 @@ const operatorGuide = { 'operator-guides/using-custom-connectors', 'operator-guides/scaling-airbyte', 'operator-guides/configuring-sync-notifications', - 'operator-guides/collecting-metrics', ], }; From 3bd340a971ca5f7a1fed4962f5101bbcd951ea07 Mon Sep 17 00:00:00 2001 From: Ben Church Date: Wed, 21 Jun 2023 18:42:56 -0700 Subject: [PATCH 4/4] Update to use rev-parse (#27598) Co-authored-by: Octavia Squidington III --- .github/workflows/connectors_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/connectors_tests.yml b/.github/workflows/connectors_tests.yml index dd45b932c71b..745e28c26999 100644 --- a/.github/workflows/connectors_tests.yml +++ b/.github/workflows/connectors_tests.yml @@ -49,7 +49,7 @@ jobs: - name: Fetch last commit id from remote branch [WORKFLOW DISPATCH] if: github.event_name == 'workflow_dispatch' id: fetch_last_commit_id_wd - run: echo "commit_id=$(git ls-remote --heads origin ${{ steps.extract_branch.outputs.branch }} | cut -f 1)" >> $GITHUB_OUTPUT + run: echo "commit_id=$(git rev-parse origin/${{ steps.extract_branch.outputs.branch }})" >> $GITHUB_OUTPUT - name: Pull formatting changes [PULL REQUESTS] if: github.event_name == 'pull_request' uses: actions/checkout@v3