Skip to content

Commit

Permalink
šŸ› Source Zendesk-Support: fixed bug when Tickets stream didn't retuā€¦
Browse files Browse the repository at this point in the history
ā€¦rn removed records (#11349)
  • Loading branch information
bazarnov committed Mar 27, 2022
1 parent 2de581d commit a305e49
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@
- name: Zendesk Support
sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
dockerRepository: airbyte/source-zendesk-support
dockerImageTag: 0.2.2
dockerImageTag: 0.2.3
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support
icon: zendesk.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8951,7 +8951,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-zendesk-support:0.2.2"
- dockerImage: "airbyte/source-zendesk-support:0.2.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/source-zendesk-support
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "schedules",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false,
"default_cursor_field": [],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "sla_policies",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream):
# thus we can't implement an incremental logic for them
"""

page_size = 100
primary_key = "id"
response_list_name: str = None

Expand Down Expand Up @@ -340,7 +341,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time")
if start_time != self.prev_start_time:
self.prev_start_time = start_time
return {self.cursor_field: start_time}
return {self.cursor_field: int(start_time)}

def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
Expand All @@ -359,49 +360,77 @@ def request_params(
return params


class ZendeskSupportTicketEventsExportStream(SourceZendeskSupportCursorPaginationStream):
"""Incremental Export from TicketEvents stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export
class SourceZendeskTicketExportStream(SourceZendeskSupportCursorPaginationStream):
"""Incremental Export from Tickets stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based
@ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events"
@ param response_target_entity: nested property inside of `response_list_name`, default = "child_events"
@ param list_entities_from_event : the list of nested child_events entities to include from parent record
@ param sideload_param : parameter variable to include various information to child_events property
@ param response_list_name: the main nested entity to look at inside of response, defualt = response_list_name
@ param sideload_param : parameter variable to include various information to response
more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints
@ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc]
"""

response_list_name: str = "ticket_events"
response_target_entity: str = "child_events"
list_entities_from_event: List[str] = None

cursor_field = "updated_at"
response_list_name: str = "tickets"
sideload_param: str = None
event_type: str = None

@property
def update_event_from_record(self) -> bool:
"""Returns True/False based on list_entities_from_event property"""
return True if len(self.list_entities_from_event) > 0 else False


@staticmethod
def check_start_time_param(requested_start_time: int, value: int = 1):
"""
Requesting tickets in the future is not allowed, hits 400 - bad request.
We get current UNIX timestamp minus `value` from now(), default = 1 (minute).
Returns: either close to now UNIX timestamp or previously requested UNIX timestamp.
"""
now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple())
return now if requested_start_time > now else requested_start_time

def path(self, **kwargs) -> str:
return "incremental/ticket_events"

return f"incremental/{self.response_list_name}.json"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Returns next_page_token based on `end_of_stream` parameter inside of response
"""
next_page_token = super().next_page_token(response)
return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token

def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, next_page_token, **kwargs)
# check "start_time" is not in the future
params["start_time"] = self.check_start_time_param(params["start_time"])
if self.sideload_param:
params["include"] = self.sideload_param
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
for record in response.json().get(self.response_list_name, []):
yield record


class SourceZendeskSupportTicketEventsExportStream(SourceZendeskTicketExportStream):
"""Incremental Export from TicketEvents stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export
@ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events"
@ param response_target_entity: nested property inside of `response_list_name`, default = "child_events"
@ param list_entities_from_event : the list of nested child_events entities to include from parent record
@ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc]
"""
cursor_field = "created_at"
response_list_name: str = "ticket_events"
response_target_entity: str = "child_events"
list_entities_from_event: List[str] = None
event_type: str = None

@property
def update_event_from_record(self) -> bool:
"""Returns True/False based on list_entities_from_event property"""
return True if len(self.list_entities_from_event) > 0 else False

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
for record in super().parse_response(response, **kwargs):
for event in record.get(self.response_target_entity, []):
if event.get("event_type") == self.event_type:
if self.update_event_from_record:
Expand All @@ -418,27 +447,15 @@ class Organizations(SourceZendeskSupportStream):
"""Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""


class Tickets(SourceZendeskSupportStream):
"""Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""

# The API compares the start_time with the ticket's generated_timestamp value, not its updated_at value.
# The generated_timestamp value is updated for all entity updates, including system updates.
# If a system update occurs after an event, the unchanged updated_at time will become earlier
# relative to the updated generated_timestamp time.

def request_params(self, **kwargs) -> MutableMapping[str, Any]:
"""Adds the field 'comment_count'"""
params = super().request_params(**kwargs)
params["include"] = "comment_count"
return params
class Tickets(SourceZendeskTicketExportStream):
"""Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based"""


class TicketComments(ZendeskSupportTicketEventsExportStream):
class TicketComments(SourceZendeskSupportTicketEventsExportStream):
"""
Fetch the TicketComments incrementaly from TicketEvents Export stream
"""

cursor_field = "created_at"
list_entities_from_event = ["via_reference_id", "ticket_id", "timestamp"]
sideload_param = "comment_events"
event_type = "Comment"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import Tickets
from source_zendesk_support.streams import Macros


@pytest.fixture(scope="module")
def stream_args():
return {
STREAM_ARGS: dict = {
"subdomain": "fake-subdomain",
"start_date": "2021-01-27T00:00:00Z",
"authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"),
Expand All @@ -34,34 +32,29 @@ def stream_args():
(101, 100, 2),
],
)
def test_proper_number_of_future_requests_generated(stream_args, records_count, page_size, expected_futures_deque_len):
stream = Tickets(**stream_args)
def test_proper_number_of_future_requests_generated(records_count, page_size, expected_futures_deque_len):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size

with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))

records_url = urljoin(stream.url_base, stream.path())
m.get(records_url)

stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field)

assert len(stream.future_requests) == expected_futures_deque_len


@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len",
[
(1000, 100, 10),
(1000, 10, 100),
(0, 100, 0),
(1, 100, 1),
(101, 100, 2),
(10, 10, 10),
(10, 100, 10),
(10, 10, 0),
],
)
def test_parse_future_records(stream_args, records_count, page_size, expected_futures_deque_len):
stream = Tickets(**stream_args)
def test_parse_future_records(records_count, page_size, expected_futures_deque_len):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
Expand Down Expand Up @@ -97,8 +90,8 @@ def test_parse_future_records(stream_args, records_count, page_size, expected_fu
# (101, 100, 2, False),
],
)
def test_read_records(stream_args, records_count, page_size, expected_futures_deque_len, should_retry):
stream = Tickets(**stream_args)
def test_read_records(records_count, page_size, expected_futures_deque_len, should_retry):
stream = Macros(**STREAM_ARGS)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import pytz
import requests
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import DATETIME_FORMAT, END_OF_STREAM_KEY, BaseSourceZendeskSupportStream, TicketComments
from source_zendesk_support.streams import (
DATETIME_FORMAT,
END_OF_STREAM_KEY,
BaseSourceZendeskSupportStream,
TicketComments,
SourceZendeskTicketExportStream,
)

# config
STREAM_ARGS = {
Expand Down Expand Up @@ -77,6 +83,12 @@ def test_str2unixtime():
expected = calendar.timegm(DATETIME_FROM_STR.utctimetuple())
output = BaseSourceZendeskSupportStream.str2unixtime(DATETIME_STR)
assert output == expected

def test_check_start_time_param():
expected = 1626936955
start_time = calendar.timegm(pendulum.parse(DATETIME_STR).utctimetuple())
output = SourceZendeskTicketExportStream.check_start_time_param(start_time)
assert output == expected


def test_parse_next_page_number(requests_mock):
Expand All @@ -90,7 +102,7 @@ def test_parse_next_page_number(requests_mock):
def test_next_page_token(requests_mock):
# mocking the logic of next_page_token
if STREAM_RESPONSE.get(END_OF_STREAM_KEY) is False:
expected = {"created_at": "1122334455"}
expected = {"created_at": 1122334455}
else:
expected = None
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/zendesk-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ This Source is capable of syncing the following core Streams:
* [Schedules](https://developer.zendesk.com/api-reference/ticketing/ticket-management/schedules/#list-schedules)
* [SLA Policies](https://developer.zendesk.com/rest_api/docs/support/sla_policies)
* [Tags](https://developer.zendesk.com/rest_api/docs/support/tags)
* [Tickets](https://developer.zendesk.com/rest_api/docs/support/tickets)
* [Tickets](https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based)
* [Ticket Audits](https://developer.zendesk.com/rest_api/docs/support/ticket_audits)
* [Ticket Comments](https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export)
* [Ticket Fields](https://developer.zendesk.com/rest_api/docs/support/ticket_fields)
Expand Down Expand Up @@ -102,6 +102,7 @@ Simply proceed by pressing "Authenticate your Account" and complete the authenti

| Version | Date | Pull Request | Subject |
|:---------|:-----------| :----- |:-------------------------------------------------------|
| `0.2.3` | 2022-03-23 | [11349](https://github.com/airbytehq/airbyte/pull/11349) | Fixed the bug when Tickets stream didn't return deleted records
| `0.2.2` | 2022-03-17 | [11237](https://github.com/airbytehq/airbyte/pull/11237) | Fixed the bug when TicketComments stream didn't return all records
| `0.2.1` | 2022-03-15 | [11162](https://github.com/airbytehq/airbyte/pull/11162) | Added support of OAuth2.0 authentication method
| `0.2.0` | 2022-03-01 | [9456](https://github.com/airbytehq/airbyte/pull/9456) | Update source to use future requests |
Expand Down

0 comments on commit a305e49

Please sign in to comment.