Skip to content

Commit

Permalink
Source Marketo: handle null cursor values (#22203)
Browse files Browse the repository at this point in the history
* #1407 source Marketo: handle null cursor values

* #1407 source marketo: upd changelog

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
davydov-d and octavia-squidington-iii committed Feb 1, 2023
1 parent 3dd42d7 commit 52ce3de
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@
- name: Marketo
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerRepository: airbyte/source-marketo
dockerImageTag: 1.0.1
dockerImageTag: 1.0.2
documentationUrl: https://docs.airbyte.com/integrations/sources/marketo
icon: marketo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8141,7 +8141,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-marketo:1.0.1"
- dockerImage: "airbyte/source-marketo:1.0.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/marketo"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.version=1.0.2
LABEL io.airbyte.name=airbyte/source-marketo
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ def state(self, value):
self._state = value

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
self._state = {
self.cursor_field: max(
latest_record.get(self.cursor_field, self.start_date), current_stream_state.get(self.cursor_field, self.start_date)
)
}
latest_cursor_value = latest_record.get(self.cursor_field, self.start_date) or self.start_date
current_cursor_value = current_stream_state.get(self.cursor_field, self.start_date) or self.start_date
self._state = {self.cursor_field: max(latest_cursor_value, current_cursor_value)}
return self._state

def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from functools import partial
from unittest.mock import ANY, Mock, patch

import pendulum
import pytest
from airbyte_cdk.models.airbyte_protocol import SyncMode
from source_marketo.source import Activities, Campaigns, MarketoStream, Programs, SourceMarketo
from source_marketo.source import Activities, Campaigns, Leads, MarketoStream, Programs, SourceMarketo


def test_create_export_job(mocker, send_email_stream, caplog):
Expand Down Expand Up @@ -286,3 +287,30 @@ def test_check_connection(config, requests_mock, status_code, response, is_conne
def test_normalize_datetime(config, input, format, expected_result):
stream = Programs(config)
assert stream.normalize_datetime(input, format) == expected_result


today = pendulum.now()
yesterday = pendulum.now().subtract(days=1).strftime("%Y-%m-%dT%H:%M:%SZ")
today = today.strftime("%Y-%m-%dT%H:%M:%SZ")


@pytest.mark.parametrize(
"latest_record, current_state, expected_state",
(
({}, {}, "start_date"),
({"updatedAt": None}, {"updatedAt": None}, "start_date"),
({}, {"updatedAt": None}, "start_date"),
({"updatedAt": None}, {}, "start_date"),
({}, {"updatedAt": today}, {"updatedAt": today}),
({"updatedAt": None}, {"updatedAt": today}, {"updatedAt": today}),
({"updatedAt": today}, {"updatedAt": None}, {"updatedAt": today}),
({"updatedAt": today}, {}, {"updatedAt": today}),
({"updatedAt": yesterday}, {"updatedAt": today}, {"updatedAt": today}),
({"updatedAt": today}, {"updatedAt": yesterday}, {"updatedAt": today})
)
)
def test_get_updated_state(config, latest_record, current_state, expected_state):
stream = Leads(config)
if expected_state == "start_date":
expected_state = {"updatedAt": config["start_date"]}
assert stream.get_updated_state(latest_record, current_state) == expected_state
3 changes: 2 additions & 1 deletion docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
| `1.0.1` | 2023-01-31 | [22015](https://github.com/airbytehq/airbyte/pull/22015) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| `1.0.2` | 2023-02-01 | [22203](https://github.com/airbytehq/airbyte/pull/22203) | Handle Null cursor values |
| `1.0.1` | 2023-01-31 | [22015](https://github.com/airbytehq/airbyte/pull/22015) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| `1.0.0` | 2023-01-25 | [21790](https://github.com/airbytehq/airbyte/pull/21790) | Fix `activities_*` stream schemas |
| `0.1.12` | 2023-01-19 | [20973](https://github.com/airbytehq/airbyte/pull/20973) | Fix encoding error (note: this change is not in version 1.0.0, but is in later versions |
| `0.1.11` | 2022-09-30 | [17445](https://github.com/airbytehq/airbyte/pull/17445) | Do not use temporary files for memory optimization |
Expand Down

0 comments on commit 52ce3de

Please sign in to comment.