Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃悰 Source Zendesk-Support: fixed bug when Tickets stream didn't return removed records #11349

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@
- name: Zendesk Support
sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
dockerRepository: airbyte/source-zendesk-support
dockerImageTag: 0.2.2
dockerImageTag: 0.2.3
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support
icon: zendesk.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8906,7 +8906,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-zendesk-support:0.2.2"
- dockerImage: "airbyte/source-zendesk-support:0.2.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/source-zendesk-support
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "schedules",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false,
"default_cursor_field": [],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "sla_policies",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream):
# thus we can't implement an incremental logic for them
"""

page_size = 100
primary_key = "id"
response_list_name: str = None

Expand Down Expand Up @@ -340,7 +341,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time")
if start_time != self.prev_start_time:
self.prev_start_time = start_time
return {self.cursor_field: start_time}
return {self.cursor_field: int(start_time)}

def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
Expand All @@ -359,49 +360,77 @@ def request_params(
return params


class ZendeskSupportTicketEventsExportStream(SourceZendeskSupportCursorPaginationStream):
"""Incremental Export from TicketEvents stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export
class SourceZendeskTicketExportStream(SourceZendeskSupportCursorPaginationStream):
"""Incremental Export from Tickets stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based

@ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events"
@ param response_target_entity: nested property inside of `response_list_name`, default = "child_events"
@ param list_entities_from_event : the list of nested child_events entities to include from parent record
@ param sideload_param : parameter variable to include various information to child_events property
@ param response_list_name: the main nested entity to look at inside of response, defualt = response_list_name
@ param sideload_param : parameter variable to include various information to response
more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints
@ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc]
"""

response_list_name: str = "ticket_events"
response_target_entity: str = "child_events"
list_entities_from_event: List[str] = None

cursor_field = "updated_at"
response_list_name: str = "tickets"
sideload_param: str = None
event_type: str = None

@property
def update_event_from_record(self) -> bool:
"""Returns True/False based on list_entities_from_event property"""
return True if len(self.list_entities_from_event) > 0 else False


@staticmethod
def check_start_time_param(requested_start_time: int, value: int = 1):
"""
Requesting tickets in the future is not allowed, hits 400 - bad request.
We get current UNIX timestamp minus `value` from now(), default = 1 (minute).

Returns: either close to now UNIX timestamp or previously requested UNIX timestamp.
"""
now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple())
return now if requested_start_time > now else requested_start_time

def path(self, **kwargs) -> str:
return "incremental/ticket_events"

return f"incremental/{self.response_list_name}.json"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Returns next_page_token based on `end_of_stream` parameter inside of response
"""
next_page_token = super().next_page_token(response)
return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token

def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, next_page_token, **kwargs)
# check "start_time" is not in the future
params["start_time"] = self.check_start_time_param(params["start_time"])
if self.sideload_param:
params["include"] = self.sideload_param
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
for record in response.json().get(self.response_list_name, []):
yield record


class SourceZendeskSupportTicketEventsExportStream(SourceZendeskTicketExportStream):
"""Incremental Export from TicketEvents stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export

@ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events"
@ param response_target_entity: nested property inside of `response_list_name`, default = "child_events"
@ param list_entities_from_event : the list of nested child_events entities to include from parent record
@ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc]
"""
cursor_field = "created_at"
response_list_name: str = "ticket_events"
response_target_entity: str = "child_events"
list_entities_from_event: List[str] = None
event_type: str = None

@property
def update_event_from_record(self) -> bool:
"""Returns True/False based on list_entities_from_event property"""
return True if len(self.list_entities_from_event) > 0 else False

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
for record in super().parse_response(response, **kwargs):
for event in record.get(self.response_target_entity, []):
if event.get("event_type") == self.event_type:
if self.update_event_from_record:
Expand All @@ -418,27 +447,15 @@ class Organizations(SourceZendeskSupportStream):
"""Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""


class Tickets(SourceZendeskSupportStream):
"""Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""

# The API compares the start_time with the ticket's generated_timestamp value, not its updated_at value.
# The generated_timestamp value is updated for all entity updates, including system updates.
# If a system update occurs after an event, the unchanged updated_at time will become earlier
# relative to the updated generated_timestamp time.

def request_params(self, **kwargs) -> MutableMapping[str, Any]:
"""Adds the field 'comment_count'"""
params = super().request_params(**kwargs)
params["include"] = "comment_count"
return params
class Tickets(SourceZendeskTicketExportStream):
"""Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based"""


class TicketComments(ZendeskSupportTicketEventsExportStream):
class TicketComments(SourceZendeskSupportTicketEventsExportStream):
"""
Fetch the TicketComments incrementaly from TicketEvents Export stream
"""

cursor_field = "created_at"
list_entities_from_event = ["via_reference_id", "ticket_id", "timestamp"]
sideload_param = "comment_events"
event_type = "Comment"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import Tickets
from source_zendesk_support.streams import Macros


@pytest.fixture(scope="module")
def stream_args():
return {
STREAM_ARGS: dict = {
"subdomain": "fake-subdomain",
"start_date": "2021-01-27T00:00:00Z",
"authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"),
Expand All @@ -34,34 +32,29 @@ def stream_args():
(101, 100, 2),
],
)
def test_proper_number_of_future_requests_generated(stream_args, records_count, page_size, expected_futures_deque_len):
stream = Tickets(**stream_args)
def test_proper_number_of_future_requests_generated(records_count, page_size, expected_futures_deque_len):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size

with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))

records_url = urljoin(stream.url_base, stream.path())
m.get(records_url)

stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field)

assert len(stream.future_requests) == expected_futures_deque_len


@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len",
[
(1000, 100, 10),
(1000, 10, 100),
(0, 100, 0),
(1, 100, 1),
(101, 100, 2),
(10, 10, 10),
(10, 100, 10),
(10, 10, 0),
],
)
def test_parse_future_records(stream_args, records_count, page_size, expected_futures_deque_len):
stream = Tickets(**stream_args)
def test_parse_future_records(records_count, page_size, expected_futures_deque_len):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
Expand Down Expand Up @@ -97,8 +90,8 @@ def test_parse_future_records(stream_args, records_count, page_size, expected_fu
# (101, 100, 2, False),
],
)
def test_read_records(stream_args, records_count, page_size, expected_futures_deque_len, should_retry):
stream = Tickets(**stream_args)
def test_read_records(records_count, page_size, expected_futures_deque_len, should_retry):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import pytz
import requests
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import DATETIME_FORMAT, END_OF_STREAM_KEY, BaseSourceZendeskSupportStream, TicketComments
from source_zendesk_support.streams import (
DATETIME_FORMAT,
END_OF_STREAM_KEY,
BaseSourceZendeskSupportStream,
TicketComments,
SourceZendeskTicketExportStream,
)

# config
STREAM_ARGS = {
Expand Down Expand Up @@ -77,6 +83,12 @@ def test_str2unixtime():
expected = calendar.timegm(DATETIME_FROM_STR.utctimetuple())
output = BaseSourceZendeskSupportStream.str2unixtime(DATETIME_STR)
assert output == expected

def test_check_start_time_param():
expected = 1626936955
start_time = calendar.timegm(pendulum.parse(DATETIME_STR).utctimetuple())
output = SourceZendeskTicketExportStream.check_start_time_param(start_time)
assert output == expected


def test_parse_next_page_number(requests_mock):
Expand All @@ -90,7 +102,7 @@ def test_parse_next_page_number(requests_mock):
def test_next_page_token(requests_mock):
# mocking the logic of next_page_token
if STREAM_RESPONSE.get(END_OF_STREAM_KEY) is False:
expected = {"created_at": "1122334455"}
expected = {"created_at": 1122334455}
else:
expected = None
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/zendesk-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ This Source is capable of syncing the following core Streams:
* [Schedules](https://developer.zendesk.com/api-reference/ticketing/ticket-management/schedules/#list-schedules)
* [SLA Policies](https://developer.zendesk.com/rest_api/docs/support/sla_policies)
* [Tags](https://developer.zendesk.com/rest_api/docs/support/tags)
* [Tickets](https://developer.zendesk.com/rest_api/docs/support/tickets)
* [Tickets](https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based)
* [Ticket Audits](https://developer.zendesk.com/rest_api/docs/support/ticket_audits)
* [Ticket Comments](https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export)
* [Ticket Fields](https://developer.zendesk.com/rest_api/docs/support/ticket_fields)
Expand Down Expand Up @@ -102,6 +102,7 @@ Simply proceed by pressing "Authenticate your Account" and complete the authenti

| Version | Date | Pull Request | Subject |
|:---------|:-----------| :----- |:-------------------------------------------------------|
| `0.2.3` | 2022-03-23 | [11349](https://github.com/airbytehq/airbyte/pull/11349) | Fixed the bug when Tickets stream didn't return deleted records
| `0.2.2` | 2022-03-17 | [11237](https://github.com/airbytehq/airbyte/pull/11237) | Fixed the bug when TicketComments stream didn't return all records
| `0.2.1` | 2022-03-15 | [11162](https://github.com/airbytehq/airbyte/pull/11162) | Added support of OAuth2.0 authentication method
| `0.2.0` | 2022-03-01 | [9456](https://github.com/airbytehq/airbyte/pull/9456) | Update source to use future requests |
Expand Down