Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃悰 Source Marketo: handle null responses #33623

Merged
merged 10 commits into from Dec 19, 2023
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerImageTag: 1.2.2
dockerImageTag: 1.2.3
dockerRepository: airbyte/source-marketo
documentationUrl: https://docs.airbyte.com/integrations/sources/marketo
githubIssueLabel: source-marketo
Expand Down
Expand Up @@ -231,7 +231,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
schema = self.get_json_schema()["properties"]
response.encoding = "utf-8"

reader = csv.DictReader(response.iter_lines(chunk_size=1024, decode_unicode=True))
response_lines = response.iter_lines(chunk_size=1024, decode_unicode=True)
filtered_response_lines = self.filter_null_bytes(response_lines)
reader = self.csv_rows(filtered_response_lines)

for record in reader:
new_record = {**record}
attributes = json.loads(new_record.pop("attributes", "{}"))
Expand All @@ -257,6 +260,23 @@ def read_records(
self.sleep_till_export_completed(stream_slice)
return super().read_records(sync_mode, cursor_field, stream_slice, stream_state)

def filter_null_bytes(self, response_lines: Iterable[str]) -> Iterable[str]:
for line in response_lines:
res = line.replace("\x00", "")
if len(res) < len(line):
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(line), len(res))
yield res

@staticmethod
def csv_rows(lines: Iterable[str]) -> Iterable[Mapping]:
reader = csv.reader(lines)
headers = None
for row in reader:
if headers is None:
headers = row
else:
yield dict(zip(headers, row))


class MarketoExportCreate(MarketoStream):
"""
Expand Down
Expand Up @@ -11,7 +11,7 @@
import pendulum
import pytest
from airbyte_cdk.models.airbyte_protocol import SyncMode
from source_marketo.source import Activities, Campaigns, Leads, MarketoStream, Programs, SourceMarketo
from source_marketo.source import Activities, Campaigns, IncrementalMarketoStream, Leads, MarketoStream, Programs, SourceMarketo


def test_create_export_job(mocker, send_email_stream, caplog):
Expand Down Expand Up @@ -314,3 +314,56 @@ def test_get_updated_state(config, latest_record, current_state, expected_state)
if expected_state == "start_date":
expected_state = {"updatedAt": config["start_date"]}
assert stream.get_updated_state(latest_record, current_state) == expected_state


def test_filter_null_bytes(config):
stream = Leads(config)

test_lines = [
"Hello\x00World\n",
"Name,Email\n",
"John\x00Doe,john.doe@example.com\n"
]
expected_lines = [
"HelloWorld\n",
"Name,Email\n",
"JohnDoe,john.doe@example.com\n"
]
filtered_lines = stream.filter_null_bytes(test_lines)
for expected_line, filtered_line in zip(expected_lines, filtered_lines):
assert expected_line == filtered_line


def test_csv_rows(config):
stream = Leads(config)

test_lines = [
"Name,Email\n",
"John Doe,john.doe@example.com\n",
"Jane Doe,jane.doe@example.com\n"
]
expected_records = [
{"Name": "John Doe", "Email": "john.doe@example.com"},
{"Name": "Jane Doe", "Email": "jane.doe@example.com"}
]
records = stream.csv_rows(test_lines)
for expected_record, record in zip(expected_records, records):
assert expected_record == record

def test_availablity_strategy(config):
stream = Leads(config)
assert stream.availability_strategy == None

def test_path(config):
stream = MarketoStream(config)
assert stream.path() == "rest/v1/marketo_stream.json"

def test_get_state(config):
stream = IncrementalMarketoStream(config)
assert stream.state == {}

def test_set_tate(config):
stream = IncrementalMarketoStream(config)
expected_state = {"id": 1}
stream.state = expected_state
assert stream._state == expected_state
Expand Up @@ -3,8 +3,10 @@
#


from datetime import datetime

import pytest
from source_marketo.utils import clean_string, format_value
from source_marketo.utils import clean_string, format_value, to_datetime_str

test_data = [
(1, {"type": "integer"}, int),
Expand All @@ -15,11 +17,12 @@
("1.5", {"type": "integer"}, int),
("15", {"type": "integer"}, int),
("true", {"type": "boolean"}, bool),
("test_custom", {"type": "custom_type"}, str),
]


@pytest.mark.parametrize("value,schema,expected_output_type", test_data)
def test_fromat_value(value, schema, expected_output_type):
def test_format_value(value, schema, expected_output_type):
test = format_value(value, schema)

assert isinstance(test, expected_output_type)
Expand Down Expand Up @@ -55,3 +58,9 @@ def test_clean_string(value, expected):
test = clean_string(value)

assert test == expected

def test_to_datetime_str():
input = datetime(2023, 1, 1)
expected = "2023-01-01T00:00:00Z"

assert to_datetime_str(input) == expected
3 changes: 2 additions & 1 deletion docs/integrations/sources/marketo.md
Expand Up @@ -117,7 +117,8 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------|
| 1.2.2 | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| `1.2.3` | 2023-08-02 | [28999](https://github.com/airbytehq/airbyte/pull/28999) | Fix for ` _csv.Error: line contains NUL` |
| `1.2.2` | 2023-10-19 | [31599](https://github.com/airbytehq/airbyte/pull/31599) | Base image migration: remove Dockerfile and use the python-connector-base image |
| `1.2.1` | 2023-09-18 | [30533](https://github.com/airbytehq/airbyte/pull/30533) | Fix `json_schema` for stream `Leads` |
| `1.2.0` | 2023-06-26 | [27726](https://github.com/airbytehq/airbyte/pull/27726) | License Update: Elv2 |
| `1.1.0` | 2023-04-18 | [23956](https://github.com/airbytehq/airbyte/pull/23956) | Add `Segmentations` Stream |
Expand Down