Skip to content

Commit

Permalink
馃悰 Source Zendesk Support: refactor TicketComments stream (#11237)
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov committed Mar 21, 2022
1 parent eeb3587 commit 94a862b
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@
- name: Zendesk Support
sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
dockerRepository: airbyte/source-zendesk-support
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support
icon: zendesk.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8905,7 +8905,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-zendesk-support:0.2.1"
- dockerImage: "airbyte/source-zendesk-support:0.2.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-zendesk-support
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"type": ["null", "string"],
"format": "date-time"
},
"timestamp": {
"type": ["null", "integer"]
},
"body": {
"type": ["null", "string"]
},
Expand All @@ -16,6 +19,9 @@
"type": {
"type": ["null", "string"]
},
"via_reference_id": {
"type": ["null", "integer"]
},
"html_body": {
"type": ["null", "string"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
from requests.auth import AuthBase
from requests_futures.sessions import PICKLE_ERROR, FuturesSession

DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
LAST_END_TIME_KEY = "_last_end_time"
DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ"
LAST_END_TIME_KEY: str = "_last_end_time"
END_OF_STREAM_KEY: str = "end_of_stream"


class SourceZendeskException(Exception):
Expand Down Expand Up @@ -114,6 +115,12 @@ def str2unixtime(str_dt: str) -> Optional[int]:
dt = datetime.strptime(str_dt, DATETIME_FORMAT)
return calendar.timegm(dt.utctimetuple())

@staticmethod
def _parse_next_page_number(response: requests.Response) -> Optional[int]:
"""Parses a response and tries to find next page number"""
next_page = response.json().get("next_page")
return dict(parse_qsl(urlparse(next_page).query)).get("page") if next_page else None

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""try to select relevant data only"""

Expand Down Expand Up @@ -149,6 +156,16 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, **k
self._session.auth = authenticator
self.future_requests = deque()

@property
def url_base(self) -> str:
return f"https://{self._subdomain}.zendesk.com/api/v2/"

def path(self, **kwargs):
return self.name

def next_page_token(self, *args, **kwargs):
return None

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
Expand Down Expand Up @@ -270,24 +287,6 @@ def read_records(
else:
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

@property
def url_base(self) -> str:
return f"https://{self._subdomain}.zendesk.com/api/v2/"

@staticmethod
def _parse_next_page_number(response: requests.Response) -> Optional[int]:
"""Parses a response and tries to find next page number"""
next_page = response.json().get("next_page")
if next_page:
return dict(parse_qsl(urlparse(next_page).query)).get("page")
return None

def path(self, **kwargs):
return self.name

def next_page_token(self, *args, **kwargs):
return None


class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream):
"""
Expand All @@ -305,14 +304,6 @@ def url_base(self) -> str:
def path(self, **kwargs):
return self.name

@staticmethod
def _parse_next_page_number(response: requests.Response) -> Optional[int]:
"""Parses a response and tries to find next page number"""
next_page = response.json().get("next_page")
if next_page:
return dict(parse_qsl(urlparse(next_page).query)).get("page")
return None

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
next_page = self._parse_next_page_number(response)
if not next_page:
Expand Down Expand Up @@ -351,17 +342,74 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
self.prev_start_time = start_time
return {self.cursor_field: start_time}

def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
next_page_token = next_page_token or {}
if stream_state:
# use the state value if exists
parsed_state = calendar.timegm(pendulum.parse(stream_state.get(self.cursor_field)).utctimetuple())
else:
# for full-refresh use start_date
parsed_state = calendar.timegm(pendulum.parse(self._start_date).utctimetuple())
if self.cursor_field:
params = {
"start_time": next_page_token.get(self.cursor_field, calendar.timegm(pendulum.parse(self._start_date).utctimetuple()))
}
params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)}
else:
params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())}
return params


class ZendeskSupportTicketEventsExportStream(SourceZendeskSupportCursorPaginationStream):
"""Incremental Export from TicketEvents stream:
https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export
@ param response_list_name: the main nested entity to look at inside of response, defualt = "ticket_events"
@ param response_target_entity: nested property inside of `response_list_name`, default = "child_events"
@ param list_entities_from_event : the list of nested child_events entities to include from parent record
@ param sideload_param : parameter variable to include various information to child_events property
more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints
@ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc]
"""

response_list_name: str = "ticket_events"
response_target_entity: str = "child_events"
list_entities_from_event: List[str] = None
sideload_param: str = None
event_type: str = None

@property
def update_event_from_record(self) -> bool:
"""Returns True/False based on list_entities_from_event property"""
return True if len(self.list_entities_from_event) > 0 else False

def path(self, **kwargs) -> str:
return "incremental/ticket_events"

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Returns next_page_token based on `end_of_stream` parameter inside of response
"""
next_page_token = super().next_page_token(response)
return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token

def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, next_page_token, **kwargs)
if self.sideload_param:
params["include"] = self.sideload_param
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
for record in response.json().get(self.response_list_name, []):
for event in record.get(self.response_target_entity, []):
if event.get("event_type") == self.event_type:
if self.update_event_from_record:
for prop in self.list_entities_from_event:
event[prop] = record.get(prop)
yield event


class Users(SourceZendeskSupportStream):
"""Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""

Expand All @@ -385,32 +433,15 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]:
return params


class TicketComments(SourceZendeskSupportStream):
"""TicketComments stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_comments/
ZenDesk doesn't provide API for loading of all comments by one direct endpoints.
Thus at first we loads all updated tickets and after this tries to load all created/updated
comments per every ticket"""

# Tickets can be removed throughout synchronization. The ZendDesk API will return a response
# with 404 code if a ticket is not exists. But it shouldn't break loading of other comments.
# raise_on_http_errors = False
class TicketComments(ZendeskSupportTicketEventsExportStream):
"""
Fetch the TicketComments incrementaly from TicketEvents Export stream
"""

parent = Tickets
cursor_field = "created_at"

response_list_name = "comments"

def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
ticket_id = stream_slice["id"]
return f"tickets/{ticket_id}/comments"

def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
tickets_stream = self.parent(start_date=self._start_date, subdomain=self._subdomain, authenticator=self._session.auth)
for ticket in tickets_stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_state=stream_state):
if ticket["comment_count"]:
yield {"id": ticket["id"], "child_count": ticket["comment_count"]}
list_entities_from_event = ["via_reference_id", "ticket_id", "timestamp"]
sideload_param = "comment_events"
event_type = "Comment"


class Groups(SourceZendeskSupportStream):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import calendar
from datetime import datetime
from urllib.parse import parse_qsl, urlparse

import pendulum
import pytz
import requests
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import DATETIME_FORMAT, END_OF_STREAM_KEY, BaseSourceZendeskSupportStream, TicketComments

# config
STREAM_ARGS = {
"subdomain": "test",
"start_date": "2022-01-27T00:00:00Z",
"authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"),
}

DATETIME_STR = "2021-07-22T06:55:55Z"
DATETIME_FROM_STR = datetime.strptime(DATETIME_STR, DATETIME_FORMAT)
STREAM_URL = "https://subdomain.zendesk.com/api/v2/stream.json?&start_time=1647532987&page=1"
STREAM_RESPONSE: dict = {
"ticket_events": [
{
"child_events": [
{
"id": 99999,
"via": {},
"via_reference_id": None,
"type": "Comment",
"author_id": 10,
"body": "test_comment",
"html_body": '<div class="zd-comment" dir="auto">test_comment<br></div>',
"plain_body": "test_comment",
"public": True,
"attachments": [],
"audit_id": 123456,
"created_at": "2022-03-17T16:03:07Z",
"event_type": "Comment",
}
],
"id": 999999,
"ticket_id": 3,
"timestamp": 1647532987,
"created_at": "2022-03-17T16:03:07Z",
"updater_id": 9999999,
"via": "Web form",
"system": {},
"metadata": {},
"event_type": "Audit",
}
],
"next_page": "https://subdomain.zendesk.com/api/v2/stream.json?&start_time=1122334455&page=2",
"count": 215,
"end_of_stream": False,
"end_time": 1647532987,
}
TEST_STREAM = TicketComments(**STREAM_ARGS)


def test_str2datetime():
expected = datetime.strptime(DATETIME_STR, DATETIME_FORMAT)
output = BaseSourceZendeskSupportStream.str2datetime(DATETIME_STR)
assert output == expected


def test_datetime2str():
expected = datetime.strftime(DATETIME_FROM_STR.replace(tzinfo=pytz.UTC), DATETIME_FORMAT)
output = BaseSourceZendeskSupportStream.datetime2str(DATETIME_FROM_STR)
assert output == expected


def test_str2unixtime():
expected = calendar.timegm(DATETIME_FROM_STR.utctimetuple())
output = BaseSourceZendeskSupportStream.str2unixtime(DATETIME_STR)
assert output == expected


def test_parse_next_page_number(requests_mock):
expected = dict(parse_qsl(urlparse(STREAM_RESPONSE.get("next_page")).query)).get("page")
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
output = BaseSourceZendeskSupportStream._parse_next_page_number(test_response)
assert output == expected


def test_next_page_token(requests_mock):
# mocking the logic of next_page_token
if STREAM_RESPONSE.get(END_OF_STREAM_KEY) is False:
expected = {"created_at": "1122334455"}
else:
expected = None
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
output = TEST_STREAM.next_page_token(test_response)
assert expected == output


def test_request_params(requests_mock):
expected = {"start_time": calendar.timegm(pendulum.parse(STREAM_ARGS.get("start_date")).utctimetuple()), "include": "comment_events"}
stream_state = None
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
next_page_token = TEST_STREAM.next_page_token(test_response)
output = TEST_STREAM.request_params(stream_state, next_page_token)
assert expected == output


def test_parse_response(requests_mock):
requests_mock.get(STREAM_URL, json=STREAM_RESPONSE)
test_response = requests.get(STREAM_URL)
output = TEST_STREAM.parse_response(test_response)
# get the first parsed element from generator
parsed_output = list(output)[0]
# check, if we have all transformations correctly
for entity in TicketComments.list_entities_from_event:
assert True if entity in parsed_output else False
Loading

0 comments on commit 94a862b

Please sign in to comment.