diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 696529098a8bd..2ec10b50edd89 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -843,7 +843,7 @@ - name: Zendesk Support sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715 dockerRepository: airbyte/source-zendesk-support - dockerImageTag: 0.2.2 + dockerImageTag: 0.2.3 documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support icon: zendesk.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index a66b2596ec5d0..d37f3765e3910 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8906,7 +8906,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-zendesk-support:0.2.2" +- dockerImage: "airbyte/source-zendesk-support:0.2.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile index 03358e75d66f6..b79601765c4e5 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile +++ b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.2 +LABEL io.airbyte.version=0.2.3 LABEL io.airbyte.name=airbyte/source-zendesk-support diff --git a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/configured_catalog.json index b2aac225ba8ad..55c43dc76d9ed 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/configured_catalog.json @@ -60,6 +60,18 @@ "sync_mode": "incremental", "destination_sync_mode": "append" }, + { + "stream": { + "name": "schedules", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "default_cursor_field": [], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, { "stream": { "name": "sla_policies", diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index c35f75f72425d..5e2481f5c68e1 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -294,6 +294,7 @@ class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream): # thus we can't implement an incremental logic for them """ + page_size = 100 primary_key = "id" response_list_name: str = None @@ -340,7 +341,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time") if start_time != self.prev_start_time: self.prev_start_time = start_time - return {self.cursor_field: start_time} + return {self.cursor_field: int(start_time)} def request_params( self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs @@ -359,49 +360,77 @@ def request_params( return params -class ZendeskSupportTicketEventsExportStream(SourceZendeskSupportCursorPaginationStream): - """Incremental Export from TicketEvents stream: - https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export +class SourceZendeskTicketExportStream(SourceZendeskSupportCursorPaginationStream): + """Incremental Export from Tickets stream: + https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based - @ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events" - @ param response_target_entity: nested property inside of `response_list_name`, default = "child_events" - @ param list_entities_from_event : the list of nested child_events entities to include from parent record - @ param sideload_param : parameter variable to include various information to child_events property + @ param response_list_name: the main nested entity to look at inside of response, defualt = response_list_name + @ param sideload_param : parameter variable to include various information to response more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints - @ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc] """ - - response_list_name: str = "ticket_events" - response_target_entity: str = "child_events" - list_entities_from_event: List[str] = None + + cursor_field = "updated_at" + response_list_name: str = "tickets" sideload_param: str = None - event_type: str = None - - @property - def update_event_from_record(self) -> bool: - """Returns True/False based on list_entities_from_event property""" - return True if len(self.list_entities_from_event) > 0 else False - + + @staticmethod + def check_start_time_param(requested_start_time: int, value: int = 1): + """ + Requesting tickets in the future is not allowed, hits 400 - bad request. + We get current UNIX timestamp minus `value` from now(), default = 1 (minute). + + Returns: either close to now UNIX timestamp or previously requested UNIX timestamp. + """ + now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple()) + return now if requested_start_time > now else requested_start_time + def path(self, **kwargs) -> str: - return "incremental/ticket_events" - + return f"incremental/{self.response_list_name}.json" + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """ Returns next_page_token based on `end_of_stream` parameter inside of response """ next_page_token = super().next_page_token(response) return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token - + def request_params( self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs ) -> MutableMapping[str, Any]: params = super().request_params(stream_state, next_page_token, **kwargs) + # check "start_time" is not in the future + params["start_time"] = self.check_start_time_param(params["start_time"]) if self.sideload_param: params["include"] = self.sideload_param return params - + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: for record in response.json().get(self.response_list_name, []): + yield record + + +class SourceZendeskSupportTicketEventsExportStream(SourceZendeskTicketExportStream): + """Incremental Export from TicketEvents stream: + https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export + + @ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events" + @ param response_target_entity: nested property inside of `response_list_name`, default = "child_events" + @ param list_entities_from_event : the list of nested child_events entities to include from parent record + @ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc] + """ + cursor_field = "created_at" + response_list_name: str = "ticket_events" + response_target_entity: str = "child_events" + list_entities_from_event: List[str] = None + event_type: str = None + + @property + def update_event_from_record(self) -> bool: + """Returns True/False based on list_entities_from_event property""" + return True if len(self.list_entities_from_event) > 0 else False + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for record in super().parse_response(response, **kwargs): for event in record.get(self.response_target_entity, []): if event.get("event_type") == self.event_type: if self.update_event_from_record: @@ -418,27 +447,15 @@ class Organizations(SourceZendeskSupportStream): """Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" -class Tickets(SourceZendeskSupportStream): - """Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" - - # The API compares the start_time with the ticket's generated_timestamp value, not its updated_at value. - # The generated_timestamp value is updated for all entity updates, including system updates. - # If a system update occurs after an event, the unchanged updated_at time will become earlier - # relative to the updated generated_timestamp time. - - def request_params(self, **kwargs) -> MutableMapping[str, Any]: - """Adds the field 'comment_count'""" - params = super().request_params(**kwargs) - params["include"] = "comment_count" - return params +class Tickets(SourceZendeskTicketExportStream): + """Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based""" -class TicketComments(ZendeskSupportTicketEventsExportStream): +class TicketComments(SourceZendeskSupportTicketEventsExportStream): """ Fetch the TicketComments incrementaly from TicketEvents Export stream """ - cursor_field = "created_at" list_entities_from_event = ["via_reference_id", "ticket_id", "timestamp"] sideload_param = "comment_events" event_type = "Comment" diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py index 32fbb78ca389f..50c334fefb688 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py @@ -12,12 +12,10 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException from source_zendesk_support.source import BasicApiTokenAuthenticator -from source_zendesk_support.streams import Tickets +from source_zendesk_support.streams import Macros -@pytest.fixture(scope="module") -def stream_args(): - return { +STREAM_ARGS: dict = { "subdomain": "fake-subdomain", "start_date": "2021-01-27T00:00:00Z", "authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"), @@ -34,34 +32,29 @@ def stream_args(): (101, 100, 2), ], ) -def test_proper_number_of_future_requests_generated(stream_args, records_count, page_size, expected_futures_deque_len): - stream = Tickets(**stream_args) +def test_proper_number_of_future_requests_generated(records_count, page_size, expected_futures_deque_len): + stream = Macros(**STREAM_ARGS) stream.page_size = page_size with requests_mock.Mocker() as m: count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") m.get(count_url, text=json.dumps({"count": {"value": records_count}})) - records_url = urljoin(stream.url_base, stream.path()) m.get(records_url) - stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field) - assert len(stream.future_requests) == expected_futures_deque_len @pytest.mark.parametrize( "records_count,page_size,expected_futures_deque_len", [ - (1000, 100, 10), - (1000, 10, 100), - (0, 100, 0), - (1, 100, 1), - (101, 100, 2), + (10, 10, 10), + (10, 100, 10), + (10, 10, 0), ], ) -def test_parse_future_records(stream_args, records_count, page_size, expected_futures_deque_len): - stream = Tickets(**stream_args) +def test_parse_future_records(records_count, page_size, expected_futures_deque_len): + stream = Macros(**STREAM_ARGS) stream.page_size = page_size expected_records = [ {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} @@ -97,8 +90,8 @@ def test_parse_future_records(stream_args, records_count, page_size, expected_fu # (101, 100, 2, False), ], ) -def test_read_records(stream_args, records_count, page_size, expected_futures_deque_len, should_retry): - stream = Tickets(**stream_args) +def test_read_records(records_count, page_size, expected_futures_deque_len, should_retry): + stream = Macros(**STREAM_ARGS) stream.page_size = page_size expected_records = [ {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py index 902cd43ea8859..0a98d9ebeb72c 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -10,7 +10,13 @@ import pytz import requests from source_zendesk_support.source import BasicApiTokenAuthenticator -from source_zendesk_support.streams import DATETIME_FORMAT, END_OF_STREAM_KEY, BaseSourceZendeskSupportStream, TicketComments +from source_zendesk_support.streams import ( + DATETIME_FORMAT, + END_OF_STREAM_KEY, + BaseSourceZendeskSupportStream, + TicketComments, + SourceZendeskTicketExportStream, +) # config STREAM_ARGS = { @@ -77,6 +83,12 @@ def test_str2unixtime(): expected = calendar.timegm(DATETIME_FROM_STR.utctimetuple()) output = BaseSourceZendeskSupportStream.str2unixtime(DATETIME_STR) assert output == expected + +def test_check_start_time_param(): + expected = 1626936955 + start_time = calendar.timegm(pendulum.parse(DATETIME_STR).utctimetuple()) + output = SourceZendeskTicketExportStream.check_start_time_param(start_time) + assert output == expected def test_parse_next_page_number(requests_mock): @@ -90,7 +102,7 @@ def test_parse_next_page_number(requests_mock): def test_next_page_token(requests_mock): # mocking the logic of next_page_token if STREAM_RESPONSE.get(END_OF_STREAM_KEY) is False: - expected = {"created_at": "1122334455"} + expected = {"created_at": 1122334455} else: expected = None requests_mock.get(STREAM_URL, json=STREAM_RESPONSE) diff --git a/docs/integrations/sources/zendesk-support.md b/docs/integrations/sources/zendesk-support.md index 0cd4228a0754b..e823361e9e5d1 100644 --- a/docs/integrations/sources/zendesk-support.md +++ b/docs/integrations/sources/zendesk-support.md @@ -20,7 +20,7 @@ This Source is capable of syncing the following core Streams: * [Schedules](https://developer.zendesk.com/api-reference/ticketing/ticket-management/schedules/#list-schedules) * [SLA Policies](https://developer.zendesk.com/rest_api/docs/support/sla_policies) * [Tags](https://developer.zendesk.com/rest_api/docs/support/tags) -* [Tickets](https://developer.zendesk.com/rest_api/docs/support/tickets) +* [Tickets](https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based) * [Ticket Audits](https://developer.zendesk.com/rest_api/docs/support/ticket_audits) * [Ticket Comments](https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export) * [Ticket Fields](https://developer.zendesk.com/rest_api/docs/support/ticket_fields) @@ -102,6 +102,7 @@ Simply proceed by pressing "Authenticate your Account" and complete the authenti | Version | Date | Pull Request | Subject | |:---------|:-----------| :----- |:-------------------------------------------------------| +| `0.2.3` | 2022-03-23 | [11349](https://github.com/airbytehq/airbyte/pull/11349) | Fixed the bug when Tickets stream didn't return deleted records | `0.2.2` | 2022-03-17 | [11237](https://github.com/airbytehq/airbyte/pull/11237) | Fixed the bug when TicketComments stream didn't return all records | `0.2.1` | 2022-03-15 | [11162](https://github.com/airbytehq/airbyte/pull/11162) | Added support of OAuth2.0 authentication method | `0.2.0` | 2022-03-01 | [9456](https://github.com/airbytehq/airbyte/pull/9456) | Update source to use future requests |