Skip to content

Commit

Permalink
Source Marketo: certify GA (#17445)
Browse files Browse the repository at this point in the history
* Source Marketo: certify GA

* source marketo: upd changelog

* auto-bump connector version [ci skip]

* #238 source marketo: do not use temp files to optimize memory consumption

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and artem1205 committed Sep 30, 2022
1 parent facab0d commit f2e0c71
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,11 +614,11 @@
- name: Marketo
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerRepository: airbyte/source-marketo
dockerImageTag: 0.1.9
dockerImageTag: 0.1.11
documentationUrl: https://docs.airbyte.io/integrations/sources/marketo
icon: marketo.svg
sourceType: api
releaseStage: beta
releaseStage: generally_available
- name: Metabase
sourceDefinitionId: c7cb421b-942e-4468-99ee-e369bcabaec5
dockerRepository: airbyte/source-metabase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6046,7 +6046,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-marketo:0.1.9"
- dockerImage: "airbyte/source-marketo:0.1.11"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/marketo"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.11
LABEL io.airbyte.name=airbyte/source-marketo
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-marketo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-faker==2.0.0",
"pytest-mock~=3.6.1",
"requests-mock",
"source-acceptance-test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import csv
import datetime
import io
import json
from abc import ABC
from time import sleep
Expand Down Expand Up @@ -228,8 +227,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
default_prop = {"type": ["null", "string"]}
schema = self.get_json_schema()["properties"]

fp = io.StringIO(response.text)
reader = csv.DictReader(fp)
reader = csv.DictReader(response.iter_lines(chunk_size=1024, decode_unicode=True))
for record in reader:
new_record = {**record}
attributes = json.loads(new_record.pop("attributes", "{}"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import os.path
import sys
import time

import pendulum
import pytest
from source_marketo.source import Activities, MarketoAuthenticator
Expand Down Expand Up @@ -60,3 +64,30 @@ def send_email_stream(config, activity):
stream_name = f"activities_{activity['name']}"
cls = type(stream_name, (Activities,), {"activity": activity})
return cls(config)


@pytest.fixture
def file_generator(faker):
def _generator(min_size: int):
print(f"Generating a test file of {min_size // 1024 ** 2} MB, this could take some time")

def fake_records_gen():
new_line = "\n"
for i in range(1000):
yield f"{str(faker.random_int())},{faker.random_int()},{faker.date_of_birth()},{faker.random_int()}," f"{faker.random_int()},{faker.email()},{faker.postcode()}{new_line}"

size, records = 0, 0
path = os.path.realpath(str(time.time()))
with open(path, "w") as output:
output.write("marketoGUID,leadId,activityDate,activityTypeId,campaignId,primaryAttributeValueId,primaryAttributeValue\n")
while size < min_size:
frg = fake_records_gen()
print("Writing another 1000 records..")
for person in frg:
output.write(person)
records += 1
size += sys.getsizeof(person)
print(f"Finished: {records} records written to {path}")
return path, records

return _generator
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#

import logging
import os
import tracemalloc
from functools import partial
from unittest.mock import ANY, Mock, patch

import pytest
Expand Down Expand Up @@ -123,7 +126,47 @@ def test_activities_schema(activity, expected_schema, config):
),
)
def test_export_parse_response(send_email_stream, response_text, expected_records):
assert list(send_email_stream.parse_response(Mock(text=response_text))) == expected_records
def iter_lines(*args, **kwargs):
yield from response_text.splitlines()

assert list(send_email_stream.parse_response(Mock(iter_lines=iter_lines, request=Mock(url="/send_email/1")))) == expected_records


def test_memory_usage(send_email_stream, file_generator):
min_file_size = 5 * (1024**2) # 5 MB
big_file_path, records_generated = file_generator(min_size=min_file_size)
small_file_path, _ = file_generator(min_size=1)

def iter_lines(file_path="", **kwargs):
with open(file_path, "r") as file:
for line in file:
yield line

tracemalloc.start()
records = 0

for _ in send_email_stream.parse_response(
Mock(iter_lines=partial(iter_lines, file_path=big_file_path), request=Mock(url="/send_email/1"))
):
records += 1
_, big_file_peak = tracemalloc.get_traced_memory()
assert records == records_generated

tracemalloc.reset_peak()
tracemalloc.clear_traces()

for _ in send_email_stream.parse_response(
Mock(iter_lines=partial(iter_lines, file_path=small_file_path), request=Mock(url="/send_email/1"))
):
pass
_, small_file_peak = tracemalloc.get_traced_memory()

os.remove(big_file_path)
os.remove(small_file_path)
# First we run parse_response() on a large file and track how much memory was consumed.
# Then we do the same with a tiny file. The goal is not to load the whole file into memory when parsing the response,
# so we assert the memory consumed was almost the same for two runs. Allowed delta is 50 KB which is 1% of a big file size.
assert abs(big_file_peak - small_file_peak) < 50 * 1024


@pytest.mark.parametrize(
Expand Down
24 changes: 13 additions & 11 deletions docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream sate. |
| `0.1.7` | 2022-08-23 | [15817](https://github.com/airbytehq/airbyte/pull/15817) | Improved unit test coverage |
| `0.1.6` | 2022-08-21 | [15824](https://github.com/airbytehq/airbyte/pull/15824) | Fix semi incremental streams: do not ignore start date, make one api call instead of multiple |
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |
| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
| `0.1.11` | 2022-09-30 | [17445](https://github.com/airbytehq/airbyte/pull/17445) | Do not use temporary files for memory optimization |
| `0.1.10` | 2022-09-30 | [17445](https://github.com/airbytehq/airbyte/pull/17445) | Optimize memory consumption |
| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream sate. |
| `0.1.7` | 2022-08-23 | [15817](https://github.com/airbytehq/airbyte/pull/15817) | Improved unit test coverage |
| `0.1.6` | 2022-08-21 | [15824](https://github.com/airbytehq/airbyte/pull/15824) | Fix semi incremental streams: do not ignore start date, make one api call instead of multiple |
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |

0 comments on commit f2e0c71

Please sign in to comment.