Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Marketo: handle null cursor values #22203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@
- name: Marketo
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerRepository: airbyte/source-marketo
dockerImageTag: 1.0.1
dockerImageTag: 1.0.2
documentationUrl: https://docs.airbyte.com/integrations/sources/marketo
icon: marketo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8141,7 +8141,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-marketo:1.0.1"
- dockerImage: "airbyte/source-marketo:1.0.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/marketo"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.version=1.0.2
LABEL io.airbyte.name=airbyte/source-marketo
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ def state(self, value):
self._state = value

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
self._state = {
self.cursor_field: max(
latest_record.get(self.cursor_field, self.start_date), current_stream_state.get(self.cursor_field, self.start_date)
)
}
latest_cursor_value = latest_record.get(self.cursor_field, self.start_date) or self.start_date
current_cursor_value = current_stream_state.get(self.cursor_field, self.start_date) or self.start_date
self._state = {self.cursor_field: max(latest_cursor_value, current_cursor_value)}
return self._state

def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from functools import partial
from unittest.mock import ANY, Mock, patch

import pendulum
import pytest
from airbyte_cdk.models.airbyte_protocol import SyncMode
from source_marketo.source import Activities, Campaigns, MarketoStream, Programs, SourceMarketo
from source_marketo.source import Activities, Campaigns, Leads, MarketoStream, Programs, SourceMarketo


def test_create_export_job(mocker, send_email_stream, caplog):
Expand Down Expand Up @@ -286,3 +287,30 @@ def test_check_connection(config, requests_mock, status_code, response, is_conne
def test_normalize_datetime(config, input, format, expected_result):
stream = Programs(config)
assert stream.normalize_datetime(input, format) == expected_result


today = pendulum.now()
yesterday = pendulum.now().subtract(days=1).strftime("%Y-%m-%dT%H:%M:%SZ")
today = today.strftime("%Y-%m-%dT%H:%M:%SZ")


@pytest.mark.parametrize(
"latest_record, current_state, expected_state",
(
({}, {}, "start_date"),
({"updatedAt": None}, {"updatedAt": None}, "start_date"),
({}, {"updatedAt": None}, "start_date"),
({"updatedAt": None}, {}, "start_date"),
({}, {"updatedAt": today}, {"updatedAt": today}),
({"updatedAt": None}, {"updatedAt": today}, {"updatedAt": today}),
({"updatedAt": today}, {"updatedAt": None}, {"updatedAt": today}),
({"updatedAt": today}, {}, {"updatedAt": today}),
({"updatedAt": yesterday}, {"updatedAt": today}, {"updatedAt": today}),
({"updatedAt": today}, {"updatedAt": yesterday}, {"updatedAt": today})
)
)
def test_get_updated_state(config, latest_record, current_state, expected_state):
stream = Leads(config)
if expected_state == "start_date":
expected_state = {"updatedAt": config["start_date"]}
assert stream.get_updated_state(latest_record, current_state) == expected_state
3 changes: 2 additions & 1 deletion docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
| `1.0.1` | 2023-01-31 | [22015](https://github.com/airbytehq/airbyte/pull/22015) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| `1.0.2` | 2023-02-01 | [22203](https://github.com/airbytehq/airbyte/pull/22203) | Handle Null cursor values |
| `1.0.1` | 2023-01-31 | [22015](https://github.com/airbytehq/airbyte/pull/22015) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| `1.0.0` | 2023-01-25 | [21790](https://github.com/airbytehq/airbyte/pull/21790) | Fix `activities_*` stream schemas |
| `0.1.12` | 2023-01-19 | [20973](https://github.com/airbytehq/airbyte/pull/20973) | Fix encoding error (note: this change is not in version 1.0.0, but is in later versions |
| `0.1.11` | 2022-09-30 | [17445](https://github.com/airbytehq/airbyte/pull/17445) | Do not use temporary files for memory optimization |
Expand Down