Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃悰 Source Zendesk Support: refactor TicketComments stream #11237

Merged
merged 13 commits into from
Mar 21, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@
- name: Zendesk Support
sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
dockerRepository: airbyte/source-zendesk-support
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support
icon: zendesk.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8905,7 +8905,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-zendesk-support:0.2.1"
- dockerImage: "airbyte/source-zendesk-support:0.2.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-zendesk-support
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"type": ["null", "string"],
"format": "date-time"
},
"timestamp": {
"type": ["null", "integer"]
},
"body": {
"type": ["null", "string"]
},
Expand All @@ -16,6 +19,9 @@
"type": {
"type": ["null", "string"]
},
"via_reference_id": {
"type": ["null", "integer"]
},
"html_body": {
"type": ["null", "string"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SourceZendeskSupport(AbstractSource):

@classmethod
def get_authenticator(cls, config: Mapping[str, Any]) -> BasicApiTokenAuthenticator:

# old authentication flow support
auth_old = config.get("auth_method")
if auth_old:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
from requests.auth import AuthBase
from requests_futures.sessions import PICKLE_ERROR, FuturesSession

DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
LAST_END_TIME_KEY = "_last_end_time"
DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ"
LAST_END_TIME_KEY: str = "_last_end_time"
END_OF_STREAM_KEY: str = "end_of_stream"


class SourceZendeskException(Exception):
Expand Down Expand Up @@ -114,6 +115,12 @@ def str2unixtime(str_dt: str) -> Optional[int]:
dt = datetime.strptime(str_dt, DATETIME_FORMAT)
return calendar.timegm(dt.utctimetuple())

@staticmethod
def _parse_next_page_number(response: requests.Response) -> Optional[int]:
"""Parses a response and tries to find next page number"""
next_page = response.json().get("next_page")
return dict(parse_qsl(urlparse(next_page).query)).get("page") if next_page else None

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""try to select relevant data only"""

Expand Down Expand Up @@ -149,6 +156,16 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, **k
self._session.auth = authenticator
self.future_requests = deque()

@property
def url_base(self) -> str:
return f"https://{self._subdomain}.zendesk.com/api/v2/"

def path(self, **kwargs):
return self.name

def next_page_token(self, *args, **kwargs):
return None

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
Expand Down Expand Up @@ -270,24 +287,6 @@ def read_records(
else:
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

@property
def url_base(self) -> str:
return f"https://{self._subdomain}.zendesk.com/api/v2/"

@staticmethod
def _parse_next_page_number(response: requests.Response) -> Optional[int]:
"""Parses a response and tries to find next page number"""
next_page = response.json().get("next_page")
if next_page:
return dict(parse_qsl(urlparse(next_page).query)).get("page")
return None

def path(self, **kwargs):
return self.name

def next_page_token(self, *args, **kwargs):
return None


class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream):
"""
Expand All @@ -305,14 +304,6 @@ def url_base(self) -> str:
def path(self, **kwargs):
return self.name

@staticmethod
def _parse_next_page_number(response: requests.Response) -> Optional[int]:
"""Parses a response and tries to find next page number"""
next_page = response.json().get("next_page")
if next_page:
return dict(parse_qsl(urlparse(next_page).query)).get("page")
return None

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
next_page = self._parse_next_page_number(response)
if not next_page:
Expand Down Expand Up @@ -351,17 +342,74 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
self.prev_start_time = start_time
return {self.cursor_field: start_time}

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
next_page_token = next_page_token or {}
if stream_state:
# use the state value if exists
parsed_state = calendar.timegm(pendulum.parse(stream_state.get(self.cursor_field)).utctimetuple())
else:
# for full-refresh use start_date
parsed_state = calendar.timegm(pendulum.parse(self._start_date).utctimetuple())
if self.cursor_field:
params = {
"start_time": next_page_token.get(self.cursor_field, calendar.timegm(pendulum.parse(self._start_date).utctimetuple()))
}
params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)}
else:
params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())}
return params


class ZendeskSupportTicketEventsExportStream(SourceZendeskSupportCursorPaginationStream):
"""Incremental Export from TicketEvents stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export

@ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events"
@ param response_target_entity: nested property inside of `response_list_name`, default = "child_events"
@ param list_entities_from_event : the list of nested child_events entities to include from parent record
@ param sideload_param : parameter variable to include various information to child_events property
more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints
@ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc]
"""

response_list_name: str = "ticket_events"
response_target_entity: str = "child_events"
list_entities_from_event: List[str] = None
sideload_param: str = None
event_type: str = None

@property
def update_event_from_record(self) -> bool:
"""Returns True/False based on list_entities_from_event property"""
return True if len(self.list_entities_from_event) > 0 else False

def path(self, **kwargs) -> str:
return "incremental/ticket_events"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns next_page_token based on `end_of_stream` parameter inside of response
"""
next_page_token = super().next_page_token(response)
return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token

def request_params(
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, next_page_token, **kwargs)
if self.sideload_param:
params["include"] = self.sideload_param
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
for record in response.json().get(self.response_list_name, []):
for event in record.get(self.response_target_entity, []):
if event.get("event_type") == self.event_type:
if self.update_event_from_record:
for prop in self.list_entities_from_event:
event[prop] = record.get(prop)
yield event


class Users(SourceZendeskSupportStream):
"""Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""

Expand All @@ -385,32 +433,15 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]:
return params


class TicketComments(SourceZendeskSupportStream):
"""TicketComments stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_comments/
ZenDesk doesn't provide API for loading of all comments by one direct endpoints.
Thus at first we loads all updated tickets and after this tries to load all created/updated
comments per every ticket"""

# Tickets can be removed throughout synchronization. The ZendDesk API will return a response
# with 404 code if a ticket is not exists. But it shouldn't break loading of other comments.
# raise_on_http_errors = False
class TicketComments(ZendeskSupportTicketEventsExportStream):
"""
Fetch the TicketComments incrementaly from TicketEvents Export stream
"""

parent = Tickets
cursor_field = "created_at"

response_list_name = "comments"

def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
ticket_id = stream_slice["id"]
return f"tickets/{ticket_id}/comments"

def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
tickets_stream = self.parent(start_date=self._start_date, subdomain=self._subdomain, authenticator=self._session.auth)
for ticket in tickets_stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_state=stream_state):
if ticket["comment_count"]:
yield {"id": ticket["id"], "child_count": ticket["comment_count"]}
list_entities_from_event = ["via_reference_id", "ticket_id", "timestamp"]
sideload_param = "comment_events"
event_type = "Comment"


class Groups(SourceZendeskSupportStream):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import calendar
from datetime import datetime
from urllib.parse import parse_qsl, urlparse

import pendulum
import pytz
import requests
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import DATETIME_FORMAT, END_OF_STREAM_KEY, BaseSourceZendeskSupportStream, TicketComments

# config
STREAM_ARGS = {
"subdomain": "test",
"start_date": "2022-01-27T00:00:00Z",
"authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"),
}

DATETIME_STR = "2021-07-22T06:55:55Z"
DATETIME_FROM_STR = datetime.strptime(DATETIME_STR, DATETIME_FORMAT)
STREAM_URL = "https://subdomain.zendesk.com/api/v2/stream.json?&start_time=1647532987&page=1"
STREAM_RESPONSE: dict = {
"ticket_events": [
{
"child_events": [
{
"id": 99999,
"via": {},
"via_reference_id": None,
"type": "Comment",
"author_id": 10,
"body": "test_comment",
"html_body": '<div class="zd-comment" dir="auto">test_comment<br></div>',
"plain_body": "test_comment",
"public": True,
"attachments": [],
"audit_id": 123456,
"created_at": "2022-03-17T16:03:07Z",
"event_type": "Comment",
}
],
"id": 999999,
"ticket_id": 3,
"timestamp": 1647532987,
"created_at": "2022-03-17T16:03:07Z",
"updater_id": 9999999,
"via": "Web form",
"system": {},
"metadata": {},
"event_type": "Audit",
}
],
"next_page": "https://subdomain.zendesk.com/api/v2/stream.json?&start_time=1122334455&page=2",
"count": 215,
"end_of_stream": False,
"end_time": 1647532987,
}
TEST_STREAM = TicketComments(**STREAM_ARGS)


def test_str2datetime():
expected = datetime.strptime(DATETIME_STR, DATETIME_FORMAT)
output = BaseSourceZendeskSupportStream.str2datetime(DATETIME_STR)
assert output == expected


def test_datetime2str():
expected = datetime.strftime(DATETIME_FROM_STR.replace(tzinfo=pytz.UTC), DATETIME_FORMAT)
output = BaseSourceZendeskSupportStream.datetime2str(DATETIME_FROM_STR)
assert output == expected


def test_str2unixtime():
expected = calendar.timegm(DATETIME_FROM_STR.utctimetuple())
output = BaseSourceZendeskSupportStream.str2unixtime(DATETIME_STR)
assert output == expected


def test_parse_next_page_number(requests_mock):
expected = dict(parse_qsl(urlparse(STREAM_RESPONSE.get("next_page")).query)).get("page")
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
output = BaseSourceZendeskSupportStream._parse_next_page_number(test_response)
assert output == expected


def test_next_page_token(requests_mock):
# mocking the logic of next_page_token
if STREAM_RESPONSE.get(END_OF_STREAM_KEY) is False:
expected = {"created_at": "1122334455"}
else:
expected = None
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
output = TEST_STREAM.next_page_token(test_response)
assert expected == output


def test_request_params(requests_mock):
expected = {"start_time": calendar.timegm(pendulum.parse(STREAM_ARGS.get("start_date")).utctimetuple()), "include": "comment_events"}
stream_state = None
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
next_page_token = TEST_STREAM.next_page_token(test_response)
output = TEST_STREAM.request_params(stream_state, next_page_token)
assert expected == output


def test_parse_response(requests_mock):
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
output = TEST_STREAM.parse_response(test_response)
# get the first parsed element from generator
parsed_output = list(output)[0]
# check, if we have all transformations correctly
for entity in TicketComments.list_entities_from_event:
assert True if entity in parsed_output else False
Loading