Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Zendesk Support: refactor and debug stuck syncs #20900

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,7 @@
- name: Zendesk Support
sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
dockerRepository: airbyte/source-zendesk-support
dockerImageTag: 0.2.19
dockerImageTag: 0.2.20
documentationUrl: https://docs.airbyte.com/integrations/sources/zendesk-support
icon: zendesk-support.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16227,7 +16227,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-zendesk-support:0.2.19"
- dockerImage: "airbyte/source-zendesk-support:0.2.20"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/zendesk-support"
connectionSpecification:
Expand Down Expand Up @@ -16299,6 +16299,12 @@
https://docs.airbyte.com/integrations/sources/zendesk-support#setup-guide\"\
>docs</a> for more information."
airbyte_secret: true
ignore_pagination:
type: "boolean"
default: false
description: "Makes each stream read a single page of data."
title: "Should the connector read the second and further pages of data."
airbyte_hidden: true
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.19
LABEL io.airbyte.version=0.2.20
LABEL io.airbyte.name=airbyte/source-zendesk-support

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def convert_config2stream_args(cls, config: Mapping[str, Any]) -> Mapping[str, A
"subdomain": config["subdomain"],
"start_date": config["start_date"],
"authenticator": cls.get_authenticator(config),
"ignore_pagination": config.get("ignore_pagination", False),
}

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
}
}
]
},
"ignore_pagination": {
"type": "boolean",
"default": false,
"description": "Makes each stream read a single page of data.",
"title": "Should the connector read the second and further pages of data.",
"airbyte_hidden": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
#

import calendar
import functools
import logging
import re
import time
from abc import ABC
from collections import deque
from concurrent.futures import Future, ProcessPoolExecutor
from datetime import datetime
from datetime import datetime, timedelta
from functools import partial
from math import ceil
from pickle import PickleError, dumps
Expand All @@ -31,6 +33,33 @@
LAST_END_TIME_KEY: str = "_last_end_time"
END_OF_STREAM_KEY: str = "end_of_stream"

logger = logging.getLogger("airbyte")

# For some streams, multiple http requests are running at the same time for performance reasons.
# However, it may result in hitting the rate limit, therefore subsequent requests have to be made after a pause.
# The idea is to sustain a pause once and continue making multiple requests at a time.
# A single `retry_at` variable is introduced here, which prevents us from duplicate sleeping in the main thread
# before each request is made as it used to be in prior versions.
# It acts like a global counter - increased each time a 429 status is met
# only if it is greater than the current value. On the other hand, no request may be made before this moment.
# Because the requests are made in parallel, time.sleep will be called in parallel as well.
# This is possible because it is a point in time, not timedelta.
retry_at: Optional[datetime] = None


def sleep_before_executing(sleep_time: float):
def wrapper(function):
@functools.wraps(function)
def inner(*args, **kwargs):
logger.info(f"Sleeping {sleep_time} seconds before next request")
time.sleep(int(sleep_time))
result = function(*args, **kwargs)
return result, datetime.utcnow()

return inner

return wrapper


def to_int(s):
"https://github.com/airbytehq/airbyte/issues/13673"
Expand Down Expand Up @@ -60,10 +89,16 @@ def send_future(self, request: requests.PreparedRequest, **kwargs) -> Future:
if self.session:
func = self.session.send
else:
sleep_time = 0
now = datetime.utcnow()
if retry_at and retry_at > now:
sleep_time = (retry_at - datetime.utcnow()).seconds
# avoid calling super to not break pickled method
func = partial(requests.Session.send, self)
func = sleep_before_executing(sleep_time)(func)

if isinstance(self.executor, ProcessPoolExecutor):
self.logger.warning("ProcessPoolExecutor is used to perform IO related tasks for unknown reason!")
# verify function can be pickled
try:
dumps(func)
Expand All @@ -74,11 +109,12 @@ def send_future(self, request: requests.PreparedRequest, **kwargs) -> Future:


class BaseSourceZendeskSupportStream(HttpStream, ABC):
def __init__(self, subdomain: str, start_date: str, **kwargs):
def __init__(self, subdomain: str, start_date: str, ignore_pagination: bool = False, **kwargs):
super().__init__(**kwargs)

self._start_date = start_date
self._subdomain = subdomain
self._ignore_pagination = ignore_pagination

def backoff_time(self, response: requests.Response) -> Union[int, float]:
"""
Expand All @@ -93,10 +129,9 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]:
return retry_after

# the header X-Rate-Limit returns the amount of requests per minute
# we try to wait twice as long
rate_limit = float(response.headers.get("X-Rate-Limit", 0))
if rate_limit and rate_limit > 0:
return (60.0 / rate_limit) * 2
return 60.0 / rate_limit
return super().backoff_time(response)

@staticmethod
Expand Down Expand Up @@ -211,7 +246,7 @@ def generate_future_requests(
stream_state: Mapping[str, Any] = None,
):
records_count = self.get_api_records_count(stream_slice=stream_slice, stream_state=stream_state)

self.logger.info(f"Records count is {records_count}")
page_count = ceil(records_count / self.page_size)
for page_number in range(1, page_count + 1):
params = self.request_params(stream_state=stream_state, stream_slice=stream_slice)
Expand All @@ -228,8 +263,14 @@ def generate_future_requests(

request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice)
self.future_requests.append(
{"future": self._send_request(request, request_kwargs), "request": request, "request_kwargs": request_kwargs, "retries": 0}
{
"future": self._send_request(request, request_kwargs),
"request": request,
"request_kwargs": request_kwargs,
"retries": 0,
}
)
self.logger.info(f"Generated {len(self.future_requests)} future requests")

def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future:
response: Future = self._session.send_future(request, **request_kwargs)
Expand Down Expand Up @@ -264,15 +305,20 @@ def _retry(
retries: int,
original_exception: Exception = None,
response: requests.Response = None,
finished_at: Optional[datetime] = None,
**request_kwargs,
):
if retries == self.max_retries:
if original_exception:
raise original_exception
raise DefaultBackoffException(request=request, response=response)
if response is not None:
backoff_time = self.backoff_time(response)
time.sleep(max(0, int(backoff_time - response.elapsed.total_seconds())))
sleep_time = self.backoff_time(response)
if response is not None and finished_at and sleep_time:
current_retry_at = finished_at + timedelta(seconds=sleep_time)
global retry_at
if not retry_at or (retry_at < current_retry_at):
retry_at = current_retry_at
self.logger.info(f"Adding a request to be retried in {sleep_time} seconds")
self.future_requests.append(
{
"future": self._send_request(request, request_kwargs),
Expand All @@ -292,17 +338,21 @@ def read_records(
self.generate_future_requests(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)

while len(self.future_requests) > 0:
self.logger.info("Starting another while loop iteration")
item = self.future_requests.popleft()
request, retries, future, kwargs = item["request"], item["retries"], item["future"], item["request_kwargs"]

try:
response = future.result()
response, finished_at = future.result()
except TRANSIENT_EXCEPTIONS as exc:
self.logger.info("Will retry the request because of a transient exception")
self._retry(request=request, retries=retries, original_exception=exc, **kwargs)
continue
if self.should_retry(response):
self._retry(request=request, retries=retries, response=response, **kwargs)
self.logger.info("Will retry the request for other reason")
self._retry(request=request, retries=retries, response=response, finished_at=finished_at, **kwargs)
continue
self.logger.info("Request successful, will parse the response now")
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)


Expand All @@ -324,6 +374,8 @@ def path(self, **kwargs):
return self.name

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
next_page = self._parse_next_page_number(response)
if not next_page:
self._finished = True
Expand Down Expand Up @@ -357,6 +409,8 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: max(new_value, old_value)}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time")
if start_time != self.prev_start_time:
self.prev_start_time = start_time
Expand Down Expand Up @@ -502,6 +556,8 @@ class GroupMemberships(SourceZendeskSupportCursorPaginationStream):
"""GroupMemberships stream: https://developer.zendesk.com/api-reference/ticketing/groups/group_memberships/"""

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
next_page = self._parse_next_page_number(response)
return next_page if next_page else None

Expand All @@ -522,6 +578,8 @@ class SatisfactionRatings(SourceZendeskSupportCursorPaginationStream):
"""

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
next_page = self._parse_next_page_number(response)
return next_page if next_page else None

Expand All @@ -548,6 +606,8 @@ class TicketMetrics(SourceZendeskSupportCursorPaginationStream):
"""TicketMetric stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metrics/"""

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
next_page = self._parse_next_page_number(response)
return next_page if next_page else None

Expand Down Expand Up @@ -601,6 +661,8 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) ->
return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if self._ignore_pagination:
return None
return response.json().get("before_cursor")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime
import json
from datetime import timedelta
from urllib.parse import urljoin
Expand All @@ -14,7 +15,7 @@
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from requests.exceptions import ConnectionError
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import Macros
from source_zendesk_support.streams import Macros, Organizations

STREAM_ARGS: dict = {
"subdomain": "fake-subdomain",
Expand All @@ -23,7 +24,7 @@
}


@pytest.fixture(autouse=True)
@pytest.fixture()
def time_sleep_mock(mocker):
time_mock = mocker.patch("time.sleep", lambda x: None)
yield time_mock
Expand All @@ -39,7 +40,7 @@ def time_sleep_mock(mocker):
(101, 100, 2),
],
)
def test_proper_number_of_future_requests_generated(records_count, page_size, expected_futures_deque_len):
def test_proper_number_of_future_requests_generated(records_count, page_size, expected_futures_deque_len, time_sleep_mock):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size

Expand All @@ -60,7 +61,7 @@ def test_proper_number_of_future_requests_generated(records_count, page_size, ex
(10, 10, 0),
],
)
def test_parse_future_records(records_count, page_size, expected_futures_deque_len):
def test_parse_future_records(records_count, page_size, expected_futures_deque_len, time_sleep_mock):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
expected_records = [
Expand All @@ -82,7 +83,7 @@ def test_parse_future_records(records_count, page_size, expected_futures_deque_l
if not stream.future_requests and not expected_futures_deque_len:
assert len(stream.future_requests) == 0 and not expected_records
else:
response = stream.future_requests[0]["future"].result()
response, _ = stream.future_requests[0]["future"].result()
records = list(stream.parse_response(response, stream_state=None, stream_slice=None))
assert records == expected_records

Expand All @@ -97,7 +98,7 @@ def test_parse_future_records(records_count, page_size, expected_futures_deque_l
(101, 101, 2, None),
],
)
def test_read_records(mocker, records_count, page_size, expected_futures_deque_len, expected_exception):
def test_read_records(mocker, records_count, page_size, expected_futures_deque_len, expected_exception, time_sleep_mock):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
should_retry = bool(expected_exception)
Expand Down Expand Up @@ -131,3 +132,45 @@ def record_gen(start=0, end=page_size):
list(stream.read_records(sync_mode=SyncMode.full_refresh))
else:
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == list(record_gen(end=expected_records_count))


def test_sleep_time():
page_size = 100
x_rate_limit = 10
records_count = 350
pages = 4

start = datetime.datetime.now()
stream = Organizations(**STREAM_ARGS)
stream.page_size = page_size

def record_gen(start=0, end=100):
for i in range(start, end):
yield {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}

with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))

records_url = urljoin(stream.url_base, stream.path())
responses = [
{
"status_code": 429,
"headers": {"X-Rate-Limit": str(x_rate_limit)},
"text": "{}"
}
for _ in range(pages)
] + [
{
"status_code": 200,
"headers": {},
"text": json.dumps({"organizations": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))})
}
for page in range(pages)
]
m.get(records_url, responses)
records = list(stream.read_records(sync_mode=SyncMode.full_refresh))
assert len(records) == records_count
end = datetime.datetime.now()
sleep_time = int(60 / x_rate_limit)
assert sleep_time - 1 <= (end - start).seconds <= sleep_time + 1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test shows 6 seconds sleeping against 44 on a previous version

5 changes: 3 additions & 2 deletions docs/integrations/sources/zendesk-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ The Zendesk connector ideally should not run into Zendesk API limitations under
## Changelog

| Version | Date | Pull Request | Subject |
|:---------|:-----------| :------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `0.2.19` | 2022-12-09 | [19967](https://github.com/airbytehq/airbyte/pull/19967) | Fix reading response for more than 100k records |
|:---------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `0.2.20` | 2022-12-28 | [20900](https://github.com/airbytehq/airbyte/pull/20900) | Remove synchronous time.sleep, add logging, reduce backoff time |
| `0.2.19` | 2022-12-09 | [19967](https://github.com/airbytehq/airbyte/pull/19967) | Fix reading response for more than 100k records |
| `0.2.18` | 2022-11-29 | [19432](https://github.com/airbytehq/airbyte/pull/19432) | Revert changes from version 0.2.15, use a test read instead |
| `0.2.17` | 2022-11-24 | [19792](https://github.com/airbytehq/airbyte/pull/19792) | Transform `ticket_comments.via` "-" to null |
| `0.2.16` | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
Expand Down