Skip to content

Commit

Permalink
Source Marketo: fix semi incremental streams (#15824)
Browse files Browse the repository at this point in the history
* #15823 source marketo: fix semi incremental streams

* source marketo - upd changelog

* #15823 fix SATs

* #15823 source marketo: fix tests

* #15823 source marketo: one more test fix

* #15823 source marketo - fix incremental catalog

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii committed Aug 23, 2022
1 parent e040362 commit 43beec3
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@
- name: Marketo
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerRepository: airbyte/source-marketo
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/sources/marketo
icon: marketo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5182,7 +5182,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-marketo:0.1.5"
- dockerImage: "airbyte/source-marketo:0.1.6"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/marketo"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/source-marketo
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: ["activities_visit_webpage"]
timeout_seconds: 4800
empty_streams: ["lists", "campaigns"]
timeout_seconds: 3600
expect_records:
path: "integration_tests/expected_records.txt"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/incremental_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
timeout_seconds: 4800
timeout_seconds: 3600
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 4800
timeout_seconds: 3600

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"streams": [
{
"stream": {
"name": "programs",
"json_schema": {},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "campaigns",
"json_schema": {},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "lists",
"json_schema": {},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "leads",
"json_schema": {},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "activities_visit_webpage",
"json_schema": {},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def __init__(self, config: Mapping[str, Any], stream_name: str = None, param: Ma
super().__init__(authenticator=config["authenticator"])
self.config = config
self.start_date = config["start_date"]
# this is done for test purposes, the field is not exposed to spec.json!
self.end_date = config.get("end_date")
self.window_in_days = config.get("window_in_days", 30)
self._url_base = config["domain_url"].rstrip("/") + "/"
self.stream_name = stream_name
Expand Down Expand Up @@ -76,14 +78,18 @@ def normalize_datetime(self, dt: str, format="%Y-%m-%dT%H:%M:%SZ%z"):
class IncrementalMarketoStream(MarketoStream):
cursor_field = "createdAt"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._state = {}

def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None) -> Iterable:
"""
Endpoint does not provide query filtering params, but they provide us
cursor field in most cases, so we used that as incremental filtering
during the parsing.
"""

if not stream_state or record[self.cursor_field] >= stream_state.get(self.cursor_field):
if record[self.cursor_field] >= (stream_state or {}).get(self.cursor_field, self.start_date):
yield record

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
Expand All @@ -92,12 +98,21 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
for record in json_response:
yield from self.filter_by_state(stream_state=stream_state, record=record)

@property
def state(self):
return self._state

@state.setter
def state(self, value):
self._state = value

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
return {
self._state = {
self.cursor_field: max(
latest_record.get(self.cursor_field, self.start_date), current_stream_state.get(self.cursor_field, self.start_date)
)
}
return self._state

def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
"""
Expand All @@ -114,16 +129,16 @@ def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwa
"""

start_date = pendulum.parse(self.start_date)
end_date = pendulum.now()

# Determine stream_state, if no stream_state we use start_date
if stream_state:
start_date = pendulum.parse(stream_state.get(self.cursor_field))

# use the lowest date between start_date and self.end_date, otherwise API fails if start_date is in future
start_date = min(start_date, end_date)
start_date = min(start_date, pendulum.now())
date_slices = []

end_date = pendulum.parse(self.end_date) if self.end_date else pendulum.now()
while start_date <= end_date:
# the amount of days for each data-chunk begining from start_date
end_date_slice = start_date.add(days=self.window_in_days)
Expand All @@ -136,6 +151,11 @@ def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwa
return date_slices


class SemiIncrementalMarketoStream(IncrementalMarketoStream):
def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
return [None]


class MarketoExportBase(IncrementalMarketoStream):
"""
Base class for all the streams which support bulk extract.
Expand Down Expand Up @@ -459,14 +479,14 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
yield record


class Campaigns(IncrementalMarketoStream):
class Campaigns(SemiIncrementalMarketoStream):
"""
Return list of all campaigns.
API Docs: http://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Campaigns/getCampaignsUsingGET
"""


class Lists(IncrementalMarketoStream):
class Lists(SemiIncrementalMarketoStream):
"""
Return list of all lists.
API Docs: http://developers.marketo.com/rest-api/endpoint-reference/lead-database-endpoint-reference/#!/Static_Lists/getListsUsingGET
Expand Down
17 changes: 9 additions & 8 deletions docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------|
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
| `0.1.6` | 2022-08-21 | [15824](https://github.com/airbytehq/airbyte/pull/15824) | Fix semi incremental streams: do not ignore start date, make one api call instead of multiple |
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |

0 comments on commit 43beec3

Please sign in to comment.