diff --git a/airbyte-integrations/connectors/source-asana/BOOTSTRAP.md b/airbyte-integrations/connectors/source-asana/BOOTSTRAP.md index 56b017415f2e8..4f31f66f078ee 100644 --- a/airbyte-integrations/connectors/source-asana/BOOTSTRAP.md +++ b/airbyte-integrations/connectors/source-asana/BOOTSTRAP.md @@ -8,7 +8,8 @@ Some streams depend on: - workspaces (Teams, Users, CustomFields, Projects, Tags, Users streams); - projects (Events, SectionsCompact, Sections, Tasks streams); -- tasks (Events, Stories stream); +- tasks (Events, StoriesCompact stream); +- storiescompact (Stories stream) - teams (TeamMemberships stream). Each record can be uniquely identified by a `gid` key. diff --git a/airbyte-integrations/connectors/source-asana/Dockerfile b/airbyte-integrations/connectors/source-asana/Dockerfile index b2998fa12c425..e9b78991f5f77 100644 --- a/airbyte-integrations/connectors/source-asana/Dockerfile +++ b/airbyte-integrations/connectors/source-asana/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.3.0 +LABEL io.airbyte.version=0.4.0 LABEL io.airbyte.name=airbyte/source-asana diff --git a/airbyte-integrations/connectors/source-asana/acceptance-test-config.yml b/airbyte-integrations/connectors/source-asana/acceptance-test-config.yml index 39a5cacfd6553..1e60bd84f8f63 100644 --- a/airbyte-integrations/connectors/source-asana/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-asana/acceptance-test-config.yml @@ -24,6 +24,8 @@ acceptance_tests: empty_streams: - name: custom_fields bypass_reason: "This stream is not available on the account we're currently using. Please follow https://github.com/airbytehq/airbyte/issues/19662." + - name: events + bypass_reason: "This stream is not available on our current account." full_refresh: # tests: # - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-asana/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-asana/integration_tests/configured_catalog.json index 0f570991e07c0..f7076719ea815 100644 --- a/airbyte-integrations/connectors/source-asana/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-asana/integration_tests/configured_catalog.json @@ -54,6 +54,15 @@ "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" }, + { + "stream": { + "name": "stories_compact", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, { "stream": { "name": "stories", diff --git a/airbyte-integrations/connectors/source-asana/metadata.yaml b/airbyte-integrations/connectors/source-asana/metadata.yaml index eb593f76a88c6..4ecb8ee2205f8 100644 --- a/airbyte-integrations/connectors/source-asana/metadata.yaml +++ b/airbyte-integrations/connectors/source-asana/metadata.yaml @@ -8,7 +8,7 @@ data: connectorSubtype: api connectorType: source definitionId: d0243522-dccf-4978-8ba0-37ed47a0bdbf - dockerImageTag: 0.3.0 + dockerImageTag: 0.4.0 dockerRepository: airbyte/source-asana documentationUrl: https://docs.airbyte.com/integrations/sources/asana githubIssueLabel: source-asana diff --git a/airbyte-integrations/connectors/source-asana/source_asana/schemas/stories.json b/airbyte-integrations/connectors/source-asana/source_asana/schemas/stories.json index 238e48ed47046..fddd089f3091f 100644 --- a/airbyte-integrations/connectors/source-asana/source_asana/schemas/stories.json +++ b/airbyte-integrations/connectors/source-asana/source_asana/schemas/stories.json @@ -1,38 +1,43 @@ { "type": ["null", "object"], "properties": { - "gid": { - "type": ["null", "string"] - }, - "resource_type": { - "type": ["null", "string"] - }, - "created_at": { - "type": ["null", "string"], - "format": "date-time" - }, + "gid": { "type": ["null", "string"] }, + "resource_type": { "type": ["null", "string"] }, + "created_at": { "type": ["null", "string"], "format": "date-time" }, "created_by": { "type": ["null", "object"], "properties": { - "gid": { - "type": ["null", "string"] - }, - "resource_type": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - } + "gid": { "type": ["null", "string"] }, + "resource_type": { "type": ["null", "string"] }, + "name": { "type": ["null", "string"] } } }, - "resource_subtype": { - "type": ["null", "string"] - }, - "text": { - "type": ["null", "string"] + "resource_subtype": { "type": ["null", "string"] }, + "text": { "type": ["null", "string"] }, + "type": { "type": ["null", "string"] }, + "html_text": { "type": ["null", "string"] }, + "is_pinned": { "type": ["null", "boolean"] }, + "sticker_name": { "type": ["null", "string"] }, + "is_editable": { "type": ["null", "boolean"] }, + "is_edited": { "type": ["null", "boolean"] }, + "liked": { "type": ["null", "boolean"] }, + "likes": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "properties": { + "gid": { "type": ["null", "string"] }, + "user": { + "type": ["null", "object"], + "properties": { + "gid": { "type": ["null", "string"] }, + "resource_type": { "type": ["null", "string"] }, + "name": { "type": ["null", "string"] } + } + } + } + } }, - "type": { - "type": ["null", "string"] - } + "num_likes": { "type": ["null", "integer"] } } } diff --git a/airbyte-integrations/connectors/source-asana/source_asana/schemas/stories_compact.json b/airbyte-integrations/connectors/source-asana/source_asana/schemas/stories_compact.json new file mode 100644 index 0000000000000..238e48ed47046 --- /dev/null +++ b/airbyte-integrations/connectors/source-asana/source_asana/schemas/stories_compact.json @@ -0,0 +1,38 @@ +{ + "type": ["null", "object"], + "properties": { + "gid": { + "type": ["null", "string"] + }, + "resource_type": { + "type": ["null", "string"] + }, + "created_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "created_by": { + "type": ["null", "object"], + "properties": { + "gid": { + "type": ["null", "string"] + }, + "resource_type": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + } + } + }, + "resource_subtype": { + "type": ["null", "string"] + }, + "text": { + "type": ["null", "string"] + }, + "type": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-asana/source_asana/source.py b/airbyte-integrations/connectors/source-asana/source_asana/source.py index 7479933180f9d..1baf61b747015 100644 --- a/airbyte-integrations/connectors/source-asana/source_asana/source.py +++ b/airbyte-integrations/connectors/source-asana/source_asana/source.py @@ -21,6 +21,7 @@ Sections, SectionsCompact, Stories, + StoriesCompact, Tags, Tasks, TeamMemberships, @@ -57,7 +58,7 @@ def _get_authenticator(config: dict) -> Union[TokenAuthenticator, AsanaOauth2Aut ) def streams(self, config: Mapping[str, Any]) -> List[Stream]: - args = {"authenticator": self._get_authenticator(config)} + args = {"authenticator": self._get_authenticator(config), "test_mode": config["test_mode"]} streams = [ AttachmentsCompact(**args), Attachments(**args), @@ -65,6 +66,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Projects(**args), SectionsCompact(**args), Sections(**args), + StoriesCompact(**args), Stories(**args), Tags(**args), Tasks(**args), diff --git a/airbyte-integrations/connectors/source-asana/source_asana/spec.json b/airbyte-integrations/connectors/source-asana/source_asana/spec.json index ecb485b8cc16f..165744a559f3f 100644 --- a/airbyte-integrations/connectors/source-asana/source_asana/spec.json +++ b/airbyte-integrations/connectors/source-asana/source_asana/spec.json @@ -63,6 +63,12 @@ } ] }, + "test_mode": { + "type": "boolean", + "title": "Test Mode", + "description": "This flag is used for testing purposes for certain streams that return a lot of data. This flag is not meant to be enabled for prod.", + "airbyte_hidden": true + }, "organization_export_ids": { "title": "Organization Export IDs", "description": "Globally unique identifiers for the organization exports", diff --git a/airbyte-integrations/connectors/source-asana/source_asana/streams.py b/airbyte-integrations/connectors/source-asana/source_asana/streams.py index 6d384b3f1dfa0..2365ec9335fba 100644 --- a/airbyte-integrations/connectors/source-asana/source_asana/streams.py +++ b/airbyte-integrations/connectors/source-asana/source_asana/streams.py @@ -4,11 +4,14 @@ from abc import ABC -from typing import Any, Iterable, Mapping, MutableMapping, Optional, Type +from itertools import islice +from typing import Any, Iterable, Mapping, MutableMapping, Optional, Type, Union import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator +from requests.auth import AuthBase ASANA_ERRORS_MAPPING = { 402: "This stream is available to premium organizations and workspaces only", @@ -24,11 +27,16 @@ class AsanaStream(HttpStream, ABC): # Asana pagination could be from 1 to 100. page_size = 100 raise_on_http_errors = True + test_mode = False @property def AsanaStreamType(self) -> Type: return self.__class__ + def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, test_mode: bool = False): + super().__init__(authenticator=authenticator) + self.test_mode = test_mode + def should_retry(self, response: requests.Response) -> bool: if response.status_code in ASANA_ERRORS_MAPPING.keys(): self.logger.error( @@ -101,6 +109,7 @@ def read_slices_from_records(self, stream_class: AsanaStreamType, slice_field: s """ stream = stream_class(authenticator=self.authenticator) stream_slices = stream.stream_slices(sync_mode=SyncMode.full_refresh) + for stream_slice in stream_slices: for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice): yield {slice_field: record["gid"]} @@ -186,9 +195,48 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: class Events(AsanaStream): + primary_key = "created_at" + sync_token = None + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return "events" + def read_records(self, *args, **kwargs): + # Check if sync token is available + if self.sync_token is not None: + # Pass the sync token as a request parameter + kwargs["next_page_token"] = {"sync": self.sync_token} + + yield from super().read_records(*args, **kwargs) + + # After reading records, update the sync token + self.sync_token = self.get_latest_sync_token() + + def get_latest_sync_token(self) -> str: + latest_sync_token = self.state.get("last_sync_token") # Get the previous sync token + + if latest_sync_token is None: + return None + + return latest_sync_token["sync"] # Extract the sync token value + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + if response.status_code == 412: # Check if response is a 412 error + response_json = response.json() + if "sync" in response_json: # Check if new sync token is available + self.sync_token = response_json["sync"] + else: + self.sync_token = None + self.logger.warning("Sync token expired. Fetch the full dataset for this query now.") + else: + response_json = response.json() + + # Check if response has new sync token + if "sync" in response_json: + self.sync_token = response_json["sync"] + + yield from response_json.get("data", []) + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: decoded_response = response.json() last_sync = decoded_response.get("sync") @@ -226,6 +274,8 @@ def path(self, **kwargs) -> str: class SectionsCompact(ProjectRelatedStream): + use_cache = True + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: project_gid = stream_slice["project_gid"] return f"projects/{project_gid}/sections" @@ -248,13 +298,40 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield from section_data -class Stories(AsanaStream): +class StoriesCompact(AsanaStream): + use_cache = True + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: task_gid = stream_slice["task_gid"] return f"tasks/{task_gid}/stories" def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - yield from self.read_slices_from_records(stream_class=Tasks, slice_field="task_gid") + # This streams causes tests to timeout (> 2hrs), so we limit stream slices to 100 to make tests less noisy + if self.test_mode: + yield from islice(self.read_slices_from_records(stream_class=Tasks, slice_field="task_gid"), 100) + else: + yield from self.read_slices_from_records(stream_class=Tasks, slice_field="task_gid") + + +class Stories(AsanaStream): + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + story_gid = stream_slice["story_gid"] + return f"stories/{story_gid}" + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + # This streams causes tests to timeout (> 2hrs), so we limit stream slices to 100 to make tests less noisy + if self.test_mode: + yield from islice(self.read_slices_from_records(stream_class=StoriesCompact, slice_field="story_gid"), 100) + else: + yield from self.read_slices_from_records(stream_class=StoriesCompact, slice_field="story_gid") + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + response_json = response.json() + section_data = response_json.get("data", {}) + if isinstance(section_data, dict): # Check if section_data is a dictionary + yield section_data + elif isinstance(section_data, list): # Check if section_data is a list + yield from section_data class Tags(WorkspaceRequestParamsRelatedStream): diff --git a/docs/integrations/sources/asana.md b/docs/integrations/sources/asana.md index 2961ddaa6d53f..945f097946559 100644 --- a/docs/integrations/sources/asana.md +++ b/docs/integrations/sources/asana.md @@ -66,17 +66,18 @@ The connector is restricted by normal Asana [requests limitation](https://develo ## Changelog -| Version | Date | Pull Request | Subject | -| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------- | -| 0.3.0 | 2023-10-24 | [31634](https://github.com/airbytehq/airbyte/pull/31634) | Add OrganizationExports stream | -| 0.2.0 | 2023-10-17 | [31090](https://github.com/airbytehq/airbyte/pull/31090) | Add Attachments stream | -| 0.1.9 | 2023-10-16 | [31089](https://github.com/airbytehq/airbyte/pull/31089) | Add Events stream | -| 0.1.8 | 2023-10-16 | [31009](https://github.com/airbytehq/airbyte/pull/31009) | Add SectionsCompact stream | -| 0.1.7 | 2023-05-29 | [26716](https://github.com/airbytehq/airbyte/pull/26716) | Remove authSpecification from spec.json, use advancedAuth instead | -| 0.1.6 | 2023-05-26 | [26653](https://github.com/airbytehq/airbyte/pull/26653) | Fix order of authentication methods | -| 0.1.5 | 2022-11-16 | [19561](https://github.com/airbytehq/airbyte/pull/19561) | Added errors handling, updated SAT with new format | -| 0.1.4 | 2022-08-18 | [15749](https://github.com/airbytehq/airbyte/pull/15749) | Add cache to project stream | -| 0.1.3 | 2021-10-06 | [6832](https://github.com/airbytehq/airbyte/pull/6832) | Add oauth init flow parameters support | -| 0.1.2 | 2021-09-24 | [6402](https://github.com/airbytehq/airbyte/pull/6402) | Fix SAT tests: update schemas and invalid_config.json file | -| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add entrypoint and bump version for connector | -| 0.1.0 | 2021-05-25 | [3510](https://github.com/airbytehq/airbyte/pull/3510) | New Source: Asana | +| Version | Date | Pull Request | Subject | +| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------- | +| 0.4.0 | 2023-10-24 | [31084](https://github.com/airbytehq/airbyte/pull/31084) | Add StoriesCompact stream | +| 0.3.0 | 2023-10-24 | [31634](https://github.com/airbytehq/airbyte/pull/31634) | Add OrganizationExports stream | +| 0.2.0 | 2023-10-17 | [31090](https://github.com/airbytehq/airbyte/pull/31090) | Add Attachments stream | +| 0.1.9 | 2023-10-16 | [31089](https://github.com/airbytehq/airbyte/pull/31089) | Add Events stream | +| 0.1.8 | 2023-10-16 | [31009](https://github.com/airbytehq/airbyte/pull/31009) | Add SectionsCompact stream | +| 0.1.7 | 2023-05-29 | [26716](https://github.com/airbytehq/airbyte/pull/26716) | Remove authSpecification from spec.json, use advancedAuth instead | +| 0.1.6 | 2023-05-26 | [26653](https://github.com/airbytehq/airbyte/pull/26653) | Fix order of authentication methods | +| 0.1.5 | 2022-11-16 | [19561](https://github.com/airbytehq/airbyte/pull/19561) | Added errors handling, updated SAT with new format | +| 0.1.4 | 2022-08-18 | [15749](https://github.com/airbytehq/airbyte/pull/15749) | Add cache to project stream | +| 0.1.3 | 2021-10-06 | [6832](https://github.com/airbytehq/airbyte/pull/6832) | Add oauth init flow parameters support | +| 0.1.2 | 2021-09-24 | [6402](https://github.com/airbytehq/airbyte/pull/6402) | Fix SAT tests: update schemas and invalid_config.json file | +| 0.1.1 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add entrypoint and bump version for connector | +| 0.1.0 | 2021-05-25 | [3510](https://github.com/airbytehq/airbyte/pull/3510) | New Source: Asana |