From 30a07dadfaa01d0ec757826151d66b3a0000fed5 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Wed, 1 Sep 2021 12:06:40 +0300 Subject: [PATCH 01/14] fix negative backoff --- .../acceptance-test-docker.sh | 7 +++ .../integration_tests/invalid_config.json | 6 +- .../source_zendesk_support/spec.json | 32 ++++++++++- .../source_zendesk_support/streams.py | 55 ++++++++++++------- .../unit_tests/unit_test.py | 44 ++++++++++++--- 5 files changed, 110 insertions(+), 34 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-docker.sh index db28f196367cf0..03a3c6d3de3218 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-docker.sh +++ b/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-docker.sh @@ -1,4 +1,11 @@ #!/usr/bin/env sh +image_name=$(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) +# Build latest connector image +echo "try to build: ${image_name}" +docker build . -t ${image_name} + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest docker run --rm -it \ -v /var/run/docker.sock:/var/run/docker.sock \ diff --git a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/invalid_config.json index b0855267d84160..c1562ac3660e64 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/invalid_config.json @@ -1,6 +1,8 @@ { - "email": "broken.email@invalid.config", - "api_token": "", + "auth_method": { + "api_token": "", + "email": "broken.email@invalid.config" + }, "subdomain": "test-failure-airbyte", "start_date": "2030-01-01T00:00:00Z" } diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json index 20f4af4f65e39e..890301be1c294f 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json @@ -4,13 +4,19 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Source Zendesk Support Spec", "type": "object", - "required": ["start_date", "subdomain", "auth_method"], + "required": [ + "start_date", + "subdomain", + "auth_method" + ], "additionalProperties": false, "properties": { "start_date": { "type": "string", "description": "The date from which you'd like to replicate data for Zendesk Support API, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", - "examples": ["2020-10-15T00:00:00Z"], + "examples": [ + "2020-10-15T00:00:00Z" + ], "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" }, "subdomain": { @@ -20,14 +26,34 @@ "auth_method": { "title": "ZenDesk Authorization Method", "type": "object", + "default": "API Token", "description": "Zendesk service provides 2 auth method: API token and oAuth2. Now only the first one is available. Another one will be added in the future", "oneOf": [ + { + "title": "oAuth2", + "type": "object", + "required": [], + "additionalProperties": false, + "properties": { + "auth_method": { + "type": "string", + "const": "oauth2" + } + } + }, { "title": "API Token", "type": "object", - "required": ["email", "api_token"], + "required": [ + "email", + "api_token" + ], "additionalProperties": false, "properties": { + "auth_method": { + "type": "string", + "const": "api_token" + }, "email": { "type": "string", "description": "The user email for your Zendesk account" diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 8eb0da8623d159..87d19edd3a406d 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -27,7 +27,7 @@ import time from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union from urllib.parse import parse_qsl, urlparse import pytz @@ -69,23 +69,26 @@ def _parse_next_page_number(response: requests.Response) -> Optional[int]: return dict(parse_qsl(urlparse(next_page).query)).get("page") return None - def backoff_time(self, response: requests.Response) -> int: + def backoff_time(self, response: requests.Response) -> Union[int, float]: """ The rate limit is 700 requests per minute # monitoring-your-request-activity See https://developer.zendesk.com/api-reference/ticketing/account-configuration/usage_limits/ The response has a Retry-After header that tells you for how many seconds to wait before retrying. """ - retry_after = response.headers.get("Retry-After") - if retry_after: - return int(retry_after) - # the header X-Rate-Limit returns a amount of requests per minute - # we try to wait twice as long - rate_limit = float(response.headers.get("X-Rate-Limit") or 0) - if rate_limit: - return (60.0 / rate_limit) * 2 # default value if there is not any headers - return 60 + sleep_timeout = 60 + retry_after = int(response.headers.get("Retry-After") or 0) + if retry_after and retry_after > 0: + sleep_timeout = int(retry_after) + else: + # the header X-Rate-Limit returns a amount of requests per minute + # we try to wait twice as long + rate_limit = float(response.headers.get("X-Rate-Limit") or 0) + if rate_limit and rate_limit > 0: + return (60.0 / rate_limit) * 2 + + return sleep_timeout @staticmethod def str2datetime(str_dt: str) -> datetime: @@ -140,7 +143,8 @@ class IncrementalEntityStream(SourceZendeskSupportStream, ABC): def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) # add the custom value for skiping of not relevant records - self._start_date = self.str2datetime(start_date) if isinstance(start_date, str) else start_date + self._start_date = self.str2datetime( + start_date) if isinstance(start_date, str) else start_date def path(self, **kwargs) -> str: return f"{self.name}.json" @@ -158,7 +162,8 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late # try to save maximum value of a cursor field return { self.cursor_field: max( - str((latest_record or {}).get(self.cursor_field, "")), str((current_stream_state or {}).get(self.cursor_field, "")) + str((latest_record or {}).get(self.cursor_field, "")), str( + (current_stream_state or {}).get(self.cursor_field, "")) ) } @@ -210,7 +215,8 @@ def request_params( if current_state and isinstance(current_state, str) and not current_state.isdigit(): # try to save a stage with UnixTime format current_state = self.str2unixtime(current_state) - start_time = int(current_state or time.mktime(self._start_date.timetuple())) + 1 + start_time = int(current_state or time.mktime( + self._start_date.timetuple())) + 1 # +1 because the API returns all records where generated_timestamp >= start_time now = calendar.timegm(datetime.now().utctimetuple()) @@ -264,7 +270,8 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - max_updated_at = self.datetime2str(self._max_cursor_date) if self._max_cursor_date else "" + max_updated_at = self.datetime2str( + self._max_cursor_date) if self._max_cursor_date else "" return {self.cursor_field: max(max_updated_at, (current_stream_state or {}).get(self.cursor_field, ""))} @property @@ -306,7 +313,8 @@ class IncrementalSortedCursorStream(IncrementalUnsortedStream, ABC): def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(next_page_token=next_page_token, **kwargs) - params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size}) + params.update({"sort_by": self.cursor_field, + "sort_order": "desc", "limit": self.page_size}) if next_page_token: params["cursor"] = next_page_token @@ -324,7 +332,8 @@ class IncrementalSortedPageStream(IncrementalUnsortedPageStream, ABC): def request_params(self, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) if params: - params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size}) + params.update({"sort_by": self.cursor_field, + "sort_order": "desc", "limit": self.page_size}) return params @@ -348,12 +357,15 @@ def stream_slices( stream_state = stream_state or {} # convert a comment state value to a ticket one # Comment state: {"created_at": "2021-07-30T12:30:09Z"} => Ticket state {"generated_timestamp": 1627637409} - ticket_stream_value = Tickets.str2unixtime(stream_state.get(self.cursor_field)) + ticket_stream_value = Tickets.str2unixtime( + stream_state.get(self.cursor_field)) tickets = Tickets(self._start_date, subdomain=self._subdomain, authenticator=self.authenticator).read_records( - sync_mode=sync_mode, cursor_field=cursor_field, stream_state={Tickets.cursor_field: ticket_stream_value} + sync_mode=sync_mode, cursor_field=cursor_field, stream_state={ + Tickets.cursor_field: ticket_stream_value} ) - stream_state_dt = self.str2datetime(stream_state.get(self.cursor_field)) + stream_state_dt = self.str2datetime( + stream_state.get(self.cursor_field)) # selects all tickets what have at least one comment ticket_ids = [ @@ -365,7 +377,8 @@ def stream_slices( for ticket in tickets if ticket["comment_count"] ] - self.logger.info(f"Found updated {len(ticket_ids)} ticket(s) with comments") + self.logger.info( + f"Found updated {len(ticket_ids)} ticket(s) with comments") # sort slices by generated_timestamp ticket_ids.sort(key=lambda ticket: ticket[Tickets.cursor_field]) return ticket_ids diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py index 2fecd7df1ce682..f60b111cf15db3 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -26,6 +26,7 @@ from unittest import TestCase import requests_mock +import requests import timeout_decorator from airbyte_cdk.sources.streams.http.exceptions import UserDefinedBackoffException from source_zendesk_support import SourceZendeskSupport @@ -46,14 +47,41 @@ def prepare_stream_args(): with open(CONFIG_FILE, "r") as f: return SourceZendeskSupport.convert_config2stream_args(json.loads(f.read())) - @timeout_decorator.timeout(10) - def test_backoff(self): - """Zendesk sends the header 'Retry-After' about needed delay. - All streams have to handle it""" - timeout = 1 + # @timeout_decorator.timeout(10) + # def test_backoff(self): + # """Zendesk sends the header 'Retry-After' about needed delay. + # All streams have to handle it""" + # timeout = 1 + # stream = Tags(**self.prepare_stream_args()) + # with requests_mock.Mocker() as m: + # url = stream.url_base + stream.path() + # m.get(url, text=json.dumps({}), status_code=429, + # headers={"Retry-After": str(timeout)}) + # with self.assertRaises(UserDefinedBackoffException): + # list(stream.read_records(sync_mode=None)) + + def test_backoff_cases(self): + """Zendesk sends the header different value for backoff logic""" + stream = Tags(**self.prepare_stream_args()) + default_timeout = 60 with requests_mock.Mocker() as m: url = stream.url_base + stream.path() - m.get(url, text=json.dumps({}), status_code=429, headers={"Retry-After": str(timeout)}) - with self.assertRaises(UserDefinedBackoffException): - list(stream.read_records(sync_mode=None)) + + # with the Retry-After header > 0 + m.get(url, headers={"Retry-After": str(123)}) + assert stream.backoff_time(requests.get(url)) == 123 + # with the Retry-After header < 0, must return a default value + m.get(url, headers={"Retry-After": str(-123)}) + assert stream.backoff_time(requests.get(url)) == default_timeout + + # with the Retry-After header > 0 + m.get(url, headers={"X-Rate-Limit": str(100)}) + assert (stream.backoff_time(requests.get(url)) - 1.2) < 0.0005 + # with the Retry-After header < 0, must return a default value + m.get(url, headers={"X-Rate-Limit": str(-100)}) + assert stream.backoff_time(requests.get(url)) == default_timeout + + # without rate headers + m.get(url) + assert stream.backoff_time(requests.get(url)) == default_timeout From ceeecee12a4a721db799770ca91d5368769352d1 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Wed, 1 Sep 2021 20:07:08 +0300 Subject: [PATCH 02/14] correct incremental logic for comments --- ....py::TestBasicRead::test_read[inputs0].txt | 0 ...efresh::test_sequential_reads[inputs0].txt | 0 .../source_zendesk_support/spec.json | 15 +--- .../source_zendesk_support/streams.py | 85 ++++++++++--------- .../unit_tests/unit_test.py | 25 +++--- 5 files changed, 60 insertions(+), 65 deletions(-) create mode 100644 airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt create mode 100644 airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt diff --git a/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt b/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt b/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json index 890301be1c294f..fa5db8e27d3e36 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json @@ -4,19 +4,13 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Source Zendesk Support Spec", "type": "object", - "required": [ - "start_date", - "subdomain", - "auth_method" - ], + "required": ["start_date", "subdomain", "auth_method"], "additionalProperties": false, "properties": { "start_date": { "type": "string", "description": "The date from which you'd like to replicate data for Zendesk Support API, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", - "examples": [ - "2020-10-15T00:00:00Z" - ], + "examples": ["2020-10-15T00:00:00Z"], "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" }, "subdomain": { @@ -44,10 +38,7 @@ { "title": "API Token", "type": "object", - "required": [ - "email", - "api_token" - ], + "required": ["email", "api_token"], "additionalProperties": false, "properties": { "auth_method": { diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 87d19edd3a406d..e0b139e5204312 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -143,8 +143,7 @@ class IncrementalEntityStream(SourceZendeskSupportStream, ABC): def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) # add the custom value for skiping of not relevant records - self._start_date = self.str2datetime( - start_date) if isinstance(start_date, str) else start_date + self._start_date = self.str2datetime(start_date) if isinstance(start_date, str) else start_date def path(self, **kwargs) -> str: return f"{self.name}.json" @@ -156,16 +155,14 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp if record.get(self.created_at_field) and self.str2datetime(record[self.created_at_field]) < self._start_date: continue yield record - yield from [] def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: # try to save maximum value of a cursor field - return { - self.cursor_field: max( - str((latest_record or {}).get(self.cursor_field, "")), str( - (current_stream_state or {}).get(self.cursor_field, "")) - ) - } + old_value = str((current_stream_state or {}).get(self.cursor_field, "")) + new_value = max(str((latest_record or {}).get(self.cursor_field, "")), old_value) + if old_value != new_value: + return {self.cursor_field: new_value} + return None class IncrementalExportStream(IncrementalEntityStream, ABC): @@ -215,8 +212,7 @@ def request_params( if current_state and isinstance(current_state, str) and not current_state.isdigit(): # try to save a stage with UnixTime format current_state = self.str2unixtime(current_state) - start_time = int(current_state or time.mktime( - self._start_date.timetuple())) + 1 + start_time = int(current_state or time.mktime(self._start_date.timetuple())) + 1 # +1 because the API returns all records where generated_timestamp >= start_time now = calendar.timegm(datetime.now().utctimetuple()) @@ -244,10 +240,6 @@ def __init__(self, **kwargs): # For saving of a relevant last updated date self._max_cursor_date = None - def _get_stream_date(self, stream_state: Mapping[str, Any], **kwargs) -> datetime: - """Can change a date of comparison""" - return self.str2datetime((stream_state or {}).get(self.cursor_field)) - def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: """try to select relevant data only""" @@ -255,10 +247,9 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, yield from super().parse_response(response, stream_state=stream_state, **kwargs) else: send_cnt = 0 - cursor_date = self._get_stream_date(stream_state, **kwargs) - + cursor_date = (stream_state or {}).get(self.cursor_field) for record in super().parse_response(response, stream_state=stream_state, **kwargs): - updated = self.str2datetime(record[self.cursor_field]) + updated = record[self.cursor_field] if not self._max_cursor_date or self._max_cursor_date < updated: self._max_cursor_date = updated if not cursor_date or updated > cursor_date: @@ -266,13 +257,11 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, yield record if not send_cnt: self._finished = True - yield from [] + else: + self.logger.info(f"found new {send_cnt} record(s)") def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - - max_updated_at = self.datetime2str( - self._max_cursor_date) if self._max_cursor_date else "" - return {self.cursor_field: max(max_updated_at, (current_stream_state or {}).get(self.cursor_field, ""))} + return {self.cursor_field: max(self._max_cursor_date or "", (current_stream_state or {}).get(self.cursor_field, ""))} @property def is_finished(self): @@ -313,8 +302,7 @@ class IncrementalSortedCursorStream(IncrementalUnsortedStream, ABC): def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(next_page_token=next_page_token, **kwargs) - params.update({"sort_by": self.cursor_field, - "sort_order": "desc", "limit": self.page_size}) + params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size}) if next_page_token: params["cursor"] = next_page_token @@ -332,8 +320,7 @@ class IncrementalSortedPageStream(IncrementalUnsortedPageStream, ABC): def request_params(self, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) if params: - params.update({"sort_by": self.cursor_field, - "sort_order": "desc", "limit": self.page_size}) + params.update({"sort_by": self.cursor_field, "sort_order": "desc", "limit": self.page_size}) return params @@ -346,46 +333,62 @@ class TicketComments(IncrementalSortedPageStream): response_list_name = "comments" cursor_field = IncrementalSortedPageStream.created_at_field + def __init__(self, **kwargs): + super().__init__(**kwargs) + # need to save a slice ticket state + # because the function get_updated_state doesn't have a stream_slice as argument + self._slice_cursor_date = None + def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: ticket_id = stream_slice["id"] return f"tickets/{ticket_id}/comments.json" + def parse_response( + self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs + ) -> Iterable[Mapping]: + """try to select relevant data only""" + self._cursor_ticket_date = stream_slice[Tickets.cursor_field] + yield from super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, **kwargs) + def stream_slices( self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: """Loads all updated tickets after last stream state""" stream_state = stream_state or {} # convert a comment state value to a ticket one - # Comment state: {"created_at": "2021-07-30T12:30:09Z"} => Ticket state {"generated_timestamp": 1627637409} - ticket_stream_value = Tickets.str2unixtime( - stream_state.get(self.cursor_field)) + # tickets and comments have different cursor formats. For example: + # Ticket state {"generated_timestamp": 1627637409} + # Comment state: {"created_at": "2021-07-30T12:30:09Z"} + # At the first try to find a ticket cursor value + ticket_stream_value = stream_state.get(Tickets.cursor_field) + if not ticket_stream_value: + # for backward compatibility because not all relevant states can have some last ticket state + ticket_stream_value = Tickets.str2unixtime(stream_state.get(self.cursor_field)) tickets = Tickets(self._start_date, subdomain=self._subdomain, authenticator=self.authenticator).read_records( - sync_mode=sync_mode, cursor_field=cursor_field, stream_state={ - Tickets.cursor_field: ticket_stream_value} + sync_mode=sync_mode, cursor_field=cursor_field, stream_state={Tickets.cursor_field: ticket_stream_value} ) - stream_state_dt = self.str2datetime( - stream_state.get(self.cursor_field)) # selects all tickets what have at least one comment ticket_ids = [ { "id": ticket["id"], - "start_stream_state": stream_state_dt, Tickets.cursor_field: ticket[Tickets.cursor_field], } for ticket in tickets if ticket["comment_count"] ] - self.logger.info( - f"Found updated {len(ticket_ids)} ticket(s) with comments") + self.logger.info(f"Found updated {len(ticket_ids)} ticket(s) with comments") # sort slices by generated_timestamp ticket_ids.sort(key=lambda ticket: ticket[Tickets.cursor_field]) return ticket_ids - def _get_stream_date(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any], **kwargs) -> datetime: - """For each tickets all comments must be compared with a start value of stream state""" - return stream_slice["start_stream_state"] + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + """Adds a last cursor ticket updated time for a comment state""" + new_state = super().get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) + if new_state: + new_state[Tickets.cursor_field] = self._cursor_ticket_date + return new_state # NOTE: all Zendesk endpoints can be splitted into several templates of data loading. @@ -466,6 +469,8 @@ class Macros(IncrementalSortedPageStream): class TicketAudits(IncrementalSortedCursorStream): """TicketAudits stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_audits/""" + # can request a maximum of 1,000 results + page_size = 1000 # ticket audits doesn't have the 'updated_by' field cursor_field = "created_at" diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py index f60b111cf15db3..f45aa5b42139ef 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -25,8 +25,8 @@ import json from unittest import TestCase -import requests_mock import requests +import requests_mock import timeout_decorator from airbyte_cdk.sources.streams.http.exceptions import UserDefinedBackoffException from source_zendesk_support import SourceZendeskSupport @@ -47,18 +47,17 @@ def prepare_stream_args(): with open(CONFIG_FILE, "r") as f: return SourceZendeskSupport.convert_config2stream_args(json.loads(f.read())) - # @timeout_decorator.timeout(10) - # def test_backoff(self): - # """Zendesk sends the header 'Retry-After' about needed delay. - # All streams have to handle it""" - # timeout = 1 - # stream = Tags(**self.prepare_stream_args()) - # with requests_mock.Mocker() as m: - # url = stream.url_base + stream.path() - # m.get(url, text=json.dumps({}), status_code=429, - # headers={"Retry-After": str(timeout)}) - # with self.assertRaises(UserDefinedBackoffException): - # list(stream.read_records(sync_mode=None)) + @timeout_decorator.timeout(10) + def test_backoff(self): + """Zendesk sends the header 'Retry-After' about needed delay. + All streams have to handle it""" + timeout = 1 + stream = Tags(**self.prepare_stream_args()) + with requests_mock.Mocker() as m: + url = stream.url_base + stream.path() + m.get(url, text=json.dumps({}), status_code=429, headers={"Retry-After": str(timeout)}) + with self.assertRaises(UserDefinedBackoffException): + list(stream.read_records(sync_mode=None)) def test_backoff_cases(self): """Zendesk sends the header different value for backoff logic""" From 9bf24d9bea24dd95290334cdecb0fd06f9b71197 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Wed, 1 Sep 2021 20:12:44 +0300 Subject: [PATCH 03/14] remove test logs --- .../test_core.py::TestBasicRead::test_read[inputs0].txt | 0 ...efresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt delete mode 100644 airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt diff --git a/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt b/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_core.py::TestBasicRead::test_read[inputs0].txt deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt b/airbyte-integrations/connectors/source-zendesk-support/acceptance_tests_logs/test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0].txt deleted file mode 100644 index e69de29bb2d1d6..00000000000000 From c2f18656dd5dd94cf9cd21cbb2c3d2584832774b Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Thu, 2 Sep 2021 00:27:18 +0300 Subject: [PATCH 04/14] bump version --- .../79c1aa37-dae3-42ae-b333-d1c105477715.json | 2 +- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- .../connectors/source-zendesk-support/Dockerfile | 2 +- .../source_zendesk_support/streams.py | 7 ++----- docs/integrations/sources/zendesk-support.md | 3 ++- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/79c1aa37-dae3-42ae-b333-d1c105477715.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/79c1aa37-dae3-42ae-b333-d1c105477715.json index 6b1b06d003d647..d54ab6bab2e52c 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/79c1aa37-dae3-42ae-b333-d1c105477715.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/79c1aa37-dae3-42ae-b333-d1c105477715.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "79c1aa37-dae3-42ae-b333-d1c105477715", "name": "Zendesk Support", "dockerRepository": "airbyte/source-zendesk-support", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://docs.airbyte.io/integrations/sources/zendesk-support", "icon": "zendesk.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f0e99b1e11a59d..fd710be446de50 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -188,7 +188,7 @@ - sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715 name: Zendesk Support dockerRepository: airbyte/source-zendesk-support - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support icon: zendesk.svg - sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a diff --git a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile index 7d8ff019a95c94..867f7a9c27faa6 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile +++ b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile @@ -21,5 +21,5 @@ COPY source_zendesk_support ./source_zendesk_support ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-zendesk-support diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index e0b139e5204312..25eb9d0755c0f3 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -76,7 +76,7 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: See https://developer.zendesk.com/api-reference/ticketing/account-configuration/usage_limits/ The response has a Retry-After header that tells you for how many seconds to wait before retrying. """ - # default value if there is not any headers + # default value if there is not any header sleep_timeout = 60 retry_after = int(response.headers.get("Retry-After") or 0) if retry_after and retry_after > 0: @@ -87,7 +87,6 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: rate_limit = float(response.headers.get("X-Rate-Limit") or 0) if rate_limit and rate_limit > 0: return (60.0 / rate_limit) * 2 - return sleep_timeout @staticmethod @@ -257,8 +256,6 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, yield record if not send_cnt: self._finished = True - else: - self.logger.info(f"found new {send_cnt} record(s)") def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: return {self.cursor_field: max(self._max_cursor_date or "", (current_stream_state or {}).get(self.cursor_field, ""))} @@ -346,7 +343,7 @@ def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str def parse_response( self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs ) -> Iterable[Mapping]: - """try to select relevant data only""" + # save a slice ticket state self._cursor_ticket_date = stream_slice[Tickets.cursor_field] yield from super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, **kwargs) diff --git a/docs/integrations/sources/zendesk-support.md b/docs/integrations/sources/zendesk-support.md index 2c313d616fb884..510154b59446ba 100644 --- a/docs/integrations/sources/zendesk-support.md +++ b/docs/integrations/sources/zendesk-support.md @@ -91,5 +91,6 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces ### CHANGELOG | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| `0.1.0` | 2021-07-21 | [4861](https://github.com/airbytehq/airbyte/issues/3698) | created CDK native zendesk connector | +| `0.1.1` | 2021-09-02 | [5787](https://github.com/airbytehq/airbyte/pull/5787) | fixed incremental logic for the ticket_comments stream | +| `0.1.0` | 2021-07-21 | [4861](https://github.com/airbytehq/airbyte/pull/4861) | created CDK native zendesk connector | From c7c0d40f4f91da937512a9e30c6b33f98e0ec6a6 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Thu, 2 Sep 2021 00:49:51 +0300 Subject: [PATCH 05/14] fix spec --- .../source_zendesk_support/spec.json | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json index fa5db8e27d3e36..5bd782541f0f5c 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/spec.json @@ -20,21 +20,9 @@ "auth_method": { "title": "ZenDesk Authorization Method", "type": "object", - "default": "API Token", + "default": "api_token", "description": "Zendesk service provides 2 auth method: API token and oAuth2. Now only the first one is available. Another one will be added in the future", "oneOf": [ - { - "title": "oAuth2", - "type": "object", - "required": [], - "additionalProperties": false, - "properties": { - "auth_method": { - "type": "string", - "const": "oauth2" - } - } - }, { "title": "API Token", "type": "object", From 77dee5334172870bf026db70d04eab4dc0562cfb Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Thu, 2 Sep 2021 01:09:19 +0300 Subject: [PATCH 06/14] remove old non-relevant test --- .../connectors/source-zendesk-support/setup.py | 2 +- .../source-zendesk-support/unit_tests/unit_test.py | 14 -------------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/setup.py b/airbyte-integrations/connectors/source-zendesk-support/setup.py index 90c4a34f1c54d2..eccd3a98498330 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/setup.py +++ b/airbyte-integrations/connectors/source-zendesk-support/setup.py @@ -27,7 +27,7 @@ MAIN_REQUIREMENTS = ["airbyte-cdk", "pytz"] -TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3", "timeout-decorator==0.5.0"] +TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3"] setup( version="0.1.0", diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py index f45aa5b42139ef..e45c5a7e979070 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -27,8 +27,6 @@ import requests import requests_mock -import timeout_decorator -from airbyte_cdk.sources.streams.http.exceptions import UserDefinedBackoffException from source_zendesk_support import SourceZendeskSupport from source_zendesk_support.streams import Tags @@ -47,18 +45,6 @@ def prepare_stream_args(): with open(CONFIG_FILE, "r") as f: return SourceZendeskSupport.convert_config2stream_args(json.loads(f.read())) - @timeout_decorator.timeout(10) - def test_backoff(self): - """Zendesk sends the header 'Retry-After' about needed delay. - All streams have to handle it""" - timeout = 1 - stream = Tags(**self.prepare_stream_args()) - with requests_mock.Mocker() as m: - url = stream.url_base + stream.path() - m.get(url, text=json.dumps({}), status_code=429, headers={"Retry-After": str(timeout)}) - with self.assertRaises(UserDefinedBackoffException): - list(stream.read_records(sync_mode=None)) - def test_backoff_cases(self): """Zendesk sends the header different value for backoff logic""" From 95ad9e32cc50399d4559823e4acfcc4dd8d6f416 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Thu, 2 Sep 2021 23:18:52 +0300 Subject: [PATCH 07/14] correct backoff default value --- .../source_zendesk_support/streams.py | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 25eb9d0755c0f3..f444e82ed70ce2 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -76,18 +76,17 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: See https://developer.zendesk.com/api-reference/ticketing/account-configuration/usage_limits/ The response has a Retry-After header that tells you for how many seconds to wait before retrying. """ - # default value if there is not any header - sleep_timeout = 60 + retry_after = int(response.headers.get("Retry-After") or 0) if retry_after and retry_after > 0: - sleep_timeout = int(retry_after) - else: - # the header X-Rate-Limit returns a amount of requests per minute - # we try to wait twice as long - rate_limit = float(response.headers.get("X-Rate-Limit") or 0) - if rate_limit and rate_limit > 0: - return (60.0 / rate_limit) * 2 - return sleep_timeout + return int(retry_after) + # the header X-Rate-Limit returns a amount of requests per minute + # we try to wait twice as long + + rate_limit = float(response.headers.get("X-Rate-Limit") or 2) + if rate_limit and rate_limit > 0: + return (60.0 / rate_limit) * 2 + return 60 @staticmethod def str2datetime(str_dt: str) -> datetime: @@ -159,9 +158,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late # try to save maximum value of a cursor field old_value = str((current_stream_state or {}).get(self.cursor_field, "")) new_value = max(str((latest_record or {}).get(self.cursor_field, "")), old_value) - if old_value != new_value: - return {self.cursor_field: new_value} - return None + return {self.cursor_field: new_value} class IncrementalExportStream(IncrementalEntityStream, ABC): From 8c6d61fe4c7142bf84dedb95bfe69fd01bcc4ec0 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Fri, 3 Sep 2021 13:57:15 +0300 Subject: [PATCH 08/14] correction a test --- .../integration_tests/configured_catalog.json | 5 +---- .../sample_files/configured_catalog.json | 9 ++------- .../source-bamboo-hr/source_bamboo_hr/source.py | 1 + .../integrations/source/jdbc/AbstractJdbcSource.java | 4 ++-- .../source/relationaldb/AbstractRelationalDbSource.java | 4 ++-- .../integration_tests/integration_test.py | 1 + 6 files changed, 9 insertions(+), 15 deletions(-) diff --git a/airbyte-integrations/connectors/source-bamboo-hr/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-bamboo-hr/integration_tests/configured_catalog.json index 0ab48212a5a03a..eb6044b93377b5 100644 --- a/airbyte-integrations/connectors/source-bamboo-hr/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-bamboo-hr/integration_tests/configured_catalog.json @@ -5,10 +5,7 @@ "name": "employee", "json_schema": {}, "supported_sync_modes": ["full_refresh"], - "supported_destination_sync_modes": [ - "overwrite", - "append_dedup" - ] + "supported_destination_sync_modes": ["overwrite", "append_dedup"] }, "sync_mode": "full_refresh", "destination_sync_mode": "append_dedup" diff --git a/airbyte-integrations/connectors/source-bamboo-hr/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-bamboo-hr/sample_files/configured_catalog.json index 3711028958bdc0..b21ac15c28bcf4 100644 --- a/airbyte-integrations/connectors/source-bamboo-hr/sample_files/configured_catalog.json +++ b/airbyte-integrations/connectors/source-bamboo-hr/sample_files/configured_catalog.json @@ -828,13 +828,8 @@ } } }, - "supported_sync_modes": [ - "full_refresh" - ], - "supported_destination_sync_modes": [ - "overwrite", - "append_dedup" - ] + "supported_sync_modes": ["full_refresh"], + "supported_destination_sync_modes": ["overwrite", "append_dedup"] }, "sync_mode": "full_refresh", "destination_sync_mode": "append_dedup" diff --git a/airbyte-integrations/connectors/source-bamboo-hr/source_bamboo_hr/source.py b/airbyte-integrations/connectors/source-bamboo-hr/source_bamboo_hr/source.py index c9c82797b52781..54b5f3a2260ec0 100644 --- a/airbyte-integrations/connectors/source-bamboo-hr/source_bamboo_hr/source.py +++ b/airbyte-integrations/connectors/source-bamboo-hr/source_bamboo_hr/source.py @@ -71,6 +71,7 @@ def request(self, uri: str, method: str = "GET", data={}, **kwargs) -> Response: response.raise_for_status() return response + class SourceBambooHr(Source): def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: """ diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 035652e98e74c3..5d785d5d03e7cf 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -60,8 +60,8 @@ /** * This class contains helper functions and boilerplate for implementing a source connector for a - * relational DB source which can be accessed via JDBC driver. If you are implementing a connector for - * a relational DB which has a JDBC driver, make an effort to use this class. + * relational DB source which can be accessed via JDBC driver. If you are implementing a connector + * for a relational DB which has a JDBC driver, make an effort to use this class. */ public abstract class AbstractJdbcSource extends AbstractRelationalDbSource implements Source { diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java index 94d1a97f0b0bb4..7eff8c76a99c99 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java @@ -74,8 +74,8 @@ * This class contains helper functions and boilerplate for implementing a source connector for a * relational DB source. * - * @see io.airbyte.integrations.source.jdbc.AbstractJdbcSource if you are implementing a relational DB which - * can be accessed via JDBC driver. + * @see io.airbyte.integrations.source.jdbc.AbstractJdbcSource if you are implementing a relational + * DB which can be accessed via JDBC driver. */ public abstract class AbstractRelationalDbSource extends BaseConnector implements Source { diff --git a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py index 17ae998f78087f..35fbdaa535f255 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py @@ -55,6 +55,7 @@ def _test_export_stream(self, stream_cls: type): for record_id, timestamp in record_timestamps.items(): state = {stream.cursor_field: timestamp} for record in stream.read_records(sync_mode=None, stream_state=state): + print(f"First record: {record}!={record_id} ({timestamp}), state:{state}") assert record["id"] != record_id break From 879663adebb5c13ae1116d317cfd7cece8fc1df6 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Fri, 3 Sep 2021 23:42:30 +0300 Subject: [PATCH 09/14] save addl export cursor field --- .../source_zendesk_support/streams.py | 62 ++++++++++++++----- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index f444e82ed70ce2..84c1f38402f20c 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -142,6 +142,12 @@ def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) # add the custom value for skiping of not relevant records self._start_date = self.str2datetime(start_date) if isinstance(start_date, str) else start_date + # Flag for marking of completed process + self._finished = False + + @property + def is_finished(self): + return self._finished def path(self, **kwargs) -> str: return f"{self.name}.json" @@ -176,6 +182,12 @@ class IncrementalExportStream(IncrementalEntityStream, ABC): # this endpoint provides responces in ascending order. state_checkpoint_interval = 100 + def __init__(self, **kwargs): + super().__init__(**kwargs) + # for saving of last page cursor value + # endpoints can have different cursor format but incremental logic uses unixtime format only + self._last_end_time = None + @staticmethod def str2unixtime(str_dt: str) -> int: """convert string to unixtime number @@ -188,11 +200,9 @@ def str2unixtime(str_dt: str) -> int: return calendar.timegm(dt.utctimetuple()) def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - data = response.json() - if data["end_of_stream"]: - # true if the current request has returned all the results up to the current time; false otherwise + if self.is_finished: return None - return {"start_time": data["end_time"]} + return {"start_time": self._last_end_time} def path(self, *args, **kwargs) -> str: return f"incremental/{self.name}.json" @@ -203,12 +213,15 @@ def request_params( params = {"per_page": self.page_size} if not next_page_token: - # try to search all reconds with generated_timestamp > start_time - current_state = stream_state.get(self.cursor_field) - if current_state and isinstance(current_state, str) and not current_state.isdigit(): - # try to save a stage with UnixTime format - current_state = self.str2unixtime(current_state) - start_time = int(current_state or time.mktime(self._start_date.timetuple())) + 1 + current_state = stream_state.get("_last_end_time") + if not current_state: + # try to search all reconds with generated_timestamp > start_time + current_state = stream_state.get(self.cursor_field) + if current_state and isinstance(current_state, str) and not current_state.isdigit(): + # try to save a stage with UnixTime format + current_state = self.str2unixtime(current_state) + + start_time = int(current_state or time.mktime(self._start_date.timetuple())) # +1 because the API returns all records where generated_timestamp >= start_time now = calendar.timegm(datetime.now().utctimetuple()) @@ -221,6 +234,27 @@ def request_params( params.update(next_page_token) return params + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + # try to save maximum value of a cursor field + state = super().get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record) + + if self._last_end_time: + state["_last_end_time"] = self._last_end_time + self.logger.warn(str(state)) + current_stream_state.update(state) + return current_stream_state + + def parse_response( + self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs + ) -> Iterable[Mapping]: + + data = response.json() + # save a last end time for the next attempt + self._last_end_time = data["end_time"] + # end_of_stream is true if the current request has returned all the results up to the current time; false otherwise + self._finished = data["end_of_stream"] + yield from super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, **kwargs) + class IncrementalUnsortedStream(IncrementalEntityStream, ABC): """Stream for loading without sorting @@ -231,8 +265,6 @@ class IncrementalUnsortedStream(IncrementalEntityStream, ABC): def __init__(self, **kwargs): super().__init__(**kwargs) - # Flag for marking of completed process - self._finished = False # For saving of a relevant last updated date self._max_cursor_date = None @@ -257,10 +289,6 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: return {self.cursor_field: max(self._max_cursor_date or "", (current_stream_state or {}).get(self.cursor_field, ""))} - @property - def is_finished(self): - return self._finished - @abstractmethod def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """can be different for each case""" @@ -359,7 +387,7 @@ def stream_slices( # for backward compatibility because not all relevant states can have some last ticket state ticket_stream_value = Tickets.str2unixtime(stream_state.get(self.cursor_field)) - tickets = Tickets(self._start_date, subdomain=self._subdomain, authenticator=self.authenticator).read_records( + tickets = Tickets(start_date=self._start_date, subdomain=self._subdomain, authenticator=self.authenticator).read_records( sync_mode=sync_mode, cursor_field=cursor_field, stream_state={Tickets.cursor_field: ticket_stream_value} ) From 477ee2b81e157b56fdd69e0a86854868ec6175fb Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Sat, 4 Sep 2021 01:46:14 +0300 Subject: [PATCH 10/14] fix incorrect optinal date-time --- .../schemas/organizations.json | 3 +-- .../schemas/ticket_metrics.json | 15 +++++---------- .../source_zendesk_support/schemas/tickets.json | 3 +-- .../source_zendesk_support/schemas/users.json | 3 +-- .../source_zendesk_support/streams.py | 5 ++--- 5 files changed, 10 insertions(+), 19 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json index f01e405d584306..40ddaa5c4291f4 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json @@ -53,8 +53,7 @@ "type": ["null", "integer"] }, "deleted_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] } } } diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json index a139c863d2b91b..a6acd2e3b18136 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json @@ -73,8 +73,7 @@ "type": ["null", "integer"] }, "latest_comment_added_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "on_hold_time_in_minutes": { "type": ["null", "object"], @@ -131,20 +130,16 @@ "type": ["null", "string"] }, "initially_assigned_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "assigned_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "solved_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "assignee_updated_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] } }, "type": ["null", "object"] diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json index 20bfc48b707473..47cf5fd97b24d5 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json @@ -93,8 +93,7 @@ "type": ["null", "boolean"] }, "due_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "followup_ids": { "items": { diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json index 11df801acee39d..3444626f3ac387 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json @@ -153,8 +153,7 @@ "additionalProperties": true }, "last_login_at": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "string"] }, "alias": { "type": ["null", "string"] diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 84c1f38402f20c..baeb7a7308adba 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -77,13 +77,13 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: The response has a Retry-After header that tells you for how many seconds to wait before retrying. """ - retry_after = int(response.headers.get("Retry-After") or 0) + retry_after = int(response.headers.get("Retry-After", 0)) if retry_after and retry_after > 0: return int(retry_after) # the header X-Rate-Limit returns a amount of requests per minute # we try to wait twice as long - rate_limit = float(response.headers.get("X-Rate-Limit") or 2) + rate_limit = float(response.headers.get("X-Rate-Limit", 2)) if rate_limit and rate_limit > 0: return (60.0 / rate_limit) * 2 return 60 @@ -240,7 +240,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late if self._last_end_time: state["_last_end_time"] = self._last_end_time - self.logger.warn(str(state)) current_stream_state.update(state) return current_stream_state From 861f09c56440f2b06b126aa640c3a6bc6018a404 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Wed, 8 Sep 2021 12:29:04 +0300 Subject: [PATCH 11/14] add format date-time again --- .../schemas/organizations.json | 3 ++- .../schemas/ticket_metrics.json | 12 ++++++++---- .../source_zendesk_support/schemas/tickets.json | 3 ++- .../source_zendesk_support/schemas/users.json | 3 ++- .../source_zendesk_support/streams.py | 4 ++-- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json index 40ddaa5c4291f4..f01e405d584306 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/organizations.json @@ -53,7 +53,8 @@ "type": ["null", "integer"] }, "deleted_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" } } } diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json index a6acd2e3b18136..249ca0d1c93e69 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json @@ -73,7 +73,8 @@ "type": ["null", "integer"] }, "latest_comment_added_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "on_hold_time_in_minutes": { "type": ["null", "object"], @@ -133,13 +134,16 @@ "type": ["null", "string"] }, "assigned_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "solved_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "assignee_updated_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" } }, "type": ["null", "object"] diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json index 47cf5fd97b24d5..20bfc48b707473 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/tickets.json @@ -93,7 +93,8 @@ "type": ["null", "boolean"] }, "due_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "followup_ids": { "items": { diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json index 3444626f3ac387..11df801acee39d 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/users.json @@ -153,7 +153,8 @@ "additionalProperties": true }, "last_login_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "date-time" }, "alias": { "type": ["null", "string"] diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index baeb7a7308adba..e051ec488e3d22 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -163,8 +163,8 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: # try to save maximum value of a cursor field old_value = str((current_stream_state or {}).get(self.cursor_field, "")) - new_value = max(str((latest_record or {}).get(self.cursor_field, "")), old_value) - return {self.cursor_field: new_value} + new_value = str((latest_record or {}).get(self.cursor_field, "")) + return {self.cursor_field: max(new_value, old_value)} class IncrementalExportStream(IncrementalEntityStream, ABC): From eec6908ba268ac1df53027f559368bfbf2937c78 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Wed, 8 Sep 2021 20:02:34 +0300 Subject: [PATCH 12/14] correction after review --- .../source_zendesk_support/schemas/ticket_metrics.json | 3 ++- .../source-zendesk-support/source_zendesk_support/streams.py | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json index 249ca0d1c93e69..454ab85dffc8ae 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/schemas/ticket_metrics.json @@ -131,7 +131,8 @@ "type": ["null", "string"] }, "initially_assigned_at": { - "type": ["null", "string"] + "type": ["null", "string"], + "format": "datetime" }, "assigned_at": { "type": ["null", "string"], diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index e051ec488e3d22..14d2f0010e6aad 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -86,7 +86,7 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: rate_limit = float(response.headers.get("X-Rate-Limit", 2)) if rate_limit and rate_limit > 0: return (60.0 / rate_limit) * 2 - return 60 + return super().backoff_time(response) @staticmethod def str2datetime(str_dt: str) -> datetime: @@ -218,7 +218,6 @@ def request_params( # try to search all reconds with generated_timestamp > start_time current_state = stream_state.get(self.cursor_field) if current_state and isinstance(current_state, str) and not current_state.isdigit(): - # try to save a stage with UnixTime format current_state = self.str2unixtime(current_state) start_time = int(current_state or time.mktime(self._start_date.timetuple())) From 87fabd295c8a25461031391906250afbeee90361 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Thu, 9 Sep 2021 02:06:43 +0300 Subject: [PATCH 13/14] fix integration test --- .../integration_tests/integration_test.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py index 35fbdaa535f255..4d761ca23bcb0c 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/integration_test.py @@ -46,16 +46,19 @@ def prepare_stream_args(): def _test_export_stream(self, stream_cls: type): stream = stream_cls(**self.prepare_stream_args()) + stream.page_size = 1 record_timestamps = {} for record in stream.read_records(sync_mode=None): # save the first 5 records if len(record_timestamps) > 5: break - record_timestamps[record["id"]] = record[stream.cursor_field] + if stream._last_end_time not in record_timestamps.values(): + record_timestamps[record["id"]] = stream._last_end_time + + stream.page_size = 10 for record_id, timestamp in record_timestamps.items(): - state = {stream.cursor_field: timestamp} + state = {"_last_end_time": timestamp} for record in stream.read_records(sync_mode=None, stream_state=state): - print(f"First record: {record}!={record_id} ({timestamp}), state:{state}") assert record["id"] != record_id break From 63df67b0ab0cd46730678c957f294303ddb846e9 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Thu, 9 Sep 2021 02:28:23 +0300 Subject: [PATCH 14/14] fix unit test --- .../source_zendesk_support/streams.py | 4 ++-- .../source-zendesk-support/unit_tests/unit_test.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 14d2f0010e6aad..1ade4e8eff4016 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -80,10 +80,10 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: retry_after = int(response.headers.get("Retry-After", 0)) if retry_after and retry_after > 0: return int(retry_after) + # the header X-Rate-Limit returns a amount of requests per minute # we try to wait twice as long - - rate_limit = float(response.headers.get("X-Rate-Limit", 2)) + rate_limit = float(response.headers.get("X-Rate-Limit", 0)) if rate_limit and rate_limit > 0: return (60.0 / rate_limit) * 2 return super().backoff_time(response) diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py index e45c5a7e979070..f6bf158f7e9d8a 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -49,22 +49,22 @@ def test_backoff_cases(self): """Zendesk sends the header different value for backoff logic""" stream = Tags(**self.prepare_stream_args()) - default_timeout = 60 + default_timeout = None with requests_mock.Mocker() as m: url = stream.url_base + stream.path() # with the Retry-After header > 0 - m.get(url, headers={"Retry-After": str(123)}) + m.get(url, headers={"Retry-After": str(123)}, status_code=429) assert stream.backoff_time(requests.get(url)) == 123 # with the Retry-After header < 0, must return a default value - m.get(url, headers={"Retry-After": str(-123)}) + m.get(url, headers={"Retry-After": str(-123)}, status_code=429) assert stream.backoff_time(requests.get(url)) == default_timeout # with the Retry-After header > 0 - m.get(url, headers={"X-Rate-Limit": str(100)}) + m.get(url, headers={"X-Rate-Limit": str(100)}, status_code=429) assert (stream.backoff_time(requests.get(url)) - 1.2) < 0.0005 # with the Retry-After header < 0, must return a default value - m.get(url, headers={"X-Rate-Limit": str(-100)}) + m.get(url, headers={"X-Rate-Limit": str(-100)}, status_code=429) assert stream.backoff_time(requests.get(url)) == default_timeout # without rate headers