Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Amplitude: handle null values and ampty strings in datetimefields #21957

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/source-amplitude
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@
#


def test_dummy_test():
assert True
import pytest

pytest_plugins = ("source_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ def _get_date_time_items_from_schema(self):
result.append(key)
return result

def _date_time_to_rfc3339(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Transform 'date-time' items to RFC3339 format
"""
date_time_fields = self._get_date_time_items_from_schema()
for item in record:
if item in date_time_fields:
record[item] = pendulum.parse(record[item]).to_rfc3339_string()
dt_value = record[item]
if not dt_value:
# either null or empty string, leave it as it
record[item] = dt_value
else:
record[item] = pendulum.parse(dt_value).to_rfc3339_string()
return record

def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.now()):
Expand Down Expand Up @@ -168,7 +173,7 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
if record[self.cursor_field] >= state_value:
yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339

def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]:
def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[MutableMapping]:
with gzip.open(zip_file) as file:
for record in file:
yield json.loads(record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _convert_auth_to_token(self, username: str, password: str) -> str:
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
try:
auth = TokenAuthenticator(token=self._convert_auth_to_token(config["api_key"], config["secret_key"]), auth_method="Basic")
list(Cohorts(authenticator=auth, data_region=config["data_region"]).read_records(SyncMode.full_refresh))
list(Cohorts(authenticator=auth, data_region=config.get("data_region", "Standard Server")).read_records(SyncMode.full_refresh))
return True, None
except Exception as error:
return False, f"Unable to connect to Amplitude API with the provided credentials - {repr(error)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,10 @@ def test_get_date_time_items_from_schema(self):
[
({}, {}),
({"event_time": "2021-05-27 11:59:53.710000"}, {"event_time": "2021-05-27T11:59:53.710000+00:00"}),
({"event_time": None}, {"event_time": None}),
({"event_time": ""}, {"event_time": ""}),
],
ids=["empty_record", "transformed_record"],
ids=["empty_record", "transformed_record", "null_value", "empty_value"],
)
def test_date_time_to_rfc3339(self, record, expected):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
Expand Down
9 changes: 5 additions & 4 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ The Amplitude connector ideally should gracefully handle Amplitude API limitatio

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------|
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields |
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.15 | 2022-10-03 | [17320](https://github.com/airbytehq/airbyte/pull/17320) | Add validation `start_date` filed if it's in the future |
| 0.1.14 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
| 0.1.13 | 2022-08-31 | [16185](https://github.com/airbytehq/airbyte/pull/16185) | Re-release on new `airbyte_cdk==0.1.81` |
Expand Down