Skip to content

Commit

Permalink
Merge branch 'master' into state-structure-change-xmin-ctid
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Jun 22, 2023
2 parents 27fb1a2 + 3bd340a commit f0a1dc4
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 74 deletions.
26 changes: 13 additions & 13 deletions .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@ jobs:
if: github.event_name == 'workflow_dispatch'
run: echo "branch=${GITHUB_REF#refs/heads/}" >> $GITHUB_OUTPUT
id: extract_branch
- name: Format connectors [PULL REQUESTS]
if: github.event_name == 'pull_request'
uses: ./.github/actions/run-dagger-pipeline
with:
context: "pull_request"
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
git_branch: ${{ github.head_ref }}
git_revision: ${{ github.sha }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
subcommand: "connectors --modified format"
# - name: Format connectors [PULL REQUESTS]
# if: github.event_name == 'pull_request'
# uses: ./.github/actions/run-dagger-pipeline
# with:
# context: "pull_request"
# docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
# docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
# gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
# git_branch: ${{ github.head_ref }}
# git_revision: ${{ github.sha }}
# github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
# subcommand: "connectors --modified format"
- name: Fetch last commit id from remote branch [PULL REQUESTS]
if: github.event_name == 'pull_request'
id: fetch_last_commit_id_pr
run: echo "commit_id=$(git ls-remote --heads origin ${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
- name: Fetch last commit id from remote branch [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch'
id: fetch_last_commit_id_wd
run: echo "commit_id=$(git ls-remote --heads origin ${{ steps.extract_branch.outputs.branch }} | cut -f 1)" >> $GITHUB_OUTPUT
run: echo "commit_id=$(git rev-parse origin/${{ steps.extract_branch.outputs.branch }})" >> $GITHUB_OUTPUT
- name: Pull formatting changes [PULL REQUESTS]
if: github.event_name == 'pull_request'
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-convex/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_convex ./source_convex
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/source-convex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.1"
# TODO: Need creds to run integration tests https://github.com/airbytehq/airbyte/issues/25571
# basic_read:
# tests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c332628c-f55c-4017-8222-378cfafda9b2
dockerImageTag: 0.1.1
dockerImageTag: 0.2.0
dockerRepository: airbyte/source-convex
githubIssueLabel: source-convex
icon: convex.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, TypedDict

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream
Expand All @@ -30,14 +31,19 @@
},
)

CONVEX_CLIENT_VERSION = "0.2.0"


# Source
class SourceConvex(AbstractSource):
def _json_schemas(self, config: ConvexConfig) -> requests.Response:
deployment_url = config["deployment_url"]
access_key = config["access_key"]
url = f"{deployment_url}/api/json_schemas?deltaSchema=true&format=convex_json"
headers = {"Authorization": f"Convex {access_key}"}
headers = {
"Authorization": f"Convex {access_key}",
"Convex-Client": f"airbyte-export-{CONVEX_CLIENT_VERSION}",
}
return requests.get(url, headers=headers)

def check_connection(self, logger: Any, config: ConvexConfig) -> Tuple[bool, Any]:
Expand Down Expand Up @@ -124,8 +130,16 @@ def state(self, value: ConvexState) -> None:
self._delta_cursor_value = value["delta_cursor"]

def next_page_token(self, response: requests.Response) -> Optional[ConvexState]:
# Inner level of pagination shares the same state as outer,
# and returns None to indicate that we're done.
if response.status_code != 200:
raise Exception(format_http_error("Failed request", response))
resp_json = response.json()
if self._snapshot_has_more:
self._snapshot_cursor_value = resp_json["cursor"]
self._snapshot_has_more = resp_json["hasMore"]
self._delta_cursor_value = resp_json["snapshot"]
else:
self._delta_cursor_value = resp_json["cursor"]
self._delta_has_more = resp_json["hasMore"]
return self.state if self._delta_has_more else None

def path(
Expand All @@ -150,13 +164,6 @@ def parse_response(
if response.status_code != 200:
raise Exception(format_http_error("Failed request", response))
resp_json = response.json()
if self._snapshot_has_more:
self._snapshot_cursor_value = resp_json["cursor"]
self._snapshot_has_more = resp_json["hasMore"]
self._delta_cursor_value = resp_json["snapshot"]
else:
self._delta_cursor_value = resp_json["cursor"]
self._delta_has_more = resp_json["hasMore"]
return list(resp_json["values"])

def request_params(
Expand All @@ -176,14 +183,28 @@ def request_params(
params["cursor"] = self._delta_cursor_value
return params

def request_headers(
self,
stream_state: ConvexState,
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[ConvexState] = None,
) -> Dict[str, str]:
"""
Custom headers for each HTTP request, not including Authorization.
"""
return {
"Convex-Client": f"airbyte-export-{CONVEX_CLIENT_VERSION}",
}

def get_updated_state(self, current_stream_state: ConvexState, latest_record: Mapping[str, Any]) -> ConvexState:
"""
This (deprecated) method is still used by AbstractSource to update state between calls to `read_records`.
"""
return self.state

def read_records(self, *args: Any, **kwargs: Any) -> Iterator[Any]:
for record in super().read_records(*args, **kwargs):
def read_records(self, sync_mode: SyncMode, *args: Any, **kwargs: Any) -> Iterator[Any]:
self._delta_has_more = sync_mode == SyncMode.incremental
for record in super().read_records(sync_mode, *args, **kwargs):
ts_ns = record["_ts"]
ts_seconds = ts_ns / 1e9 # convert from nanoseconds.
# equivalent of java's `new Timestamp(transactionMillis).toInstant().toString()`
Expand All @@ -203,5 +224,5 @@ def format_http_error(context: str, resp: requests.Response) -> str:
try:
err = resp.json()
return f"{context}: {resp.status_code}: {err['code']}: {err['message']}"
except JSONDecodeError or KeyError:
except (JSONDecodeError, KeyError):
return f"{context}: {resp.text}"
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@ def test_get_updated_state(patch_incremental_base_class):
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 123}], "cursor": 1234, "snapshot": 3000, "hasMore": True}
resp.status_code = 200
stream.parse_response(resp, {})
stream.next_page_token(resp)
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1234,
"snapshot_has_more": True,
"delta_cursor": 3000,
}
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 1235, "snapshot": 3000, "hasMore": False}
stream.parse_response(resp, {})
stream.next_page_token(resp)
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
"delta_cursor": 3000,
}
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 8000, "hasMore": True}
stream.parse_response(resp, {})
stream.next_page_token(resp)
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
Expand All @@ -52,6 +55,7 @@ def test_get_updated_state(patch_incremental_base_class):
assert stream._delta_has_more is True
resp.json = lambda: {"values": [{"_id": "my_id", "field": "f", "_ts": 1235}], "cursor": 9000, "hasMore": False}
stream.parse_response(resp, {})
stream.next_page_token(resp)
assert stream.get_updated_state(None, None) == {
"snapshot_cursor": 1235,
"snapshot_has_more": False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ def test_parse_response(patch_base_class):
inputs = {"response": resp, "stream_state": {}}
expected_parsed_objects = [{"_id": "my_id", "field": "f", "_ts": 1234}]
assert stream.parse_response(**inputs) == expected_parsed_objects
assert stream.state == {"snapshot_cursor": 1234, "snapshot_has_more": True, "delta_cursor": 2000}


def test_request_headers(patch_base_class):
stream = ConvexStream("murky-swan-635", "accesskey", "messages", None)
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
assert stream.request_headers(**inputs) == {}
assert stream.request_headers(**inputs) == {"Convex-Client": "airbyte-export-0.2.0"}


def test_http_method(patch_base_class):
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/convex.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,6 @@ In the Data tab, you should see the tables and a sample of the data that will be

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :-------------------- |
| 0.2.0 | 2023-06-21 | [27226](https://github.com/airbytehq/airbyte/pull/27226) | 🐛 Convex source fix skipped records |
| 0.1.1 | 2023-03-06 | [23797](https://github.com/airbytehq/airbyte/pull/23797) | 🐛 Convex source connector error messages |
| 0.1.0 | 2022-10-24 | [18403](https://github.com/airbytehq/airbyte/pull/18403) | 🎉 New Source: Convex |
| 0.1.0 | 2022-10-24 | [18403](https://github.com/airbytehq/airbyte/pull/18403) | 🎉 New Source: Convex |
43 changes: 43 additions & 0 deletions docs/operator-guides/collecting-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,46 @@ Visit [OssMetricsRegistry.java](https://github.com/airbytehq/airbyte-platform/bl

## Additional information
Suppose you are looking for a non-production way of collecting metrics with dbt and Metabase, the tutorial [Airbyte Monitoring with dbt and Metabase](https://airbyte.com/blog/airbyte-monitoring-with-dbt-and-metabase) by accessing Airbyte's Postgres DB. The source code is open on [airbytehq/open-data-stack](https://github.com/airbytehq/open-data-stack). Think of it as an exploratory for data analysts and data engineers of building a dashboard on top of the existing Airbyte Postgres database versus the Prometheus more for DevOps engineers in production.

# Scaling Airbyte

## Metrics
Airbyte supports exporting built-in metrics to Datadog or OpenTelemetry.

### Key Metrics

| Key Metrics | Description |
|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|
| ``oldest_pending_job_age_secs`` | Shows how long a pending job waits before it is scheduled. If a job is in pending state for a long time, more workers may be required. |
| ``oldest_running_job_age_secs`` | Shows how long the oldest job has been running. A running job that is too large can indicate stuck jobs. This is relative to each job’s runtime. |
| ``job_failed_by_release_stage`` | Shows jobs that have failed in that release stage and is tagged as alpha, beta, or GA. |

:::note

Metrics with ``by_release_stage`` in their name are tagged by connector release stage (alpha, beta, or GA). These tags allow you to filter by release stage. Alpha and beta connectors are less stable and have a higher failure rate than GA connectors, so filtering by those release stages can help you find failed jobs.

:::

### Recommended Metrics

| Recommended Metrics | Description |
|-----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| ``num_running_jobs & num_pending_jobs`` | Shows how many jobs are currently running and how many jobs are in pending state. These metrics help you understand the general system state. |
| ``job_succeeded_by_release_stage`` | Shows successful jobs in that release stage and is tagged as alpha, beta, or GA. |
| ``job_created_by_release_stage`` | Shows the jobs created in that release stage and is tagged as alpha, beta, or GA. |

### Example
If a job was created for an Alpha source to a Beta destination and the outcome of the job is a success, the following metrics are displayed:

```
job_created_by_release_stage[“alpha”] = 1;
job_created_by_release_stage[“beta”] = 1;
job_failed_by_release_stage[“alpha”] = 1;
job_succeeded_by_release_stage[“beta”] = 1;
```

:::note

Each job has a source and destination, so each metric is counted twice — once for source and once for destination.

:::
41 changes: 0 additions & 41 deletions docs/operator-guides/scaling-airbyte.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,44 +65,3 @@ is capped by `SQL_MAX_CONNS`.
The advice here is best-effort and by no means comprehensive. Please reach out on Slack if anything doesn't make sense or if something can be improved.

If you've been running Airbyte in production and have more tips up your sleeve, we welcome contributions!

## Metrics
Airbyte supports exporting built-in metrics to Datadog or [OpenTelemetry](https://docs.airbyte.com/operator-guides/collecting-metrics/)

### Key Metrics

| Key Metrics | Description |
|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|
| ``oldest_pending_job_age_secs`` | Shows how long a pending job waits before it is scheduled. If a job is in pending state for a long time, more workers may be required. |
| ``oldest_running_job_age_secs`` | Shows how long the oldest job has been running. A running job that is too large can indicate stuck jobs. This is relative to each job’s runtime. |
| ``job_failed_by_release_stage`` | Shows jobs that have failed in that release stage and is tagged as alpha, beta, or GA. |
:::note

Metrics with ``by_release_stage`` in their name are tagged by connector release stage (alpha, beta, or GA). These tags allow you to filter by release stage. Alpha and beta connectors are less stable and have a higher failure rate than GA connectors, so filtering by those release stages can help you find failed jobs.

:::

### Recommended Metrics

| Recommended Metrics | Description |
|-----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| ``num_running_jobs & num_pending_jobs`` | Shows how many jobs are currently running and how many jobs are in pending state. These metrics help you understand the general system state. |
| ``job_succeeded_by_release_stage`` | Shows successful jobs in that release stage and is tagged as alpha, beta, or GA. |
| ``job_created_by_release_stage`` | Shows the jobs created in that release stage and is tagged as alpha, beta, or GA. |

### Example
If a job was created for an Alpha source to a Beta destination and the outcome of the job is a success, the following metrics are displayed:

```
job_created_by_release_stage[“alpha”] = 1;
job_created_by_release_stage[“beta”] = 1;
job_failed_by_release_stage[“alpha”] = 1;
job_succeeded_by_release_stage[“beta”] = 1;
```

:::note

Each job has a source and destination, so each metric is counted twice — once for source and once for destination.

:::
2 changes: 1 addition & 1 deletion docs/release_notes/july_2022.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ This page includes new features and improvements to the Airbyte Cloud and Airbyt

* Improved Airbyte Open Source self-hosting by refactoring and publishing Helm charts according to best practices as we prepare to formally support Helm deployments. [#14794](https://github.com/airbytehq/airbyte/pull/14794)

* Improved Airbyte Open Source by supporting the [OpenTelemetry (OTEL) Collector](https://docs.airbyte.com/operator-guides/collecting-metrics/). Airbyte Open Source now sends telemetry data to the OTEL collector, and we included a set of [recommended metrics](https://docs.airbyte.com/operator-guides/scaling-airbyte/#metrics) to export to OTEL when running Airbyte Open Source at scale. [#12908](https://github.com/airbytehq/airbyte/issues/12908)
* Improved Airbyte Open Source by supporting the OpenTelemetry (OTEL) Collector. Airbyte Open Source now sends telemetry data to the OTEL collector, and we included a set of [recommended metrics](https://docs.airbyte.com/operator-guides/scaling-airbyte/#metrics) to export to OTEL when running Airbyte Open Source at scale. [#12908](https://github.com/airbytehq/airbyte/issues/12908)

* Improved the [Airbyte Connector Development Kit (CDK)](https://airbyte.com/connector-development-kit) by enabling detailed bug logs from the command line. In addition to the preset CDK debug logs, you can also create custom debug statements and display custom debug logs in the command line. [#14521](https://github.com/airbytehq/airbyte/pull/14521)

Expand Down
1 change: 0 additions & 1 deletion docusaurus/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ const operatorGuide = {
'operator-guides/using-custom-connectors',
'operator-guides/scaling-airbyte',
'operator-guides/configuring-sync-notifications',
'operator-guides/collecting-metrics',
],
};

Expand Down

0 comments on commit f0a1dc4

Please sign in to comment.